diff --git a/temporalio/lib/temporalio/activity/context.rb b/temporalio/lib/temporalio/activity/context.rb index edaef5e3..5957a6e6 100644 --- a/temporalio/lib/temporalio/activity/context.rb +++ b/temporalio/lib/temporalio/activity/context.rb @@ -108,9 +108,16 @@ def _scoped_logger_info end # @return [Metric::Meter] Metric meter to create metrics on, with some activity-specific attributes already set. + # @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set. def metric_meter raise NotImplementedError end + + # @return [Client] Temporal client this activity worker is running in. + # @raise [RuntimeError] Called within a {Testing::ActivityEnvironment} and it was not set. + def client + raise NotImplementedError + end end end end diff --git a/temporalio/lib/temporalio/internal/worker/activity_worker.rb b/temporalio/lib/temporalio/internal/worker/activity_worker.rb index 595a63e6..3cd7c742 100644 --- a/temporalio/lib/temporalio/internal/worker/activity_worker.rb +++ b/temporalio/lib/temporalio/internal/worker/activity_worker.rb @@ -201,6 +201,7 @@ def execute_activity(task_token, defn, start) # Run activity = RunningActivity.new( + worker: @worker, info:, cancellation: Cancellation.new, worker_shutdown_cancellation: @worker._worker_shutdown_cancellation, @@ -299,6 +300,7 @@ class RunningActivity < Activity::Context attr_accessor :instance, :_outbound_impl, :_server_requested_cancel def initialize( # rubocop:disable Lint/MissingSuper + worker:, info:, cancellation:, worker_shutdown_cancellation:, @@ -306,6 +308,7 @@ def initialize( # rubocop:disable Lint/MissingSuper logger:, runtime_metric_meter: ) + @worker = worker @info = info @cancellation = cancellation @worker_shutdown_cancellation = worker_shutdown_cancellation @@ -334,6 +337,10 @@ def metric_meter } ) end + + def client + @worker.client + end end class InboundImplementation < Temporalio::Worker::Interceptor::Activity::Inbound diff --git a/temporalio/lib/temporalio/testing/activity_environment.rb b/temporalio/lib/temporalio/testing/activity_environment.rb index 6d504ac3..73720364 100644 --- a/temporalio/lib/temporalio/testing/activity_environment.rb +++ b/temporalio/lib/temporalio/testing/activity_environment.rb @@ -47,6 +47,9 @@ def self.default_info # @param payload_converter [Converters::PayloadConverter] Value for {Activity::Context#payload_converter}. # @param logger [Logger] Value for {Activity::Context#logger}. # @param activity_executors [Hash] Executors that activities can run within. + # @param metric_meter [Metric::Meter, nil] Value for {Activity::Context#metric_meter}, or nil to raise when + # called. + # @param client [Client, nil] Value for {Activity::Context#client}, or nil to raise when called. def initialize( info: ActivityEnvironment.default_info, on_heartbeat: nil, @@ -54,7 +57,9 @@ def initialize( worker_shutdown_cancellation: Cancellation.new, payload_converter: Converters::PayloadConverter.default, logger: Logger.new(nil), - activity_executors: Worker::ActivityExecutor.defaults + activity_executors: Worker::ActivityExecutor.defaults, + metric_meter: nil, + client: nil ) @info = info @on_heartbeat = on_heartbeat @@ -63,6 +68,8 @@ def initialize( @payload_converter = payload_converter @logger = logger @activity_executors = activity_executors + @metric_meter = metric_meter + @client = client end # Run an activity and returns its result or raises its exception. @@ -86,7 +93,9 @@ def run(activity, *args) cancellation: @cancellation, worker_shutdown_cancellation: @worker_shutdown_cancellation, payload_converter: @payload_converter, - logger: @logger + logger: @logger, + metric_meter: @metric_meter, + client: @client )) queue.push([defn.proc.call(*args), nil]) rescue Exception => e # rubocop:disable Lint/RescueException Intentionally capturing all exceptions @@ -113,7 +122,9 @@ def initialize( # rubocop:disable Lint/MissingSuper cancellation:, worker_shutdown_cancellation:, payload_converter:, - logger: + logger:, + metric_meter:, + client: ) @info = info @instance = instance @@ -122,12 +133,24 @@ def initialize( # rubocop:disable Lint/MissingSuper @worker_shutdown_cancellation = worker_shutdown_cancellation @payload_converter = payload_converter @logger = logger + @metric_meter = metric_meter + @client = client end # @!visibility private def heartbeat(*details) @on_heartbeat&.call(details) end + + # @!visibility private + def metric_meter + @metric_meter or raise 'No metric meter configured in this test environment' + end + + # @!visibility private + def client + @client or raise 'No client configured in this test environment' + end end private_constant :Context diff --git a/temporalio/lib/temporalio/worker.rb b/temporalio/lib/temporalio/worker.rb index 4a0cb216..846e8093 100644 --- a/temporalio/lib/temporalio/worker.rb +++ b/temporalio/lib/temporalio/worker.rb @@ -55,6 +55,8 @@ class Worker ) # Options as returned from {options} for `**to_h` splat use in {initialize}. See {initialize} for details. + # + # Note, the `client` within can be replaced via client setter. class Options; end # rubocop:disable Lint/EmptyClass # @return [String] Memoized default build ID. This default value is built as a checksum of all of the loaded Ruby @@ -484,6 +486,9 @@ def initialize( # Validate worker @bridge_worker.validate + + # Mutex needed for accessing and replacing a client + @client_mutex = Mutex.new end # @return [String] Task queue set on the worker options. @@ -491,6 +496,25 @@ def task_queue @options.task_queue end + # @return [Client] Client for this worker. This is the same as {Options.client} in {options}, but surrounded by a + # mutex to be safe for client replacement in {client=}. + def client + @client_mutex.synchronize { @options.client } + end + + # Replace the worker's client. When this is called, the client is replaced on the internal worker which means any + # new calls will be made on the new client (but existing calls will still complete on the previous one). This is + # commonly used for providing a new client with updated authentication credentials. + # + # @param new_client [Client] New client to use for new calls. + def client=(new_client) + @client_mutex.synchronize do + @bridge_worker.replace_client(new_client.connection._core_client) + @options = @options.with(client: new_client) + new_client + end + end + # Run this worker until cancellation or optional block completes. When the cancellation or block is complete, the # worker is shut down. This will return the block result if everything successful or raise an error if not. # diff --git a/temporalio/sig/temporalio/activity/context.rbs b/temporalio/sig/temporalio/activity/context.rbs index 930522b4..2aefb2b1 100644 --- a/temporalio/sig/temporalio/activity/context.rbs +++ b/temporalio/sig/temporalio/activity/context.rbs @@ -19,6 +19,7 @@ module Temporalio def _scoped_logger_info: -> Hash[Symbol, Object] def metric_meter: -> Metric::Meter + def client: -> Client end end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/internal/worker/activity_worker.rbs b/temporalio/sig/temporalio/internal/worker/activity_worker.rbs index 651f3caf..16709839 100644 --- a/temporalio/sig/temporalio/internal/worker/activity_worker.rbs +++ b/temporalio/sig/temporalio/internal/worker/activity_worker.rbs @@ -32,6 +32,7 @@ module Temporalio attr_accessor _server_requested_cancel: bool def initialize: ( + worker: Temporalio::Worker, info: Activity::Info, cancellation: Cancellation, worker_shutdown_cancellation: Cancellation, diff --git a/temporalio/sig/temporalio/testing/activity_environment.rbs b/temporalio/sig/temporalio/testing/activity_environment.rbs index 5480c6df..e2379b24 100644 --- a/temporalio/sig/temporalio/testing/activity_environment.rbs +++ b/temporalio/sig/temporalio/testing/activity_environment.rbs @@ -10,7 +10,9 @@ module Temporalio ?worker_shutdown_cancellation: Cancellation, ?payload_converter: Converters::PayloadConverter, ?logger: Logger, - ?activity_executors: Hash[Symbol, Worker::ActivityExecutor] + ?activity_executors: Hash[Symbol, Worker::ActivityExecutor], + ?metric_meter: Metric::Meter?, + ?client: Client? ) -> void def run: ( diff --git a/temporalio/sig/temporalio/worker.rbs b/temporalio/sig/temporalio/worker.rbs index 9da961c7..f5c3e465 100644 --- a/temporalio/sig/temporalio/worker.rbs +++ b/temporalio/sig/temporalio/worker.rbs @@ -60,6 +60,8 @@ module Temporalio workflow_payload_codec_thread_pool: ThreadPool?, debug_mode: bool ) -> void + + def with: (**Object kwargs) -> Options end def self.default_build_id: -> String @@ -110,6 +112,9 @@ module Temporalio def task_queue: -> String + def client: -> Client + def client=: (Client new_client) -> void + def run: [T] ( ?cancellation: Cancellation, ?shutdown_signals: Array[String | Integer], diff --git a/temporalio/test/worker_activity_test.rb b/temporalio/test/worker_activity_test.rb index 8c870066..a95c0e6b 100644 --- a/temporalio/test/worker_activity_test.rb +++ b/temporalio/test/worker_activity_test.rb @@ -901,6 +901,19 @@ def test_context_instance execute_activity(shared_instance, interceptors: [ContextInstanceInterceptor.new]) end + class ClientAccessActivity < Temporalio::Activity::Definition + def execute + desc = Temporalio::Activity::Context.current.client.workflow_handle( + Temporalio::Activity::Context.current.info.workflow_id + ).describe + desc.raw_description.pending_activities.first.activity_type.name + end + end + + def test_client_access + assert_equal 'ClientAccessActivity', execute_activity(ClientAccessActivity) + end + # steep:ignore def execute_activity( activity, diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index 50a5100f..57b71268 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -1769,13 +1769,59 @@ def test_context_instance execute_workflow(ContextInstanceWorkflow, interceptors: [ContextInstanceInterceptor.new]) end + class WorkerClientReplacementWorkflow < Temporalio::Workflow::Definition + def execute + Temporalio::Workflow.wait_condition { @complete } + end + + workflow_signal + def complete(value) + @complete = value + end + end + + def test_worker_client_replacement + # Create a second ephemeral server and start workflow on both servers + Temporalio::Testing::WorkflowEnvironment.start_local do |env2| + # Start both workflows on different servers + task_queue = "tq-#{SecureRandom.uuid}" + handle1 = env.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:) + handle2 = env2.client.start_workflow(WorkerClientReplacementWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:) + + # Run worker on the first env. Make sure cache is off and only 1 max poller + worker = Temporalio::Worker.new( + client: env.client, task_queue:, workflows: [WorkerClientReplacementWorkflow], + max_cached_workflows: 0, max_concurrent_workflow_task_polls: 1 + ) + worker.run do + # Confirm first workflow has a task complete but not the second + assert_eventually do + refute_nil handle1.fetch_history_events.find(&:workflow_task_completed_event_attributes) + end + assert_nil handle2.fetch_history_events.find(&:workflow_task_completed_event_attributes) + + # Replace the client + worker.client = env2.client + + # Signal both which should allow the current poll to wake up and it'll be a task failure when trying to submit + # that to the new client which is ignored. But also the new client will poll for the new workflow, which we will + # wait for it to complete. + handle1.signal(WorkerClientReplacementWorkflow.complete, 'done1') + handle2.signal(WorkerClientReplacementWorkflow.complete, 'done2') + + # Confirm second workflow on new server completes + assert_equal 'done2', handle2.result + handle1.terminate + end + end + end + # TODO(cretz): To test # * Common # * Eager workflow start # * Unawaited futures that have exceptions, need to log warning like Java does # * Enhanced stack trace? # * Separate abstract/interface demonstration - # * Replace worker client # * Reset update randomness seed # * Confirm thread pool does not leak, meaning thread/worker goes away after last workflow # * Test workflow cancel causing other cancels at the same time but in different coroutines