diff --git a/README.md b/README.md index d17f2bb5..f19b5964 100644 --- a/README.md +++ b/README.md @@ -75,6 +75,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub - [Rails](#rails) - [ActiveRecord](#activerecord) - [Lazy/Eager Loading](#lazyeager-loading) + - [Forking](#forking) - [Ractors](#ractors) - [Platform Support](#platform-support) - [Development](#development) @@ -1202,6 +1203,16 @@ workflow at the top of the file. Note, this only affects non-production environments. +### Forking + +Objects created with the Temporal Ruby SDK cannot be used across forks. This includes runtimes, clients, and workers. By +default, using `Client.connect` uses `Runtime.default` which is lazily created. If it was already created on the parent, +an exception will occur when trying to reuse it to create clients or workers in a forked child. Similarly any RPC +invocation or worker execution inside of a forked child separate from where the runtime or client or worker were created +will raise an exception. + +If forking must be used, make sure Temporal objects are only created _inside_ the fork. + ### Ractors It was an original goal to have workflows actually be Ractors for deterministic state isolation and have the library diff --git a/temporalio/ext/src/client.rs b/temporalio/ext/src/client.rs index b3d05d17..e40bbfea 100644 --- a/temporalio/ext/src/client.rs +++ b/temporalio/ext/src/client.rs @@ -83,6 +83,7 @@ macro_rules! rpc_call { impl Client { pub fn async_new(runtime: &Runtime, options: Struct, queue: Value) -> Result<(), Error> { + runtime.handle.fork_check("create client")?; // Build options let mut opts_build = ClientOptionsBuilder::default(); let tls = options.child(id!("tls"))?; @@ -191,6 +192,7 @@ impl Client { } pub fn async_invoke_rpc(&self, args: &[Value]) -> Result<(), Error> { + self.runtime_handle.fork_check("use client")?; let args = scan_args::scan_args::<(), (), (), (), _, ()>(args)?; let (service, rpc, request, retry, metadata, timeout, cancel_token, queue) = scan_args::get_kwargs::< diff --git a/temporalio/ext/src/runtime.rs b/temporalio/ext/src/runtime.rs index cf446fdb..db915027 100644 --- a/temporalio/ext/src/runtime.rs +++ b/temporalio/ext/src/runtime.rs @@ -46,6 +46,7 @@ pub struct Runtime { #[derive(Clone)] pub(crate) struct RuntimeHandle { + pub(crate) pid: u32, pub(crate) core: Arc, pub(crate) async_command_tx: Sender, } @@ -178,6 +179,7 @@ impl Runtime { channel(); Ok(Self { handle: RuntimeHandle { + pid: std::process::id(), core: Arc::new(core), async_command_tx, }, @@ -258,4 +260,17 @@ impl RuntimeHandle { ))); }); } + + pub(crate) fn fork_check(&self, action: &'static str) -> Result<(), Error> { + if self.pid != std::process::id() { + Err(error!( + "Cannot {} across forks (original runtime PID is {}, current is {})", + action, + self.pid, + std::process::id() + )) + } else { + Ok(()) + } + } } diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 3b178711..baf3b3dc 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -96,6 +96,8 @@ struct PollResult { impl Worker { pub fn new(client: &Client, options: Struct) -> Result { + client.runtime_handle.fork_check("create worker")?; + enter_sync!(client.runtime_handle); let activity = options.member::(id!("activity"))?; @@ -168,6 +170,8 @@ impl Worker { .runtime_handle .clone(); + runtime.fork_check("use worker")?; + // Create streams of poll calls let worker_streams = workers .into_iter() diff --git a/temporalio/lib/temporalio/versioning_override.rb b/temporalio/lib/temporalio/versioning_override.rb index 719cad8e..4a5179ca 100644 --- a/temporalio/lib/temporalio/versioning_override.rb +++ b/temporalio/lib/temporalio/versioning_override.rb @@ -31,10 +31,11 @@ def initialize(version) # @!visibility private def _to_proto - Temporalio::Api::Workflow::V1::VersioningOverride.new( - behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED, + Api::Workflow::V1::VersioningOverride.new( + behavior: Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED, pinned_version: @version.to_canonical_string, - pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new( + pinned: Api::Workflow::V1::VersioningOverride::PinnedOverride.new( + behavior: Api::Workflow::V1::VersioningOverride::PinnedOverrideBehavior::PINNED_OVERRIDE_BEHAVIOR_PINNED, version: @version._to_proto ) ) @@ -45,8 +46,8 @@ def _to_proto class AutoUpgrade < VersioningOverride # @!visibility private def _to_proto - Temporalio::Api::Workflow::V1::VersioningOverride.new( - behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE, + Api::Workflow::V1::VersioningOverride.new( + behavior: Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE, auto_upgrade: true ) end diff --git a/temporalio/test/client_test.rb b/temporalio/test/client_test.rb index 2bb5962c..ac8861e1 100644 --- a/temporalio/test/client_test.rb +++ b/temporalio/test/client_test.rb @@ -3,6 +3,7 @@ require 'async' require 'temporalio/client' require 'temporalio/testing' +require 'temporalio/worker' require 'test' class ClientTest < Test @@ -144,4 +145,93 @@ def test_interceptor assert_equal(%w[list_workflow_page count_workflows], track.calls.map(&:first)) end end + + class SimpleWorkflow < Temporalio::Workflow::Definition + def execute(name) + "Hello, #{name}!" + end + end + + def test_fork + # Cannot use client on other side of fork from where created + pre_fork_client = env.client + pid = fork do + pre_fork_client.start_workflow( + 'some-workflow', id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}" + ) + rescue Temporalio::Internal::Bridge::Error => e + exit! 123 if e.message.start_with?('Cannot use client across forks') + raise + end + _, status = Process.wait2(pid) + assert_equal 123, status.exitstatus + + # Cannot create client on other side of fork from runtime + pid = fork do + Temporalio::Client.connect(env.client.options.connection.target_host, env.client.options.namespace) + rescue Temporalio::Internal::Bridge::Error => e + exit! 234 if e.message.start_with?('Cannot create client across forks') + raise + end + _, status = Process.wait2(pid) + assert_equal 234, status.exitstatus + + # Cannot create worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here + # so we use a pipe to relay back + reader, writer = IO.pipe + pid = fork do + reader.close + Temporalio::Worker.new( + client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow] + ) + writer.puts 'success' + rescue Temporalio::Internal::Bridge::Error => e + writer.puts e.message.start_with?('Cannot create worker across forks') ? 'fork-fail' : 'fail' + exit! + end + Process.wait2(pid) + writer.close + assert_equal 'fork-fail', reader.read.strip + + # Cannot use worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here + # so we use a pipe to relay back + pre_fork_worker = Temporalio::Worker.new( + client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow] + ) + reader, writer = IO.pipe + pid = fork do + reader.close + pre_fork_worker.run + writer.puts 'success' + rescue Temporalio::Internal::Bridge::Error => e + writer.puts e.message.start_with?('Cannot use worker across forks') ? 'fork-fail' : 'fail' + exit! + end + Process.wait2(pid) + writer.close + assert_equal 'fork-fail', reader.read.strip + + # But use of a client in the fork with its own runtime is fine + reader, writer = IO.pipe + pid = fork do + reader.close + client = Temporalio::Client.connect( + env.client.options.connection.target_host, + env.client.options.namespace, + runtime: Temporalio::Runtime.new, + logger: Logger.new($stdout) + ) + handle = client.start_workflow( + SimpleWorkflow, 'some-user', + id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}" + ) + writer.puts('started workflow') + handle.terminate + exit! 0 + end + _, status = Process.wait2(pid) + writer.close + assert status.success? + assert_equal 'started workflow', reader.read.strip + end end diff --git a/temporalio/test/test.rb b/temporalio/test/test.rb index b438db73..0a779a83 100644 --- a/temporalio/test/test.rb +++ b/temporalio/test/test.rb @@ -146,7 +146,6 @@ def initialize if ENV['TEMPORAL_TEST_CLIENT_TARGET_HOST'].blank? @server = Temporalio::Testing::WorkflowEnvironment.start_local( logger: Logger.new($stdout), - dev_server_download_version: 'v1.3.1-priority.0', dev_server_extra_args: [ # Allow continue as new to be immediate '--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"', diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index e9184fc1..57764418 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -2282,6 +2282,10 @@ def do_http_call end def test_unsafe_io + # Ruby 3.2 on macOS has Fiber scheduling issues with IO.select + major, minor = RUBY_VERSION.split('.').take(2).map(&:to_i) + return if major.nil? || major != 3 || minor.nil? || minor < 3 + # Not allowed by default execute_workflow(UnsafeIOWorkflow, false) do |handle| assert_eventually_task_fail(handle:, message_contains: 'Cannot perform IO from inside a workflow')