Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 96 additions & 107 deletions temporalio/Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions temporalio/Cargo.toml
Copy link
Member Author

@cretz cretz Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of things in this file that I blindly brought over from https://github.com/temporalio/sdk-core/blob/master/Cargo.toml. This is likely only going to every be a 1-member workspace (it's just needed at the top level for tooling). Consider getting rid of everything in here except members and moving things to specific project toml if they need to be retained.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added docs to the top of this file explaining why it has to be this way right now

Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# Ruby and rb-sys seem to require a Cargo.toml here at the gem root (see https://github.com/oxidize-rb/rb-sys/issues/405),
# and Rust is broken if one workspace is nested within another without the ability to exclude (see
# https://github.com/rust-lang/cargo/issues/6745). So we basically must copy sdk-core's workspace for now.
# TODO(cretz): Fix this situation if possible without moving sdk-core submodule.

[workspace]
members = ["./ext"]
resolver = "2"
Expand Down
1 change: 1 addition & 0 deletions temporalio/ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-protos" }
tokio = "1.26"
tonic = "0.11"
tracing = "0.1"
url = "2.2"
46 changes: 46 additions & 0 deletions temporalio/ext/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Ruby Rust Bridge

This is a Ruby extension written in Rust to incorporate https://github.com/temporalio/sdk-core (included as a
submodule). This leverages https://github.com/oxidize-rb/rb-sys and https://github.com/matsadler/magnus.

## Tokio Async to Ruby Async

There is no general accepted way to bridge the gap between async Tokio and async Ruby. It is important not only that
calls to/from Ruby happen with the GVL, but in threads _Ruby_ creates. C extensions cannot reasonably turn a non-Ruby
thread into a Ruby thread.

What the current implementation does is asks the Ruby side to instantiate and run a "command loop". This eats up a
single Ruby thread for the life of the runtime (which is usually the life of the process). This run-command-loop call
unlocks the GVL and waits on a Rust mpsc channel to feed it work. Ignoring shutdown machinations, the work is just a
Ruby callback. When the Rust channel receives a Ruby callback on this Ruby thread, it re-acquires the GVL and runs the
callback. Since this occurs serially for each callback in a single thread the callback is expected to be very fast.

Each thing that needs to do some async Tokio call, calls a helper with two things: the future to spawn in Tokio runtime,
and the callback to handle the results in the Ruby thread. All users of this helper are expected to do basically
everything in the async Tokio function, and do a very simple Ruby block call (or similar) in the callback.

So async calls usually looks like this:

* In Ruby, call method implemented in Rust, e.g.

```
queue = Queue.new
some_bridge_thing.do_foo { |result| Queue.push(result) }
queue.pop
```

* In Rust, `do_foo` spawns some Tokio async thing and returns
* Once Tokio async thing is completed, in the Ruby-thread callback, Rust side converts that thing to a Ruby thing and
invokes the block

This allows Ruby to remain async if in a Fiber, because Ruby `queue.pop` does not block a thread when in a Fiber
context.

The invocation of a block with a value is quite cheap in Ruby (`rb_proc_call_kw` C call). There are no obvious
performance savings by trying to push to a Ruby queue from inside Rust directly.

## Argument Passing

For the smallest set of arguments, a simple positional or kwarg is fine. For anything larger, a `Struct` defined on the
Ruby side should be used. Parameters to Ruby functions defined in Rust should have no defaults. We want to make sure we
catch any missing arguments eagerly (e.g. the first test that even uses the struct/args).
115 changes: 55 additions & 60 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, future::Future, time::Duration};
use std::{collections::HashMap, future::Future, marker::PhantomData, time::Duration};

use temporal_client::{
ClientInitError, ClientKeepAliveConfig, ClientOptionsBuilder, ClientTlsConfig,
Expand All @@ -13,7 +13,7 @@ use magnus::{
use tonic::{metadata::MetadataKey, Status};
use url::Url;

use super::{error, new_error, ROOT_MOD};
use super::{error, id, new_error, ROOT_MOD};
use crate::{
runtime::{Runtime, RuntimeHandle},
util::Struct,
Expand Down Expand Up @@ -82,19 +82,21 @@ impl Client {
let mut opts_build = ClientOptionsBuilder::default();
opts_build
.target_url(
Url::parse(format!("http://{}", options.aref::<String>("target_host")?).as_str())
.map_err(|err| error!("Failed parsing host: {}", err))?,
Url::parse(
format!("http://{}", options.member::<String>(id!("target_host"))?).as_str(),
)
.map_err(|err| error!("Failed parsing host: {}", err))?,
)
.client_name(options.aref::<String>("client_name")?)
.client_version(options.aref::<String>("client_version")?)
.headers(Some(options.aref("rpc_metadata")?))
.api_key(options.aref("api_key")?)
.identity(options.aref("identity")?);
if let Some(tls) = options.child("tls")? {
.client_name(options.member::<String>(id!("client_name"))?)
.client_version(options.member::<String>(id!("client_version"))?)
.headers(Some(options.member(id!("rpc_metadata"))?))
.api_key(options.member(id!("api_key"))?)
.identity(options.member(id!("identity"))?);
if let Some(tls) = options.child(id!("tls"))? {
opts_build.tls_cfg(TlsConfig {
client_tls_config: match (
tls.aref::<Option<RString>>("client_cert")?,
tls.aref::<Option<RString>>("client_private_key")?,
tls.member::<Option<RString>>(id!("client_cert"))?,
tls.member::<Option<RString>>(id!("client_private_key"))?,
) {
(None, None) => None,
(Some(client_cert), Some(client_private_key)) => Some(ClientTlsConfig {
Expand All @@ -109,38 +111,38 @@ impl Client {
}
},
server_root_ca_cert: tls
.aref::<Option<RString>>("server_root_ca_cert")?
.member::<Option<RString>>(id!("server_root_ca_cert"))?
.map(|rstr| unsafe { rstr.as_slice().to_vec() }),
domain: tls.aref("domain")?,
domain: tls.member(id!("domain"))?,
});
}
let rpc_retry = options
.child("rpc_retry")?
.child(id!("rpc_retry"))?
.ok_or_else(|| error!("Missing rpc_retry"))?;
opts_build.retry_config(RetryConfig {
initial_interval: Duration::from_millis(rpc_retry.aref("initial_interval_ms")?),
randomization_factor: rpc_retry.aref("randomization_factor")?,
multiplier: rpc_retry.aref("multiplier")?,
max_interval: Duration::from_millis(rpc_retry.aref("max_interval_ms")?),
max_elapsed_time: match rpc_retry.aref::<u64>("max_elapsed_time_ms")? {
initial_interval: Duration::from_millis(rpc_retry.member(id!("initial_interval_ms"))?),
randomization_factor: rpc_retry.member(id!("randomization_factor"))?,
multiplier: rpc_retry.member(id!("multiplier"))?,
max_interval: Duration::from_millis(rpc_retry.member(id!("max_interval_ms"))?),
max_elapsed_time: match rpc_retry.member::<u64>(id!("max_elapsed_time_ms"))? {
// 0 means none
0 => None,
val => Some(Duration::from_millis(val)),
},
max_retries: rpc_retry.aref("max_retries")?,
max_retries: rpc_retry.member(id!("max_retries"))?,
});
if let Some(keep_alive) = options.child("keep_alive")? {
if let Some(keep_alive) = options.child(id!("keep_alive"))? {
opts_build.keep_alive(Some(ClientKeepAliveConfig {
interval: Duration::from_millis(keep_alive.aref("interval_ms")?),
timeout: Duration::from_millis(keep_alive.aref("timeout_ms")?),
interval: Duration::from_millis(keep_alive.member(id!("interval_ms"))?),
timeout: Duration::from_millis(keep_alive.member(id!("timeout_ms"))?),
}));
}
if let Some(proxy) = options.child("http_connect_proxy")? {
if let Some(proxy) = options.child(id!("http_connect_proxy"))? {
opts_build.http_connect_proxy(Some(HttpConnectProxyOptions {
target_addr: proxy.aref("target_host")?,
target_addr: proxy.member(id!("target_host"))?,
basic_auth: match (
proxy.aref::<Option<String>>("basic_auth_user")?,
proxy.aref::<Option<String>>("basic_auth_user")?,
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
proxy.member::<Option<String>>(id!("basic_auth_user"))?,
) {
(None, None) => None,
(Some(user), Some(pass)) => Some((user, pass)),
Expand All @@ -165,21 +167,14 @@ impl Client {
},
move |ruby, result: Result<CoreClient, ClientInitError>| {
let block = ruby.get_inner(block);
match result {
Ok(core) => {
let _: Value = block
.call((Client {
core,
runtime_handle,
},))
.expect("Block call failed");
}
Err(err) => {
let _: Value = block
.call((new_error!("Failed client connect: {}", err),))
.expect("Block call failed");
}
let _: Value = match result {
Ok(core) => block.call((Client {
core,
runtime_handle,
},))?,
Err(err) => block.call((new_error!("Failed client connect: {}", err),))?,
};
Ok(())
},
);
Ok(())
Expand All @@ -195,12 +190,12 @@ impl Client {
>(
args.keywords,
&[
"service",
"rpc",
"request",
"rpc_retry",
"rpc_metadata",
"rpc_timeout_ms",
id!("service"),
id!("rpc"),
id!("request"),
id!("rpc_retry"),
id!("rpc_metadata"),
id!("rpc_timeout_ms"),
],
&[],
)?
Expand All @@ -211,6 +206,7 @@ impl Client {
retry,
metadata,
timeout_ms,
_not_send_sync: PhantomData,
};
let block = Opaque::from(args.block);
match service {
Expand Down Expand Up @@ -261,6 +257,11 @@ struct RpcCall<'a> {
retry: bool,
metadata: HashMap<String, String>,
timeout_ms: u64,

// This RPC call contains an unsafe reference to Ruby bytes that does not
// outlive the call, so we prevent it from being sent to another thread.
// !Send/!Sync not yet stable: https://github.com/rust-lang/rust/issues/68318
_not_send_sync: PhantomData<*const ()>,
}

impl RpcCall<'_> {
Expand Down Expand Up @@ -295,19 +296,13 @@ where
async move { fut.await.map(|msg| msg.get_ref().encode_to_vec()) },
move |ruby, result| {
let block = ruby.get_inner(block);
match result {
Ok(val) => {
// TODO(cretz): Any reasonable way to prevent byte copy?
let _: Value = block
.call((RString::from_slice(&val),))
.expect("Block call failed");
}
Err(status) => {
let _: Value = block
.call((RpcFailure { status },))
.expect("Block call failed");
}
let _: Value = match result {
// TODO(cretz): Any reasonable way to prevent byte copy that is just going to get decoded into proto
// object?
Comment on lines +300 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There probably is, but it might also be more effort than it's worth. You'd have to pass around some explicit lifetime-having struct here or something. Hard for me to say without playing with the code myself but I could try it out if you like

Copy link
Member Author

@cretz cretz Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I found a way with rb_str_new_static (which magnus doesn't expose directly, but uses for static literals in r_string! IIUC). To do it safely, I'd have to probably unhook it from Rust (i.e. in a forgotten box) and put it back for dropping on RString GC. Can't even have a predictable time it could be dropped because I think Ruby GC has certain expectations here even after use.

But yeah it's a dangerous lifetime game to play save these cycles. I'll consider it premature optimization for now and we can come back later if needed.

Ok(val) => block.call((RString::from_slice(&val),))?,
Err(status) => block.call((RpcFailure { status },))?,
};
Ok(())
},
);
Ok(())
Expand Down
8 changes: 8 additions & 0 deletions temporalio/ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ macro_rules! new_error {
};
}

#[macro_export]
macro_rules! id {
($str:expr) => {{
static VAL: magnus::value::LazyId = magnus::value::LazyId::new($str);
*VAL
}};
}

#[magnus::init]
fn init(ruby: &Ruby) -> Result<(), Error> {
Lazy::force(&ROOT_ERR, ruby);
Expand Down
Loading