diff --git a/README.md b/README.md index 28e7207f..a8d9bf96 100644 --- a/README.md +++ b/README.md @@ -571,9 +571,7 @@ Some things to note about the above code: * A timer is represented by `Temporalio::Workflow.sleep`. * Timers are also started on `Temporalio::Workflow.timeout`. - * _Technically_ `Kernel.sleep` and `Timeout.timeout` also delegate to the above calls, but the more explicit workflow - forms are encouraged because they accept more options and are not subject to Ruby standard library implementation - changes. + * `Kernel.sleep` and `Timeout.timeout` are considered illegal by default. * Each timer accepts a `Cancellation`, but if none is given, it defaults to `Temporalio::Workflow.cancellation`. * `Temporalio::Workflow.wait_condition` accepts a block that waits until the evaluated block result is truthy, then returns the value. @@ -586,7 +584,10 @@ Some things to note about the above code: #### Workflow Fiber Scheduling and Cancellation Workflows are backed by a custom, deterministic `Fiber::Scheduler`. All fiber calls inside a workflow use this scheduler -to ensure coroutines run deterministically. +to ensure coroutines run deterministically. Although this means that `Kernel.sleep` and `Mutex` and such should work and +since they are Fiber-aware, Temporal intentionally disables their use by default to prevent accidental use. See +"Workflow Logic Constraints" and "Advanced Workflow Safety and Escaping" for more details, and see "Workflow Utilities" +for alternatives. Every workflow contains a `Temporalio::Cancellation` at `Temporalio::Workflow.cancellation`. This is canceled when the workflow is canceled. For all workflow calls that accept a cancellation token, this is the default. So if a workflow is @@ -678,6 +679,9 @@ from workflows including: nil key for dynamic). `[]=` or `store` can be called on these to update the handlers, though defined handlers are encouraged over runtime-set ones. +There are also classes for `Temporalio::Workflow::Mutex`, `Temporalio::Workflow::Queue`, and +`Temporalio::Workflow::SizedQueue` that are workflow-safe wrappers around the standard library forms. + `Temporalio::Workflow::ContinueAsNewError` can be raised to continue-as-new the workflow. It accepts positional args and defaults the workflow to the same as the current, though it can be changed with the `workflow` kwarg. See API documentation for other details. @@ -714,15 +718,15 @@ Ruby workflows. This means there are several things workflows cannot do such as: * Perform IO (network, disk, stdio, etc) * Access/alter external mutable state -* Do any threading +* Do any threading or blocking calls * Do anything using the system clock (e.g. `Time.Now`) * Make any random calls * Make any not-guaranteed-deterministic calls -This means you can't even call `puts` or logger calls outside of `Temporalio::Workflow.logger` because they use mutexes -which may be hit during periods of high-contention, but they are not completely disabled since users may do quick -debugging with them. See the [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping) section if -needing to work around this. +This means you can't even use logger calls outside of `Temporalio::Workflow.logger` because they use mutexes which may +be hit during periods of high-contention, but they are not completely disabled since users may do quick debugging with +them. See the [Advanced Workflow Safety and Escaping](#advanced-workflow-safety-and-escaping) section if needing to work +around this. #### Workflow Testing @@ -928,24 +932,22 @@ See the `WorkflowReplayer` API documentation for more details. #### Advanced Workflow Safety and Escaping -Workflows use a custom fiber scheduler to make things like certain blocking calls and timeouts durable. There is also -call tracing to prevent accidentally making illegal workflow calls. But sometimes in advanced situations, workarounds -may be needed. This section describes advanced situations working with the workflow Fiber scheduler and illegal call -tracer. +Workflows use a custom fiber scheduler to make fibers durable. There is also call tracing to prevent accidentally making +illegal workflow calls. But sometimes in advanced situations, workarounds may be needed. This section describes advanced +situations working with the workflow Fiber scheduler and illegal call tracer. ##### Durable Fiber Scheduler -The custom fiber scheduler that powers workflows makes otherwise-local, blocking things durable. This is why `sleep` and -`Timeout.timeout` and `Queue` and other things work durably. However, there are cases where it may be desired for these -to work locally inside a workflow such as for logging or `puts` or other side-effecting, known-non-deterministic -aspects. +By default, Temporal considers `Logger`, `sleep`, `Timeout.timeout`, `Queue`, etc illegal. However, there are cases +where it may be desired for these to work locally inside a workflow such as for logging or other side-effecting, +known-non-deterministic aspects. Users can pass a block to `Temporalio::Workflow::Unsafe.durable_scheduler_disabled` to not use the durable scheduler. This should be used any time the scheduler needs to be bypassed, e.g. for local stdout. Not doing this can cause -workflows to get hung in high contention situations. For instance, if there is a `puts` or a logger (that isn't the -safe-to-use `Temporalio::Workflow.logger`) in a workflow, _technically_ Ruby surrounds the IO writes with a mutex and -in extreme high contention that mutex may durably block and then the workflow task may complete causing hung workflows -because no event comes to wake the mutex. +workflows to get hung in high contention situations. For instance, if there is a logger (that isn't the safe-to-use +`Temporalio::Workflow.logger`) in a workflow, _technically_ Ruby surrounds the IO writes with a mutex and in extreme +high contention that mutex may durably block and then the workflow task may complete causing hung workflows because no +event comes to wake the mutex. Also, by default anything that relies on IO wait that is not inside `durable_scheduler_disabled` will fail. It is recommended to put things that need this in `durable_scheduler_disabled`, but if the durable scheduler is still needed @@ -957,9 +959,9 @@ Note `durable_scheduler_disabled` implies `illegal_call_tracing_disabled` (see n ##### Illegal Call Tracing -Ruby workflow threads employ a `TracePoint` to catch illegal calls such as `Time.now` or `Thread.new`. The set of -illegal calls can be configured via the `illegal_workflow_calls` parameter when creating a worker. The default set is at -`Temporalio::Worker.default_illegal_workflow_calls`. +Ruby workflow threads employ a `TracePoint` to catch illegal calls such as `sleep` or `Time.now` or `Thread.new`. The +set of illegal calls can be configured via the `illegal_workflow_calls` parameter when creating a worker. The default +set is at `Temporalio::Worker.default_illegal_workflow_calls`. When an illegal call is encountered, an exception is thrown. In advanced cases there may be a need to allow an illegal call that is known to be used deterministically. This code can be in a block passed to diff --git a/temporalio/lib/temporalio/cancellation.rb b/temporalio/lib/temporalio/cancellation.rb index 6d5416d5..9b33b204 100644 --- a/temporalio/lib/temporalio/cancellation.rb +++ b/temporalio/lib/temporalio/cancellation.rb @@ -18,7 +18,7 @@ class Cancellation def initialize(*parents) @canceled = false @canceled_reason = nil - @canceled_mutex = Mutex.new + @canceled_mutex = Workflow::Unsafe.illegal_call_tracing_disabled { Mutex.new } @canceled_cond_var = nil @cancel_callbacks = {} # Keyed by sentinel value, but value iteration still is deterministic @shield_depth = 0 @@ -28,22 +28,22 @@ def initialize(*parents) # @return [Boolean] Whether this cancellation is canceled. def canceled? - @canceled_mutex.synchronize { @canceled } + canceled_mutex_synchronize { @canceled } end # @return [String, nil] Reason for cancellation. Can be nil if not canceled or no reason provided. def canceled_reason - @canceled_mutex.synchronize { @canceled_reason } + canceled_mutex_synchronize { @canceled_reason } end # @return [Boolean] Whether a cancel is pending but currently shielded. def pending_canceled? - @canceled_mutex.synchronize { !@shield_pending_cancel.nil? } + canceled_mutex_synchronize { !@shield_pending_cancel.nil? } end # @return [String, nil] Reason for pending cancellation. Can be nil if not pending canceled or no reason provided. def pending_canceled_reason - @canceled_mutex.synchronize { @shield_pending_cancel&.first } + canceled_mutex_synchronize { @shield_pending_cancel&.first } end # Raise an error if this cancellation is canceled. @@ -71,13 +71,13 @@ def wait return end - @canceled_mutex.synchronize do + canceled_mutex_synchronize do break if @canceled # Add cond var if not present if @canceled_cond_var.nil? @canceled_cond_var = ConditionVariable.new - @cancel_callbacks[Object.new] = proc { @canceled_mutex.synchronize { @canceled_cond_var.broadcast } } + @cancel_callbacks[Object.new] = proc { canceled_mutex_synchronize { @canceled_cond_var.broadcast } } end # Wait on it @@ -94,10 +94,10 @@ def wait def shield raise ArgumentError, 'Block required' unless block_given? - @canceled_mutex.synchronize { @shield_depth += 1 } + canceled_mutex_synchronize { @shield_depth += 1 } yield ensure - callbacks_to_run = @canceled_mutex.synchronize do + callbacks_to_run = canceled_mutex_synchronize do @shield_depth -= 1 if @shield_depth.zero? && @shield_pending_cancel reason = @shield_pending_cancel.first @@ -120,7 +120,7 @@ def shield def add_cancel_callback(&block) raise ArgumentError, 'Must provide block' unless block_given? - callback_to_run_immediately, key = @canceled_mutex.synchronize do + callback_to_run_immediately, key = canceled_mutex_synchronize do break [block, nil] if @canceled key = Object.new @@ -135,7 +135,7 @@ def add_cancel_callback(&block) # # @param key [Object] Key returned from {add_cancel_callback}. def remove_cancel_callback(key) - @canceled_mutex.synchronize do + canceled_mutex_synchronize do @cancel_callbacks.delete(key) end nil @@ -144,7 +144,7 @@ def remove_cancel_callback(key) private def on_cancel(reason:) - callbacks_to_run = @canceled_mutex.synchronize do + callbacks_to_run = canceled_mutex_synchronize do # If we're shielding, set as pending and return nil if @shield_depth.positive? @shield_pending_cancel = [reason] @@ -166,5 +166,9 @@ def prepare_cancel(reason:) @cancel_callbacks.clear to_return.values end + + def canceled_mutex_synchronize(&) + Workflow::Unsafe.illegal_call_tracing_disabled { @canceled_mutex.synchronize(&) } + end end end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb index f5694e51..4661c995 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb @@ -16,7 +16,7 @@ def self.frozen_validated_illegal_calls(illegal_calls) # @type var fixed_val: :all | Worker::IllegalWorkflowCallValidator | Hash[Symbol, TrueClass | Worker::IllegalWorkflowCallValidator] # rubocop:disable Layout/LineLength fixed_val = case val when Temporalio::Worker::IllegalWorkflowCallValidator - if sub_val.method_name + if val.method_name raise ArgumentError, 'Top level IllegalWorkflowCallValidator instances cannot have method name' end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb index 330ec874..6663e7a4 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/scheduler.rb @@ -111,7 +111,7 @@ def block(_blocker, timeout = nil) # We just yield because unblock will resume this. We will just wrap in timeout if needed. if timeout begin - Timeout.timeout(timeout) { Fiber.yield } + Workflow.timeout(timeout) { Fiber.yield } true rescue Timeout::Error false diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index bf065286..830ab65b 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -267,7 +267,11 @@ def self.default_illegal_workflow_calls #:write ], 'Kernel' => %i[abort at_exit autoload autoload? eval exec exit fork gets load open rand readline readlines - spawn srand system test trap], + sleep spawn srand system test trap], + # Loggers use mutexes in ways that can hang workflows, so users need to disable the durable scheduler to use + # them + 'Logger' => :all, + 'Monitor' => :all, 'Net::HTTP' => :all, 'Pathname' => :all, # TODO(cretz): Investigate why clock_gettime called from Timeout thread affects this code at all. Stack trace @@ -282,9 +286,14 @@ def self.default_illegal_workflow_calls 'Signal' => :all, 'Socket' => :all, 'Tempfile' => :all, + 'Timeout' => :all, 'Thread' => %i[abort_on_exception= exit fork handle_interrupt ignore_deadlock= kill new pass pending_interrupt? report_on_exception= start stop initialize join name= priority= raise run terminate thread_variable_set wakeup], + 'Thread::ConditionVariable' => :all, + 'Thread::Mutex' => IllegalWorkflowCallValidator.known_safe_mutex_validator, + 'Thread::SizedQueue' => :all, + 'Thread::Queue' => :all, 'Time' => IllegalWorkflowCallValidator.default_time_validators } #: Hash[String, :all | Array[Symbol]] hash.each_value(&:freeze) diff --git a/temporalio/lib/temporalio/worker/illegal_workflow_call_validator.rb b/temporalio/lib/temporalio/worker/illegal_workflow_call_validator.rb index f399e52e..34693d64 100644 --- a/temporalio/lib/temporalio/worker/illegal_workflow_call_validator.rb +++ b/temporalio/lib/temporalio/worker/illegal_workflow_call_validator.rb @@ -39,6 +39,15 @@ def self.default_time_validators ] end + # @return [IllegalWorkflowCallValidator] Workflow call validator that is tailored to disallow most Mutex calls, + # but let others through for certain situations. + def self.known_safe_mutex_validator + @known_safe_mutex_validator ||= IllegalWorkflowCallValidator.new do + # Only Google Protobuf use of Mutex is known to be safe, fail unless any caller location path has protobuf + raise 'disallowed' unless caller_locations&.any? { |loc| loc.path&.include?('google/protobuf/') } + end + end + # @return [String, nil] Method name if this validator is specific to a method. attr_reader :method_name diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index 3b0a2adb..92a69222 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -537,18 +537,30 @@ def self.replaying? # Run a block of code with illegal call tracing disabled. Users should be cautious about using this as it can # often signify unsafe code. # + # If this is invoked outside of a workflow, it just runs the block. + # # @yield Block to run with call tracing disabled # # @return [Object] Result of the block. def self.illegal_call_tracing_disabled(&) - Workflow._current.illegal_call_tracing_disabled(&) + if Workflow.in_workflow? + Workflow._current.illegal_call_tracing_disabled(&) + else + yield + end end # Run a block of code with IO enabled. Specifically this allows the `io_wait` call of the fiber scheduler to work. # Users should be cautious about using this as it can often signify unsafe code. Note, this is often only # applicable to network code as file IO and most process-based IO does not go through scheduler `io_wait`. + # + # If this is invoked outside of a workflow, it just runs the block. def self.io_enabled(&) - Workflow._current.io_enabled(&) + if Workflow.in_workflow? + Workflow._current.io_enabled(&) + else + yield + end end # Run a block of code with the durable/deterministic workflow Fiber scheduler off. This means fallback to default @@ -556,9 +568,41 @@ def self.io_enabled(&) # situations where a third party library does something like use "Timeout" in a way that shouldn't be made # durable. # + # If this is invoked outside of a workflow, it just runs the block. + # # This implies {illegal_call_tracing_disabled}. def self.durable_scheduler_disabled(&) - Workflow._current.durable_scheduler_disabled(&) + if Workflow.in_workflow? + Workflow._current.durable_scheduler_disabled(&) + else + yield + end + end + + # @!visibility private + def self._wrap_ruby_class_as_legal(target_class) + Class.new do + define_method(:initialize) do |*args, **kwargs, &block| + @underlying = Unsafe.illegal_call_tracing_disabled do + target_class.new(*args, **kwargs, &block) # steep:ignore + end + end + + # @!visibility private + def method_missing(name, ...) + if @underlying.respond_to?(name) + # Call with tracing disabled + Unsafe.illegal_call_tracing_disabled { @underlying.public_send(name, ...) } + else + super + end + end + + # @!visibility private + def respond_to_missing?(name, include_all = false) + @underlying.respond_to?(name, include_all) || super + end + end end end @@ -625,5 +669,29 @@ class InvalidWorkflowStateError < Error; end # error can still be used with configuring workflow failure exception types to change non-deterministic errors from # task failures to workflow failures. class NondeterminismError < Error; end + + # Mutex is a workflow-safe wrapper around {::Mutex}. + # + # As of this writing, all methods on Mutex are safe for workflow use and are implicitly made deterministic by the + # Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of Mutex by + # non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach as a mutex + # (e.g. wait until a certain attribute is set to false then set it to true before continuing). + Mutex = Unsafe._wrap_ruby_class_as_legal(::Mutex) + + # Queue is a workflow-safe wrapper around {::Queue}. + # + # As of this writing, all methods on Queue are safe for workflow use and are implicitly made deterministic by the + # Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of Queue by + # non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach as a queue + # (e.g. wait until an array is non-empty before continuing). + Queue = Unsafe._wrap_ruby_class_as_legal(::Queue) + + # SizedQueue is a workflow-safe wrapper around {::SizedQueue}. + # + # As of this writing, all methods on SizedQueue are safe for workflow use and are implicitly made deterministic by + # the Fiber scheduler. The primary reason this is wrapped as safe is to be able to catch unintentional uses of + # SizedQueue by non-workflow-safe code. However, users may prefer to use the more powerful {wait_condition} approach + # as a queue (e.g. wait until an array is non-empty before continuing). + SizedQueue = Unsafe._wrap_ruby_class_as_legal(::SizedQueue) end end diff --git a/temporalio/sig/temporalio/cancellation.rbs b/temporalio/sig/temporalio/cancellation.rbs index 7b73fbb2..4d387cce 100644 --- a/temporalio/sig/temporalio/cancellation.rbs +++ b/temporalio/sig/temporalio/cancellation.rbs @@ -9,11 +9,12 @@ module Temporalio def check!: (?Exception err) -> void def to_ary: -> [Cancellation, Proc] def wait: -> void - def shield: [T] { (?) -> untyped } -> T + def shield: [T] { (?) -> T } -> T def add_cancel_callback: { -> untyped } -> Object def remove_cancel_callback: (Object key) -> void private def on_cancel: (reason: Object?) -> void private def prepare_cancel: (reason: Object?) -> Array[Proc]? + private def canceled_mutex_synchronize: [T] { (?) -> T } -> T end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/worker/illegal_workflow_call_validator.rbs b/temporalio/sig/temporalio/worker/illegal_workflow_call_validator.rbs index 4b106e36..211aa20b 100644 --- a/temporalio/sig/temporalio/worker/illegal_workflow_call_validator.rbs +++ b/temporalio/sig/temporalio/worker/illegal_workflow_call_validator.rbs @@ -14,6 +14,7 @@ module Temporalio end def self.default_time_validators: -> Array[IllegalWorkflowCallValidator] + def self.known_safe_mutex_validator: -> IllegalWorkflowCallValidator attr_reader method_name: Symbol? attr_reader block: ^(CallInfo) -> void diff --git a/temporalio/sig/temporalio/workflow.rbs b/temporalio/sig/temporalio/workflow.rbs index 99c736dd..5266bd90 100644 --- a/temporalio/sig/temporalio/workflow.rbs +++ b/temporalio/sig/temporalio/workflow.rbs @@ -155,6 +155,8 @@ module Temporalio def self.io_enabled: [T] { -> T } -> T def self.durable_scheduler_disabled: [T] { -> T } -> T + + def self._wrap_ruby_class_as_legal: (Class target_class) -> Class end class ContinueAsNewError < Error @@ -187,5 +189,12 @@ module Temporalio end class NondeterminismError < Error end + + class Mutex < ::Mutex + end + class Queue < ::Queue + end + class SizedQueue < ::SizedQueue + end end end diff --git a/temporalio/test/testing/workflow_environment_test.rb b/temporalio/test/testing/workflow_environment_test.rb index e8ebc967..49bf1bf6 100644 --- a/temporalio/test/testing/workflow_environment_test.rb +++ b/temporalio/test/testing/workflow_environment_test.rb @@ -17,7 +17,7 @@ class SlowWorkflow < Temporalio::Workflow::Definition TWO_DAYS = 2 * 24 * 60 * 60 def execute - sleep(TWO_DAYS) + Temporalio::Workflow.sleep(TWO_DAYS) 'all done' end diff --git a/temporalio/test/worker_workflow_activity_test.rb b/temporalio/test/worker_workflow_activity_test.rb index 8050e864..850a7b90 100644 --- a/temporalio/test/worker_workflow_activity_test.rb +++ b/temporalio/test/worker_workflow_activity_test.rb @@ -163,7 +163,7 @@ def run(scenario, local) Temporalio::Workflow.execute_local_activity(CancellationSleepActivity, 0.1, schedule_to_close_timeout: 10) else - sleep(0.1) + Temporalio::Workflow.sleep(0.1) end cancel_proc.call diff --git a/temporalio/test/worker_workflow_child_test.rb b/temporalio/test/worker_workflow_child_test.rb index 3a4cc322..f2d708c7 100644 --- a/temporalio/test/worker_workflow_child_test.rb +++ b/temporalio/test/worker_workflow_child_test.rb @@ -78,7 +78,7 @@ def execute(scenario) when :cancel_wait cancellation, cancel_proc = Temporalio::Cancellation.new handle = Temporalio::Workflow.start_child_workflow(CancelChildWorkflow, cancellation:) - sleep(0.1) + Temporalio::Workflow.sleep(0.1) cancel_proc.call handle.result when :cancel_try @@ -88,7 +88,7 @@ def execute(scenario) cancellation:, cancellation_type: Temporalio::Workflow::ChildWorkflowCancellationType::TRY_CANCEL ) - sleep(0.1) + Temporalio::Workflow.sleep(0.1) cancel_proc.call handle.result else diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 942071a1..18b5b82a 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -20,6 +20,8 @@ def test_simple assert_equal 'Hello, Temporal!', execute_workflow(SimpleWorkflow, 'Temporal') end + IGNORED_LOGGER = Logger.new($stdout) # rubocop:disable Layout/ClassStructure + class IllegalCallsWorkflow < Temporalio::Workflow::Definition def execute(scenario) case scenario.to_sym @@ -53,6 +55,24 @@ def execute(scenario) Time.now when :time_iso8601 Time.iso8601('2011-10-05T22:26:12-04:00') + when :stdlib_logger_write + IGNORED_LOGGER.info('test') + when :workflow_logger_write + Temporalio::Workflow.logger.info('test') + when :sleep + sleep(0.1) + when :timeout + Timeout.timeout(0.1) { 'test' } + when :queue + Queue.new + when :sized_queue + SizedQueue.new + when :mutex + Mutex.new + when :condvar + ConditionVariable.new + when :monitor + Monitor.new.synchronize { 'test' } else raise NotImplementedError end @@ -85,6 +105,15 @@ def test_illegal_calls exec.call(:time_new_explicit, nil) # This call is ok exec.call(:time_now, 'Time now') exec.call(:time_iso8601, nil) # This call is ok + exec.call(:stdlib_logger_write, 'Logger info') + exec.call(:workflow_logger_write, nil) # This call is ok + exec.call(:sleep, 'Kernel sleep') + exec.call(:timeout, 'Timeout timeout') + exec.call(:queue, 'Thread::Queue initialize') + exec.call(:sized_queue, 'Thread::SizedQueue initialize') + exec.call(:mutex, 'Thread::Mutex initialize') + exec.call(:condvar, 'Thread::ConditionVariable initialize') + exec.call(:monitor, 'Monitor synchronize') end class WorkflowInitWorkflow < Temporalio::Workflow::Definition @@ -202,7 +231,7 @@ class HistoryInfoWorkflow < Temporalio::Workflow::Definition def execute # Start 30 10ms timers and wait on them all Temporalio::Workflow::Future.all_of( - *30.times.map { Temporalio::Workflow::Future.new { sleep(0.1) } } + *30.times.map { Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(0.1) } } ).wait [ @@ -251,13 +280,13 @@ def execute(scenario) @waiting = true Temporalio::Workflow.wait_condition { false } when :timeout - Timeout.timeout(0.1) do + Temporalio::Workflow.timeout(0.1) do Temporalio::Workflow.wait_condition { false } end when :manual_cancel my_cancel, my_cancel_proc = Temporalio::Cancellation.new Temporalio::Workflow::Future.new do - sleep(0.1) + Temporalio::Workflow.sleep(0.1) my_cancel_proc.call(reason: 'my cancel reason') end Temporalio::Workflow.wait_condition(cancellation: my_cancel) { false } @@ -305,18 +334,14 @@ class TimerWorkflow < Temporalio::Workflow::Definition def execute(scenario) case scenario.to_sym - when :sleep_stdlib - sleep(0.11) when :sleep_workflow Temporalio::Workflow.sleep(0.12, summary: 'my summary') - when :sleep_stdlib_workflow_cancel - sleep(1000) when :sleep_workflow_cancel Temporalio::Workflow.sleep(1000) when :sleep_explicit_cancel my_cancel, my_cancel_proc = Temporalio::Cancellation.new Temporalio::Workflow::Future.new do - sleep(0.1) + Temporalio::Workflow.sleep(0.1) my_cancel_proc.call(reason: 'my cancel reason') end Temporalio::Workflow.sleep(1000, cancellation: my_cancel) @@ -324,10 +349,6 @@ def execute(scenario) my_cancel, my_cancel_proc = Temporalio::Cancellation.new my_cancel_proc.call(reason: 'my cancel reason') Temporalio::Workflow.sleep(1000, cancellation: my_cancel) - when :timeout_stdlib - Timeout.timeout(0.16) do - Temporalio::Workflow.wait_condition { false } - end when :timeout_workflow Temporalio::Workflow.timeout(0.17) do Temporalio::Workflow.wait_condition { false } @@ -346,11 +367,11 @@ def execute(scenario) Temporalio::Workflow.wait_condition { false } end when :timeout_workflow_cancel - Timeout.timeout(1000) do + Temporalio::Workflow.timeout(1000) do Temporalio::Workflow.wait_condition { false } end when :timeout_not_reached - Timeout.timeout(1000) do + Temporalio::Workflow.timeout(1000) do Temporalio::Workflow.wait_condition { @return_value } end @waiting = true @@ -373,12 +394,6 @@ def return_value(value) end def test_timer - event = execute_workflow(TimerWorkflow, :sleep_stdlib) do |handle| - handle.result - handle.fetch_history_events.find(&:timer_started_event_attributes) - end - assert_equal 0.11, event.timer_started_event_attributes.start_to_fire_timeout.to_f - event = execute_workflow(TimerWorkflow, :sleep_workflow) do |handle| handle.result handle.fetch_history_events.find(&:timer_started_event_attributes) @@ -386,13 +401,6 @@ def test_timer assert_equal 0.12, event.timer_started_event_attributes.start_to_fire_timeout.to_f # TODO(cretz): Assert summary - execute_workflow(TimerWorkflow, :sleep_stdlib_workflow_cancel) do |handle| - assert_eventually { assert handle.fetch_history_events.any?(&:timer_started_event_attributes) } - handle.cancel - err = assert_raises(Temporalio::Error::WorkflowFailedError) { handle.result } - assert_instance_of Temporalio::Error::CanceledError, err.cause - end - execute_workflow(TimerWorkflow, :sleep_workflow_cancel) do |handle| assert_eventually { assert handle.fetch_history_events.any?(&:timer_started_event_attributes) } handle.cancel @@ -414,11 +422,6 @@ def test_timer assert_instance_of Temporalio::Error::CanceledError, err.cause assert_equal 'my cancel reason', err.cause.message - err = assert_raises(Temporalio::Error::WorkflowFailedError) { execute_workflow(TimerWorkflow, :timeout_stdlib) } - assert_instance_of Temporalio::Error::ApplicationError, err.cause - assert_equal 'execution expired', err.cause.message - assert_equal 'Error', err.cause.type - err = assert_raises(Temporalio::Error::WorkflowFailedError) { execute_workflow(TimerWorkflow, :timeout_workflow) } assert_instance_of Temporalio::Error::ApplicationError, err.cause assert_equal 'execution expired', err.cause.message @@ -600,7 +603,7 @@ def execute # Inside a coroutine and timeout, execute an activity forever Temporalio::Workflow::Future.new do - Timeout.timeout(nil) do + Temporalio::Workflow.timeout(nil) do @expected_traces << ["#{__FILE__}:#{__LINE__ + 1}", "#{__FILE__}:#{__LINE__ - 1}"] Temporalio::Workflow.execute_activity('does-not-exist', task_queue: 'does-not-exist', @@ -734,7 +737,7 @@ class NonDeterminismErrorWorkflow < Temporalio::Workflow::Definition def execute # Do a timer only on non-replay - sleep(0.01) unless Temporalio::Workflow::Unsafe.replaying? + Temporalio::Workflow.sleep(0.01) unless Temporalio::Workflow::Unsafe.replaying? Temporalio::Workflow.wait_condition { @finish } end @@ -801,7 +804,7 @@ def test_non_determinism_error class LoggerWorkflow < Temporalio::Workflow::Definition def initialize - @bad_logger = Logger.new($stdout) + @bad_logger = Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled { Logger.new($stdout) } end def execute @@ -812,7 +815,7 @@ def execute def update Temporalio::Workflow.logger.info('some-log-1') Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled { @bad_logger.info('some-log-2') } - sleep(0.01) + Temporalio::Workflow.sleep(0.01) end workflow_signal @@ -886,7 +889,7 @@ def execute(scenario) when :any_of # Basic any of result = Temporalio::Workflow::Future.any_of( - Temporalio::Workflow::Future.new { sleep(0.01) }, + Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(0.01) }, Temporalio::Workflow::Future.new { 'done' } ).wait raise unless result == 'done' @@ -894,7 +897,7 @@ def execute(scenario) # Any of with exception begin Temporalio::Workflow::Future.any_of( - Temporalio::Workflow::Future.new { sleep(0.01) }, + Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(0.01) }, Temporalio::Workflow::Future.new { raise FutureWorkflowError } ).wait raise @@ -904,14 +907,14 @@ def execute(scenario) # Try any of result = Temporalio::Workflow::Future.try_any_of( - Temporalio::Workflow::Future.new { sleep(0.01) }, + Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(0.01) }, Temporalio::Workflow::Future.new { 'done' } ).wait.wait raise unless result == 'done' # Try any of with exception try_any_of = Temporalio::Workflow::Future.try_any_of( - Temporalio::Workflow::Future.new { sleep(0.01) }, + Temporalio::Workflow::Future.new { Temporalio::Workflow.sleep(0.01) }, Temporalio::Workflow::Future.new { raise FutureWorkflowError } ).wait begin @@ -1346,7 +1349,7 @@ def test_initializer_failure class QueueWorkflow < Temporalio::Workflow::Definition def initialize - @queue = Queue.new + @queue = Temporalio::Workflow::Queue.new end def execute(timeout = nil) @@ -1416,6 +1419,36 @@ def test_queue end end + class SizedQueueWorkflow < Temporalio::Workflow::Definition + def initialize + @queue = Temporalio::Workflow::SizedQueue.new(1) + end + + def execute(timeout = nil) + queue = Temporalio::Workflow::SizedQueue.new(1) + queue.push('some-value') + # Timeout only works on 3.2+ + raise 'Expected nil' if timeout && !queue.push('some-other-value', timeout:).nil? + + queue.pop + end + end + + def test_sized_queue + assert_equal 'some-value', execute_workflow(SizedQueueWorkflow) + + # Timeout not added until 3.2, so can stop test here before then + major, minor = RUBY_VERSION.split('.').take(2).map(&:to_i) + return if major.nil? || major != 3 || minor.nil? || minor < 2 + + execute_workflow(SizedQueueWorkflow, 0.1) do |handle| + assert_equal 'some-value', handle.result + # Make sure a timer event is in there + evt = handle.fetch_history_events.find(&:timer_started_event_attributes) + assert_equal 0.1, evt&.timer_started_event_attributes&.start_to_fire_timeout&.to_f # rubocop:disable Style/SafeNavigationChainLength + end + end + class MutexActivity < Temporalio::Activity::Definition def initialize(queue) @queue = queue @@ -1430,7 +1463,7 @@ class MutexWorkflow < Temporalio::Workflow::Definition workflow_query_attr_reader :results def initialize - @mutex = Mutex.new + @mutex = Temporalio::Workflow::Mutex.new @results = [] end @@ -1465,6 +1498,23 @@ def test_mutex end end + class MutexSleepWorkflow < Temporalio::Workflow::Definition + def initialize + @mutex = Temporalio::Workflow::Mutex.new + end + + def execute + @mutex.synchronize do + @mutex.sleep(0.5) + 'done' + end + end + end + + def test_mutex_sleep + assert_equal 'done', execute_workflow(MutexSleepWorkflow) + end + class UtilitiesWorkflow < Temporalio::Workflow::Definition workflow_query_attr_reader :result @@ -2531,10 +2581,10 @@ def test_hints class NonDurableTimerWorkfow < Temporalio::Workflow::Definition def execute - sleep(0.1) + Temporalio::Workflow.sleep(0.1) Temporalio::Workflow::Unsafe.durable_scheduler_disabled { sleep(0.2) } begin - Timeout.timeout(0.3) { Queue.new.pop } + Temporalio::Workflow.timeout(0.3) { Temporalio::Workflow::Queue.new.pop } raise 'Expected timeout' rescue Timeout::Error # Ignore