Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions temporalio/lib/temporalio/activity/context.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,16 @@ def _scoped_logger_info
end

# @return [Metric::Meter] Metric meter to create metrics on, with some activity-specific attributes already set.
# @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set.
def metric_meter
raise NotImplementedError
end

# @return [Client] Temporal client this activity worker is running in.
# @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set.
def client
raise NotImplementedError
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def execute_activity(task_token, defn, start)

# Run
activity = RunningActivity.new(
worker: @worker,
info:,
cancellation: Cancellation.new,
worker_shutdown_cancellation: @worker._worker_shutdown_cancellation,
Expand Down Expand Up @@ -299,13 +300,15 @@ class RunningActivity < Activity::Context
attr_accessor :instance, :_outbound_impl, :_server_requested_cancel

def initialize( # rubocop:disable Lint/MissingSuper
worker:,
info:,
cancellation:,
worker_shutdown_cancellation:,
payload_converter:,
logger:,
runtime_metric_meter:
)
@worker = worker
@info = info
@cancellation = cancellation
@worker_shutdown_cancellation = worker_shutdown_cancellation
Expand Down Expand Up @@ -334,6 +337,10 @@ def metric_meter
}
)
end

def client
@worker.client
end
end

class InboundImplementation < Temporalio::Worker::Interceptor::Activity::Inbound
Expand Down
29 changes: 26 additions & 3 deletions temporalio/lib/temporalio/testing/activity_environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,19 @@ def self.default_info
# @param payload_converter [Converters::PayloadConverter] Value for {Activity::Context#payload_converter}.
# @param logger [Logger] Value for {Activity::Context#logger}.
# @param activity_executors [Hash<Symbol, Worker::ActivityExecutor>] Executors that activities can run within.
# @param metric_meter [Metric::Meter, nil] Value for {Activity::Context#metric_meter}, or nil to raise when
# called.
# @param client [Client, nil] Value for {Activity::Context#client}, or nil to raise when called.
def initialize(
info: ActivityEnvironment.default_info,
on_heartbeat: nil,
cancellation: Cancellation.new,
worker_shutdown_cancellation: Cancellation.new,
payload_converter: Converters::PayloadConverter.default,
logger: Logger.new(nil),
activity_executors: Worker::ActivityExecutor.defaults
activity_executors: Worker::ActivityExecutor.defaults,
metric_meter: nil,
client: nil
)
@info = info
@on_heartbeat = on_heartbeat
Expand All @@ -63,6 +68,8 @@ def initialize(
@payload_converter = payload_converter
@logger = logger
@activity_executors = activity_executors
@metric_meter = metric_meter
@client = client
end

# Run an activity and returns its result or raises its exception.
Expand All @@ -86,7 +93,9 @@ def run(activity, *args)
cancellation: @cancellation,
worker_shutdown_cancellation: @worker_shutdown_cancellation,
payload_converter: @payload_converter,
logger: @logger
logger: @logger,
metric_meter: @metric_meter,
client: @client
))
queue.push([defn.proc.call(*args), nil])
rescue Exception => e # rubocop:disable Lint/RescueException Intentionally capturing all exceptions
Expand All @@ -113,7 +122,9 @@ def initialize( # rubocop:disable Lint/MissingSuper
cancellation:,
worker_shutdown_cancellation:,
payload_converter:,
logger:
logger:,
metric_meter:,
client:
)
@info = info
@instance = instance
Expand All @@ -122,12 +133,24 @@ def initialize( # rubocop:disable Lint/MissingSuper
@worker_shutdown_cancellation = worker_shutdown_cancellation
@payload_converter = payload_converter
@logger = logger
@metric_meter = metric_meter
@client = client
end

# @!visibility private
def heartbeat(*details)
@on_heartbeat&.call(details)
end

# @!visibility private
def metric_meter
@metric_meter or raise 'No metric meter configured in this test environment'
end

# @!visibility private
def client
@client or raise 'No client configured in this test environment'
end
end

private_constant :Context
Expand Down
24 changes: 24 additions & 0 deletions temporalio/lib/temporalio/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class Worker
)

# Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details.
#
# Note, the `client` within can be replaced via client setter.
class Options; end # rubocop:disable Lint/EmptyClass

# @return [String] Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby
Expand Down Expand Up @@ -484,13 +486,35 @@ def initialize(

# Validate worker
@bridge_worker.validate

# Mutex needed for accessing and replacing a client
@client_mutex = Mutex.new
end

# @return [String] Task queue set on the worker options.
def task_queue
@options.task_queue
end

# @return [Client] Client for this worker. This is the same as {Options.client} in {options}, but surrounded by a
# mutex to be safe for client replacement in {client=}.
def client
@client_mutex.synchronize { @options.client }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to acquire the mutex to read the client? Even during client replacement, that prop would be, at any moment, either the previous client or the new client, which are both valid values.

Copy link
Member Author

@cretz cretz Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While they both may be valid, we would prefer it be the "current" client. In Ruby (and similar languages), people expect an attribute reader to be atomic with an attribute writer of the same name, and they are for simple get/set, but this adds extra to the set side which means we need a mutex to preserve that expectation (and it's basically harmless to do so).

end

# Replace the worker's client. When this is called, the client is replaced on the internal worker which means any
# new calls will be made on the new client (but existing calls will still complete on the previous one). This is
# commonly used for providing a new client with updated authentication credentials.
#
# @param new_client [Client] New client to use for new calls.
def client=(new_client)
@client_mutex.synchronize do
@bridge_worker.replace_client(new_client.connection._core_client)
@options = @options.with(client: new_client)
new_client
end
end

# Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the
# worker is shut down. This will return the block result if everything successful or raise an error if not.
#
Expand Down
1 change: 1 addition & 0 deletions temporalio/sig/temporalio/activity/context.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ module Temporalio
def _scoped_logger_info: -> Hash[Symbol, Object]

def metric_meter: -> Metric::Meter
def client: -> Client
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Temporalio
attr_accessor _server_requested_cancel: bool

def initialize: (
worker: Temporalio::Worker,
info: Activity::Info,
cancellation: Cancellation,
worker_shutdown_cancellation: Cancellation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ module Temporalio
?worker_shutdown_cancellation: Cancellation,
?payload_converter: Converters::PayloadConverter,
?logger: Logger,
?activity_executors: Hash[Symbol, Worker::ActivityExecutor]
?activity_executors: Hash[Symbol, Worker::ActivityExecutor],
?metric_meter: Metric::Meter?,
?client: Client?
) -> void

def run: (
Expand Down
5 changes: 5 additions & 0 deletions temporalio/sig/temporalio/worker.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ module Temporalio
workflow_payload_codec_thread_pool: ThreadPool?,
debug_mode: bool
) -> void

def with: (**Object kwargs) -> Options
end

def self.default_build_id: -> String
Expand Down Expand Up @@ -110,6 +112,9 @@ module Temporalio

def task_queue: -> String

def client: -> Client
def client=: (Client new_client) -> void

def run: [T] (
?cancellation: Cancellation,
?shutdown_signals: Array[String | Integer],
Expand Down
13 changes: 13 additions & 0 deletions temporalio/test/worker_activity_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,19 @@ def test_context_instance
execute_activity(shared_instance, interceptors: [ContextInstanceInterceptor.new])
end

class ClientAccessActivity < Temporalio::Activity::Definition
def execute
desc = Temporalio::Activity::Context.current.client.workflow_handle(
Temporalio::Activity::Context.current.info.workflow_id
).describe
desc.raw_description.pending_activities.first.activity_type.name
end
end

def test_client_access
assert_equal 'ClientAccessActivity', execute_activity(ClientAccessActivity)
end

# steep:ignore
def execute_activity(
activity,
Expand Down
48 changes: 47 additions & 1 deletion temporalio/test/worker_workflow_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1769,13 +1769,59 @@ def test_context_instance
execute_workflow(ContextInstanceWorkflow, interceptors: [ContextInstanceInterceptor.new])
end

class WorkerClientReplacementWorkflow < Temporalio::Workflow::Definition
def execute
Temporalio::Workflow.wait_condition { @complete }
end

workflow_signal
def complete(value)
@complete = value
end
end

def test_worker_client_replacement
# Create a second ephemeral server and start workflow on both servers
Temporalio::Testing::WorkflowEnvironment.start_local do |env2|
# Start both workflows on different servers
task_queue = "tq-#{SecureRandom.uuid}"
handle1 = env.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)
handle2 = env2.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)

# Run worker on the first env. Make sure cache is off and only 1 max poller
worker = Temporalio::Worker.new(
client: env.client, task_queue:, workflows: [WorkerClientReplacementWorkflow],
max_cached_workflows: 0, max_concurrent_workflow_task_polls: 1
)
worker.run do
# Confirm first workflow has a task complete but not the second
assert_eventually do
refute_nil handle1.fetch_history_events.find(&:workflow_task_completed_event_attributes)
end
assert_nil handle2.fetch_history_events.find(&:workflow_task_completed_event_attributes)

# Replace the client
worker.client = env2.client

# Signal both which should allow the current poll to wake up and it'll be a task failure when trying to submit
# that to the new client which is ignored. But also the new client will poll for the new workflow, which we will
# wait for it to complete.
handle1.signal(WorkerClientReplacementWorkflow.complete, 'done1')
handle2.signal(WorkerClientReplacementWorkflow.complete, 'done2')

# Confirm second workflow on new server completes
assert_equal 'done2', handle2.result
handle1.terminate
end
end
end

# TODO(cretz): To test
# * Common
# * Eager workflow start
# * Unawaited futures that have exceptions, need to log warning like Java does
# * Enhanced stack trace?
# * Separate abstract/interface demonstration
# * Replace worker client
# * Reset update randomness seed
# * Confirm thread pool does not leak, meaning thread/worker goes away after last workflow
# * Test workflow cancel causing other cancels at the same time but in different coroutines
Expand Down
Loading