Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub
- [Rails](#rails)
- [ActiveRecord](#activerecord)
- [Lazy/Eager Loading](#lazyeager-loading)
- [Forking](#forking)
- [Ractors](#ractors)
- [Platform Support](#platform-support)
- [Development](#development)
Expand Down Expand Up @@ -1202,6 +1203,16 @@ workflow at the top of the file.

Note, this only affects non-production environments.

### Forking

Objects created with the Temporal Ruby SDK cannot be used across forks. This includes runtimes, clients, and workers. By
default, using `Client.connect` uses `Runtime.default` which is lazily created. If it was already created on the parent,
an exception will occur when trying to reuse it to create clients or workers in a forked child. Similarly any RPC
invocation or worker execution inside of a forked child separate from where the runtime or client or worker were created
will raise an exception.

If forking must be used, make sure Temporal objects are only created _inside_ the fork.

### Ractors

It was an original goal to have workflows actually be Ractors for deterministic state isolation and have the library
Expand Down
14 changes: 14 additions & 0 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ macro_rules! rpc_call {

impl Client {
pub fn async_new(runtime: &Runtime, options: Struct, queue: Value) -> Result<(), Error> {
if runtime.handle.pid != std::process::id() {
return Err(error!(
"Cannot create clients across forks (original runtime PID is {}, current is {})",
runtime.handle.pid,
std::process::id()
));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just lift all four of these out to a fork_check(reason: &'static str)

// Build options
let mut opts_build = ClientOptionsBuilder::default();
let tls = options.child(id!("tls"))?;
Expand Down Expand Up @@ -191,6 +198,13 @@ impl Client {
}

pub fn async_invoke_rpc(&self, args: &[Value]) -> Result<(), Error> {
if self.runtime_handle.pid != std::process::id() {
return Err(error!(
"Cannot use clients across forks (original runtime PID is {}, current is {})",
self.runtime_handle.pid,
std::process::id()
));
}
let args = scan_args::scan_args::<(), (), (), (), _, ()>(args)?;
let (service, rpc, request, retry, metadata, timeout, cancel_token, queue) =
scan_args::get_kwargs::<
Expand Down
2 changes: 2 additions & 0 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Runtime {

#[derive(Clone)]
pub(crate) struct RuntimeHandle {
pub(crate) pid: u32,
pub(crate) core: Arc<CoreRuntime>,
pub(crate) async_command_tx: Sender<AsyncCommand>,
}
Expand Down Expand Up @@ -178,6 +179,7 @@ impl Runtime {
channel();
Ok(Self {
handle: RuntimeHandle {
pid: std::process::id(),
core: Arc::new(core),
async_command_tx,
},
Expand Down
16 changes: 16 additions & 0 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ struct PollResult {

impl Worker {
pub fn new(client: &Client, options: Struct) -> Result<Self, Error> {
if client.runtime_handle.pid != std::process::id() {
return Err(error!(
"Cannot create workers across forks (original runtime PID is {}, current is {})",
client.runtime_handle.pid,
std::process::id()
));
}

enter_sync!(client.runtime_handle);

let activity = options.member::<bool>(id!("activity"))?;
Expand Down Expand Up @@ -168,6 +176,14 @@ impl Worker {
.runtime_handle
.clone();

if runtime.pid != std::process::id() {
return Err(error!(
"Cannot use workers across forks (original runtime PID is {}, current is {})",
runtime.pid,
std::process::id()
));
}

// Create streams of poll calls
let worker_streams = workers
.into_iter()
Expand Down
93 changes: 93 additions & 0 deletions temporalio/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'async'
require 'temporalio/client'
require 'temporalio/testing'
require 'temporalio/worker'
require 'test'

class ClientTest < Test
Expand Down Expand Up @@ -144,4 +145,96 @@ def test_interceptor
assert_equal(%w[list_workflow_page count_workflows], track.calls.map(&:first))
end
end

class SimpleWorkflow < Temporalio::Workflow::Definition
def execute(name)
"Hello, #{name}!"
end
end

def test_fork
# Cannot use client on other side of fork from where created
pre_fork_client = env.client
pid = fork do
pre_fork_client.start_workflow(
'some-workflow', id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}"
)
rescue Temporalio::Internal::Bridge::Error => e
exit! 123 if e.message.start_with?('Cannot use clients across forks')
raise
end
_, status = Process.wait2(pid)
assert_equal 123, status.exitstatus

# Cannot create client on other side of fork from runtime
pid = fork do
Temporalio::Client.connect(env.client.options.connection.target_host, env.client.options.namespace)
rescue Temporalio::Internal::Bridge::Error => e
exit! 234 if e.message.start_with?('Cannot create clients across forks')
raise
end
_, status = Process.wait2(pid)
assert_equal 234, status.exitstatus

# Cannot create worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here
# so we use a pipe to relay back
reader, writer = IO.pipe
pid = fork do
reader.close
Temporalio::Worker.new(
client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow]
)
writer.puts 'success'
rescue Temporalio::Internal::Bridge::Error => e
writer.puts e.message.start_with?('Cannot create workers across forks') ? 'fork-fail' : 'fail'
exit!
end
Process.wait2(pid)
writer.close
assert_equal 'fork-fail', reader.read.strip

# Cannot use worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here
# so we use a pipe to relay back
pre_fork_worker = Temporalio::Worker.new(
client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow]
)
reader, writer = IO.pipe
pid = fork do
reader.close
pre_fork_worker.run
writer.puts 'success'
rescue Temporalio::Internal::Bridge::Error => e
writer.puts e.message.start_with?('Cannot use workers across forks') ? 'fork-fail' : 'fail'
exit!
end
Process.wait2(pid)
writer.close
assert_equal 'fork-fail', reader.read.strip

# But use of a client and worker in the fork with their own runtime is fine
reader, writer = IO.pipe
pid = fork do
reader.close
client = Temporalio::Client.connect(
env.client.options.connection.target_host,
env.client.options.namespace,
runtime: Temporalio::Runtime.new
)
worker = Temporalio::Worker.new(
client:, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow]
)
worker.run do
result = client.execute_workflow(
SimpleWorkflow, 'some-user',
id: "wf-#{SecureRandom.uuid}", task_queue: worker.task_queue
)
writer.puts "Workflow result: #{result}"
end
exit! 0
end
_, status = Process.wait2(pid)
writer.close
assert status.success?
assert_equal 'Workflow result: Hello, some-user!', reader.read.strip
end
end
Loading