diff --git a/README.md b/README.md index 7c63d118..80f473a2 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,10 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub - [Activity Worker Shutdown](#activity-worker-shutdown) - [Activity Concurrency and Executors](#activity-concurrency-and-executors) - [Activity Testing](#activity-testing) + - [Telemetry](#telemetry) + - [Metrics](#metrics) + - [OpenTelemetry Tracing](#opentelemetry-tracing) + - [OpenTelemetry Tracing in Workflows](#opentelemetry-tracing-in-workflows) - [Ractors](#ractors) - [Platform Support](#platform-support) - [Development](#development) @@ -1034,6 +1038,122 @@ it will raise the error raised in the activity. The constructor of the environment has multiple keyword arguments that can be set to affect the activity context for the activity. +### Telemetry + +#### Metrics + +Metrics can be configured on a `Temporalio::Runtime`. Only one runtime is expected to be created for the entire +application and it should be created before any clients are created. For example, this configures Prometheus to export +metrics at `http://127.0.0.1:9000/metrics`: + +```ruby +require 'temporalio/runtime' + +Temporalio::Runtime.default = Temporalio::Runtime.new( + telemetry: Temporalio::Runtime::TelemetryOptions.new( + metrics: Temporalio::Runtime::MetricsOptions.new( + prometheus: Temporalio::Runtime::PrometheusMetricsOptions.new( + bind_address: '127.0.0.1:9000' + ) + ) + ) +) +``` + +Now every client created will use this runtime. Setting the default will fail if a runtime has already been requested or +a default already set. Technically a runtime can be created without setting the default and be set on each client via +the `runtime` parameter, but this is discouraged because a runtime represents a heavy internal engine not meant to be +created multiple times. + +OpenTelemetry metrics can be configured instead by passing `Temporalio::Runtime::OpenTelemetryMetricsOptions` as the +`opentelemetry` parameter to the metrics options. See API documentation for details. + +#### OpenTelemetry Tracing + +OpenTelemetry tracing for clients, activities, and workflows can be enabled using the +`Temporalio::Contrib::OpenTelemetry::TracingInterceptor`. Specifically, when creating a client, set the interceptor like +so: + +```ruby +require 'opentelemetry/api' +require 'opentelemetry/sdk' +require 'temporalio/client' +require 'temporalio/contrib/open_telemetry' + +# ... assumes my_otel_tracer_provider is a tracer provider created by the user +my_tracer = my_otel_tracer_provider.tracer('my-otel-tracer') + +my_client = Temporalio::Client.connect( + 'localhost:7233', 'my-namespace', + interceptors: [Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(my_tracer)] +) +``` + +Now many high-level client calls and activities/workflows on workers using this client will have spans created on that +OpenTelemetry tracer. + +##### OpenTelemetry Tracing in Workflows + +OpenTelemetry works by creating spans as necessary and in some cases serializing them to Temporal headers to be +deserialized by workflows/activities to be set on the context. However, OpenTelemetry requires spans to be finished +where they start, so spans cannot be resumed. This is fine for client calls and activity attempts, but Temporal +workflows are resumable functions that may start on a different machine than they complete. Due to this, spans created +by workflows are immediately closed since there is no way for the span to actually span machines. They are also not +created during replay. The spans still become the proper parents of other spans if they are created. + +Custom spans can be created inside of workflows using class methods on the +`Temporalio::Contrib::OpenTelemetry::Workflow` module. For example: + +```ruby +class MyWorkflow < Temporalio::Workflow::Definition + def execute + # Sleep for a bit + Temporalio::Workflow.sleep(10) + # Run activity in span + Temporalio::Contrib::OpenTelemetry::Workflow.with_completed_span( + 'my-span', + attributes: { 'my-attr' => 'some val' } + ) do + # Execute an activity + Temporalio::Workflow.execute_activity(MyActivity, start_to_close_timeout: 10) + end + end +end +``` + +If this all executes on one worker (because Temporal has a concept of stickiness that caches instances), the span tree +may look like: + +``` +StartWorkflow:MyWorkflow <-- created by client outbound + RunWorkflow:MyWorkflow <-- created inside workflow on first task + my-span <-- created inside workflow by code + StartActivity:MyActivity <-- created inside workflow when first called + RunActivity:MyActivity <-- created inside activity attempt 1 + CompleteWorkflow:MyWorkflow <-- created inside workflow on last task +``` + +However if, say, the worker crashed during the 10s sleep and the workflow was resumed (i.e. replayed) on another worker, +the span tree may look like: + +``` +StartWorkflow:MyWorkflow <-- created by client outbound + RunWorkflow:MyWorkflow <-- created by workflow inbound on first task + my-span <-- created inside the workflow + StartActivity:MyActivity <-- created by workflow outbound + RunActivity:MyActivity <-- created by activity attempt 1 inbound + CompleteWorkflow:MyWorkflow <-- created by workflow inbound on last task +``` + +Notice how the spans are no longer under `RunWorkflow`. This is because spans inside the workflow are not created on +replay, so there is no parent on replay. But there are no orphans because we still have the overarching parent of +`StartWorkflow` that was created by the client and is serialized into Temporal headers so it can always be the parent. + +And reminder that `StartWorkflow` and `RunActivity` spans do last the length of their calls (so time to start the +workflow and time to run the activity attempt respectively), but the other spans have no measurable time because they +are created in workflows and closed immediately since long-lived spans cannot work for durable software that may resume +on other machines. + ### Ractors It was an original goal to have workflows actually be Ractors for deterministic state isolation and have the library diff --git a/temporalio/Gemfile b/temporalio/Gemfile index 0ceddee2..afae2942 100644 --- a/temporalio/Gemfile +++ b/temporalio/Gemfile @@ -12,6 +12,10 @@ group :development do gem 'grpc', '~> 1.69' gem 'grpc-tools', '~> 1.69' gem 'minitest' + # We are intentionally not pinning OTel versions here so that CI tests the latest. This also means that the OTel + # contrib library also does not require specific versions, we are relying on the compatibility rigor of OTel. + gem 'opentelemetry-api' + gem 'opentelemetry-sdk' gem 'rake' gem 'rake-compiler' gem 'rbs', '~> 3.5.3' diff --git a/temporalio/lib/temporalio/activity/info.rb b/temporalio/lib/temporalio/activity/info.rb index 77f07751..dc7aa799 100644 --- a/temporalio/lib/temporalio/activity/info.rb +++ b/temporalio/lib/temporalio/activity/info.rb @@ -59,6 +59,9 @@ module Activity # @return [String] Workflow run ID that started this activity. # @!attribute workflow_type # @return [String] Workflow type name that started this activity. + # + # @note WARNING: This class may have required parameters added to its constructor. Users should not instantiate this + # class or it may break in incompatible ways. class Info; end # rubocop:disable Lint/EmptyClass end end diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index 696aba55..d584da40 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -242,7 +242,7 @@ def start_workflow( rpc_options: nil ) @impl.start_workflow(Interceptor::StartWorkflowInput.new( - workflow:, + workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), args:, workflow_id: id, task_queue:, @@ -386,7 +386,7 @@ def start_update_with_start_workflow( @impl.start_update_with_start_workflow( Interceptor::StartUpdateWithStartWorkflowInput.new( update_id: id, - update:, + update: Workflow::Definition::Update._name_from_parameter(update), args:, wait_for_stage:, start_workflow_operation:, @@ -449,7 +449,7 @@ def signal_with_start_workflow( ) @impl.signal_with_start_workflow( Interceptor::SignalWithStartWorkflowInput.new( - signal:, + signal: Workflow::Definition::Signal._name_from_parameter(signal), args:, start_workflow_operation:, rpc_options: diff --git a/temporalio/lib/temporalio/client/with_start_workflow_operation.rb b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb index 8d667a10..ef3c1b78 100644 --- a/temporalio/lib/temporalio/client/with_start_workflow_operation.rb +++ b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb @@ -58,7 +58,7 @@ def initialize( headers: {} ) @options = Options.new( - workflow:, + workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), args:, id:, task_queue:, diff --git a/temporalio/lib/temporalio/client/workflow_handle.rb b/temporalio/lib/temporalio/client/workflow_handle.rb index 5b2c2d67..77a183f5 100644 --- a/temporalio/lib/temporalio/client/workflow_handle.rb +++ b/temporalio/lib/temporalio/client/workflow_handle.rb @@ -222,7 +222,7 @@ def signal(signal, *args, rpc_options: nil) @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new( workflow_id: id, run_id:, - signal:, + signal: Workflow::Definition::Signal._name_from_parameter(signal), args:, headers: {}, rpc_options: @@ -254,7 +254,7 @@ def query( @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new( workflow_id: id, run_id:, - query:, + query: Workflow::Definition::Query._name_from_parameter(query), args:, reject_condition:, headers: {}, @@ -291,7 +291,7 @@ def start_update( workflow_id: self.id, run_id:, update_id: id, - update:, + update: Workflow::Definition::Update._name_from_parameter(update), args:, wait_for_stage:, headers: {}, diff --git a/temporalio/lib/temporalio/contrib/open_telemetry.rb b/temporalio/lib/temporalio/contrib/open_telemetry.rb new file mode 100644 index 00000000..5838b5cf --- /dev/null +++ b/temporalio/lib/temporalio/contrib/open_telemetry.rb @@ -0,0 +1,454 @@ +# frozen_string_literal: true + +require 'English' +require 'opentelemetry' # This import will intentionally fail if the user does not have OTel gem available +require 'temporalio/client/interceptor' +require 'temporalio/converters/payload_converter' +require 'temporalio/worker/interceptor' + +module Temporalio + module Contrib + module OpenTelemetry + # Tracing interceptor to add OpenTelemetry traces to clients, activities, and workflows. + class TracingInterceptor + include Client::Interceptor + include Worker::Interceptor::Activity + include Worker::Interceptor::Workflow + + # @return [OpenTelemetry::Trace::Tracer] Tracer in use. + attr_reader :tracer + + # Create interceptor. + # + # @param tracer [OpenTelemetry::Trace::Tracer] Tracer to use. + # @param header_key [String] Temporal header name to serialize spans to/from. Most users should not change this. + # @param propagator [Object] Propagator to use. Most users should not change this. + def initialize( + tracer, + header_key: '_tracer-data', + propagator: ::OpenTelemetry::Context::Propagation::CompositeTextMapPropagator.compose_propagators( + [ + ::OpenTelemetry::Trace::Propagation::TraceContext::TextMapPropagator.new, + ::OpenTelemetry::Baggage::Propagation::TextMapPropagator.new + ] + ) + ) + @tracer = tracer + @header_key = header_key + @propagator = propagator + end + + # @!visibility private + def intercept_client(next_interceptor) + ClientOutbound.new(self, next_interceptor) + end + + # @!visibility private + def intercept_activity(next_interceptor) + ActivityInbound.new(self, next_interceptor) + end + + # @!visibility private + def intercept_workflow(next_interceptor) + WorkflowInbound.new(self, next_interceptor) + end + + # @!visibility private + def _apply_context_to_headers(headers, context: ::OpenTelemetry::Context.current) + carrier = {} + @propagator.inject(carrier, context:) + headers[@header_key] = carrier unless carrier.empty? + end + + # @!visibility private + def _attach_context(headers) + context = _context_from_headers(headers) + ::OpenTelemetry::Context.attach(context) if context + end + + # @!visibility private + def _context_from_headers(headers) + carrier = headers[@header_key] + @propagator.extract(carrier) if carrier.is_a?(Hash) && !carrier.empty? + end + + # @!visibility private + def _with_started_span( + name:, + kind:, + attributes: nil, + outbound_input: nil + ) + tracer.in_span(name, attributes:, kind:) do + _apply_context_to_headers(outbound_input.headers) if outbound_input + yield + end + end + + # @!visibility private + class ClientOutbound < Client::Interceptor::Outbound + def initialize(root, next_interceptor) + super(next_interceptor) + @root = root + end + + # @!visibility private + def start_workflow(input) + @root._with_started_span( + name: "StartWorkflow:#{input.workflow}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.workflow_id }, + outbound_input: input + ) { super } + end + + # @!visibility private + def start_update_with_start_workflow(input) + @root._with_started_span( + name: "UpdateWithStartWorkflow:#{input.update}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.start_workflow_operation.options.id, + 'temporalUpdateID' => input.update_id }, + outbound_input: input + ) do + # Also add to start headers + if input.headers[@header_key] + input.start_workflow_operation.options.headers[@header_key] = input.headers[@header_key] + end + super + end + end + + # @!visibility private + def signal_with_start_workflow(input) + @root._with_started_span( + name: "SignalWithStartWorkflow:#{input.workflow}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.start_workflow_operation.options.id }, + outbound_input: input + ) do + # Also add to start headers + if input.headers[@header_key] + input.start_workflow_operation.options.headers[@header_key] = input.headers[@header_key] + end + super + end + end + + # @!visibility private + def signal_workflow(input) + @root._with_started_span( + name: "SignalWorkflow:#{input.signal}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.workflow_id }, + outbound_input: input + ) { super } + end + + # @!visibility private + def query_workflow(input) + @root._with_started_span( + name: "QueryWorkflow:#{input.query}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.workflow_id }, + outbound_input: input + ) { super } + end + + # @!visibility private + def start_workflow_update(input) + @root._with_started_span( + name: "StartWorkflowUpdate:#{input.update}", + kind: :client, + attributes: { 'temporalWorkflowID' => input.workflow_id, 'temporalUpdateID' => input.update_id }, + outbound_input: input + ) { super } + end + end + + # @!visibility private + class ActivityInbound < Worker::Interceptor::Activity::Inbound + def initialize(root, next_interceptor) + super(next_interceptor) + @root = root + end + + # @!visibility private + def execute(input) + @root._attach_context(input.headers) + info = Activity::Context.current.info + @root._with_started_span( + name: "RunActivity:#{info.activity_type}", + kind: :server, + attributes: { + 'temporalWorkflowID' => info.workflow_id, + 'temporalRunID' => info.workflow_run_id, + 'temporalActivityID' => info.activity_id + } + ) { super } + end + end + + # @!visibility private + class WorkflowInbound < Worker::Interceptor::Workflow::Inbound + def initialize(root, next_interceptor) + super(next_interceptor) + @root = root + end + + # @!visibility private + def init(outbound) + # Set root on storage + Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] = @root + super(WorkflowOutbound.new(@root, outbound)) + end + + # @!visibility private + def execute(input) + @root._attach_context(Temporalio::Workflow.info.headers) + Workflow.with_completed_span("RunWorkflow:#{Temporalio::Workflow.info.workflow_type}", kind: :server) do + super + ensure + Workflow.completed_span( + "CompleteWorkflow:#{Temporalio::Workflow.info.workflow_type}", + kind: :internal, + exception: $ERROR_INFO # steep:ignore + ) + end + end + + # @!visibility private + def handle_signal(input) + @root._attach_context(Temporalio::Workflow.info.headers) + Workflow.with_completed_span( + "HandleSignal:#{input.signal}", + links: _links_from_headers(input.headers), + kind: :server + ) do + super + rescue Exception => e # rubocop:disable Lint/RescueException + Workflow.completed_span("FailHandleSignal:#{input.signal}", kind: :internal, exception: e) + raise + end + end + + # @!visibility private + def handle_query(input) + @root._attach_context(Temporalio::Workflow.info.headers) + Workflow.with_completed_span( + "HandleQuery:#{input.query}", + links: _links_from_headers(input.headers), + kind: :server, + even_during_replay: true + ) do + super + rescue Exception => e # rubocop:disable Lint/RescueException + Workflow.completed_span( + "FailHandleQuery:#{input.query}", + kind: :internal, + exception: e, + even_during_replay: true + ) + raise + end + end + + # @!visibility private + def validate_update(input) + @root._attach_context(Temporalio::Workflow.info.headers) + Workflow.with_completed_span( + "ValidateUpdate:#{input.update}", + attributes: { 'temporalUpdateID' => input.id }, + links: _links_from_headers(input.headers), + kind: :server, + even_during_replay: true + ) do + super + rescue Exception => e # rubocop:disable Lint/RescueException + Workflow.completed_span( + "FailValidateUpdate:#{input.update}", + attributes: { 'temporalUpdateID' => input.id }, + kind: :internal, + exception: e, + even_during_replay: true + ) + raise + end + end + + # @!visibility private + def handle_update(input) + @root._attach_context(Temporalio::Workflow.info.headers) + Workflow.with_completed_span( + "HandleUpdate:#{input.update}", + attributes: { 'temporalUpdateID' => input.id }, + links: _links_from_headers(input.headers), + kind: :server + ) do + super + rescue Exception => e # rubocop:disable Lint/RescueException + Workflow.completed_span( + "FailHandleUpdate:#{input.update}", + attributes: { 'temporalUpdateID' => input.id }, + kind: :internal, + exception: e + ) + raise + end + end + + # @!visibility private + def _links_from_headers(headers) + context = @root._context_from_headers(headers) + span = ::OpenTelemetry::Trace.current_span(context) if context + if span && span != ::OpenTelemetry::Trace::Span::INVALID + [::OpenTelemetry::Trace::Link.new(span.context)] + else + [] + end + end + end + + # @!visibility private + class WorkflowOutbound < Worker::Interceptor::Workflow::Outbound + def initialize(root, next_interceptor) + super(next_interceptor) + @root = root + end + + # @!visibility private + def execute_activity(input) + _apply_span_to_headers(input.headers, + Workflow.completed_span("StartActivity:#{input.activity}", kind: :client)) + super + end + + # @!visibility private + def execute_local_activity(input) + _apply_span_to_headers(input.headers, + Workflow.completed_span("StartActivity:#{input.activity}", kind: :client)) + super + end + + # @!visibility private + def initialize_continue_as_new_error(input) + # Just apply the current context to headers + @root._apply_context_to_headers(input.error.headers) + super + end + + # @!visibility private + def signal_child_workflow(input) + _apply_span_to_headers(input.headers, + Workflow.completed_span("SignalChildWorkflow:#{input.signal}", kind: :client)) + super + end + + # @!visibility private + def signal_external_workflow(input) + _apply_span_to_headers(input.headers, + Workflow.completed_span("SignalExternalWorkflow:#{input.signal}", kind: :client)) + super + end + + # @!visibility private + def start_child_workflow(input) + _apply_span_to_headers(input.headers, + Workflow.completed_span("StartChildWorkflow:#{input.workflow}", kind: :client)) + super + end + + # @!visibility private + def _apply_span_to_headers(headers, span) + @root._apply_context_to_headers(headers, context: ::OpenTelemetry::Trace.context_with_span(span)) if span + end + end + + private_constant :ClientOutbound + private_constant :ActivityInbound + private_constant :WorkflowInbound + private_constant :WorkflowOutbound + end + + # Contains workflow methods that can be used for OpenTelemetry. + module Workflow + # Create a completed span and execute block with the span set on the context. + # + # @param name [String] Span name. + # @param attributes [Hash] Span attributes. These will have workflow and run ID automatically added. + # @param links [Array, nil] Span links. + # @param kind [Symbol, nil] Span kind. + # @param exception [Exception, nil] Exception to record on the span. + # @param even_during_replay [Boolean] Set to true to record this span even during replay. Most users should + # never set this. + # @yield Block to call. It is UNSAFE to expect any parameters in this block. + # @return [Object] Result of the block. + def self.with_completed_span( + name, + attributes: {}, + links: nil, + kind: nil, + exception: nil, + even_during_replay: false + ) + span = completed_span(name, attributes:, links:, kind:, exception:, even_during_replay:) + if span + ::OpenTelemetry::Trace.with_span(span) do # rubocop:disable Style/ExplicitBlockArgument + # Yield with no parameters + yield + end + else + yield + end + end + + # Create a completed span only if not replaying (or `even_during_replay` is true). + # + # @note WARNING: It is UNSAFE to rely on the result of this method as it may be different/absent on replay. + # + # @param name [String] Span name. + # @param attributes [Hash] Span attributes. These will have workflow and run ID automatically added. + # @param links [Array, nil] Span links. + # @param kind [Symbol, nil] Span kind. + # @param exception [Exception, nil] Exception to record on the span. + # @param even_during_replay [Boolean] Set to true to record this span even during replay. Most users should + # never set this. + # @return [OpenTelemetry::Trace::Span, nil] Span if one was created. WARNING: It is UNSAFE to use this value. + def self.completed_span( + name, + attributes: {}, + links: nil, + kind: nil, + exception: nil, + even_during_replay: false + ) + # Get root interceptor, which also checks if in workflow + root = Temporalio::Workflow.storage[:__temporal_opentelemetry_tracing_interceptor] + raise 'Tracing interceptor not configured' unless root + + # Do nothing if replaying and not wanted during replay + return nil if !even_during_replay && Temporalio::Workflow::Unsafe.replaying? + + # Do nothing if there is no span on the context. We do not want orphan spans coming from workflows, so we + # require a parent (i.e. a current). + # TODO(cretz): This matches Python behavior but not .NET behavior (which will create no matter what), is that + # ok? + return nil if ::OpenTelemetry::Trace.current_span == ::OpenTelemetry::Trace::Span::INVALID + + # Create attributes, adding user-defined ones + attributes = { 'temporalWorkflowID' => Temporalio::Workflow.info.workflow_id, + 'temporalRunID' => Temporalio::Workflow.info.run_id }.merge(attributes) + + # Create span + time = Temporalio::Workflow.now + timestamp = (time.to_i * 1_000_000_000) + time.nsec + span = root.tracer.start_span(name, attributes:, links:, start_timestamp: timestamp, kind:) # steep:ignore + # Record exception if present + span.record_exception(exception) if exception + # Finish the span (returns self) + span.finish(end_timestamp: timestamp) + end + end + end + end +end diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index 06242a53..d3b35624 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -47,13 +47,10 @@ def initialize(client) end def start_workflow(input) - # TODO(cretz): Signal/update with start req = Api::WorkflowService::V1::StartWorkflowExecutionRequest.new( request_id: SecureRandom.uuid, namespace: @client.namespace, - workflow_type: Api::Common::V1::WorkflowType.new( - name: Workflow::Definition._workflow_type_from_workflow_parameter(input.workflow) - ), + workflow_type: Api::Common::V1::WorkflowType.new(name: input.workflow), workflow_id: input.workflow_id, task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s), input: @client.data_converter.to_payloads(input.args), @@ -139,7 +136,7 @@ def start_update_with_start_workflow(input) identity: @client.connection.identity ), input: Api::Update::V1::Input.new( - name: Workflow::Definition::Update._name_from_parameter(input.update), + name: input.update, args: @client.data_converter.to_payloads(input.args), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ) @@ -261,7 +258,7 @@ def signal_with_start_workflow(input) req = _start_workflow_request_from_with_start_options( Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest, start_options ) - req.signal_name = Workflow::Definition::Signal._name_from_parameter(input.signal) + req.signal_name = input.signal req.signal_input = @client.data_converter.to_payloads(input.args) # Send request @@ -305,9 +302,7 @@ def _start_workflow_request_from_with_start_options(klass, start_options) klass.new( request_id: SecureRandom.uuid, namespace: @client.namespace, - workflow_type: Api::Common::V1::WorkflowType.new( - name: Workflow::Definition._workflow_type_from_workflow_parameter(start_options.workflow) - ), + workflow_type: Api::Common::V1::WorkflowType.new(name: start_options.workflow), workflow_id: start_options.id, task_queue: Api::TaskQueue::V1::TaskQueue.new(name: start_options.task_queue.to_s), input: @client.data_converter.to_payloads(start_options.args), @@ -416,7 +411,7 @@ def signal_workflow(input) workflow_id: input.workflow_id, run_id: input.run_id || '' ), - signal_name: Workflow::Definition::Signal._name_from_parameter(input.signal), + signal_name: input.signal, input: @client.data_converter.to_payloads(input.args), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter), identity: @client.connection.identity, @@ -437,7 +432,7 @@ def query_workflow(input) run_id: input.run_id || '' ), query: Api::Query::V1::WorkflowQuery.new( - query_type: Workflow::Definition::Query._name_from_parameter(input.query), + query_type: input.query, query_args: @client.data_converter.to_payloads(input.args), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ), @@ -480,7 +475,7 @@ def start_workflow_update(input) identity: @client.connection.identity ), input: Api::Update::V1::Input.new( - name: Workflow::Definition::Update._name_from_parameter(input.update), + name: input.update, args: @client.data_converter.to_payloads(input.args), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ) diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index 24b07070..07b9ab98 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -117,6 +117,7 @@ def initialize(details) continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), + headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {}, last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, @@ -539,7 +540,7 @@ def run_workflow result = @inbound.execute( Temporalio::Worker::Interceptor::Workflow::ExecuteInput.new( args: @workflow_arguments, - headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {} + headers: @info.headers ) ) add_command( diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb index b66b6827..84a360d2 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb @@ -73,6 +73,16 @@ def execute_activity( activity_id:, disable_eager_execution: ) + activity = case activity + when Class + Activity::Definition::Info.from_activity(activity).name&.to_s + when Symbol, String + activity.to_s + else + raise ArgumentError, 'Activity must be a definition class, or a symbol/string' + end + raise 'Cannot invoke dynamic activities' unless activity + @outbound.execute_activity( Temporalio::Worker::Interceptor::Workflow::ExecuteActivityInput.new( activity:, @@ -105,6 +115,16 @@ def execute_local_activity( cancellation_type:, activity_id: ) + activity = case activity + when Class + Activity::Definition::Info.from_activity(activity).name&.to_s + when Symbol, String + activity.to_s + else + raise ArgumentError, 'Activity must be a definition class, or a symbol/string' + end + raise 'Cannot invoke dynamic activities' unless activity + @outbound.execute_local_activity( Temporalio::Worker::Interceptor::Workflow::ExecuteLocalActivityInput.new( activity:, @@ -219,7 +239,7 @@ def start_child_workflow( ) @outbound.start_child_workflow( Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new( - workflow:, + workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), args:, id:, task_queue:, @@ -241,6 +261,10 @@ def start_child_workflow( ) end + def storage + @storage ||= {} + end + def timeout(duration, exception_class, *exception_args, summary:, &) raise 'Block required for timeout' unless block_given? @@ -322,7 +346,7 @@ def _signal_child_workflow(id:, signal:, args:, cancellation:) @outbound.signal_child_workflow( Temporalio::Worker::Interceptor::Workflow::SignalChildWorkflowInput.new( id:, - signal:, + signal: Workflow::Definition::Signal._name_from_parameter(signal), args:, cancellation:, headers: {} @@ -335,7 +359,7 @@ def _signal_external_workflow(id:, run_id:, signal:, args:, cancellation:) Temporalio::Worker::Interceptor::Workflow::SignalExternalWorkflowInput.new( id:, run_id:, - signal:, + signal: Workflow::Definition::Signal._name_from_parameter(signal), args:, cancellation:, headers: {} diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index 5825b602..54a7748d 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -56,16 +56,6 @@ def execute_activity(input) raise ArgumentError, 'Activity must have schedule_to_close_timeout or start_to_close_timeout' end - activity_type = case input.activity - when Class - Activity::Definition::Info.from_activity(input.activity).name - when Symbol, String - input.activity.to_s - else - raise ArgumentError, 'Activity must be a definition class, or a symbol/string' - end - raise 'Cannot invoke dynamic activities' unless activity_type - execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation) do seq = (@activity_counter += 1) @instance.add_command( @@ -73,7 +63,7 @@ def execute_activity(input) schedule_activity: Bridge::Api::WorkflowCommands::ScheduleActivity.new( seq:, activity_id: input.activity_id || seq.to_s, - activity_type:, + activity_type: input.activity, task_queue: input.task_queue, headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter), arguments: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), @@ -97,16 +87,6 @@ def execute_local_activity(input) raise ArgumentError, 'Activity must have schedule_to_close_timeout or start_to_close_timeout' end - activity_type = case input.activity - when Class - Activity::Definition::Info.from_activity(input.activity).name - when Symbol, String - input.activity.to_s - else - raise ArgumentError, 'Activity must be a definition class, or a symbol/string' - end - raise 'Cannot invoke dynamic activities' unless activity_type - execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff| seq = (@activity_counter += 1) @instance.add_command( @@ -114,7 +94,7 @@ def execute_local_activity(input) schedule_local_activity: Bridge::Api::WorkflowCommands::ScheduleLocalActivity.new( seq:, activity_id: input.activity_id || seq.to_s, - activity_type:, + activity_type: input.activity, headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter), arguments: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), @@ -233,7 +213,7 @@ def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation seq = (@external_signal_counter += 1) cmd = Bridge::Api::WorkflowCommands::SignalExternalWorkflowExecution.new( seq:, - signal_name: Workflow::Definition::Signal._name_from_parameter(signal), + signal_name: signal, args: ProtoUtils.convert_to_payload_array(@instance.payload_converter, args), headers: ProtoUtils.headers_to_proto_hash(headers, @instance.payload_converter) ) @@ -342,7 +322,7 @@ def start_child_workflow(input) seq:, namespace: @instance.info.namespace, workflow_id: input.id, - workflow_type: Workflow::Definition._workflow_type_from_workflow_parameter(input.workflow), + workflow_type: input.workflow, task_queue: input.task_queue, input: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), workflow_execution_timeout: ProtoUtils.seconds_to_duration(input.execution_timeout), diff --git a/temporalio/lib/temporalio/testing/activity_environment.rb b/temporalio/lib/temporalio/testing/activity_environment.rb index b7d302a4..fe6f5198 100644 --- a/temporalio/lib/temporalio/testing/activity_environment.rb +++ b/temporalio/lib/temporalio/testing/activity_environment.rb @@ -13,8 +13,8 @@ module Testing # cancellation can be set, users create this for each activity that is run. There is no real performance penalty for # creating an environment for every run. class ActivityEnvironment - # @return [Activity::Info] The activity info used by default. This is frozen, but can be dup'd and mutated to pass - # in to {initialize}. + # @return [Activity::Info] The activity info used by default. This is frozen, but `with` can be used to make a new + # instance with changes to pass in to {initialize}. def self.default_info @default_info ||= Activity::Info.new( activity_id: 'test', @@ -34,12 +34,13 @@ def self.default_info workflow_namespace: 'default', workflow_run_id: 'test-run', workflow_type: 'test' - ).freeze + ) end # Create a test environment for activities. # - # @param info [Activity::Info] Value for {Activity::Context#info}. + # @param info [Activity::Info] Value for {Activity::Context#info}. Users should not try to instantiate this + # themselves, but rather use `with` on {default_info}. # @param on_heartbeat [Proc(Array), nil] Proc that is called with all heartbeat details when # {Activity::Context#heartbeat} is called. # @param cancellation [Cancellation] Value for {Activity::Context#cancellation}. diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index 8a20bcce..d3f61014 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -391,6 +391,12 @@ def self.start_child_workflow( ) end + # @return [Hash] General in-workflow storage. Most users will store state on the workflow class + # instance instead, this is only for utilities without access to the class instance. + def self.storage + _current.storage + end + # Run the block until the timeout is reached. This is backed by {sleep}. This does not accept cancellation because # it is expected the block within will properly handle/bubble cancellation. # diff --git a/temporalio/lib/temporalio/workflow/info.rb b/temporalio/lib/temporalio/workflow/info.rb index cb5ba1d4..46348bcf 100644 --- a/temporalio/lib/temporalio/workflow/info.rb +++ b/temporalio/lib/temporalio/workflow/info.rb @@ -7,6 +7,7 @@ module Workflow :continued_run_id, :cron_schedule, :execution_timeout, + :headers, :last_failure, :last_result, :namespace, @@ -32,6 +33,8 @@ module Workflow # @return [String, nil] Cron schedule if applicable. # @!attribute execution_timeout # @return [Float, nil] Execution timeout for the workflow. + # @!attribute headers + # @return [Hash] Headers. # @!attribute last_failure # @return [Exception, nil] Failure if this workflow run is a continuation of a failure. # @!attribute last_result diff --git a/temporalio/sig/temporalio/client/interceptor.rbs b/temporalio/sig/temporalio/client/interceptor.rbs index 6e1d4ebd..3d3c6ff1 100644 --- a/temporalio/sig/temporalio/client/interceptor.rbs +++ b/temporalio/sig/temporalio/client/interceptor.rbs @@ -4,7 +4,7 @@ module Temporalio def intercept_client: (Outbound next_interceptor) -> Outbound class StartWorkflowInput - attr_reader workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String + attr_reader workflow: String attr_reader args: Array[Object?] attr_reader workflow_id: String attr_reader task_queue: String @@ -25,7 +25,7 @@ module Temporalio attr_reader rpc_options: RPCOptions? def initialize: ( - workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String, + workflow: String, args: Array[Object?], workflow_id: String, task_queue: String, @@ -49,7 +49,7 @@ module Temporalio class StartUpdateWithStartWorkflowInput attr_reader update_id: String - attr_reader update: Workflow::Definition::Update | Symbol | String + attr_reader update: String attr_reader args: Array[Object?] attr_reader wait_for_stage: WorkflowUpdateWaitStage::enum attr_reader start_workflow_operation: WithStartWorkflowOperation @@ -58,7 +58,7 @@ module Temporalio def initialize: ( update_id: String, - update: Workflow::Definition::Update | Symbol | String, + update: String, args: Array[Object?], wait_for_stage: WorkflowUpdateWaitStage::enum, start_workflow_operation: WithStartWorkflowOperation, @@ -68,13 +68,13 @@ module Temporalio end class SignalWithStartWorkflowInput - attr_reader signal: Workflow::Definition::Signal | Symbol | String + attr_reader signal: String attr_reader args: Array[Object?] attr_reader start_workflow_operation: WithStartWorkflowOperation attr_reader rpc_options: RPCOptions? def initialize: ( - signal: Workflow::Definition::Signal | Symbol | String, + signal: String, args: Array[Object?], start_workflow_operation: WithStartWorkflowOperation, rpc_options: RPCOptions? @@ -134,7 +134,7 @@ module Temporalio class SignalWorkflowInput attr_reader workflow_id: String attr_reader run_id: String? - attr_reader signal: Workflow::Definition::Signal | Symbol | String + attr_reader signal: String attr_reader args: Array[Object?] attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? @@ -142,7 +142,7 @@ module Temporalio def initialize: ( workflow_id: String, run_id: String?, - signal: Workflow::Definition::Signal | Symbol | String, + signal: String, args: Array[Object?], headers: Hash[String, Object?], rpc_options: RPCOptions? @@ -152,7 +152,7 @@ module Temporalio class QueryWorkflowInput attr_reader workflow_id: String attr_reader run_id: String? - attr_reader query: Workflow::Definition::Query | Symbol | String + attr_reader query: String attr_reader args: Array[Object?] attr_reader reject_condition: WorkflowQueryRejectCondition::enum? attr_reader headers: Hash[String, Object?] @@ -161,7 +161,7 @@ module Temporalio def initialize: ( workflow_id: String, run_id: String?, - query: Workflow::Definition::Query | Symbol | String, + query: String, args: Array[Object?], reject_condition: WorkflowQueryRejectCondition::enum?, headers: Hash[String, Object?], @@ -173,7 +173,7 @@ module Temporalio attr_reader workflow_id: String attr_reader run_id: String? attr_reader update_id: String - attr_reader update: Workflow::Definition::Update | Symbol | String + attr_reader update: String attr_reader args: Array[Object?] attr_reader wait_for_stage: WorkflowUpdateWaitStage::enum attr_reader headers: Hash[String, Object?] @@ -183,7 +183,7 @@ module Temporalio workflow_id: String, run_id: String?, update_id: String, - update: Workflow::Definition::Update | Symbol | String, + update: String, args: Array[Object?], wait_for_stage: WorkflowUpdateWaitStage::enum, headers: Hash[String, Object?], diff --git a/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs index 36b97c17..6f21e864 100644 --- a/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs +++ b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs @@ -2,7 +2,7 @@ module Temporalio class Client class WithStartWorkflowOperation class Options - attr_reader workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String + attr_reader workflow: String attr_reader args: Array[Object?] attr_reader id: String attr_reader task_queue: String @@ -21,7 +21,7 @@ module Temporalio attr_reader headers: Hash[String, Object?] def initialize: ( - workflow: singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String, + workflow: String, args: Array[Object?], id: String, task_queue: String, diff --git a/temporalio/sig/temporalio/contrib/open_telemetry.rbs b/temporalio/sig/temporalio/contrib/open_telemetry.rbs new file mode 100644 index 00000000..a0c60416 --- /dev/null +++ b/temporalio/sig/temporalio/contrib/open_telemetry.rbs @@ -0,0 +1,61 @@ +module Temporalio + module Contrib + module OpenTelemetry + class TracingInterceptor + include Client::Interceptor + include Worker::Interceptor::Activity + include Worker::Interceptor::Workflow + + attr_reader tracer: untyped + + def initialize: ( + untyped tracer, + ?header_key: String, + ?propagator: untyped + ) -> void + + def _apply_context_to_headers: (Hash[String, untyped] headers, ?context: untyped) -> void + def _attach_context: (Hash[String, untyped] headers) -> void + def _context_from_headers: (Hash[String, untyped] headers) -> untyped + def _with_started_span: [T] ( + name: String, + kind: Symbol, + ?attributes: Hash[untyped, untyped]?, + ?outbound_input: untyped + ) { () -> T } -> T + + class WorkflowInbound < Worker::Interceptor::Workflow::Inbound + def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Inbound next_interceptor) -> void + + def _links_from_headers: (Hash[String, untyped] headers) -> Array[untyped] + end + + class WorkflowOutbound < Worker::Interceptor::Workflow::Outbound + def initialize: (TracingInterceptor root, Worker::Interceptor::Workflow::Outbound next_interceptor) -> void + + def _apply_span_to_headers: (Hash[String, untyped] headers, untyped span) -> void + end + end + + module Workflow + def self.with_completed_span: [T] ( + String name, + ?attributes: Hash[untyped, untyped], + ?links: Array[untyped]?, + ?kind: Symbol?, + ?exception: Exception?, + ?even_during_replay: bool + ) { -> T } -> T + + def self.completed_span: ( + String name, + ?attributes: Hash[untyped, untyped], + ?links: Array[untyped]?, + ?kind: Symbol?, + ?exception: Exception?, + ?even_during_replay: bool + ) -> untyped + end + end + end +end \ No newline at end of file diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs index 3307df42..bfb71144 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs @@ -105,6 +105,8 @@ module Temporalio search_attributes: SearchAttributes? ) -> ChildWorkflowHandle + def storage: -> Hash[Object, Object] + def timeout: [T] ( duration? duration, singleton(Exception) exception_class, diff --git a/temporalio/sig/temporalio/worker/interceptor.rbs b/temporalio/sig/temporalio/worker/interceptor.rbs index 731fa8f1..072fb850 100644 --- a/temporalio/sig/temporalio/worker/interceptor.rbs +++ b/temporalio/sig/temporalio/worker/interceptor.rbs @@ -129,7 +129,7 @@ module Temporalio end class ExecuteActivityInput - attr_reader activity: singleton(Temporalio::Activity::Definition) | Symbol | String + attr_reader activity: String attr_reader args: Array[Object?] attr_reader task_queue: String attr_reader summary: String? @@ -145,7 +145,7 @@ module Temporalio attr_reader headers: Hash[String, Object?] def initialize: ( - activity: singleton(Temporalio::Activity::Definition) | Symbol | String, + activity: String, args: Array[Object?], task_queue: String, summary: String?, @@ -163,7 +163,7 @@ module Temporalio end class ExecuteLocalActivityInput - attr_reader activity: singleton(Temporalio::Activity::Definition) | Symbol | String + attr_reader activity: String attr_reader args: Array[Object?] attr_reader schedule_to_close_timeout: duration? attr_reader schedule_to_start_timeout: duration? @@ -176,7 +176,7 @@ module Temporalio attr_reader headers: Hash[String, Object?] def initialize: ( - activity: singleton(Temporalio::Activity::Definition) | Symbol | String, + activity: String, args: Array[Object?], schedule_to_close_timeout: duration?, schedule_to_start_timeout: duration?, @@ -200,14 +200,14 @@ module Temporalio class SignalChildWorkflowInput attr_reader id: String - attr_reader signal: Temporalio::Workflow::Definition::Signal | Symbol | String + attr_reader signal: String attr_reader args: Array[Object?] attr_reader cancellation: Cancellation attr_reader headers: Hash[String, Object?] def initialize: ( id: String, - signal: Temporalio::Workflow::Definition::Signal | Symbol | String, + signal: String, args: Array[Object?], cancellation: Cancellation, headers: Hash[String, Object?] @@ -217,7 +217,7 @@ module Temporalio class SignalExternalWorkflowInput attr_reader id: String attr_reader run_id: String? - attr_reader signal: Temporalio::Workflow::Definition::Signal | Symbol | String + attr_reader signal: String attr_reader args: Array[Object?] attr_reader cancellation: Cancellation attr_reader headers: Hash[String, Object?] @@ -225,7 +225,7 @@ module Temporalio def initialize: ( id: String, run_id: String?, - signal: Temporalio::Workflow::Definition::Signal | Symbol | String, + signal: String, args: Array[Object?], cancellation: Cancellation, headers: Hash[String, Object?] @@ -245,7 +245,7 @@ module Temporalio end class StartChildWorkflowInput - attr_reader workflow: singleton(Temporalio::Workflow::Definition) | Temporalio::Workflow::Definition::Info | Symbol | String + attr_reader workflow: String attr_reader args: Array[Object?] attr_reader id: String attr_reader task_queue: String @@ -265,7 +265,7 @@ module Temporalio attr_reader headers: Hash[String, Object?] def initialize: ( - workflow: singleton(Temporalio::Workflow::Definition) | Temporalio::Workflow::Definition::Info | Symbol | String, + workflow: String, args: Array[Object?], id: String, task_queue: String, diff --git a/temporalio/sig/temporalio/workflow.rbs b/temporalio/sig/temporalio/workflow.rbs index cbc75e24..a05093b4 100644 --- a/temporalio/sig/temporalio/workflow.rbs +++ b/temporalio/sig/temporalio/workflow.rbs @@ -114,6 +114,8 @@ module Temporalio ?search_attributes: SearchAttributes? ) -> ChildWorkflowHandle + def self.storage: -> Hash[Object, Object] + def self.timeout: [T] ( duration? duration, ?singleton(Exception) exception_class, diff --git a/temporalio/sig/temporalio/workflow/info.rbs b/temporalio/sig/temporalio/workflow/info.rbs index 0bd8af92..713bd4a3 100644 --- a/temporalio/sig/temporalio/workflow/info.rbs +++ b/temporalio/sig/temporalio/workflow/info.rbs @@ -5,6 +5,7 @@ module Temporalio attr_reader continued_run_id: String? attr_reader cron_schedule: String? attr_reader execution_timeout: Float? + attr_reader headers: Hash[String, untyped] attr_reader last_failure: Exception? attr_reader last_result: Object? attr_reader namespace: String @@ -23,6 +24,7 @@ module Temporalio continued_run_id: String?, cron_schedule: String?, execution_timeout: Float?, + headers: Hash[String, untyped], last_failure: Exception?, last_result: Object?, namespace: String, diff --git a/temporalio/test/contrib/open_telemetry_test.rb b/temporalio/test/contrib/open_telemetry_test.rb new file mode 100644 index 00000000..0c64e83c --- /dev/null +++ b/temporalio/test/contrib/open_telemetry_test.rb @@ -0,0 +1,519 @@ +# frozen_string_literal: true + +require 'opentelemetry/sdk' +require 'set' +require 'temporalio/contrib/open_telemetry' +require 'test' + +module Contrib + class OpenTelemetryTest < Test + class TestActivity < Temporalio::Activity::Definition + def initialize(tracer) + @tracer = tracer + end + + def execute(scenario) + case scenario.to_sym + when :fail_first_attempt + raise 'Intentional activity failure' if Temporalio::Activity::Context.current.info.attempt == 1 + + @tracer.in_span('custom-activity-span') { 'activity-done' } + else + raise NotImplementedError + end + end + end + + class TestWorkflow < Temporalio::Workflow::Definition + def execute(scenario) + case scenario.to_sym + when :complete + 'workflow-done' + when :fail + raise Temporalio::Error::ApplicationError, 'Intentional workflow failure' + when :fail_task + raise 'Intentional workflow task failure' + when :wait_on_signal + Temporalio::Workflow.wait_condition { @finish } + 'workflow-done' + when :continue_as_new + Temporalio::Contrib::OpenTelemetry::Workflow.with_completed_span('custom-can-span') do + raise Temporalio::Workflow::ContinueAsNewError, :complete + end + else + raise NotImplementedError + end + end + + workflow_signal + def signal(scenario) + case scenario.to_sym + when :complete + # Do nothng + when :fail + raise Temporalio::Error::ApplicationError, 'Intentional signal failure' + when :fail_task + raise 'Intentional signal task failure' + when :mark_finished + @finish = true + else + raise NotImplementedError + end + end + + workflow_query + def query(scenario) + case scenario.to_sym + when :complete + 'query-done' + when :fail + raise 'Intentional query failure' + else + raise NotImplementedError + end + end + + workflow_update + def update(scenario) + case scenario.to_sym + when :complete + 'update-done' + when :fail + raise Temporalio::Error::ApplicationError, 'Intentional update failure' + when :fail_task + raise 'Intentional update task failure' + when :call_activity + # Do it in a custom span + Temporalio::Contrib::OpenTelemetry::Workflow.with_completed_span( + 'custom-workflow-span', attributes: { 'foo' => 'bar' } + ) do + Temporalio::Workflow.execute_activity(TestActivity, :fail_first_attempt, start_to_close_timeout: 30) + end + when :call_local_activity + # Do it in a custom span + Temporalio::Contrib::OpenTelemetry::Workflow.with_completed_span( + 'custom-workflow-span', attributes: { 'baz' => 'qux' } + ) do + Temporalio::Workflow.execute_local_activity(TestActivity, :fail_first_attempt, start_to_close_timeout: 30) + end + when :child_workflow + # Start a child, send child signal and external signal, finish + handle = Temporalio::Workflow.start_child_workflow(TestWorkflow, :wait_on_signal) + handle.signal(TestWorkflow.signal, :complete) + Temporalio::Workflow.external_workflow_handle(handle.id).signal(TestWorkflow.signal, :mark_finished) + [handle.id, handle.first_execution_run_id, handle.result] + else + raise NotImplementedError + end + end + + workflow_update_validator(:update_with_validator) + def validate_update_with_validator(scenario) + case scenario.to_sym + when :complete + # Do nothing + when :fail + raise Temporalio::Error::ApplicationError, 'Intentional update validator failure' + else + raise NotImplementedError + end + end + + workflow_update + def update_with_validator(scenario) + case scenario.to_sym + when :complete + 'update-with-validator-done' + else + raise NotImplementedError + end + end + end + + def init_tracer_and_exporter + exporter = OpenTelemetry::SDK::Trace::Export::InMemorySpanExporter.new + tracer_provider = OpenTelemetry::SDK::Trace::TracerProvider.new + tracer_provider.add_span_processor(OpenTelemetry::SDK::Trace::Export::SimpleSpanProcessor.new(exporter)) + tracer = tracer_provider.tracer('test-tracer') + [tracer, exporter] + end + + def trace(tracer_and_exporter: init_tracer_and_exporter, &) + tracer, exporter = tracer_and_exporter + + # Make client with interceptors + interceptor = Temporalio::Contrib::OpenTelemetry::TracingInterceptor.new(tracer) + new_options = env.client.options.with(interceptors: [interceptor]) + client = Temporalio::Client.new(**new_options.to_h) # steep:ignore + + # Run all in one outer span + tracer_and_exporter.first.in_span('root') do + yield client + end + + # Convert spans, confirm there is only the outer, and return children + spans = ExpectedSpan.from_span_data(exporter.finished_spans) + assert_equal 1, spans.size + assert_equal 'root', spans.first&.name + spans.first + end + + def trace_workflow(scenario, tracer_and_exporter: init_tracer_and_exporter, &) + trace(tracer_and_exporter:) do |client| + # Must capture and attach outer context + outer_context = OpenTelemetry::Context.current + attach_token = nil + execute_workflow( + TestWorkflow, + scenario, + client:, + activities: [TestActivity.new(tracer_and_exporter.first)], + # Have to reattach outer context inside worker run to check outer span + on_worker_run: proc { attach_token = OpenTelemetry::Context.attach(outer_context) } + ) do |handle| + yield handle + ensure + OpenTelemetry::Context.detach(attach_token) + end + end + end + + def test_simple_successes + exp_root = ExpectedSpan.new(name: 'root') + history = nil + act_root = trace_workflow(:wait_on_signal) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_run_wf = exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Basic query + exp_query = exp_root.add_child(name: 'QueryWorkflow:query', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'HandleQuery:query', attributes: exp_run_attrs, links: [exp_query]) + assert_equal 'query-done', handle.query(TestWorkflow.query, :complete) + + # Basic update + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) + exp_start_wf.add_child(name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + links: [exp_update]) + assert_equal 'update-done', handle.execute_update(TestWorkflow.update, :complete, id: 'my-update-id') + + # Basic update with validator + exp_update2 = exp_root.add_child(name: 'StartWorkflowUpdate:update_with_validator', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' })) + exp_start_wf.add_child(name: 'ValidateUpdate:update_with_validator', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' }), + links: [exp_update2]) + exp_start_wf.add_child(name: 'HandleUpdate:update_with_validator', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' }), + links: [exp_update2]) + assert_equal 'update-with-validator-done', + handle.execute_update(TestWorkflow.update_with_validator, :complete, id: 'my-update-id2') + + # Basic signal + exp_signal = exp_root.add_child(name: 'SignalWorkflow:signal', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'HandleSignal:signal', attributes: exp_run_attrs, links: [exp_signal]) + handle.signal(TestWorkflow.signal, :mark_finished) + + # Workflow complete + exp_run_wf.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_run_attrs) + assert_equal 'workflow-done', handle.result + history = handle.fetch_history + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + + # Run in replayer with a tracer and confirm nothing comes up because it's all replay + act_replay_root = trace do |client| + replayer = Temporalio::Worker::WorkflowReplayer.new( + workflows: [TestWorkflow], + interceptors: client.options.interceptors # steep:ignore + ) + replayer.replay_workflow(history || raise) + end + assert_equal ExpectedSpan.new(name: 'root').to_s_indented, act_replay_root.to_s_indented + end + + def test_handler_failures + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace_workflow(:wait_on_signal) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Basic query + exp_query = exp_root.add_child(name: 'QueryWorkflow:query', attributes: exp_cl_attrs, + exception_message: 'Intentional query failure') + exp_start_wf.add_child(name: 'HandleQuery:query', attributes: exp_run_attrs, links: [exp_query]) + .add_child(name: 'FailHandleQuery:query', attributes: exp_run_attrs, + exception_message: 'Intentional query failure') + err = assert_raises(Temporalio::Error::WorkflowQueryFailedError) do + handle.query(TestWorkflow.query, :fail) + end + assert_equal 'Intentional query failure', err.message + + # Basic update + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) + exp_start_wf.add_child(name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + links: [exp_update]) + .add_child(name: 'FailHandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + exception_message: 'Intentional update failure') + err = assert_raises(Temporalio::Error::WorkflowUpdateFailedError) do + handle.execute_update(TestWorkflow.update, :fail, id: 'my-update-id') + end + assert_equal 'Intentional update failure', err.cause.message + + # Basic update with validator + exp_update2 = exp_root.add_child(name: 'StartWorkflowUpdate:update_with_validator', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' })) + exp_start_wf.add_child(name: 'ValidateUpdate:update_with_validator', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' }), + links: [exp_update2]) + .add_child(name: 'FailValidateUpdate:update_with_validator', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' }), + exception_message: 'Intentional update validator failure') + err = assert_raises(Temporalio::Error::WorkflowUpdateFailedError) do + handle.execute_update(TestWorkflow.update_with_validator, :fail, id: 'my-update-id2') + end + assert_equal 'Intentional update validator failure', err.cause.message + + # Basic signal, where failure fails the workflow + exp_signal = exp_root.add_child(name: 'SignalWorkflow:signal', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'HandleSignal:signal', attributes: exp_run_attrs, links: [exp_signal]) + .add_child(name: 'FailHandleSignal:signal', attributes: exp_run_attrs, + exception_message: 'Intentional signal failure') + handle.signal(TestWorkflow.signal, :fail) + + # Workflow complete + err = assert_raises(Temporalio::Error::WorkflowFailedError) do + handle.result + end + assert_equal 'Intentional signal failure', err.cause.message + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + end + + def test_activity + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace_workflow(:wait_on_signal) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Wait for task completion so update isn't accidentally first before run + assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } + + # Update calls activity inside custom span and that activity fails on first attempt + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) + exp_start_act = exp_start_wf.add_child( + name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + links: [exp_update] + ) + .add_child(name: 'custom-workflow-span', + attributes: exp_run_attrs.merge({ 'foo' => 'bar' })) + .add_child(name: 'StartActivity:TestActivity', attributes: exp_run_attrs) + # Two run activity calls because first one fails + exp_start_act.add_child( + name: 'RunActivity:TestActivity', + attributes: exp_run_attrs.merge({ 'temporalActivityID' => '1' }), + exception_message: 'Intentional activity failure' + ) + exp_start_act.add_child( + name: 'RunActivity:TestActivity', + attributes: exp_run_attrs.merge({ 'temporalActivityID' => '1' }) + ).add_child(name: 'custom-activity-span') + assert_equal 'activity-done', handle.execute_update(TestWorkflow.update, :call_activity, id: 'my-update-id') + + # Local activity too + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' })) + exp_start_act = exp_start_wf.add_child( + name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id2' }), + links: [exp_update] + ) + .add_child(name: 'custom-workflow-span', + attributes: exp_run_attrs.merge({ 'baz' => 'qux' })) + .add_child(name: 'StartActivity:TestActivity', attributes: exp_run_attrs) + # Two run activity calls because first one fails + exp_start_act.add_child( + name: 'RunActivity:TestActivity', + attributes: exp_run_attrs.merge({ 'temporalActivityID' => '2' }), + exception_message: 'Intentional activity failure' + ) + exp_start_act.add_child( + name: 'RunActivity:TestActivity', + attributes: exp_run_attrs.merge({ 'temporalActivityID' => '2' }) + ).add_child(name: 'custom-activity-span') + assert_equal 'activity-done', + handle.execute_update(TestWorkflow.update, :call_local_activity, id: 'my-update-id2') + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + end + + def test_client_fail + # Workflow start fail + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace do |client| + # Will fail with unknown search attribute + err = assert_raises(Temporalio::Error::RPCError) do + client.start_workflow( + 'does-not-exist', + id: 'does-not-exist', task_queue: 'does-not-exist', + search_attributes: Temporalio::SearchAttributes.new( + { + Temporalio::SearchAttributes::Key.new( + 'does-not-exist', Temporalio::SearchAttributes::IndexedValueType::TEXT + ) => 'does-not-exist' + } + ) + ) + end + exp_root.add_child(name: 'StartWorkflow:does-not-exist', + attributes: { 'temporalWorkflowID' => 'does-not-exist' }, + exception_message: err.message) + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + + # Workflow update fail + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace do |client| + err = assert_raises(Temporalio::Error::RPCError) do + client.workflow_handle('does-not-exist').execute_update('does-not-exist', id: 'my-update-id') + end + exp_root.add_child(name: 'StartWorkflowUpdate:does-not-exist', + attributes: { 'temporalWorkflowID' => 'does-not-exist', + 'temporalUpdateID' => 'my-update-id' }, + exception_message: err.message) + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + end + + def test_child_and_external + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace_workflow(:wait_on_signal) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Wait for task completion so update isn't accidentally first before run + assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } + + # Update calls child and sends signals to it in two ways + child_id, child_run_id, child_result = handle.execute_update(TestWorkflow.update, + :child_workflow, id: 'my-update-id') + exp_update = exp_root.add_child(name: 'StartWorkflowUpdate:update', + attributes: exp_cl_attrs.merge({ 'temporalUpdateID' => 'my-update-id' })) + # Expected span for update + exp_hnd_update = exp_start_wf.add_child( + name: 'HandleUpdate:update', + attributes: exp_run_attrs.merge({ 'temporalUpdateID' => 'my-update-id' }), + links: [exp_update] + ) + # Expected for children + exp_child_run_attrs = { 'temporalWorkflowID' => child_id, 'temporalRunID' => child_run_id } + exp_child_start = exp_hnd_update.add_child(name: 'StartChildWorkflow:TestWorkflow', attributes: exp_run_attrs) + exp_child_start + .add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_child_run_attrs) + .add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_child_run_attrs) + # Two signals we send to the child + exp_sig_child = exp_hnd_update.add_child(name: 'SignalChildWorkflow:signal', attributes: exp_run_attrs) + exp_sig_ext = exp_hnd_update.add_child(name: 'SignalExternalWorkflow:signal', attributes: exp_run_attrs) + exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_child]) + exp_child_start.add_child(name: 'HandleSignal:signal', attributes: exp_child_run_attrs, links: [exp_sig_ext]) + + assert_equal 'workflow-done', child_result + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + end + + def test_continue_as_new + exp_root = ExpectedSpan.new(name: 'root') + act_root = trace_workflow(:continue_as_new) do |handle| + exp_cl_attrs = { 'temporalWorkflowID' => handle.id } + exp_run_attrs = exp_cl_attrs.merge({ 'temporalRunID' => handle.result_run_id }) + exp_start_wf = exp_root.add_child(name: 'StartWorkflow:TestWorkflow', attributes: exp_cl_attrs) + exp_run_wf = exp_start_wf.add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_run_attrs) + + # Get continue as new error then get final result + cont_err = assert_raises(Temporalio::Error::WorkflowContinuedAsNewError) { handle.result(follow_runs: false) } + assert_equal 'workflow-done', handle.result + + exp_can_attrs = exp_cl_attrs.merge({ 'temporalRunID' => cont_err.new_run_id }) + exp_run_wf.add_child(name: 'custom-can-span', attributes: exp_run_attrs) + .add_child(name: 'RunWorkflow:TestWorkflow', attributes: exp_can_attrs) + .add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_can_attrs) + exp_run_wf.add_child(name: 'CompleteWorkflow:TestWorkflow', attributes: exp_run_attrs, + exception_message: 'Continue as new') + end + assert_equal exp_root.to_s_indented, act_root.to_s_indented + end + + ExpectedSpan = Data.define(:name, :children, :attributes, :links, :exception_message) # rubocop:disable Layout/ClassStructure + + class ExpectedSpan + # Only returns unparented + def self.from_span_data(all_spans) + # Create a hash of spans by their ID + by_id = all_spans.to_h do |span| + [ + span.span_id, + [ + ExpectedSpan.new( + name: span.name, + attributes: span.attributes, + exception_message: span.events&.find { |e| e.name == 'exception' }&.attributes&.[]('exception.message') + ), + span + ] + ] + end + # Go over every span, associating children and links + by_id.each_value do |(span, raw_span)| + if raw_span.parent_span_id && raw_span.parent_span_id != OpenTelemetry::Trace::INVALID_SPAN_ID + by_id[raw_span.parent_span_id].first.children << span + end + raw_span.links&.each do |link| + span.links << by_id[link.span_context.span_id].first + end + end + # Return only spans with no parent + by_id.map do |_, (span, raw_span)| + span if !raw_span.parent_span_id || raw_span.parent_span_id == OpenTelemetry::Trace::INVALID_SPAN_ID + end.compact + end + + def initialize(name:, children: [], attributes: {}, links: [], exception_message: nil) + children = children.to_set + super + end + + def add_child(name:, attributes: {}, links: [], exception_message: nil) + span = ExpectedSpan.new(name:, attributes:, links:, exception_message:) + children << span + span + end + + def to_s_indented(indent: '') + ret = "#{name} (attrs: #{attributes}" + ret += ", links: [#{links.map(&:name).join(', ')}]" unless links.empty? + ret += ", exception: '#{exception_message}'" if exception_message + ret += ')' + indent += ' ' + children.each do |child| + ret += "\n#{indent}#{child.to_s_indented(indent:)}" + end + ret + end + end + end +end diff --git a/temporalio/test/sig/contrib/open_telemetry_test.rbs b/temporalio/test/sig/contrib/open_telemetry_test.rbs new file mode 100644 index 00000000..d7d32bd5 --- /dev/null +++ b/temporalio/test/sig/contrib/open_telemetry_test.rbs @@ -0,0 +1,39 @@ +module Contrib + class OpenTelemetryTest < Test + def init_tracer_and_exporter: -> [untyped, untyped] + def trace: ( + ?tracer_and_exporter: [untyped, untyped] + ) { (Temporalio::Client) -> void } -> untyped + def trace_workflow: ( + Symbol scenario, + ?tracer_and_exporter: [untyped, untyped] + ) { (Temporalio::Client::WorkflowHandle) -> void } -> untyped + + class ExpectedSpan + def self.from_span_data: (untyped all_spans) -> Array[ExpectedSpan] + + attr_reader name: String + attr_reader children: Array[ExpectedSpan] + attr_reader attributes: Hash[untyped, untyped] + attr_reader links: Array[ExpectedSpan] + attr_reader exception_message: String? + + def initialize: ( + name: String, + ?children: Array[ExpectedSpan], + ?attributes: Hash[untyped, untyped], + ?links: Array[ExpectedSpan], + ?exception_message: String? + ) -> void + + def add_child: ( + name: String, + ?attributes: Hash[untyped, untyped], + ?links: Array[ExpectedSpan], + ?exception_message: String? + ) -> ExpectedSpan + + def to_s_indented: (?indent: String) -> String + end + end +end \ No newline at end of file diff --git a/temporalio/test/sig/workflow_utils.rbs b/temporalio/test/sig/workflow_utils.rbs index 813f33b6..3b7144fa 100644 --- a/temporalio/test/sig/workflow_utils.rbs +++ b/temporalio/test/sig/workflow_utils.rbs @@ -16,7 +16,8 @@ module WorkflowUtils ?workflow_payload_codec_thread_pool: Temporalio::Worker::ThreadPool?, ?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum, ?max_heartbeat_throttle_interval: Float, - ?task_timeout: duration? + ?task_timeout: duration?, + ?on_worker_run: Proc? ) -> Object? | [T] ( singleton(Temporalio::Workflow::Definition) workflow, @@ -35,7 +36,8 @@ module WorkflowUtils ?workflow_payload_codec_thread_pool: Temporalio::Worker::ThreadPool?, ?id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::enum, ?max_heartbeat_throttle_interval: Float, - ?task_timeout: duration? + ?task_timeout: duration?, + ?on_worker_run: Proc? ) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T def assert_eventually_task_fail: ( diff --git a/temporalio/test/workflow_utils.rb b/temporalio/test/workflow_utils.rb index 61afc0b2..ea0c914b 100644 --- a/temporalio/test/workflow_utils.rb +++ b/temporalio/test/workflow_utils.rb @@ -28,7 +28,8 @@ def execute_workflow( id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::UNSPECIFIED, max_heartbeat_throttle_interval: 60.0, task_timeout: nil, - interceptors: [] + interceptors: [], + on_worker_run: nil ) worker = Temporalio::Worker.new( client:, @@ -43,6 +44,7 @@ def execute_workflow( interceptors: ) worker.run do + on_worker_run&.call handle = client.start_workflow( workflow, *args,