Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions temporalio/lib/temporalio/contrib/open_telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,20 @@ def self.completed_span(
attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id,
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)

# Create span, which has to be done with illegal call disabling because OTel asks for full exception message
# which uses error highlighting and such which accesses File#path
time = Temporalio::Workflow.now.dup
# Disable illegal call tracing because OTel asks for full exception message which uses error highlighting and
# such which accesses File#path, and they also use loggers accessing current time
Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do
time = Temporalio::Workflow.now.dup
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: time)
# Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though
# could have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout
# which we don't want to use durable timers.
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
# Record exception if present
span.record_exception(exception) if exception
# Finish the span (returns self)
span.finish(end_timestamp: time)
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ def deprecate_patch(patch_id)
@instance.patch(patch_id:, deprecated: true)
end

def durable_scheduler_disabled(&)
prev = Fiber.current_scheduler
illegal_call_tracing_disabled { Fiber.set_scheduler(nil) }
begin
yield
ensure
illegal_call_tracing_disabled { Fiber.set_scheduler(prev) }
end
end

def execute_activity(
activity,
*args,
Expand Down
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
require 'temporalio/internal/worker/workflow_instance'
require 'temporalio/internal/worker/workflow_worker'
require 'temporalio/worker/activity_executor'
require 'temporalio/worker/deployment_options'
require 'temporalio/worker/interceptor'
require 'temporalio/worker/poller_behavior'
require 'temporalio/worker/thread_pool'
Expand Down
8 changes: 8 additions & 0 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ def self.illegal_call_tracing_disabled(&)
def self.io_enabled(&)
Workflow._current.io_enabled(&)
end

# Run a block of code with the durable/deterministic workflow Fiber scheduler off. This means fallback to default
# fiber scheduler and no workflow helpers will be available in the block. This is usually only needed in advanced
# situations where a third party library does something like use "Timeout" in a way that shouldn't be made
# durable.
def self.durable_scheduler_disabled(&)
Workflow._current.durable_scheduler_disabled(&)
end
end

# Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ module Temporalio

def deprecate_patch: (Symbol | String patch_id) -> void

def durable_scheduler_disabled: [T] { -> T } -> T

def execute_activity: (
singleton(Activity::Definition) | Symbol | String activity,
*Object? args,
Expand Down
2 changes: 2 additions & 0 deletions temporalio/sig/temporalio/workflow.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ module Temporalio
def self.illegal_call_tracing_disabled: [T] { -> T } -> T

def self.io_enabled: [T] { -> T } -> T

def self.durable_scheduler_disabled: [T] { -> T } -> T
end

class ContinueAsNewError < Error
Expand Down
31 changes: 31 additions & 0 deletions temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2319,6 +2319,37 @@ def test_no_local_activity
assert_eventually_task_fail(handle:, message_contains:)
end
end

class NonDurableTimerWorkfow < Temporalio::Workflow::Definition
def execute
sleep(0.1)
Temporalio::Workflow::Unsafe.durable_scheduler_disabled { sleep(0.2) }
begin
Timeout.timeout(0.3) { Queue.new.pop }
raise 'Expected timeout'
rescue Timeout::Error
# Ignore
end
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
Timeout.timeout(0.4) { Queue.new.pop }
raise 'Expected timeout'
rescue Timeout::Error
# Ignore
end
end
end

def test_non_durable_timer
execute_workflow(NonDurableTimerWorkfow) do |handle|
# Let workflow complete
handle.result
# Confirm only the durable timers of 0.1 and 0.3 were set
assert_equal([0.1, 0.3], handle.fetch_history_events
.map(&:timer_started_event_attributes)
.compact
.map { |a| a.start_to_fire_timeout.to_f })
end
end
end

# TODO(cretz): To test
Expand Down
Loading