Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub
- [Manual Time Skipping](#manual-time-skipping)
- [Mocking Activities](#mocking-activities)
- [Workflow Replay](#workflow-replay)
- [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping)
- [Durable Fiber Scheduler](#durable-fiber-scheduler)
- [Illegal Call Tracing](#illegal-call-tracing)
- [Activities](#activities)
- [Activity Definition](#activity-definition)
- [Activity Context](#activity-context)
Expand Down Expand Up @@ -716,6 +719,11 @@ Ruby workflows. This means there are several things workflows cannot do such as:
* Make any random calls
* Make any not-guaranteed-deterministic calls

This means you can't even call `puts` or logger calls outside of `Temporalio::Workflow.logger` because they use mutexes
which may be hit during periods of high-contention, but they are not completely disabled since users may do quick
debugging with them. See the [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping) section if
needing to work around this.

#### Workflow Testing

Workflow testing can be done in an integration-test fashion against a real server. However, it is hard to simulate
Expand Down Expand Up @@ -918,6 +926,46 @@ end

See the `WorkflowReplayer` API documentation for more details.

#### Advanced Workflow Safety and Escaping

Workflows use a custom fiber scheduler to make things like certain blocking calls and timeouts durable. There is also
call tracing to prevent accidentally making illegal workflow calls. But sometimes in advanced situations, workarounds
may be needed. This section describes advanced situations working with the workflow Fiber scheduler and illegal call
tracer.

##### Durable Fiber Scheduler

The custom fiber scheduler that powers workflows makes otherwise-local, blocking things durable. This is why `sleep` and
`Timeout.timeout` and `Queue` and other things work durably. However, there are cases where it may be desired for these
to work locally inside a workflow such as for logging or `puts` or other side-effecting, known-non-deterministic
aspects.

Users can pass a block to `Temporalio::Workflow::Unsafe.durable_scheduler_disabled` to not use the durable scheduler.
This should be used any time the scheduler needs to be bypassed, e.g. for local stdout. Not doing this can cause
workflows to get hung in high contention situations. For instance, if there is a `puts` or a logger (that isn't the
safe-to-use `Temporalio::Workflow.logger`) in a workflow, _technically_ Ruby surrounds the IO writes with a mutex and
in extreme high contention that mutex may durably block and then the workflow task may complete causing hung workflows
because no event comes to wake the mutex.

Also, by default anything that relies on IO wait that is not inside `durable_scheduler_disabled` will fail. It is
recommended to put things that need this in `durable_scheduler_disabled`, but if the durable scheduler is still needed
but IO wait is also needed, then a block passed to `Temporalio::Workflow::Unsafe.io_enabled` can be used.

Note `durable_scheduler_disabled` implies `illegal_call_tracing_disabled` (see next section). Many use of
`durable_scheduler_disabled`, such as for tracing or logging, often surround themselves in a
`unless Temporalio::Workflow.replaying?` block to make sure they don't duplicate the side effects on replay.

##### Illegal Call Tracing

Ruby workflow threads employ a `TracePoint` to catch illegal calls such as `Time.now` or `Thread.new`. The set of
illegal calls can be configured via the `illegal_workflow_calls` parameter when creating a worker. The default set is at
`Temporalio::Worker.default_illegal_workflow_calls`.

When an illegal call is encountered, an exception is thrown. In advanced cases there may be a need to allow an illegal
call that is known to be used deterministically. This code can be in a block passed to
`Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled`. If this has side-effecting behavior that needs to use the
non-durable scheduler, use `durable_scheduler_disabled` instead (which implies this, see previous section).

### Activities

#### Activity Definition
Expand Down
22 changes: 9 additions & 13 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -453,19 +453,15 @@ def self.completed_span(
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)

time = Temporalio::Workflow.now.dup
# Disable illegal call tracing because OTel asks for full exception message which uses error highlighting and
# such which accesses File#path, and they also use loggers accessing current time
Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do
# Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though
# could have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout
# which we don't want to use durable timers.
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: time)
end
# Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though could
# have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout which we
# don't want to use durable timers.
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: time)
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'json'
require 'temporalio/api'
require 'temporalio/converters/payload_converter/encoding'
require 'temporalio/workflow'

module Temporalio
module Converters
Expand All @@ -28,15 +29,31 @@ def encoding

# (see Encoding.to_payload)
def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument
Api::Common::V1::Payload.new(
metadata: { 'encoding' => ENCODING },
data: JSON.generate(value, @generate_options).b
)
# For generate and parse, if we are in a workflow, we need to do this outside of the durable scheduler since
# some things like the recent https://github.com/ruby/json/pull/832 may make illegal File.expand_path calls.
# And other future things may be slightly illegal in JSON generate/parse and we don't want to break everyone
# when it happens.
data = if Temporalio::Workflow.in_workflow?
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
JSON.generate(value, @generate_options).b
end
else
JSON.generate(value, @generate_options).b
end

Api::Common::V1::Payload.new(metadata: { 'encoding' => ENCODING }, data:)
end

# (see Encoding.from_payload)
def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument
JSON.parse(payload.data, @parse_options)
# See comment in to_payload about why we have to do something different in workflow
if Temporalio::Workflow.in_workflow?
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
JSON.parse(payload.data, @parse_options)
end
else
JSON.parse(payload.data, @parse_options)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ def deprecate_patch(patch_id)
end

def durable_scheduler_disabled(&)
prev = Fiber.current_scheduler
illegal_call_tracing_disabled { Fiber.set_scheduler(nil) }
begin
prev = Fiber.scheduler
# Imply illegal call tracing disabled
illegal_call_tracing_disabled do
Fiber.set_scheduler(nil)
yield
ensure
illegal_call_tracing_disabled { Fiber.set_scheduler(prev) }
Fiber.set_scheduler(prev)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ def add(...)
return true
end

# Disable illegal call tracing for the log call
@instance.illegal_call_tracing_disabled { super }
# Disable scheduler since logs technically have local mutexes in them that cannot be done durably or they
# will block workflows
@instance.context.durable_scheduler_disabled do
super
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ def io_wait(io, events, timeout)
unless @instance.io_enabled
raise Workflow::NondeterminismError,
'Cannot perform IO from inside a workflow. If this is known to be safe, ' \
'the code can be run in a Temporalio::Workflow::Unsafe.io_enabled block.'
'the code can be run in a Temporalio::Workflow::Unsafe.durable_scheduler_disabled ' \
'or Temporalio::Workflow::Unsafe.io_enabled block.'
end

# Use regular Ruby behavior of blocking this thread. There is no Ruby implementation of io_wait we can just
Expand Down
2 changes: 2 additions & 0 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,8 @@ def self.io_enabled(&)
# fiber scheduler and no workflow helpers will be available in the block. This is usually only needed in advanced
# situations where a third party library does something like use "Timeout" in a way that shouldn't be made
# durable.
#
# This implies {illegal_call_tracing_disabled}.
def self.durable_scheduler_disabled(&)
Workflow._current.durable_scheduler_disabled(&)
end
Expand Down
4 changes: 4 additions & 0 deletions temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,10 @@ def execute
def test_deadlock
# TODO(cretz): Do we need more tests? This attempts to interrupt the workflow via a raise on the thread, but do we
# need to concern ourselves with what happens if that's accidentally swallowed?
# TODO(cretz): Note that often mutexes and cond vars are not subject to Timeout.timeout which means they can not be
# interrupted by deadlock detection
# TODO(cretz): Note that a thread.join that does get deadlock detected may crash the VM on exit with a
# "[BUG] pthread_mutex_lock: Invalid argument (EINVAL)"
# TODO(cretz): Decrease deadlock detection timeout to make test faster? It is 4s now because shutdown waits on
# second task.
execute_workflow(DeadlockWorkflow) do |handle|
Expand Down
Loading