Skip to content

Commit dc83329

Browse files
authored
Allow unsafe disabling of durable workflow scheduler (#284)
Fixes #251
1 parent 7eb50a5 commit dc83329

File tree

7 files changed

+67
-8
lines changed

7 files changed

+67
-8
lines changed

temporalio/lib/temporalio/contrib/open_telemetry.rb

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -452,15 +452,20 @@ def self.completed_span(
452452
attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id,
453453
'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes)
454454

455-
# Create span, which has to be done with illegal call disabling because OTel asks for full exception message
456-
# which uses error highlighting and such which accesses File#path
455+
time = Temporalio::Workflow.now.dup
456+
# Disable illegal call tracing because OTel asks for full exception message which uses error highlighting and
457+
# such which accesses File#path, and they also use loggers accessing current time
457458
Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled do
458-
time = Temporalio::Workflow.now.dup
459-
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
460-
# Record exception if present
461-
span.record_exception(exception) if exception
462-
# Finish the span (returns self)
463-
span.finish(end_timestamp: time)
459+
# Disable durable scheduler because 1) synchronous/non-batch span processors in OTel use network (though
460+
# could have just used Unafe.io_enabled for this if not for the next point) and 2) OTel uses Ruby Timeout
461+
# which we don't want to use durable timers.
462+
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
463+
span = root.tracer.start_span(name, attributes:, links:, start_timestamp: time, kind:) # steep:ignore
464+
# Record exception if present
465+
span.record_exception(exception) if exception
466+
# Finish the span (returns self)
467+
span.finish(end_timestamp: time)
468+
end
464469
end
465470
end
466471
end

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ def deprecate_patch(patch_id)
6262
@instance.patch(patch_id:, deprecated: true)
6363
end
6464

65+
def durable_scheduler_disabled(&)
66+
prev = Fiber.current_scheduler
67+
illegal_call_tracing_disabled { Fiber.set_scheduler(nil) }
68+
begin
69+
yield
70+
ensure
71+
illegal_call_tracing_disabled { Fiber.set_scheduler(prev) }
72+
end
73+
end
74+
6575
def execute_activity(
6676
activity,
6777
*args,

temporalio/lib/temporalio/worker.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
require 'temporalio/internal/worker/workflow_instance'
1313
require 'temporalio/internal/worker/workflow_worker'
1414
require 'temporalio/worker/activity_executor'
15+
require 'temporalio/worker/deployment_options'
1516
require 'temporalio/worker/illegal_workflow_call_validator'
1617
require 'temporalio/worker/interceptor'
1718
require 'temporalio/worker/poller_behavior'

temporalio/lib/temporalio/workflow.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,14 @@ def self.illegal_call_tracing_disabled(&)
529529
def self.io_enabled(&)
530530
Workflow._current.io_enabled(&)
531531
end
532+
533+
# Run a block of code with the durable/deterministic workflow Fiber scheduler off. This means fallback to default
534+
# fiber scheduler and no workflow helpers will be available in the block. This is usually only needed in advanced
535+
# situations where a third party library does something like use "Timeout" in a way that shouldn't be made
536+
# durable.
537+
def self.durable_scheduler_disabled(&)
538+
Workflow._current.durable_scheduler_disabled(&)
539+
end
532540
end
533541

534542
# Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new.

temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ module Temporalio
2424

2525
def deprecate_patch: (Symbol | String patch_id) -> void
2626

27+
def durable_scheduler_disabled: [T] { -> T } -> T
28+
2729
def execute_activity: (
2830
singleton(Activity::Definition) | Symbol | String activity,
2931
*Object? args,

temporalio/sig/temporalio/workflow.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ module Temporalio
145145
def self.illegal_call_tracing_disabled: [T] { -> T } -> T
146146

147147
def self.io_enabled: [T] { -> T } -> T
148+
149+
def self.durable_scheduler_disabled: [T] { -> T } -> T
148150
end
149151

150152
class ContinueAsNewError < Error

temporalio/test/worker_workflow_test.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2328,6 +2328,37 @@ def test_no_local_activity
23282328
assert_eventually_task_fail(handle:, message_contains:)
23292329
end
23302330
end
2331+
2332+
class NonDurableTimerWorkfow < Temporalio::Workflow::Definition
2333+
def execute
2334+
sleep(0.1)
2335+
Temporalio::Workflow::Unsafe.durable_scheduler_disabled { sleep(0.2) }
2336+
begin
2337+
Timeout.timeout(0.3) { Queue.new.pop }
2338+
raise 'Expected timeout'
2339+
rescue Timeout::Error
2340+
# Ignore
2341+
end
2342+
Temporalio::Workflow::Unsafe.durable_scheduler_disabled do
2343+
Timeout.timeout(0.4) { Queue.new.pop }
2344+
raise 'Expected timeout'
2345+
rescue Timeout::Error
2346+
# Ignore
2347+
end
2348+
end
2349+
end
2350+
2351+
def test_non_durable_timer
2352+
execute_workflow(NonDurableTimerWorkfow) do |handle|
2353+
# Let workflow complete
2354+
handle.result
2355+
# Confirm only the durable timers of 0.1 and 0.3 were set
2356+
assert_equal([0.1, 0.3], handle.fetch_history_events
2357+
.map(&:timer_started_event_attributes)
2358+
.compact
2359+
.map { |a| a.start_to_fire_timeout.to_f })
2360+
end
2361+
end
23312362
end
23322363

23332364
# TODO(cretz): To test

0 commit comments

Comments
 (0)