Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub
- [Rails](#rails)
- [ActiveRecord](#activerecord)
- [Lazy/Eager Loading](#lazyeager-loading)
- [Forking](#forking)
- [Ractors](#ractors)
- [Platform Support](#platform-support)
- [Development](#development)
Expand Down Expand Up @@ -1202,6 +1203,16 @@ workflow at the top of the file.

Note, this only affects non-production environments.

### Forking

Objects created with the Temporal Ruby SDK cannot be used across forks. This includes runtimes, clients, and workers. By
default, using `Client.connect` uses `Runtime.default` which is lazily created. If it was already created on the parent,
an exception will occur when trying to reuse it to create clients or workers in a forked child. Similarly any RPC
invocation or worker execution inside of a forked child separate from where the runtime or client or worker were created
will raise an exception.

If forking must be used, make sure Temporal objects are only created _inside_ the fork.

### Ractors

It was an original goal to have workflows actually be Ractors for deterministic state isolation and have the library
Expand Down
2 changes: 2 additions & 0 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ macro_rules! rpc_call {

impl Client {
pub fn async_new(runtime: &Runtime, options: Struct, queue: Value) -> Result<(), Error> {
runtime.handle.fork_check("create client")?;
// Build options
let mut opts_build = ClientOptionsBuilder::default();
let tls = options.child(id!("tls"))?;
Expand Down Expand Up @@ -191,6 +192,7 @@ impl Client {
}

pub fn async_invoke_rpc(&self, args: &[Value]) -> Result<(), Error> {
self.runtime_handle.fork_check("use client")?;
let args = scan_args::scan_args::<(), (), (), (), _, ()>(args)?;
let (service, rpc, request, retry, metadata, timeout, cancel_token, queue) =
scan_args::get_kwargs::<
Expand Down
15 changes: 15 additions & 0 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct Runtime {

#[derive(Clone)]
pub(crate) struct RuntimeHandle {
pub(crate) pid: u32,
pub(crate) core: Arc<CoreRuntime>,
pub(crate) async_command_tx: Sender<AsyncCommand>,
}
Expand Down Expand Up @@ -178,6 +179,7 @@ impl Runtime {
channel();
Ok(Self {
handle: RuntimeHandle {
pid: std::process::id(),
core: Arc::new(core),
async_command_tx,
},
Expand Down Expand Up @@ -258,4 +260,17 @@ impl RuntimeHandle {
)));
});
}

pub(crate) fn fork_check(&self, action: &'static str) -> Result<(), Error> {
if self.pid != std::process::id() {
Err(error!(
"Cannot {} across forks (original runtime PID is {}, current is {})",
action,
self.pid,
std::process::id()
))
} else {
Ok(())
}
}
}
4 changes: 4 additions & 0 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ struct PollResult {

impl Worker {
pub fn new(client: &Client, options: Struct) -> Result<Self, Error> {
client.runtime_handle.fork_check("create worker")?;

enter_sync!(client.runtime_handle);

let activity = options.member::<bool>(id!("activity"))?;
Expand Down Expand Up @@ -168,6 +170,8 @@ impl Worker {
.runtime_handle
.clone();

runtime.fork_check("use worker")?;

// Create streams of poll calls
let worker_streams = workers
.into_iter()
Expand Down
11 changes: 6 additions & 5 deletions temporalio/lib/temporalio/versioning_override.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ def initialize(version)

# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
Api::Workflow::V1::VersioningOverride.new(
behavior: Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_PINNED,
pinned_version: @version.to_canonical_string,
pinned: Temporalio::Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
pinned: Api::Workflow::V1::VersioningOverride::PinnedOverride.new(
behavior: Api::Workflow::V1::VersioningOverride::PinnedOverrideBehavior::PINNED_OVERRIDE_BEHAVIOR_PINNED,
version: @version._to_proto
)
)
Expand All @@ -45,8 +46,8 @@ def _to_proto
class AutoUpgrade < VersioningOverride
# @!visibility private
def _to_proto
Temporalio::Api::Workflow::V1::VersioningOverride.new(
behavior: Temporalio::Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
Api::Workflow::V1::VersioningOverride.new(
behavior: Api::Enums::V1::VersioningBehavior::VERSIONING_BEHAVIOR_AUTO_UPGRADE,
auto_upgrade: true
)
end
Expand Down
90 changes: 90 additions & 0 deletions temporalio/test/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'async'
require 'temporalio/client'
require 'temporalio/testing'
require 'temporalio/worker'
require 'test'

class ClientTest < Test
Expand Down Expand Up @@ -144,4 +145,93 @@ def test_interceptor
assert_equal(%w[list_workflow_page count_workflows], track.calls.map(&:first))
end
end

class SimpleWorkflow < Temporalio::Workflow::Definition
def execute(name)
"Hello, #{name}!"
end
end

def test_fork
# Cannot use client on other side of fork from where created
pre_fork_client = env.client
pid = fork do
pre_fork_client.start_workflow(
'some-workflow', id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}"
)
rescue Temporalio::Internal::Bridge::Error => e
exit! 123 if e.message.start_with?('Cannot use client across forks')
raise
end
_, status = Process.wait2(pid)
assert_equal 123, status.exitstatus

# Cannot create client on other side of fork from runtime
pid = fork do
Temporalio::Client.connect(env.client.options.connection.target_host, env.client.options.namespace)
rescue Temporalio::Internal::Bridge::Error => e
exit! 234 if e.message.start_with?('Cannot create client across forks')
raise
end
_, status = Process.wait2(pid)
assert_equal 234, status.exitstatus

# Cannot create worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here
# so we use a pipe to relay back
reader, writer = IO.pipe
pid = fork do
reader.close
Temporalio::Worker.new(
client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow]
)
writer.puts 'success'
rescue Temporalio::Internal::Bridge::Error => e
writer.puts e.message.start_with?('Cannot create worker across forks') ? 'fork-fail' : 'fail'
exit!
end
Process.wait2(pid)
writer.close
assert_equal 'fork-fail', reader.read.strip

# Cannot use worker on other side of fork from runtime. For whatever reason, the exit status is overwritten here
# so we use a pipe to relay back
pre_fork_worker = Temporalio::Worker.new(
client: pre_fork_client, task_queue: "tq-#{SecureRandom.uuid}", workflows: [SimpleWorkflow]
)
reader, writer = IO.pipe
pid = fork do
reader.close
pre_fork_worker.run
writer.puts 'success'
rescue Temporalio::Internal::Bridge::Error => e
writer.puts e.message.start_with?('Cannot use worker across forks') ? 'fork-fail' : 'fail'
exit!
end
Process.wait2(pid)
writer.close
assert_equal 'fork-fail', reader.read.strip

# But use of a client in the fork with its own runtime is fine
reader, writer = IO.pipe
pid = fork do
reader.close
client = Temporalio::Client.connect(
env.client.options.connection.target_host,
env.client.options.namespace,
runtime: Temporalio::Runtime.new,
logger: Logger.new($stdout)
)
handle = client.start_workflow(
SimpleWorkflow, 'some-user',
id: "wf-#{SecureRandom.uuid}", task_queue: "tq-#{SecureRandom.uuid}"
)
writer.puts('started workflow')
handle.terminate
exit! 0
end
_, status = Process.wait2(pid)
writer.close
assert status.success?
assert_equal 'started workflow', reader.read.strip
end
end
1 change: 0 additions & 1 deletion temporalio/test/test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ def initialize
if ENV['TEMPORAL_TEST_CLIENT_TARGET_HOST'].blank?
@server = Temporalio::Testing::WorkflowEnvironment.start_local(
logger: Logger.new($stdout),
dev_server_download_version: 'v1.3.1-priority.0',
dev_server_extra_args: [
# Allow continue as new to be immediate
'--dynamic-config-value', 'history.workflowIdReuseMinimalInterval="0s"',
Expand Down
4 changes: 4 additions & 0 deletions temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,10 @@ def do_http_call
end

def test_unsafe_io
# Ruby 3.2 on macOS has Fiber scheduling issues with IO.select
major, minor = RUBY_VERSION.split('.').take(2).map(&:to_i)
return if major.nil? || major != 3 || minor.nil? || minor < 3

# Not allowed by default
execute_workflow(UnsafeIOWorkflow, false) do |handle|
assert_eventually_task_fail(handle:, message_contains: 'Cannot perform IO from inside a workflow')
Expand Down
Loading