Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
:failure_converter, :cancellation, :continue_as_new_suggested, :current_history_length,
:current_history_size, :replaying, :random, :signal_handlers, :query_handlers, :update_handlers,
:context_frozen
attr_accessor :current_details
attr_accessor :io_enabled, :current_details

def initialize(details)
# Initialize general state
Expand All @@ -68,6 +68,7 @@ def initialize(details)
@logger = ReplaySafeLogger.new(logger: details.logger, instance: self)
@logger.scoped_values_getter = proc { scoped_logger_info }
@runtime_metric_meter = details.metric_meter
@io_enabled = details.unsafe_workflow_io_enabled
@scheduler = Scheduler.new(self)
@payload_converter = details.payload_converter
@failure_converter = details.failure_converter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ def initialize_continue_as_new_error(error)
)
end

def io_enabled(&)
prev = @instance.io_enabled
@instance.io_enabled = true
begin
yield
ensure
@instance.io_enabled = prev
end
end

def logger
@instance.logger
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class WorkflowInstance
class Details
attr_reader :namespace, :task_queue, :definition, :initial_activation, :logger, :metric_meter,
:payload_converter, :failure_converter, :interceptors, :disable_eager_activity_execution,
:illegal_calls, :workflow_failure_exception_types
:illegal_calls, :workflow_failure_exception_types, :unsafe_workflow_io_enabled

def initialize(
namespace:,
Expand All @@ -22,7 +22,8 @@ def initialize(
interceptors:,
disable_eager_activity_execution:,
illegal_calls:,
workflow_failure_exception_types:
workflow_failure_exception_types:,
unsafe_workflow_io_enabled:
)
@namespace = namespace
@task_queue = task_queue
Expand All @@ -36,6 +37,7 @@ def initialize(
@disable_eager_activity_execution = disable_eager_activity_execution
@illegal_calls = illegal_calls
@workflow_failure_exception_types = workflow_failure_exception_types
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,28 @@ def fiber(&block)
end

def io_wait(io, events, timeout)
# TODO(cretz): This in a blocking fashion?
raise NotImplementedError, 'TODO'
# Do not allow if IO disabled
unless @instance.io_enabled
raise Workflow::NondeterminismError,
'Cannot perform IO from inside a workflow. If this is known to be safe, ' \
'the code can be run in a Temporalio::Workflow::Unsafe.io_enabled block.'
end

# Use regular Ruby behavior of blocking this thread. There is no Ruby implementation of io_wait we can just
# delegate to at this time (or default scheduler or anything like that), so we had to implement this
# ourselves.
readers = events.nobits?(IO::READABLE) ? nil : [io]
writers = events.nobits?(IO::WRITABLE) ? nil : [io]
priority = events.nobits?(IO::PRIORITY) ? nil : [io]
ready = IO.select(readers, writers, priority, timeout) # steep:ignore

result = 0
unless ready.nil?
result |= IO::READABLE if ready[0]&.include?(io)
result |= IO::WRITABLE if ready[1]&.include?(io)
result |= IO::PRIORITY if ready[2]&.include?(io)
end
result
end

def kernel_sleep(duration = nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def initialize(
illegal_workflow_calls:,
workflow_failure_exception_types:,
workflow_payload_codec_thread_pool:,
unsafe_workflow_io_enabled:,
debug_mode:,
on_eviction: nil
)
Expand Down Expand Up @@ -109,7 +110,8 @@ def initialize(
end

t
end.freeze
end.freeze,
unsafe_workflow_io_enabled:
)
@state.on_eviction = on_eviction if on_eviction

Expand Down Expand Up @@ -184,14 +186,14 @@ def apply_codec_on_payload_visit(payload_or_payloads, &)
class State
attr_reader :workflow_definitions, :bridge_worker, :logger, :metric_meter, :data_converter, :deadlock_timeout,
:illegal_calls, :namespace, :task_queue, :disable_eager_activity_execution,
:workflow_interceptors, :workflow_failure_exception_types
:workflow_interceptors, :workflow_failure_exception_types, :unsafe_workflow_io_enabled

attr_writer :on_eviction

def initialize(
workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout:,
illegal_calls:, namespace:, task_queue:, disable_eager_activity_execution:,
workflow_interceptors:, workflow_failure_exception_types:
workflow_interceptors:, workflow_failure_exception_types:, unsafe_workflow_io_enabled:
)
@workflow_definitions = workflow_definitions
@bridge_worker = bridge_worker
Expand All @@ -205,6 +207,7 @@ def initialize(
@disable_eager_activity_execution = disable_eager_activity_execution
@workflow_interceptors = workflow_interceptors
@workflow_failure_exception_types = workflow_failure_exception_types
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled

@running_workflows = {}
@running_workflows_mutex = Mutex.new
Expand Down
7 changes: 7 additions & 0 deletions temporalio/lib/temporalio/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Worker
:illegal_workflow_calls,
:workflow_failure_exception_types,
:workflow_payload_codec_thread_pool,
:unsafe_workflow_io_enabled,
:debug_mode
)

Expand Down Expand Up @@ -347,6 +348,9 @@ def self.default_illegal_workflow_calls
# @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode within.
# This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially block
# execution which is why they need to be run in the background.
# @param unsafe_workflow_io_enabled [Boolean] If false, the default, workflow code that invokes io_wait on the fiber
# scheduler will fail. Instead of setting this to true, users are encouraged to use {Workflow::Unsafe.io_enabled}
# with a block for narrower enabling of IO.
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
# `true` or `1`.
Expand Down Expand Up @@ -378,6 +382,7 @@ def initialize(
illegal_workflow_calls: Worker.default_illegal_workflow_calls,
workflow_failure_exception_types: [],
workflow_payload_codec_thread_pool: nil,
unsafe_workflow_io_enabled: false,
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase)
)
raise ArgumentError, 'Must have at least one activity or workflow' if activities.empty? && workflows.empty?
Expand Down Expand Up @@ -412,6 +417,7 @@ def initialize(
illegal_workflow_calls:,
workflow_failure_exception_types:,
workflow_payload_codec_thread_pool:,
unsafe_workflow_io_enabled:,
debug_mode:
).freeze

Expand Down Expand Up @@ -483,6 +489,7 @@ def initialize(
illegal_workflow_calls:,
workflow_failure_exception_types:,
workflow_payload_codec_thread_pool:,
unsafe_workflow_io_enabled:,
debug_mode:
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ def create_instance(initial_activation, worker_state)
interceptors: worker_state.workflow_interceptors,
disable_eager_activity_execution: worker_state.disable_eager_activity_execution,
illegal_calls: worker_state.illegal_calls,
workflow_failure_exception_types: worker_state.workflow_failure_exception_types
workflow_failure_exception_types: worker_state.workflow_failure_exception_types,
unsafe_workflow_io_enabled: worker_state.unsafe_workflow_io_enabled
)
)
end
Expand Down
7 changes: 7 additions & 0 deletions temporalio/lib/temporalio/worker/workflow_replayer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class WorkflowReplayer
:illegal_workflow_calls,
:workflow_failure_exception_types,
:workflow_payload_codec_thread_pool,
:unsafe_workflow_io_enabled,
:debug_mode,
:runtime
)
Expand Down Expand Up @@ -69,6 +70,9 @@ class Options; end # rubocop:disable Lint/EmptyClass
# @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode
# within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially
# block execution which is why they need to be run in the background.
# @param unsafe_workflow_io_enabled [Boolean] If false, the default, workflow code that invokes io_wait on the
# fiber scheduler will fail. Instead of setting this to true, users are encouraged to use
# {Workflow::Unsafe.io_enabled} with a block for narrower enabling of IO.
# @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
# if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
# `true` or `1`.
Expand All @@ -89,6 +93,7 @@ def initialize(
illegal_workflow_calls: Worker.default_illegal_workflow_calls,
workflow_failure_exception_types: [],
workflow_payload_codec_thread_pool: nil,
unsafe_workflow_io_enabled: false,
debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase),
runtime: Runtime.default,
&
Expand All @@ -106,6 +111,7 @@ def initialize(
illegal_workflow_calls:,
workflow_failure_exception_types:,
workflow_payload_codec_thread_pool:,
unsafe_workflow_io_enabled:,
debug_mode:,
runtime:
).freeze
Expand Down Expand Up @@ -237,6 +243,7 @@ def initialize(
illegal_workflow_calls: options.illegal_workflow_calls,
workflow_failure_exception_types: options.workflow_failure_exception_types,
workflow_payload_codec_thread_pool: options.workflow_payload_codec_thread_pool,
unsafe_workflow_io_enabled: options.unsafe_workflow_io_enabled,
debug_mode: options.debug_mode,
on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job } # steep:ignore
)
Expand Down
7 changes: 7 additions & 0 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,13 @@ def self.replaying?
def self.illegal_call_tracing_disabled(&)
Workflow._current.illegal_call_tracing_disabled(&)
end

# Run a block of code with IO enabled. Specifically this allows the `io_wait` call of the fiber scheduler to work.
# Users should be cautious about using this as it can often signify unsafe code. Note, this is often only
# applicable to network code as file IO and most process-based IO does not go through scheduler `io_wait`.
def self.io_enabled(&)
Workflow._current.io_enabled(&)
end
end

# Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module Temporalio
attr_reader update_handlers: Hash[String?, Workflow::Definition::Update]
attr_reader context_frozen: bool

attr_accessor io_enabled: bool
attr_accessor current_details: String?

def initialize: (Details details) -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ module Temporalio

def initialize_continue_as_new_error: (Workflow::ContinueAsNewError error) -> void

def io_enabled: [T] { -> T } -> T

def logger: -> ReplaySafeLogger

def memo: -> ExternallyImmutableHash[String, Object?]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Temporalio
attr_reader disable_eager_activity_execution: bool
attr_reader illegal_calls: Hash[String, :all | Hash[Symbol, bool]]
attr_reader workflow_failure_exception_types: Array[singleton(Exception)]
attr_reader unsafe_workflow_io_enabled: bool

def initialize: (
namespace: String,
Expand All @@ -28,7 +29,8 @@ module Temporalio
interceptors: Array[Temporalio::Worker::Interceptor::Workflow],
disable_eager_activity_execution: bool,
illegal_calls: Hash[String, :all | Hash[Symbol, bool]],
workflow_failure_exception_types: Array[singleton(Exception)]
workflow_failure_exception_types: Array[singleton(Exception)],
unsafe_workflow_io_enabled: bool
) -> void
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Temporalio
illegal_workflow_calls: Hash[String, :all | Array[Symbol]],
workflow_failure_exception_types: Array[singleton(Exception)],
workflow_payload_codec_thread_pool: Temporalio::Worker::ThreadPool?,
unsafe_workflow_io_enabled: bool,
debug_mode: bool,
?on_eviction: (^(String run_id, untyped cache_remove_job) -> void)?
) -> void
Expand Down Expand Up @@ -61,6 +62,7 @@ module Temporalio
attr_reader disable_eager_activity_execution: bool
attr_reader workflow_interceptors: Array[Temporalio::Worker::Interceptor::Workflow]
attr_reader workflow_failure_exception_types: Array[singleton(Exception)]
attr_reader unsafe_workflow_io_enabled: bool

attr_writer on_eviction: ^(String run_id, untyped cache_remove_job) -> void

Expand All @@ -76,7 +78,8 @@ module Temporalio
task_queue: String,
disable_eager_activity_execution: bool,
workflow_interceptors: Array[Temporalio::Worker::Interceptor::Workflow],
workflow_failure_exception_types: Array[singleton(Exception)]
workflow_failure_exception_types: Array[singleton(Exception)],
unsafe_workflow_io_enabled: bool
) -> void

def get_or_create_running_workflow: [T] (String run_id) { -> T } -> T
Expand Down
3 changes: 3 additions & 0 deletions temporalio/sig/temporalio/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module Temporalio
attr_reader illegal_workflow_calls: Hash[String, :all | Array[Symbol]]
attr_reader workflow_failure_exception_types: Array[singleton(Exception)]
attr_reader workflow_payload_codec_thread_pool: ThreadPool?
attr_reader unsafe_workflow_io_enabled: bool
attr_reader debug_mode: bool

def initialize: (
Expand Down Expand Up @@ -58,6 +59,7 @@ module Temporalio
illegal_workflow_calls: Hash[String, :all | Array[Symbol]],
workflow_failure_exception_types: Array[singleton(Exception)],
workflow_payload_codec_thread_pool: ThreadPool?,
unsafe_workflow_io_enabled: bool,
debug_mode: bool
) -> void

Expand Down Expand Up @@ -107,6 +109,7 @@ module Temporalio
?illegal_workflow_calls: Hash[String, :all | Array[Symbol]],
?workflow_failure_exception_types: Array[singleton(Exception)],
?workflow_payload_codec_thread_pool: ThreadPool?,
?unsafe_workflow_io_enabled: bool,
?debug_mode: bool
) -> void

Expand Down
3 changes: 3 additions & 0 deletions temporalio/sig/temporalio/worker/workflow_replayer.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Temporalio
attr_reader illegal_workflow_calls: Hash[String, :all | Array[Symbol]]
attr_reader workflow_failure_exception_types: Array[singleton(Exception)]
attr_reader workflow_payload_codec_thread_pool: ThreadPool?
attr_reader unsafe_workflow_io_enabled: bool
attr_reader debug_mode: bool
attr_reader runtime: Runtime

Expand All @@ -30,6 +31,7 @@ module Temporalio
illegal_workflow_calls: Hash[String, :all | Array[Symbol]],
workflow_failure_exception_types: Array[singleton(Exception)],
workflow_payload_codec_thread_pool: ThreadPool?,
unsafe_workflow_io_enabled: bool,
debug_mode: bool,
runtime: Runtime
) -> void
Expand All @@ -50,6 +52,7 @@ module Temporalio
?illegal_workflow_calls: Hash[String, :all | Array[Symbol]],
?workflow_failure_exception_types: Array[singleton(Exception)],
?workflow_payload_codec_thread_pool: ThreadPool?,
?unsafe_workflow_io_enabled: bool,
?debug_mode: bool,
?runtime: Runtime
) ?{ (ReplayWorker worker) -> untyped } -> void
Expand Down
2 changes: 2 additions & 0 deletions temporalio/sig/temporalio/workflow.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ module Temporalio
def self.replaying?: -> bool

def self.illegal_call_tracing_disabled: [T] { -> T } -> T

def self.io_enabled: [T] { -> T } -> T
end

class ContinueAsNewError < Error
Expand Down
3 changes: 2 additions & 1 deletion temporalio/test/sig/workflow_utils.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ module WorkflowUtils
?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum,
?max_heartbeat_throttle_interval: Float,
?task_timeout: duration?,
?on_worker_run: Proc?
?on_worker_run: Proc?,
?unsafe_workflow_io_enabled: bool
) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T

def assert_eventually_task_fail: (
Expand Down
Loading
Loading