Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,7 @@ Some things to note about the above code:

* A timer is represented by `Temporalio::Workflow.sleep`.
* Timers are also started on `Temporalio::Workflow.timeout`.
* _Technically_ `Kernel.sleep` and `Timeout.timeout` also delegate to the above calls, but the more explicit workflow
forms are encouraged because they accept more options and are not subject to Ruby standard library implementation
changes.
* `Kernel.sleep` and `Timeout.timeout` are considered illegal by default.
* Each timer accepts a `Cancellation`, but if none is given, it defaults to `Temporalio::Workflow.cancellation`.
* `Temporalio::Workflow.wait_condition` accepts a block that waits until the evaluated block result is truthy, then
returns the value.
Expand All @@ -586,7 +584,10 @@ Some things to note about the above code:
#### Workflow Fiber Scheduling and Cancellation

Workflows are backed by a custom, deterministic `Fiber::Scheduler`. All fiber calls inside a workflow use this scheduler
to ensure coroutines run deterministically.
to ensure coroutines run deterministically. Although this means that `Kernel.sleep` and `Mutex` and such should work and
since they are Fiber-aware, Temporal intentionally disables their use by default to prevent accidental use. See
"Workflow Logic Constraints" and "Advanced Workflow Safety and Escaping" for more details, and see "Workflow Utilities"
for alternatives.

Every workflow contains a `Temporalio::Cancellation` at `Temporalio::Workflow.cancellation`. This is canceled when the
workflow is canceled. For all workflow calls that accept a cancellation token, this is the default. So if a workflow is
Expand Down Expand Up @@ -678,6 +679,9 @@ from workflows including:
nil key for dynamic). `[]=` or `store` can be called on these to update the handlers, though defined handlers are
encouraged over runtime-set ones.

There are also classes for `Temporalio::Workflow::Mutex`, `Temporalio::Workflow::Queue`, and
`Temporalio::Workflow::SizedQueue` that are workflow-safe wrappers around the standard library forms.

`Temporalio::Workflow::ContinueAsNewError` can be raised to continue-as-new the workflow. It accepts positional args and
defaults the workflow to the same as the current, though it can be changed with the `workflow` kwarg. See API
documentation for other details.
Expand Down Expand Up @@ -714,15 +718,15 @@ Ruby workflows. This means there are several things workflows cannot do such as:

* Perform IO (network, disk, stdio, etc)
* Access/alter external mutable state
* Do any threading
* Do any threading or blocking calls
* Do anything using the system clock (e.g. `Time.Now`)
* Make any random calls
* Make any not-guaranteed-deterministic calls

This means you can't even call `puts` or logger calls outside of `Temporalio::Workflow.logger` because they use mutexes
which may be hit during periods of high-contention, but they are not completely disabled since users may do quick
debugging with them. See the [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping) section if
needing to work around this.
This means you can't even use logger calls outside of `Temporalio::Workflow.logger` because they use mutexes which may
be hit during periods of high-contention, but they are not completely disabled since users may do quick debugging with
them. See the [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping) section if needing to work
around this.

#### Workflow Testing

Expand Down Expand Up @@ -928,24 +932,22 @@ See the `WorkflowReplayer` API documentation for more details.

#### Advanced Workflow Safety and Escaping

Workflows use a custom fiber scheduler to make things like certain blocking calls and timeouts durable. There is also
call tracing to prevent accidentally making illegal workflow calls. But sometimes in advanced situations, workarounds
may be needed. This section describes advanced situations working with the workflow Fiber scheduler and illegal call
tracer.
Workflows use a custom fiber scheduler to make fibers durable. There is also call tracing to prevent accidentally making
illegal workflow calls. But sometimes in advanced situations, workarounds may be needed. This section describes advanced
situations working with the workflow Fiber scheduler and illegal call tracer.

##### Durable Fiber Scheduler

The custom fiber scheduler that powers workflows makes otherwise-local, blocking things durable. This is why `sleep` and
`Timeout.timeout` and `Queue` and other things work durably. However, there are cases where it may be desired for these
to work locally inside a workflow such as for logging or `puts` or other side-effecting, known-non-deterministic
aspects.
By default, Temporal considers `Logger`, `sleep`, `Timeout.timeout`, `Queue`, etc illegal. However, there are cases
where it may be desired for these to work locally inside a workflow such as for logging or other side-effecting,
known-non-deterministic aspects.

Users can pass a block to `Temporalio::Workflow::Unsafe.durable_scheduler_disabled` to not use the durable scheduler.
This should be used any time the scheduler needs to be bypassed, e.g. for local stdout. Not doing this can cause
workflows to get hung in high contention situations. For instance, if there is a `puts` or a logger (that isn't the
safe-to-use `Temporalio::Workflow.logger`) in a workflow, _technically_ Ruby surrounds the IO writes with a mutex and
in extreme high contention that mutex may durably block and then the workflow task may complete causing hung workflows
because no event comes to wake the mutex.
workflows to get hung in high contention situations. For instance, if there is a logger (that isn't the safe-to-use
`Temporalio::Workflow.logger`) in a workflow, _technically_ Ruby surrounds the IO writes with a mutex and in extreme
high contention that mutex may durably block and then the workflow task may complete causing hung workflows because no
event comes to wake the mutex.

Also, by default anything that relies on IO wait that is not inside `durable_scheduler_disabled` will fail. It is
recommended to put things that need this in `durable_scheduler_disabled`, but if the durable scheduler is still needed
Expand All @@ -957,9 +959,9 @@ Note `durable_scheduler_disabled` implies `illegal_call_tracing_disabled` (see n

##### Illegal Call Tracing

Ruby workflow threads employ a `TracePoint` to catch illegal calls such as `Time.now` or `Thread.new`. The set of
illegal calls can be configured via the `illegal_workflow_calls` parameter when creating a worker. The default set is at
`Temporalio::Worker.default_illegal_workflow_calls`.
Ruby workflow threads employ a `TracePoint` to catch illegal calls such as `sleep` or `Time.now` or `Thread.new`. The
set of illegal calls can be configured via the `illegal_workflow_calls` parameter when creating a worker. The default
set is at `Temporalio::Worker.default_illegal_workflow_calls`.

When an illegal call is encountered, an exception is thrown. In advanced cases there may be a need to allow an illegal
call that is known to be used deterministically. This code can be in a block passed to
Expand Down
28 changes: 16 additions & 12 deletions temporalio/lib/temporalio/cancellation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Cancellation
def initialize(*parents)
@canceled = false
@canceled_reason = nil
@canceled_mutex = Mutex.new
@canceled_mutex = Workflow::Unsafe.illegal_call_tracing_disabled { Mutex.new }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use the workflow mutex here? Is it also used from non-workflow places?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is a general purpose "cancellation token" that is also used for activities (which is why it's not in the Temporalio::Workflow module, but rather here in the top level one)

@canceled_cond_var = nil
@cancel_callbacks = {} # Keyed by sentinel value, but value iteration still is deterministic
@shield_depth = 0
Expand All @@ -28,22 +28,22 @@ def initialize(*parents)

# @return [Boolean] Whether this cancellation is canceled.
def canceled?
@canceled_mutex.synchronize { @canceled }
canceled_mutex_synchronize { @canceled }
end

# @return [String, nil] Reason for cancellation. Can be nil if not canceled or no reason provided.
def canceled_reason
@canceled_mutex.synchronize { @canceled_reason }
canceled_mutex_synchronize { @canceled_reason }
end

# @return [Boolean] Whether a cancel is pending but currently shielded.
def pending_canceled?
@canceled_mutex.synchronize { !@shield_pending_cancel.nil? }
canceled_mutex_synchronize { !@shield_pending_cancel.nil? }
end

# @return [String, nil] Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.
def pending_canceled_reason
@canceled_mutex.synchronize { @shield_pending_cancel&.first }
canceled_mutex_synchronize { @shield_pending_cancel&.first }
end

# Raise an error if this cancellation is canceled.
Expand Down Expand Up @@ -71,13 +71,13 @@ def wait
return
end

@canceled_mutex.synchronize do
canceled_mutex_synchronize do
break if @canceled

# Add cond var if not present
if @canceled_cond_var.nil?
@canceled_cond_var = ConditionVariable.new
@cancel_callbacks[Object.new] = proc { @canceled_mutex.synchronize { @canceled_cond_var.broadcast } }
@cancel_callbacks[Object.new] = proc { canceled_mutex_synchronize { @canceled_cond_var.broadcast } }
end

# Wait on it
Expand All @@ -94,10 +94,10 @@ def wait
def shield
raise ArgumentError, 'Block required' unless block_given?

@canceled_mutex.synchronize { @shield_depth += 1 }
canceled_mutex_synchronize { @shield_depth += 1 }
yield
ensure
callbacks_to_run = @canceled_mutex.synchronize do
callbacks_to_run = canceled_mutex_synchronize do
@shield_depth -= 1
if @shield_depth.zero? && @shield_pending_cancel
reason = @shield_pending_cancel.first
Expand All @@ -120,7 +120,7 @@ def shield
def add_cancel_callback(&block)
raise ArgumentError, 'Must provide block' unless block_given?

callback_to_run_immediately, key = @canceled_mutex.synchronize do
callback_to_run_immediately, key = canceled_mutex_synchronize do
break [block, nil] if @canceled

key = Object.new
Expand All @@ -135,7 +135,7 @@ def add_cancel_callback(&block)
#
# @param key [Object] Key returned from {add_cancel_callback}.
def remove_cancel_callback(key)
@canceled_mutex.synchronize do
canceled_mutex_synchronize do
@cancel_callbacks.delete(key)
end
nil
Expand All @@ -144,7 +144,7 @@ def remove_cancel_callback(key)
private

def on_cancel(reason:)
callbacks_to_run = @canceled_mutex.synchronize do
callbacks_to_run = canceled_mutex_synchronize do
# If we're shielding, set as pending and return nil
if @shield_depth.positive?
@shield_pending_cancel = [reason]
Expand All @@ -166,5 +166,9 @@ def prepare_cancel(reason:)
@cancel_callbacks.clear
to_return.values
end

def canceled_mutex_synchronize(&)
Workflow::Unsafe.illegal_call_tracing_disabled { @canceled_mutex.synchronize(&) }
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def self.frozen_validated_illegal_calls(illegal_calls)
# @type var fixed_val: :all | Worker::IllegalWorkflowCallValidator | Hash[Symbol, TrueClass | Worker::IllegalWorkflowCallValidator] # rubocop:disable Layout/LineLength
fixed_val = case val
when Temporalio::Worker::IllegalWorkflowCallValidator
if sub_val.method_name
if val.method_name
raise ArgumentError,
'Top level IllegalWorkflowCallValidator instances cannot have method name'
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def block(_blocker, timeout = nil)
# We just yield because unblock will resume this. We will just wrap in timeout if needed.
if timeout
begin
Timeout.timeout(timeout) { Fiber.yield }
Workflow.timeout(timeout) { Fiber.yield }
true
rescue Timeout::Error
false
Expand Down
11 changes: 10 additions & 1 deletion temporalio/lib/temporalio/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ def self.default_illegal_workflow_calls
#:write
],
'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines
spawn srand system test trap],
sleep spawn srand system test trap],
# Loggers use mutexes in ways that can hang workflows, so users need to disable the durable scheduler to use
# them
'Logger' => :all,
'Monitor' => :all,
'Net::HTTP' => :all,
'Pathname' => :all,
# TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace
Expand All @@ -282,9 +286,14 @@ def self.default_illegal_workflow_calls
'Signal' => :all,
'Socket' => :all,
'Tempfile' => :all,
'Timeout' => :all,
'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass
pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run
terminate thread_variable_set wakeup],
'Thread::ConditionVariable' => :all,
'Thread::Mutex' => IllegalWorkflowCallValidator.known_safe_mutex_validator,
'Thread::SizedQueue' => :all,
'Thread::Queue' => :all,
'Time' => IllegalWorkflowCallValidator.default_time_validators
} #: Hash[String, :all | Array[Symbol]]
hash.each_value(&:freeze)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ def self.default_time_validators
]
end

# @return [IllegalWorkflowCallValidator] Workflow call validator that is tailored to disallow most Mutex calls,
# but let others through for certain situations.
def self.known_safe_mutex_validator
@known_safe_mutex_validator ||= IllegalWorkflowCallValidator.new do
# Only Google Protobuf use of Mutex is known to be safe, fail unless any caller location label starts with
# 'Google::Protobuf'
raise 'disallowed' unless caller_locations&.any? { |loc| loc.label&.start_with?('Google::Protobuf') }
end
end

# @return [String, nil] Method name if this validator is specific to a method.
attr_reader :method_name

Expand Down
74 changes: 71 additions & 3 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -537,28 +537,72 @@ def self.replaying?
# Run a block of code with illegal call tracing disabled. Users should be cautious about using this as it can
# often signify unsafe code.
#
# If this is invoked outside of a workflow, it just runs the block.
#
# @yield Block to run with call tracing disabled
#
# @return [Object] Result of the block.
def self.illegal_call_tracing_disabled(&)
Workflow._current.illegal_call_tracing_disabled(&)
if Workflow.in_workflow?
Workflow._current.illegal_call_tracing_disabled(&)
else
yield
end
end

# Run a block of code with IO enabled. Specifically this allows the `io_wait` call of the fiber scheduler to work.
# Users should be cautious about using this as it can often signify unsafe code. Note, this is often only
# applicable to network code as file IO and most process-based IO does not go through scheduler `io_wait`.
#
# If this is invoked outside of a workflow, it just runs the block.
def self.io_enabled(&)
Workflow._current.io_enabled(&)
if Workflow.in_workflow?
Workflow._current.io_enabled(&)
else
yield
end
end

# Run a block of code with the durable/deterministic workflow Fiber scheduler off. This means fallback to default
# fiber scheduler and no workflow helpers will be available in the block. This is usually only needed in advanced
# situations where a third party library does something like use "Timeout" in a way that shouldn't be made
# durable.
#
# If this is invoked outside of a workflow, it just runs the block.
#
# This implies {illegal_call_tracing_disabled}.
def self.durable_scheduler_disabled(&)
Workflow._current.durable_scheduler_disabled(&)
if Workflow.in_workflow?
Workflow._current.durable_scheduler_disabled(&)
else
yield
end
end

# @!visibility private
def self._wrap_ruby_class_as_legal(target_class)
Class.new do
define_method(:initialize) do |*args, **kwargs, &block|
@underlying = Unsafe.illegal_call_tracing_disabled do
target_class.new(*args, **kwargs, &block) # steep:ignore
end
end

# @!visibility private
def method_missing(name, ...)
if @underlying.respond_to?(name)
# Call with tracing disabled
Unsafe.illegal_call_tracing_disabled { @underlying.public_send(name, ...) }
else
super
end
end

# @!visibility private
def respond_to_missing?(name, include_all = false)
@underlying.respond_to?(name, include_all) || super
end
end
end
end

Expand Down Expand Up @@ -625,5 +669,29 @@ class InvalidWorkflowStateError < Error; end
# error can still be used with configuring workflow failure exception types to change non-deterministic errors from
# task failures to workflow failures.
class NondeterminismError < Error; end

# Mutex is a workflow-safe wrapper around {::Mutex}.
#
# As of this writing, all methods on Mutex are safe for workflow use and are implicitly made deterministic by the
# Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of Mutex by
# non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach as a mutex
# (e.g. wait until a certain attribute is set to false then set it to true before continuing).
Mutex = Unsafe._wrap_ruby_class_as_legal(::Mutex)

# Queue is a workflow-safe wrapper around {::Queue}.
#
# As of this writing, all methods on Queue are safe for workflow use and are implicitly made deterministic by the
# Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of Queue by
# non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach as a queue
# (e.g. wait until an array is non-empty before continuing).
Queue = Unsafe._wrap_ruby_class_as_legal(::Queue)

# SizedQueue is a workflow-safe wrapper around {::SizedQueue}.
#
# As of this writing, all methods on SizedQueue are safe for workflow use and are implicitly made deterministic by
# the Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of
# SizedQueue by non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach
# as a queue (e.g. wait until an array is non-empty before continuing).
SizedQueue = Unsafe._wrap_ruby_class_as_legal(::SizedQueue)
Comment on lines +679 to +695
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried that just wrapping and re-exporting leaves us vulnerable to a change in the underlying implementation causing NDE. Maybe it would be better to just not export these at all and push people harder towards using wait condition, or otherwise explicitly opt-in to unsafe usage at the workflow code level

Copy link
Member Author

@cretz cretz Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think recently as part of temporalio/features#500 we had decided to make at least mutex/lock-or-sempahore a workflow primitive in every SDK even if they do just wrap wait condition. The main reason for disallowing stdlib constructs was less about fear of Ruby changing its stdlib, and more about accidental use of these now-durable constructs in places that aren't meant to be durable.

I think if we remove these, we 1) would need to add Mutex back anyways but hand-written, and 2) accept that there may be envy towards Java's WorkflowQueue or Python's asyncio.Queue.

Having said that, I can remove these and make a homemade Mutex if we want to. No super strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we had firsthand experience of that exact thing happening in Python, though? If we're not worried about it changing here, in Ruby (and we can substantiate why that's unlikely) I'm fine with it. Otherwise I'd suggest the handwritten Mutex.

Copy link
Member Author

@cretz cretz Aug 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We luckily haven't had that happen in Python yet exactly. We did have temporalio/sdk-python#429 (and temporalio/sdk-python#518) because of Python async primitives having non-deterministic behavior. That would be akin to Ruby Fiber helpers changing how it called scheduler functions, which would be trouble for us no matter what with these utilities.

Queue is quite heavily used in Ruby and would like not to have to match the Ruby interface with our own (it supports timeouts which we make durable implicitly as well). I think there is not a high risk of the basic Fiber scheduler calls these utilities make changing. I think it's unlikely because these very popular primitives are quite stable and they only make like one obvious Fiber scheduler call each (e.g. block or unblock) and the Fiber::Scheduler interface is quite stable as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, sounds good to me. We can proceed with the wrappers and re-evaluate later if necessary.

end
end
3 changes: 2 additions & 1 deletion temporalio/sig/temporalio/cancellation.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ module Temporalio
def check!: (?Exception err) -> void
def to_ary: -> [Cancellation, Proc]
def wait: -> void
def shield: [T] { (?) -> untyped } -> T
def shield: [T] { (?) -> T } -> T
def add_cancel_callback: { -> untyped } -> Object
def remove_cancel_callback: (Object key) -> void

private def on_cancel: (reason: Object?) -> void
private def prepare_cancel: (reason: Object?) -> Array[Proc]?
private def canceled_mutex_synchronize: [T] { (?) -> T } -> T
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Temporalio
end

def self.default_time_validators: -> Array[IllegalWorkflowCallValidator]
def self.known_safe_mutex_validator: -> IllegalWorkflowCallValidator

attr_reader method_name: Symbol?
attr_reader block: ^(CallInfo) -> void
Expand Down
Loading
Loading