Skip to content

Commit 7ab900b

Browse files
committed
Add Ruby Rust utility for Send/Sync BoxValue
1 parent 3acd1b5 commit 7ab900b

File tree

5 files changed

+114
-87
lines changed

5 files changed

+114
-87
lines changed

temporalio/ext/src/client.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,15 @@ impl Client {
176176
.await?;
177177
Ok(core)
178178
},
179-
move |_, result: Result<CoreClient, ClientInitError>| match result {
180-
Ok(core) => callback.push(Client {
181-
core,
182-
runtime_handle,
183-
}),
184-
Err(err) => callback.push(new_error!("Failed client connect: {}", err)),
179+
move |ruby, result: Result<CoreClient, ClientInitError>| match result {
180+
Ok(core) => callback.push(
181+
&ruby,
182+
Client {
183+
core,
184+
runtime_handle,
185+
},
186+
),
187+
Err(err) => callback.push(&ruby, new_error!("Failed client connect: {}", err)),
185188
},
186189
);
187190
Ok(())
@@ -325,12 +328,12 @@ where
325328
};
326329
res.map(|msg| msg.get_ref().encode_to_vec())
327330
},
328-
move |_, result| {
331+
move |ruby, result| {
329332
match result {
330333
// TODO(cretz): Any reasonable way to prevent byte copy that is just going to get decoded into proto
331334
// object?
332-
Ok(val) => callback.push(RString::from_slice(&val)),
333-
Err(status) => callback.push(RpcFailure { status }),
335+
Ok(val) => callback.push(&ruby, RString::from_slice(&val)),
336+
Err(status) => callback.push(&ruby, RpcFailure { status }),
334337
}
335338
},
336339
);

temporalio/ext/src/metric.rs

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
use std::{any::Any, rc::Rc, sync::Arc, time::Duration};
1+
use std::{any::Any, sync::Arc, time::Duration};
22

33
use magnus::{
44
class, function,
55
gc::register_mark_object,
66
method,
77
prelude::*,
88
r_hash::ForEach,
9-
value::{BoxValue, IntoId, Lazy, Qfalse, Qtrue},
9+
value::{IntoId, Lazy, Qfalse, Qtrue},
1010
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
1111
Symbol, TryConvert, TypedData, Value,
1212
};
1313
use temporal_sdk_core_api::telemetry::metrics::{
1414
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
1515
};
1616

17-
use crate::{error, id, runtime::Runtime, ROOT_MOD};
17+
use crate::{error, id, runtime::Runtime, util::SendSyncBoxValue, ROOT_MOD};
1818

1919
pub fn init(ruby: &Ruby) -> Result<(), Error> {
2020
let root_mod = ruby.get_inner(&ROOT_MOD);
@@ -275,26 +275,16 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error
275275

276276
#[derive(Clone, Debug)]
277277
pub struct BufferedMetricRef {
278-
value: Rc<BoxValue<Value>>,
278+
value: Arc<SendSyncBoxValue<Value>>,
279279
}
280280

281281
impl BufferInstrumentRef for BufferedMetricRef {}
282282

283-
// We can't use Ruby Opaque because it doesn't protect the object from being
284-
// GC'd, but we trust ourselves not to access this value outside of Ruby
285-
// context (which has global GVL to ensure thread safety).
286-
unsafe impl Send for BufferedMetricRef {}
287-
unsafe impl Sync for BufferedMetricRef {}
288-
289283
#[derive(Debug)]
290284
struct BufferedMetricAttributes {
291-
value: BoxValue<RHash>,
285+
value: SendSyncBoxValue<RHash>,
292286
}
293287

294-
// See Send/Sync for BufferedMetricRef for details on why we do this
295-
unsafe impl Send for BufferedMetricAttributes {}
296-
unsafe impl Sync for BufferedMetricAttributes {}
297-
298288
impl CustomMetricAttributes for BufferedMetricAttributes {
299289
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
300290
self as Arc<dyn Any + Send + Sync>
@@ -402,7 +392,7 @@ fn convert_metric_event(
402392
// Put on lazy ref
403393
populate_into
404394
.set(Arc::new(BufferedMetricRef {
405-
value: Rc::new(BoxValue::new(val)),
395+
value: Arc::new(SendSyncBoxValue::new(val)),
406396
}))
407397
.map_err(|_| error!("Failed setting metric ref"))?;
408398
Ok(None)
@@ -423,8 +413,10 @@ fn convert_metric_event(
423413
.downcast::<BufferedMetricAttributes>()
424414
.map_err(|_| {
425415
error!("Unable to downcast to expected buffered metric attributes")
426-
})?;
427-
attrs.value.as_ref().funcall("dup", ())?
416+
})?
417+
.value
418+
.value(ruby);
419+
attrs.funcall("dup", ())?
428420
}
429421
None => ruby.hash_new_capa(attributes.len()),
430422
};
@@ -441,7 +433,7 @@ fn convert_metric_event(
441433
// Put on lazy ref
442434
populate_into
443435
.set(Arc::new(BufferedMetricAttributes {
444-
value: BoxValue::new(hash),
436+
value: SendSyncBoxValue::new(hash),
445437
}))
446438
.map_err(|_| error!("Failed setting metric attrs"))?;
447439
Ok(None)
@@ -458,7 +450,7 @@ fn convert_metric_event(
458450
"new",
459451
(
460452
// Metric
461-
**instrument.get().clone().value.clone(),
453+
instrument.get().clone().value.clone().value(ruby),
462454
// Value
463455
match update {
464456
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
@@ -482,15 +474,16 @@ fn convert_metric_event(
482474
metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v),
483475
},
484476
// Attributes
485-
*attributes
477+
attributes
486478
.get()
487479
.clone()
488480
.as_any()
489481
.downcast::<BufferedMetricAttributes>()
490482
.map_err(|_| {
491483
error!("Unable to downcast to expected buffered metric attributes")
492484
})?
493-
.value,
485+
.value
486+
.value(ruby),
494487
),
495488
)?,
496489
))

temporalio/ext/src/testing.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,16 @@ impl EphemeralServer {
7777
let runtime_handle = runtime.handle.clone();
7878
runtime.handle.spawn(
7979
async move { opts.start_server().await },
80-
move |_, result| match result {
81-
Ok(core) => callback.push(EphemeralServer {
82-
target: core.target.clone(),
83-
core: Mutex::new(Some(core)),
84-
runtime_handle,
85-
}),
86-
Err(err) => callback.push(new_error!("Failed starting server: {}", err)),
80+
move |ruby, result| match result {
81+
Ok(core) => callback.push(
82+
&ruby,
83+
EphemeralServer {
84+
target: core.target.clone(),
85+
core: Mutex::new(Some(core)),
86+
runtime_handle,
87+
},
88+
),
89+
Err(err) => callback.push(&ruby, new_error!("Failed starting server: {}", err)),
8790
},
8891
);
8992
Ok(())
@@ -109,13 +112,16 @@ impl EphemeralServer {
109112
let runtime_handle = runtime.handle.clone();
110113
runtime.handle.spawn(
111114
async move { opts.start_server().await },
112-
move |_, result| match result {
113-
Ok(core) => callback.push(EphemeralServer {
114-
target: core.target.clone(),
115-
core: Mutex::new(Some(core)),
116-
runtime_handle,
117-
}),
118-
Err(err) => callback.push(new_error!("Failed starting server: {}", err)),
115+
move |ruby, result| match result {
116+
Ok(core) => callback.push(
117+
&ruby,
118+
EphemeralServer {
119+
target: core.target.clone(),
120+
core: Mutex::new(Some(core)),
121+
runtime_handle,
122+
},
123+
),
124+
Err(err) => callback.push(&ruby, new_error!("Failed starting server: {}", err)),
119125
},
120126
);
121127
Ok(())
@@ -149,9 +155,9 @@ impl EphemeralServer {
149155
.spawn(
150156
async move { core.shutdown().await },
151157
move |ruby, result| match result {
152-
Ok(_) => callback.push(ruby.qnil()),
158+
Ok(_) => callback.push(&ruby, ruby.qnil()),
153159
Err(err) => {
154-
callback.push(new_error!("Failed shutting down server: {}", err))
160+
callback.push(&ruby, new_error!("Failed shutting down server: {}", err))
155161
}
156162
},
157163
)

temporalio/ext/src/util.rs

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -103,27 +103,44 @@ where
103103
}
104104
}
105105

106+
/// Utility for pushing a result to a queue in an async callback.
106107
pub(crate) struct AsyncCallback {
107-
queue: BoxValue<Value>,
108+
queue: SendSyncBoxValue<Value>,
108109
}
109110

110-
// We trust our usage of this across threads. We would use Opaque but we can't
111-
// box that properly/safely. The inner queue is always expected to be a Ruby
112-
// Queue.
113-
unsafe impl Send for AsyncCallback {}
114-
unsafe impl Sync for AsyncCallback {}
115-
116111
impl AsyncCallback {
117112
pub(crate) fn from_queue(queue: Value) -> Self {
118113
Self {
119-
queue: BoxValue::new(queue),
114+
queue: SendSyncBoxValue::new(queue),
120115
}
121116
}
122117

123-
pub(crate) fn push<V>(&self, value: V) -> Result<(), Error>
118+
pub(crate) fn push<V>(&self, ruby: &Ruby, value: V) -> Result<(), Error>
124119
where
125120
V: IntoValue,
126121
{
127-
self.queue.funcall(id!("push"), (value,)).map(|_: Value| ())
122+
let queue = self.queue.value(ruby);
123+
queue.funcall(id!("push"), (value,)).map(|_: Value| ())
124+
}
125+
}
126+
127+
/// Utility that basically combines Magnus BoxValue with Magnus Opaque. It's a
128+
/// Send/Sync safe Ruby value that prevents GC until dropped and is only
129+
/// accessible from a Ruby thread.
130+
#[derive(Debug)]
131+
pub(crate) struct SendSyncBoxValue<T: ReprValue>(BoxValue<T>);
132+
133+
// We trust our usage of this across threads. We would use Opaque but we can't
134+
// box that properly/safely to ensure it does not get GC'd.
135+
unsafe impl<T: ReprValue> Send for SendSyncBoxValue<T> {}
136+
unsafe impl<T: ReprValue> Sync for SendSyncBoxValue<T> {}
137+
138+
impl<T: ReprValue> SendSyncBoxValue<T> {
139+
pub fn new(val: T) -> Self {
140+
Self(BoxValue::new(val))
141+
}
142+
143+
pub fn value(&self, _: &Ruby) -> T {
144+
*self.0
128145
}
129146
}

temporalio/ext/src/worker.rs

Lines changed: 38 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -225,21 +225,23 @@ impl Worker {
225225
Ok(None) => ruby.qnil().as_value(),
226226
Err(err) => new_error!("Poll failure: {}", err).as_value(),
227227
};
228-
callback.push(ruby.ary_new_from_values(&[
229-
poll_result.worker_index.into_value(),
230-
worker_type.into_value(),
231-
result,
232-
]))
228+
callback.push(
229+
&ruby,
230+
ruby.ary_new_from_values(&[
231+
poll_result.worker_index.into_value(),
232+
worker_type.into_value(),
233+
result,
234+
]),
235+
)
233236
})));
234237
}
235238
},
236239
move |ruby, _| {
237240
// Call with nil, nil, nil to say done
238-
complete_callback.push(ruby.ary_new_from_values(&[
239-
ruby.qnil(),
240-
ruby.qnil(),
241-
ruby.qnil(),
242-
]))
241+
complete_callback.push(
242+
&ruby,
243+
ruby.ary_new_from_values(&[ruby.qnil(), ruby.qnil(), ruby.qnil()]),
244+
)
243245
},
244246
);
245247
Ok(())
@@ -289,13 +291,16 @@ impl Worker {
289291
},
290292
move |ruby, errs| {
291293
if errs.is_empty() {
292-
callback.push(ruby.qnil())
294+
callback.push(&ruby, ruby.qnil())
293295
} else {
294-
callback.push(new_error!(
295-
"{} worker(s) failed to finalize, reasons: {}",
296-
errs.len(),
297-
errs.join(", ")
298-
))
296+
callback.push(
297+
&ruby,
298+
new_error!(
299+
"{} worker(s) failed to finalize, reasons: {}",
300+
errs.len(),
301+
errs.join(", ")
302+
),
303+
)
299304
}
300305
},
301306
);
@@ -308,8 +313,8 @@ impl Worker {
308313
self.runtime_handle.spawn(
309314
async move { temporal_sdk_core_api::Worker::validate(&*worker).await },
310315
move |ruby, result| match result {
311-
Ok(()) => callback.push(ruby.qnil()),
312-
Err(err) => callback.push(new_error!("Failed validating worker: {}", err)),
316+
Ok(()) => callback.push(&ruby, ruby.qnil()),
317+
Err(err) => callback.push(&ruby, new_error!("Failed validating worker: {}", err)),
313318
},
314319
);
315320
Ok(())
@@ -325,8 +330,8 @@ impl Worker {
325330
temporal_sdk_core_api::Worker::complete_activity_task(&*worker, completion).await
326331
},
327332
move |ruby, result| match result {
328-
Ok(()) => callback.push((ruby.qnil(),)),
329-
Err(err) => callback.push((new_error!("Completion failure: {}", err),)),
333+
Ok(()) => callback.push(&ruby, (ruby.qnil(),)),
334+
Err(err) => callback.push(&ruby, (new_error!("Completion failure: {}", err),)),
330335
},
331336
);
332337
Ok(())
@@ -357,16 +362,19 @@ impl Worker {
357362
.await
358363
},
359364
move |ruby, result| {
360-
callback.push(ruby.ary_new_from_values(&[
361-
(-1).into_value_with(&ruby),
362-
run_id.into_value_with(&ruby),
363-
match result {
364-
Ok(()) => ruby.qnil().into_value_with(&ruby),
365-
Err(err) => {
366-
new_error!("Completion failure: {}", err).into_value_with(&ruby)
367-
}
368-
},
369-
]))
365+
callback.push(
366+
&ruby,
367+
ruby.ary_new_from_values(&[
368+
(-1).into_value_with(&ruby),
369+
run_id.into_value_with(&ruby),
370+
match result {
371+
Ok(()) => ruby.qnil().into_value_with(&ruby),
372+
Err(err) => {
373+
new_error!("Completion failure: {}", err).into_value_with(&ruby)
374+
}
375+
},
376+
]),
377+
)
370378
},
371379
);
372380
Ok(())

0 commit comments

Comments
 (0)