diff --git a/README.md b/README.md index 7707d966..5f2e4d26 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub - [Cloud Client Using API Key](#cloud-client-using-api-key) - [Data Conversion](#data-conversion) - [ActiveModel](#activemodel) + - [Converter Hints](#converter-hints) - [Workers](#workers) - [Workflows](#workflows) - [Workflow Definition](#workflow-definition) @@ -336,6 +337,25 @@ Now if `include ActiveModelJSONSupport` is present on any ActiveModel class, on which will use `as_json` which calls the super `as_json` but also includes the fully qualified class name as the JSON `create_id` key. On deserialization, Ruby JSON then uses this key to know what class to call `json_create` on. +##### Converter Hints + +In most places where objects are converted to payloads or vice versa, a "hint" can be provided to tell the converter +something else about the object/payload to assist conversion. The default converters ignore these hints, but custom +converters can be written to take advantage of them. For example, hints may be used to provide a custom converter the +Ruby type to deserialize a payload into. + +These hints manifest themselves various ways throughout the API. The most obvious way is when making definitions. An +activity can define `activity_arg_hint` (which accepts multiple) and/or `activity_result_hint` for activity-level hints. +Similarly, a workflow can define `workflow_arg_hint` and/or `workflow_result_hint` for workflow-level hints. +`workflow_signal`, `workflow_query`, and `workflow_update` all similarly accept `arg_hints` and `result_hint` (except +signal of course). These definition-level hints are passed to converters both from the caller side and the +implementation side. + +There are some advanced payload uses in the SDK that do not currently have a way to set hints. These include +workflow/schedule memo, workflow get/upsert memo, activity last heartbeat details, and application error details. In +some cases, users can use `Temporalio::Converters::RawValue` and then manually convert with hints. For others, hints can +be added as needed, please open an issue or otherwise contact Temporal. + ### Workers Workers host workflows and/or activities. Here's how to run a worker: diff --git a/temporalio/lib/temporalio/activity/context.rb b/temporalio/lib/temporalio/activity/context.rb index 64bdfb17..4e8ee734 100644 --- a/temporalio/lib/temporalio/activity/context.rb +++ b/temporalio/lib/temporalio/activity/context.rb @@ -61,7 +61,8 @@ def instance # Users do not have to be concerned with burdening the server by calling this too frequently. # # @param details [Array] Details to record with the heartbeat. - def heartbeat(*details) + # @param detail_hints [Array, nil] Hints to pass to converter. + def heartbeat(*details, detail_hints: nil) raise NotImplementedError end diff --git a/temporalio/lib/temporalio/activity/definition.rb b/temporalio/lib/temporalio/activity/definition.rb index 4941c4c9..10637a81 100644 --- a/temporalio/lib/temporalio/activity/definition.rb +++ b/temporalio/lib/temporalio/activity/definition.rb @@ -78,6 +78,21 @@ def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParam @activity_raw_args = value end + + # Add activity hints to be passed to converter for activity args. + # + # @param hints [Array] Hints to add. + def activity_arg_hint(*hints) + @activity_arg_hints ||= [] + @activity_arg_hints.concat(hints) + end + + # Set activity result hint to be passed to converter for activity result. + # + # @param hint [Object] Hint to set. + def activity_result_hint(hint) + @activity_result_hint = hint + end end # @!visibility private @@ -96,7 +111,9 @@ def self._activity_definition_details activity_name:, activity_executor: @activity_executor || :default, activity_cancel_raise: @activity_cancel_raise.nil? || @activity_cancel_raise, - activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args + activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args, + activity_arg_hints: @activity_arg_hints, + activity_result_hint: @activity_result_hint } end @@ -127,6 +144,12 @@ class Info # @return [Boolean] Whether to use {Converters::RawValue}s as arguments. attr_reader :raw_args + # @return [Array, nil] Argument hints. + attr_reader :arg_hints + + # @return [Object, nil] Result hint + attr_reader :result_hint + # Obtain definition info representing the given activity, which can be a class, instance, or definition info. # # @param activity [Definition, Class, Info] Activity to get info for. @@ -147,7 +170,9 @@ def self.from_activity(activity) instance: proc { activity.new }, executor: details[:activity_executor], cancel_raise: details[:activity_cancel_raise], - raw_args: details[:activity_raw_args] + raw_args: details[:activity_raw_args], + arg_hints: details[:activity_arg_hints], + result_hint: details[:activity_result_hint] ) { |*args| Context.current.instance&.execute(*args) } when Definition details = activity.class._activity_definition_details @@ -156,7 +181,9 @@ def self.from_activity(activity) instance: activity, executor: details[:activity_executor], cancel_raise: details[:activity_cancel_raise], - raw_args: details[:activity_raw_args] + raw_args: details[:activity_raw_args], + arg_hints: details[:activity_arg_hints], + result_hint: details[:activity_result_hint] ) { |*args| Context.current.instance&.execute(*args) } when Info activity @@ -172,6 +199,8 @@ def self.from_activity(activity) # @param executor [Symbol] Name of the executor. # @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation. # @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments. + # @param arg_hints [Array, nil] Argument hints. + # @param result_hint [Object, nil] Result hint. # @yield Use this block as the activity. def initialize( name:, @@ -179,6 +208,8 @@ def initialize( executor: :default, cancel_raise: true, raw_args: false, + arg_hints: nil, + result_hint: nil, &block ) @name = name @@ -189,6 +220,8 @@ def initialize( @executor = executor @cancel_raise = cancel_raise @raw_args = raw_args + @arg_hints = arg_hints + @result_hint = result_hint Internal::ProtoUtils.assert_non_reserved_name(name) end end diff --git a/temporalio/lib/temporalio/client.rb b/temporalio/lib/temporalio/client.rb index b18a6bfd..b60f3c6e 100644 --- a/temporalio/lib/temporalio/client.rb +++ b/temporalio/lib/temporalio/client.rb @@ -231,6 +231,10 @@ def operator_service # @param versioning_override [VersioningOverride, nil] Override the version of the workflow. # This is currently experimental. # @param priority [Priority] Priority of the workflow. This is currently experimental. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # workflow definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow + # definition has result hint, it is used by default. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [WorkflowHandle] A workflow handle to the started workflow. @@ -256,10 +260,15 @@ def start_workflow( request_eager_start: false, versioning_override: nil, priority: Priority.default, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) + # Take hints from definition if there is a definition + workflow, defn_arg_hints, defn_result_hint = + Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow) @impl.start_workflow(Interceptor::StartWorkflowInput.new( - workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), + workflow:, args:, workflow_id: id, task_queue:, @@ -279,6 +288,8 @@ def start_workflow( headers: {}, versioning_override:, priority:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, rpc_options: )) end @@ -314,6 +325,10 @@ def start_workflow( # @param versioning_override [VersioningOverride, nil] Override the version of the workflow. # This is currently experimental. # @param priority [Priority] Priority for the workflow. This is currently experimental. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # workflow definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow + # definition has result hint, it is used by default. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object] Successful result of the workflow. @@ -340,10 +355,11 @@ def execute_workflow( request_eager_start: false, versioning_override: nil, priority: Priority.default, - follow_runs: true, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) - handle = start_workflow( + start_workflow( workflow, *args, id:, @@ -363,9 +379,10 @@ def execute_workflow( request_eager_start:, versioning_override:, priority:, + arg_hints:, + result_hint:, rpc_options: - ) - follow_runs ? handle.result : handle + ).result end # Get a workflow handle to an existing workflow by its ID. @@ -375,14 +392,18 @@ def execute_workflow( # interactions occur on the latest of the workflow ID. # @param first_execution_run_id [String, nil] First execution run ID used for some calls like cancellation and # termination to ensure the affected workflow is only within the same chain as this given run ID. + # @param result_hint [Object, nil] Converter hint for the workflow's result. # # @return [WorkflowHandle] The workflow handle. def workflow_handle( workflow_id, run_id: nil, - first_execution_run_id: nil + first_execution_run_id: nil, + result_hint: nil ) - WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:) + WorkflowHandle.new( + client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:, result_hint: + ) end # Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict @@ -396,6 +417,10 @@ def workflow_handle( # @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not # currently supported. See https://docs.temporal.io/workflows#update for more details. # @param id [String] ID of the update. + # @param arg_hints [Array, nil] Overrides converter hints for update arguments if any. If unset/nil and the + # update definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update + # definition has result hint, it is used by default. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [WorkflowUpdateHandle] The update handle. @@ -409,15 +434,20 @@ def start_update_with_start_workflow( start_workflow_operation:, wait_for_stage:, id: SecureRandom.uuid, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) + update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update) @impl.start_update_with_start_workflow( Interceptor::StartUpdateWithStartWorkflowInput.new( update_id: id, - update: Workflow::Definition::Update._name_from_parameter(update), + update:, args:, wait_for_stage:, start_workflow_operation:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {}, rpc_options: ) @@ -433,6 +463,10 @@ def start_update_with_start_workflow( # @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must # have an `id_conflict_policy` set. # @param id [String] ID of the update. + # @param arg_hints [Array, nil] Overrides converter hints for update arguments if any. If unset/nil and the + # update definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update + # definition has result hint, it is used by default. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object] Successful update result. @@ -446,6 +480,8 @@ def execute_update_with_start_workflow( *args, start_workflow_operation:, id: SecureRandom.uuid, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) start_update_with_start_workflow( @@ -454,6 +490,8 @@ def execute_update_with_start_workflow( start_workflow_operation:, wait_for_stage: WorkflowUpdateWaitStage::COMPLETED, id:, + arg_hints:, + result_hint:, rpc_options: ).result end @@ -464,6 +502,8 @@ def execute_update_with_start_workflow( # @param args [Array] Signal arguments. # @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not # support all `id_conflict_policy` options. + # @param arg_hints [Array, nil] Overrides converter hints for signal arguments if any. If unset/nil and the + # signal definition has arg hints, those are used by default. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [WorkflowHandle] A workflow handle to the workflow. @@ -473,13 +513,16 @@ def signal_with_start_workflow( signal, *args, start_workflow_operation:, + arg_hints: nil, rpc_options: nil ) + signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @impl.signal_with_start_workflow( Interceptor::SignalWithStartWorkflowInput.new( - signal: Workflow::Definition::Signal._name_from_parameter(signal), + signal:, args:, start_workflow_operation:, + arg_hints: arg_hints || defn_arg_hints, rpc_options: ) ) diff --git a/temporalio/lib/temporalio/client/async_activity_handle.rb b/temporalio/lib/temporalio/client/async_activity_handle.rb index 4a94467d..2010a6c3 100644 --- a/temporalio/lib/temporalio/client/async_activity_handle.rb +++ b/temporalio/lib/temporalio/client/async_activity_handle.rb @@ -27,11 +27,13 @@ def initialize(client:, task_token:, id_reference:) # Record a heartbeat for the activity. # # @param details [Array] Details of the heartbeat. + # @param detail_hints [Array, nil] Converter hints for the details. # @param rpc_options [RPCOptions, nil] Advanced RPC options. - def heartbeat(*details, rpc_options: nil) + def heartbeat(*details, detail_hints: nil, rpc_options: nil) @client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new( task_token_or_id_reference:, details:, + detail_hints:, rpc_options: )) end @@ -39,11 +41,13 @@ def heartbeat(*details, rpc_options: nil) # Complete the activity. # # @param result [Object, nil] Result of the activity. + # @param result_hint [Object, nil] Converter hint for the result. # @param rpc_options [RPCOptions, nil] Advanced RPC options. - def complete(result = nil, rpc_options: nil) + def complete(result = nil, result_hint: nil, rpc_options: nil) @client._impl.complete_async_activity(Interceptor::CompleteAsyncActivityInput.new( task_token_or_id_reference:, result:, + result_hint:, rpc_options: )) end @@ -52,12 +56,14 @@ def complete(result = nil, rpc_options: nil) # # @param error [Exception] Error for the activity. # @param last_heartbeat_details [Array] Last heartbeat details for the activity. + # @param last_heartbeat_detail_hints [Array, nil] Converter hints for the last heartbeat details. # @param rpc_options [RPCOptions, nil] Advanced RPC options. - def fail(error, last_heartbeat_details: [], rpc_options: nil) + def fail(error, last_heartbeat_details: [], last_heartbeat_detail_hints: nil, rpc_options: nil) @client._impl.fail_async_activity(Interceptor::FailAsyncActivityInput.new( task_token_or_id_reference:, error:, last_heartbeat_details:, + last_heartbeat_detail_hints:, rpc_options: )) end @@ -65,12 +71,14 @@ def fail(error, last_heartbeat_details: [], rpc_options: nil) # Report the activity as canceled. # # @param details [Array] Cancellation details. + # @param detail_hints [Array, nil] Converter hints for the details. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # @raise [AsyncActivityCanceledError] If the activity has been canceled. - def report_cancellation(*details, rpc_options: nil) + def report_cancellation(*details, detail_hints: nil, rpc_options: nil) @client._impl.report_cancellation_async_activity(Interceptor::ReportCancellationAsyncActivityInput.new( task_token_or_id_reference:, details:, + detail_hints:, rpc_options: )) end diff --git a/temporalio/lib/temporalio/client/interceptor.rb b/temporalio/lib/temporalio/client/interceptor.rb index f039dfab..361acd1c 100644 --- a/temporalio/lib/temporalio/client/interceptor.rb +++ b/temporalio/lib/temporalio/client/interceptor.rb @@ -36,9 +36,11 @@ def intercept_client(next_interceptor) :search_attributes, :start_delay, :request_eager_start, - :headers, :versioning_override, :priority, + :arg_hints, + :result_hint, + :headers, :rpc_options ) @@ -49,6 +51,8 @@ def intercept_client(next_interceptor) :args, :wait_for_stage, :start_workflow_operation, + :arg_hints, + :result_hint, :headers, :rpc_options ) @@ -58,6 +62,7 @@ def intercept_client(next_interceptor) :signal, :args, :start_workflow_operation, + :arg_hints, # Headers intentionally not defined here, because they are not separate from start_workflow_operation :rpc_options ) @@ -99,6 +104,7 @@ def intercept_client(next_interceptor) :run_id, :signal, :args, + :arg_hints, :headers, :rpc_options ) @@ -110,6 +116,8 @@ def intercept_client(next_interceptor) :query, :args, :reject_condition, + :arg_hints, + :result_hint, :headers, :rpc_options ) @@ -122,6 +130,8 @@ def intercept_client(next_interceptor) :update, :args, :wait_for_stage, + :arg_hints, + :result_hint, :headers, :rpc_options ) @@ -220,6 +230,7 @@ def intercept_client(next_interceptor) HeartbeatAsyncActivityInput = Data.define( :task_token_or_id_reference, :details, + :detail_hints, :rpc_options ) @@ -227,6 +238,7 @@ def intercept_client(next_interceptor) CompleteAsyncActivityInput = Data.define( :task_token_or_id_reference, :result, + :result_hint, :rpc_options ) @@ -235,6 +247,7 @@ def intercept_client(next_interceptor) :task_token_or_id_reference, :error, :last_heartbeat_details, + :last_heartbeat_detail_hints, :rpc_options ) @@ -242,6 +255,7 @@ def intercept_client(next_interceptor) ReportCancellationAsyncActivityInput = Data.define( :task_token_or_id_reference, :details, + :detail_hints, :rpc_options ) diff --git a/temporalio/lib/temporalio/client/schedule.rb b/temporalio/lib/temporalio/client/schedule.rb index 95e71e04..1de6298d 100644 --- a/temporalio/lib/temporalio/client/schedule.rb +++ b/temporalio/lib/temporalio/client/schedule.rb @@ -175,6 +175,7 @@ def _to_proto(data_converter) :retry_policy, :memo, :search_attributes, + :arg_hints, :headers ) @@ -208,6 +209,9 @@ def _to_proto(data_converter) # @return [Hash, nil] Memo for the workflow. # @!attribute search_attributes # @return [SearchAttributes, nil] Search attributes for the workflow. + # @!attribute arg_hints + # @return [Array, nil] Converter hints for workflow arguments. This is only user-set (e.g. on create) + # and is not persisted and therefore will not be set when describing a workflow. # @!attribute headers # @return [Hash, nil] Headers for the workflow. class StartWorkflow @@ -249,10 +253,13 @@ def new( retry_policy: nil, memo: nil, search_attributes: nil, + arg_hints: nil, headers: nil ) + workflow, defn_arg_hints, = + Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow) _original_new( # steep:ignore - workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), + workflow:, args:, id:, task_queue:, @@ -264,6 +271,7 @@ def new( retry_policy:, memo:, search_attributes:, + arg_hints: arg_hints || defn_arg_hints, headers: ) end @@ -296,7 +304,7 @@ def _to_proto(data_converter) workflow_id: id, workflow_type: Api::Common::V1::WorkflowType.new(name: workflow), task_queue: Api::TaskQueue::V1::TaskQueue.new(name: task_queue), - input: data_converter.to_payloads(args), + input: data_converter.to_payloads(args, hints: arg_hints), workflow_execution_timeout: Internal::ProtoUtils.seconds_to_duration(execution_timeout), workflow_run_timeout: Internal::ProtoUtils.seconds_to_duration(run_timeout), workflow_task_timeout: Internal::ProtoUtils.seconds_to_duration(task_timeout), diff --git a/temporalio/lib/temporalio/client/with_start_workflow_operation.rb b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb index ef3c1b78..17743197 100644 --- a/temporalio/lib/temporalio/client/with_start_workflow_operation.rb +++ b/temporalio/lib/temporalio/client/with_start_workflow_operation.rb @@ -24,6 +24,8 @@ class WithStartWorkflowOperation :memo, :search_attributes, :start_delay, + :arg_hints, + :result_hint, :headers ) @@ -55,10 +57,14 @@ def initialize( memo: nil, search_attributes: nil, start_delay: nil, + arg_hints: nil, + result_hint: nil, headers: {} ) + workflow, defn_arg_hints, defn_result_hint = + Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow) @options = Options.new( - workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), + workflow:, args:, id:, task_queue:, @@ -74,6 +80,8 @@ def initialize( memo:, search_attributes:, start_delay:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: ) @workflow_handle_mutex = Mutex.new diff --git a/temporalio/lib/temporalio/client/workflow_handle.rb b/temporalio/lib/temporalio/client/workflow_handle.rb index 77a183f5..4fdae65f 100644 --- a/temporalio/lib/temporalio/client/workflow_handle.rb +++ b/temporalio/lib/temporalio/client/workflow_handle.rb @@ -48,13 +48,21 @@ class WorkflowHandle # @return [String, nil] First execution run ID. attr_reader :first_execution_run_id + # Result hint for the result of this workflow. If this handle was created via {Client.start_workflow}, this is set + # from there (either via result hint on that call or workflow definition's result hint). Otherwise, the result + # hint is set by the creator of the handle. + # + # @return [Object, nil] Result hint. + attr_reader :result_hint + # @!visibility private - def initialize(client:, id:, run_id:, result_run_id:, first_execution_run_id:) + def initialize(client:, id:, run_id:, result_run_id:, first_execution_run_id:, result_hint:) @client = client @id = id @run_id = run_id @result_run_id = result_run_id @first_execution_run_id = first_execution_run_id + @result_hint = result_hint end # Wait for the result of the workflow. @@ -64,6 +72,8 @@ def initialize(client:, id:, run_id:, result_run_id:, first_execution_run_id:) # # @param follow_runs [Boolean] If +true+, workflow runs will be continually fetched across retries and continue as # new until the latest one is found. If +false+, the first result is used. + # @param result_hint [Object, nil] Override the result hint for the result. If unset/nil, uses one on the handle + # itself. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object] Result of the workflow after being converted by the data converter. @@ -71,7 +81,7 @@ def initialize(client:, id:, run_id:, result_run_id:, first_execution_run_id:) # @raise [Error::WorkflowFailedError] Workflow failed with +cause+ as the cause. # @raise [Error::WorkflowContinuedAsNewError] Workflow continued as new and +follow_runs+ is +false+. # @raise [Error::RPCError] RPC error from call. - def result(follow_runs: true, rpc_options: nil) + def result(follow_runs: true, result_hint: nil, rpc_options: nil) # Wait on the close event, following as needed hist_run_id = result_run_id loop do @@ -91,7 +101,7 @@ def result(follow_runs: true, rpc_options: nil) hist_run_id = attrs.new_execution_run_id next if follow_runs && hist_run_id && !hist_run_id.empty? - return @client.data_converter.from_payloads(attrs.result).first + return @client.data_converter.from_payloads(attrs.result, hints: Array(@result_hint || result_hint)).first when :EVENT_TYPE_WORKFLOW_EXECUTION_FAILED attrs = event.workflow_execution_failed_event_attributes hist_run_id = attrs.new_execution_run_id @@ -212,18 +222,22 @@ def fetch_history_events( # # @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name. # @param args [Array] Signal arguments. + # @param arg_hints [Array, nil] Signal argument hints. If unset/nil and a signal definition is passed, + # uses the ones on the signal definition if present. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @raise [Error::RPCError] RPC error from call. # # @note Handles created as a result of {Client.start_workflow} will signal the latest workflow with the same # workflow ID even if it is unrelated to the started workflow. - def signal(signal, *args, rpc_options: nil) + def signal(signal, *args, arg_hints: nil, rpc_options: nil) + signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @client._impl.signal_workflow(Interceptor::SignalWorkflowInput.new( workflow_id: id, run_id:, - signal: Workflow::Definition::Signal._name_from_parameter(signal), + signal:, args:, + arg_hints: arg_hints || defn_arg_hints, headers: {}, rpc_options: )) @@ -235,6 +249,10 @@ def signal(signal, *args, rpc_options: nil) # @param query [Workflow::Definition::Query, Symbol, String] Query definition or name. # @param args [Array] Query arguments. # @param reject_condition [WorkflowQueryRejectCondition, nil] Condition for rejecting the query. + # @param arg_hints [Array, nil] Query argument hints. If unset/nil and a query definition is passed, + # uses the ones on the query definition if present. + # @param result_hint [Object, nil] Query result hints. If unset/nil and a query definition is passed, uses the + # one on the query definition if present. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object, nil] Query result. @@ -249,14 +267,19 @@ def query( query, *args, reject_condition: @client.options.default_workflow_query_reject_condition, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) + query, defn_arg_hints, defn_result_hint = Workflow::Definition::Query._name_and_hints_from_parameter(query) @client._impl.query_workflow(Interceptor::QueryWorkflowInput.new( workflow_id: id, run_id:, - query: Workflow::Definition::Query._name_from_parameter(query), + query:, args:, reject_condition:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {}, rpc_options: )) @@ -270,6 +293,10 @@ def query( # @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not # currently supported. See https://docs.temporal.io/workflows#update for more details. # @param id [String] ID of the update. + # @param arg_hints [Array, nil] Update argument hints. If unset/nil and am update definition is passed, + # uses the ones on the update definition if present. + # @param result_hint [Object, nil] Update result hints. If unset/nil and an update definition is passed, uses the + # one on the update definition if present. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [WorkflowUpdateHandle] The update handle. @@ -285,15 +312,20 @@ def start_update( *args, wait_for_stage:, id: SecureRandom.uuid, + arg_hints: nil, + result_hint: nil, rpc_options: nil ) + update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update) @client._impl.start_workflow_update(Interceptor::StartWorkflowUpdateInput.new( workflow_id: self.id, run_id:, update_id: id, - update: Workflow::Definition::Update._name_from_parameter(update), + update:, args:, wait_for_stage:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {}, rpc_options: )) @@ -305,6 +337,10 @@ def start_update( # @param update [Workflow::Definition::Update, Symbol, String] Update definition or name. # @param args [Array] Update arguments. # @param id [String] ID of the update. + # @param arg_hints [Array, nil] Update argument hints. If unset/nil and am update definition is passed, + # uses the ones on the update definition if present. + # @param result_hint [Object, nil] Update result hints. If unset/nil and an update definition is passed, uses the + # one on the update definition if present. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object, nil] Update result. @@ -316,12 +352,14 @@ def start_update( # # @note Handles created as a result of {Client.start_workflow} will send updates the latest workflow with the same # workflow ID even if it is unrelated to the started workflow. - def execute_update(update, *args, id: SecureRandom.uuid, rpc_options: nil) + def execute_update(update, *args, id: SecureRandom.uuid, arg_hints: nil, result_hint: nil, rpc_options: nil) start_update( update, *args, wait_for_stage: WorkflowUpdateWaitStage::COMPLETED, id:, + arg_hints:, + result_hint:, rpc_options: ).result end @@ -331,15 +369,17 @@ def execute_update(update, *args, id: SecureRandom.uuid, rpc_options: nil) # @param id [String] ID of the update. # @param specific_run_id [String, nil] Workflow run ID to get update handle for. Default is the {run_id}. Most # users will not need to set this and instead use the one on the class. + # @param result_hint [Object, nil] Result hint for the update result to set on the handle. # # @return [WorkflowUpdateHandle] The update handle. - def update_handle(id, specific_run_id: run_id) + def update_handle(id, specific_run_id: run_id, result_hint: nil) WorkflowUpdateHandle.new( client: @client, id:, workflow_id: self.id, workflow_run_id: specific_run_id, - known_outcome: nil + known_outcome: nil, + result_hint: ) end diff --git a/temporalio/lib/temporalio/client/workflow_update_handle.rb b/temporalio/lib/temporalio/client/workflow_update_handle.rb index 7d48f7cb..c59bb2bf 100644 --- a/temporalio/lib/temporalio/client/workflow_update_handle.rb +++ b/temporalio/lib/temporalio/client/workflow_update_handle.rb @@ -18,13 +18,17 @@ class WorkflowUpdateHandle # @return [String, nil] Run ID for the workflow. attr_reader :workflow_run_id + # @return [Object, nil] Result hint if one set when the handle was created. + attr_reader :result_hint + # @!visibility private - def initialize(client:, id:, workflow_id:, workflow_run_id:, known_outcome:) + def initialize(client:, id:, workflow_id:, workflow_run_id:, known_outcome:, result_hint:) @client = client @id = id @workflow_id = workflow_id @workflow_run_id = workflow_run_id @known_outcome = known_outcome + @result_hint = result_hint end # @return [Boolean] True if the result is already known and {result} will not make a blocking call, false if @@ -36,6 +40,7 @@ def result_obtained? # Wait for and return the result of the update. The result may already be known in which case no network call is # made. Otherwise the result will be polled for until it is returned. # + # @param result_hint [Object, nil] If not nil, overrides handle-level result hint for getting the result. # @param rpc_options [RPCOptions, nil] Advanced RPC options. # # @return [Object, nil] Update result. @@ -44,7 +49,7 @@ def result_obtained? # @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't # mean the update itself was timed out or canceled. # @raise [Error::RPCError] RPC error from call. - def result(rpc_options: nil) + def result(result_hint: nil, rpc_options: nil) @known_outcome ||= @client._impl.poll_workflow_update(Interceptor::PollWorkflowUpdateInput.new( workflow_id:, run_id: workflow_run_id, @@ -56,7 +61,8 @@ def result(rpc_options: nil) raise Error::WorkflowUpdateFailedError.new, cause: @client.data_converter.from_failure(@known_outcome.failure) end - results = @client.data_converter.from_payloads(@known_outcome.success) + results = @client.data_converter.from_payloads(@known_outcome.success, + hints: Array(result_hint || @result_hint)) warn("Expected 0 or 1 update result, got #{results.size}") if results.size > 1 results.first end diff --git a/temporalio/lib/temporalio/converters/data_converter.rb b/temporalio/lib/temporalio/converters/data_converter.rb index f518ac7e..72e7ccf3 100644 --- a/temporalio/lib/temporalio/converters/data_converter.rb +++ b/temporalio/lib/temporalio/converters/data_converter.rb @@ -40,9 +40,10 @@ def initialize( # Convert a Ruby value to a payload and encode it. # # @param value [Object] Ruby value. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Api::Common::V1::Payload] Converted and encoded payload. - def to_payload(value) - payload = payload_converter.to_payload(value) + def to_payload(value, hint: nil) + payload = payload_converter.to_payload(value, hint:) payload = payload_codec.encode([payload]).first if payload_codec payload end @@ -50,9 +51,13 @@ def to_payload(value) # Convert multiple Ruby values to a payload set and encode it. # # @param values [Object] Ruby values, converted to array via {::Array}. + # @param hints [Array, nil] Hints, if any, to assist conversion. Note, when using the default converter + # that converts a payload at a time, hints for each value are taken from the array at that value's index. So if + # there are fewer hints than values, some values will not have a hint. Similarly if there are more hints than + # values, the trailing hints are not used. # @return [Api::Common::V1::Payloads] Converted and encoded payload set. - def to_payloads(values) - payloads = payload_converter.to_payloads(values) + def to_payloads(values, hints: nil) + payloads = payload_converter.to_payloads(values, hints:) payloads.payloads.replace(payload_codec.encode(payloads.payloads)) if payload_codec && !payloads.payloads.empty? payloads end @@ -60,23 +65,28 @@ def to_payloads(values) # Decode and convert a payload to a Ruby value. # # @param payload [Api::Common::V1::Payload] Encoded payload. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Object] Decoded and converted Ruby value. - def from_payload(payload) + def from_payload(payload, hint: nil) payload = payload_codec.decode([payload]).first if payload_codec - payload_converter.from_payload(payload) + payload_converter.from_payload(payload, hint:) end # Decode and convert a payload set to Ruby values. # # @param payloads [Api::Common::V1::Payloads, nil] Encoded payload set. + # @param hints [Array, nil] Hints, if any, to assist conversion. Note, when using the default converter + # that converts a value at a time, hints for each payload are taken from the array at that payload's index. So + # if there are fewer hints than payloads, some payloads will not have a hint. Similarly if there are more hints + # than payloads, the trailing hints are not used. # @return [Array] Decoded and converted Ruby values. - def from_payloads(payloads) + def from_payloads(payloads, hints: nil) return [] unless payloads && !payloads.payloads.empty? if payload_codec && !payloads.payloads.empty? payloads = Api::Common::V1::Payloads.new(payloads: payload_codec.decode(payloads.payloads)) end - payload_converter.from_payloads(payloads) + payload_converter.from_payloads(payloads, hints:) end # Convert a Ruby error to a Temporal failure and encode it. diff --git a/temporalio/lib/temporalio/converters/payload_converter.rb b/temporalio/lib/temporalio/converters/payload_converter.rb index 81c680ec..ed305326 100644 --- a/temporalio/lib/temporalio/converters/payload_converter.rb +++ b/temporalio/lib/temporalio/converters/payload_converter.rb @@ -34,37 +34,47 @@ def self.new_with_defaults(json_parse_options: { create_additions: true }, json_ # Convert a Ruby value to a payload. # # @param value [Object] Ruby value. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Api::Common::V1::Payload] Converted payload. - def to_payload(value) + def to_payload(value, hint: nil) raise NotImplementedError end # Convert multiple Ruby values to a payload set. # # @param values [Object] Ruby values, converted to array via {::Array}. + # @param hints [Array, nil] Hints, if any, to assist conversion. Note, when using the default converter + # that converts a payload at a time, hints for each value are taken from the array at that value's index. So if + # there are fewer hints than values, some values will not have a hint. Similarly if there are more hints than + # values, the trailing hints are not used. # @return [Api::Common::V1::Payloads] Converted payload set. - def to_payloads(values) + def to_payloads(values, hints: nil) Api::Common::V1::Payloads.new( - payloads: Array(values).map { |value| to_payload(value) } + payloads: Array(values).zip(Array(hints)).map { |value, hint| to_payload(value, hint:) } ) end # Convert a payload to a Ruby value. # # @param payload [Api::Common::V1::Payload] Payload. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Object] Converted Ruby value. - def from_payload(payload) + def from_payload(payload, hint: nil) raise NotImplementedError end # Convert a payload set to Ruby values. # # @param payloads [Api::Common::V1::Payloads, nil] Payload set. + # @param hints [Array, nil] Hints, if any, to assist conversion. Note, when using the default converter + # that converts a value at a time, hints for each payload are taken from the array at that payload's index. So + # if there are fewer hints than payloads, some payloads will not have a hint. Similarly if there are more hints + # than payloads, the trailing hints are not used. # @return [Array] Converted Ruby values. - def from_payloads(payloads) + def from_payloads(payloads, hints: nil) return [] unless payloads - payloads.payloads.map { |payload| from_payload(payload) } + payloads.payloads.zip(Array(hints)).map { |payload, hint| from_payload(payload, hint:) } end end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/binary_null.rb b/temporalio/lib/temporalio/converters/payload_converter/binary_null.rb index 304bddb4..5459df43 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/binary_null.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/binary_null.rb @@ -16,7 +16,7 @@ def encoding end # (see Encoding.to_payload) - def to_payload(value) + def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument return nil unless value.nil? Api::Common::V1::Payload.new( @@ -25,7 +25,7 @@ def to_payload(value) end # (see Encoding.from_payload) - def from_payload(payload) # rubocop:disable Lint/UnusedMethodArgument + def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument nil end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/binary_plain.rb b/temporalio/lib/temporalio/converters/payload_converter/binary_plain.rb index c6f77296..076d38e0 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/binary_plain.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/binary_plain.rb @@ -16,7 +16,7 @@ def encoding end # (see Encoding.to_payload) - def to_payload(value) + def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument return nil unless value.is_a?(String) && value.encoding == ::Encoding::ASCII_8BIT Temporalio::Api::Common::V1::Payload.new( @@ -26,7 +26,7 @@ def to_payload(value) end # (see Encoding.from_payload) - def from_payload(payload) + def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument payload.data end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/binary_protobuf.rb b/temporalio/lib/temporalio/converters/payload_converter/binary_protobuf.rb index 6061134b..e13f894b 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/binary_protobuf.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/binary_protobuf.rb @@ -17,7 +17,7 @@ def encoding end # (see Encoding.to_payload) - def to_payload(value) + def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument return nil unless value.is_a?(Google::Protobuf::MessageExts) # @type var value: Google::Protobuf::MessageExts @@ -28,7 +28,7 @@ def to_payload(value) end # (see Encoding.from_payload) - def from_payload(payload) + def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument type = payload.metadata['messageType'] # @type var desc: untyped desc = Google::Protobuf::DescriptorPool.generated_pool.lookup(type) diff --git a/temporalio/lib/temporalio/converters/payload_converter/composite.rb b/temporalio/lib/temporalio/converters/payload_converter/composite.rb index d5cadabe..d81f8cab 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/composite.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/composite.rb @@ -32,14 +32,15 @@ def initialize(*converters) # Convert Ruby value to a payload by going over each encoding converter in order until one can convert. # # @param value [Object] Ruby value to convert. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Api::Common::V1::Payload] Converted payload. # @raise [ConverterNotFound] If no converters can process the value. - def to_payload(value) + def to_payload(value, hint: nil) # As a special case, raw values just return the payload within return value.payload if value.is_a?(RawValue) converters.each_value do |converter| - payload = converter.to_payload(value) + payload = converter.to_payload(value, hint:) return payload unless payload.nil? end raise ConverterNotFound, "Value of type #{value} has no known converter" @@ -48,17 +49,18 @@ def to_payload(value) # Convert payload to Ruby value based on its +encoding+ metadata on the payload. # # @param payload [Api::Common::V1::Payload] Payload to convert. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Object] Converted Ruby value. # @raise [EncodingNotSet] If encoding not set on the metadata. # @raise [ConverterNotFound] If no converter found for the encoding. - def from_payload(payload) + def from_payload(payload, hint: nil) encoding = payload.metadata['encoding'] raise EncodingNotSet, 'Missing payload encoding' unless encoding converter = converters[encoding] raise ConverterNotFound, "No converter for encoding #{encoding}" unless converter - converter.from_payload(payload) + converter.from_payload(payload, hint:) end end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/encoding.rb b/temporalio/lib/temporalio/converters/payload_converter/encoding.rb index 55ab5398..36c6e7ca 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/encoding.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/encoding.rb @@ -16,8 +16,9 @@ def encoding # handle it, the resulting payload must have +encoding+ metadata on the payload set to the value of {encoding}. # # @param value [Object] Ruby value to possibly convert. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Api::Common::V1::Payload, nil] Converted payload if it can handle it, +nil+ otherwise. - def to_payload(value) + def to_payload(value, hint: nil) raise NotImplementedError end @@ -25,8 +26,9 @@ def to_payload(value) # will error if it cannot convert. # # @param payload [Api::Common::V1::Payload] Payload to convert. + # @param hint [Object, nil] Hint, if any, to assist conversion. # @return [Object] Converted Ruby value. - def from_payload(payload) + def from_payload(payload, hint: nil) raise NotImplementedError end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/json_plain.rb b/temporalio/lib/temporalio/converters/payload_converter/json_plain.rb index 14ed58a1..f71e10e2 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/json_plain.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/json_plain.rb @@ -27,7 +27,7 @@ def encoding end # (see Encoding.to_payload) - def to_payload(value) + def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument Api::Common::V1::Payload.new( metadata: { 'encoding' => ENCODING }, data: JSON.generate(value, @generate_options).b @@ -35,7 +35,7 @@ def to_payload(value) end # (see Encoding.from_payload) - def from_payload(payload) + def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument JSON.parse(payload.data, @parse_options) end end diff --git a/temporalio/lib/temporalio/converters/payload_converter/json_protobuf.rb b/temporalio/lib/temporalio/converters/payload_converter/json_protobuf.rb index 96e7f31a..897b7c79 100644 --- a/temporalio/lib/temporalio/converters/payload_converter/json_protobuf.rb +++ b/temporalio/lib/temporalio/converters/payload_converter/json_protobuf.rb @@ -17,7 +17,7 @@ def encoding end # (see Encoding.to_payload) - def to_payload(value) + def to_payload(value, hint: nil) # rubocop:disable Lint/UnusedMethodArgument return nil unless value.is_a?(Google::Protobuf::MessageExts) Api::Common::V1::Payload.new( @@ -27,7 +27,7 @@ def to_payload(value) end # (see Encoding.from_payload) - def from_payload(payload) + def from_payload(payload, hint: nil) # rubocop:disable Lint/UnusedMethodArgument type = payload.metadata['messageType'] # @type var desc: untyped desc = Google::Protobuf::DescriptorPool.generated_pool.lookup(type) diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index 8f5bc12c..8466d9b0 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -53,7 +53,7 @@ def start_workflow(input) workflow_type: Api::Common::V1::WorkflowType.new(name: input.workflow), workflow_id: input.workflow_id, task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s), - input: @client.data_converter.to_payloads(input.args), + input: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), workflow_execution_timeout: ProtoUtils.seconds_to_duration(input.execution_timeout), workflow_run_timeout: ProtoUtils.seconds_to_duration(input.run_timeout), workflow_task_timeout: ProtoUtils.seconds_to_duration(input.task_timeout), @@ -103,7 +103,8 @@ def start_workflow(input) id: input.workflow_id, run_id: nil, result_run_id: resp.run_id, - first_execution_run_id: resp.run_id + first_execution_run_id: resp.run_id, + result_hint: input.result_hint ) end @@ -139,7 +140,7 @@ def start_update_with_start_workflow(input) ), input: Api::Update::V1::Input.new( name: input.update, - args: @client.data_converter.to_payloads(input.args), + args: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ) ), @@ -169,7 +170,8 @@ def start_update_with_start_workflow(input) id: start_options.id, run_id: nil, result_run_id: run_id, - first_execution_run_id: run_id + first_execution_run_id: run_id, + result_hint: start_options.result_hint ) ) update_resp = resp.responses.last.update_workflow @@ -245,7 +247,8 @@ def start_update_with_start_workflow(input) id: input.update_id, workflow_id: start_options.id, workflow_run_id: run_id, - known_outcome: update_resp.outcome + known_outcome: update_resp.outcome, + result_hint: input.result_hint ) end @@ -261,7 +264,7 @@ def signal_with_start_workflow(input) Api::WorkflowService::V1::SignalWithStartWorkflowExecutionRequest, start_options ) req.signal_name = input.signal - req.signal_input = @client.data_converter.to_payloads(input.args) + req.signal_input = @client.data_converter.to_payloads(input.args, hints: input.arg_hints) # Send request begin @@ -294,7 +297,8 @@ def signal_with_start_workflow(input) id: start_options.id, run_id: nil, result_run_id: resp.run_id, - first_execution_run_id: resp.run_id + first_execution_run_id: resp.run_id, + result_hint: start_options.result_hint ) input.start_workflow_operation._set_workflow_handle(handle) handle @@ -307,7 +311,7 @@ def _start_workflow_request_from_with_start_options(klass, start_options) workflow_type: Api::Common::V1::WorkflowType.new(name: start_options.workflow), workflow_id: start_options.id, task_queue: Api::TaskQueue::V1::TaskQueue.new(name: start_options.task_queue.to_s), - input: @client.data_converter.to_payloads(start_options.args), + input: @client.data_converter.to_payloads(start_options.args, hints: start_options.arg_hints), workflow_execution_timeout: ProtoUtils.seconds_to_duration(start_options.execution_timeout), workflow_run_timeout: ProtoUtils.seconds_to_duration(start_options.run_timeout), workflow_task_timeout: ProtoUtils.seconds_to_duration(start_options.task_timeout), @@ -413,7 +417,7 @@ def signal_workflow(input) run_id: input.run_id || '' ), signal_name: input.signal, - input: @client.data_converter.to_payloads(input.args), + input: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter), identity: @client.connection.identity, request_id: SecureRandom.uuid @@ -434,7 +438,7 @@ def query_workflow(input) ), query: Api::Query::V1::WorkflowQuery.new( query_type: input.query, - query_args: @client.data_converter.to_payloads(input.args), + query_args: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ), query_reject_condition: input.reject_condition || 0 @@ -454,7 +458,7 @@ def query_workflow(input) )) end - results = @client.data_converter.from_payloads(resp.query_result) + results = @client.data_converter.from_payloads(resp.query_result, hints: Array(input.result_hint)) warn("Expected 0 or 1 query result, got #{results.size}") if results.size > 1 results&.first end @@ -477,7 +481,7 @@ def start_workflow_update(input) ), input: Api::Update::V1::Input.new( name: input.update, - args: @client.data_converter.to_payloads(input.args), + args: @client.data_converter.to_payloads(input.args, hints: input.arg_hints), header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter) ) ), @@ -534,7 +538,8 @@ def start_workflow_update(input) id: input.update_id, workflow_id: input.workflow_id, workflow_run_id: input.run_id, - known_outcome: resp.outcome + known_outcome: resp.outcome, + result_hint: input.result_hint ) end @@ -809,7 +814,7 @@ def heartbeat_async_activity(input) activity_id: input.task_token_or_id_reference.activity_id, namespace: @client.namespace, identity: @client.connection.identity, - details: @client.data_converter.to_payloads(input.details) + details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -819,7 +824,7 @@ def heartbeat_async_activity(input) task_token: input.task_token_or_id_reference, namespace: @client.namespace, identity: @client.connection.identity, - details: @client.data_converter.to_payloads(input.details) + details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -838,7 +843,7 @@ def complete_async_activity(input) activity_id: input.task_token_or_id_reference.activity_id, namespace: @client.namespace, identity: @client.connection.identity, - result: @client.data_converter.to_payloads([input.result]) + result: @client.data_converter.to_payloads([input.result], hints: Array(input.result_hint)) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -848,7 +853,7 @@ def complete_async_activity(input) task_token: input.task_token_or_id_reference, namespace: @client.namespace, identity: @client.connection.identity, - result: @client.data_converter.to_payloads([input.result]) + result: @client.data_converter.to_payloads([input.result], hints: Array(input.result_hint)) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -857,6 +862,14 @@ def complete_async_activity(input) end def fail_async_activity(input) + last_heartbeat_details = if input.last_heartbeat_details.empty? + nil + else + @client.data_converter.to_payloads( + input.last_heartbeat_details, + hints: input.last_heartbeat_detail_hints + ) + end if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference) @client.workflow_service.respond_activity_task_failed_by_id( Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new( @@ -866,11 +879,7 @@ def fail_async_activity(input) namespace: @client.namespace, identity: @client.connection.identity, failure: @client.data_converter.to_failure(input.error), - last_heartbeat_details: if input.last_heartbeat_details.empty? - nil - else - @client.data_converter.to_payloads(input.last_heartbeat_details) - end + last_heartbeat_details: ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -881,11 +890,7 @@ def fail_async_activity(input) namespace: @client.namespace, identity: @client.connection.identity, failure: @client.data_converter.to_failure(input.error), - last_heartbeat_details: if input.last_heartbeat_details.empty? - nil - else - @client.data_converter.to_payloads(input.last_heartbeat_details) - end + last_heartbeat_details: ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -902,7 +907,7 @@ def report_cancellation_async_activity(input) activity_id: input.task_token_or_id_reference.activity_id, namespace: @client.namespace, identity: @client.connection.identity, - details: @client.data_converter.to_payloads(input.details) + details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) @@ -912,7 +917,7 @@ def report_cancellation_async_activity(input) task_token: input.task_token_or_id_reference, namespace: @client.namespace, identity: @client.connection.identity, - details: @client.data_converter.to_payloads(input.details) + details: @client.data_converter.to_payloads(input.details, hints: input.detail_hints) ), rpc_options: Implementation.with_default_rpc_options(input.rpc_options) ) diff --git a/temporalio/lib/temporalio/internal/proto_utils.rb b/temporalio/lib/temporalio/internal/proto_utils.rb index 6ebfa33c..e94a4b7d 100644 --- a/temporalio/lib/temporalio/internal/proto_utils.rb +++ b/temporalio/lib/temporalio/internal/proto_utils.rb @@ -94,16 +94,16 @@ def self.enum_to_int(enum_mod, enum_val, zero_means_nil: false) enum_val end - def self.convert_from_payload_array(converter, payloads) + def self.convert_from_payload_array(converter, payloads, hints:) return [] if payloads.empty? - converter.from_payloads(Api::Common::V1::Payloads.new(payloads:)) + converter.from_payloads(Api::Common::V1::Payloads.new(payloads:), hints:) end - def self.convert_to_payload_array(converter, values) + def self.convert_to_payload_array(converter, values, hints:) return [] if values.empty? - converter.to_payloads(values).payloads.to_ary + converter.to_payloads(values, hints:).payloads.to_ary end def self.assert_non_reserved_name(name) diff --git a/temporalio/lib/temporalio/internal/worker/activity_worker.rb b/temporalio/lib/temporalio/internal/worker/activity_worker.rb index 89a8be77..7c1bf7e8 100644 --- a/temporalio/lib/temporalio/internal/worker/activity_worker.rb +++ b/temporalio/lib/temporalio/internal/worker/activity_worker.rb @@ -178,7 +178,8 @@ def execute_activity(task_token, defn, start) ) || raise, # Never nil heartbeat_details: ProtoUtils.convert_from_payload_array( @worker.options.client.data_converter, - start.heartbeat_details.to_ary + start.heartbeat_details.to_ary, + hints: nil ), heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout), local?: start.is_local, @@ -205,8 +206,13 @@ def execute_activity(task_token, defn, start) payloads = codec.decode(payloads) if codec payloads.map { |p| Temporalio::Converters::RawValue.new(p) } else - ProtoUtils.convert_from_payload_array(@worker.options.client.data_converter, start.input.to_ary) + ProtoUtils.convert_from_payload_array( + @worker.options.client.data_converter, + start.input.to_ary, + hints: defn.arg_hints + ) end, + result_hint: defn.result_hint, headers: ProtoUtils.headers_from_proto_map(start.header_fields, @worker.options.client.data_converter) || {} ) @@ -267,7 +273,7 @@ def run_activity(defn, activity, input) # Success Bridge::Api::ActivityResult::ActivityExecutionResult.new( completed: Bridge::Api::ActivityResult::Success.new( - result: @worker.options.client.data_converter.to_payload(result) + result: @worker.options.client.data_converter.to_payload(result, hint: input.result_hint) ) ) rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here @@ -372,13 +378,15 @@ def initialize( # rubocop:disable Lint/MissingSuper @_server_requested_cancel = false end - def heartbeat(*details) + def heartbeat(*details, detail_hints: nil) raise 'Implementation not set yet' if _outbound_impl.nil? # No-op if local return if info.local? - _outbound_impl.heartbeat(Temporalio::Worker::Interceptor::Activity::HeartbeatInput.new(details:)) + _outbound_impl.heartbeat( + Temporalio::Worker::Interceptor::Activity::HeartbeatInput.new(details:, detail_hints:) + ) end def metric_meter @@ -437,7 +445,8 @@ def heartbeat(input) Bridge::Api::CoreInterface::ActivityHeartbeat.new( task_token: @task_token, details: ProtoUtils.convert_to_payload_array(@worker.worker.options.client.data_converter, - input.details) + input.details, + hints: input.detail_hints) ).to_proto ) end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb index 1c9dad3f..6ec101e4 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance.rb @@ -312,7 +312,8 @@ def create_instance # Convert workflow arguments @workflow_arguments = convert_args(payload_array: @init_job.arguments, method_name: :execute, - raw_args: @definition.raw_args) + raw_args: @definition.raw_args, + arg_hints: @definition.arg_hints) # Initialize interceptors @inbound = @interceptors.reverse_each.reduce(InboundImplementation.new(self)) do |acc, int| @@ -420,6 +421,7 @@ def apply_signal(job) end def apply_query(job) + result_hint = nil schedule do # If it's a built-in, run it without interceptors, otherwise do normal behavior result = if job.query_type == '__stack_trace' @@ -437,6 +439,7 @@ def apply_query(job) raise "Query handler for #{job.query_type} expected but not found, " \ "known queries: [#{query_handlers.keys.compact.sort.join(', ')}]" end + result_hint = defn.result_hint with_context_frozen do @inbound.handle_query( @@ -460,7 +463,7 @@ def apply_query(job) respond_to_query: Bridge::Api::WorkflowCommands::QueryResult.new( query_id: job.query_id, succeeded: Bridge::Api::WorkflowCommands::QuerySuccess.new( - response: @payload_converter.to_payload(result) + response: @payload_converter.to_payload(result, hint: result_hint) ) ) ) @@ -549,7 +552,7 @@ def apply_update(job) Bridge::Api::WorkflowCommands::WorkflowCommand.new( update_response: Bridge::Api::WorkflowCommands::UpdateResponse.new( protocol_instance_id: job.protocol_instance_id, - completed: @payload_converter.to_payload(result) + completed: @payload_converter.to_payload(result, hint: defn.result_hint) ) ) ) @@ -579,7 +582,7 @@ def run_workflow add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( complete_workflow_execution: Bridge::Api::WorkflowCommands::CompleteWorkflowExecution.new( - result: @payload_converter.to_payload(result) + result: @payload_converter.to_payload(result, hint: @definition.result_hint) ) ) ) @@ -607,14 +610,19 @@ def schedule( def on_top_level_exception(err) if err.is_a?(Workflow::ContinueAsNewError) @logger.debug('Workflow requested continue as new') + workflow_type, defn_arg_hints, = + if err.workflow + Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(err.workflow) + else + [nil, @definition.arg_hints, nil] + end add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( continue_as_new_workflow_execution: Bridge::Api::WorkflowCommands::ContinueAsNewWorkflowExecution.new( - workflow_type: if err.workflow - Workflow::Definition._workflow_type_from_workflow_parameter(err.workflow) - end, + workflow_type:, task_queue: err.task_queue, - arguments: ProtoUtils.convert_to_payload_array(payload_converter, err.args), + arguments: ProtoUtils.convert_to_payload_array(payload_converter, err.args, + hints: err.arg_hints || defn_arg_hints), workflow_run_timeout: ProtoUtils.seconds_to_duration(err.run_timeout), workflow_task_timeout: ProtoUtils.seconds_to_duration(err.task_timeout), memo: ProtoUtils.memo_to_proto_hash(err.memo, payload_converter), @@ -669,11 +677,12 @@ def convert_handler_args(payload_array:, defn:) payload_array:, method_name: defn.to_invoke.is_a?(Symbol) ? defn.to_invoke : nil, raw_args: defn.raw_args, + arg_hints: defn.arg_hints, ignore_first_param: defn.name.nil? # Dynamic ) end - def convert_args(payload_array:, method_name:, raw_args:, ignore_first_param: false) + def convert_args(payload_array:, method_name:, raw_args:, arg_hints:, ignore_first_param: false) # Just in case it is not an array payload_array = payload_array.to_ary @@ -713,7 +722,7 @@ def convert_args(payload_array:, method_name:, raw_args:, ignore_first_param: fa if raw_args payload_array.map { |p| Converters::RawValue.new(p) } else - ProtoUtils.convert_from_payload_array(@payload_converter, payload_array) + ProtoUtils.convert_from_payload_array(@payload_converter, payload_array, hints: arg_hints) end end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb index e6c5656e..1a0a0e3c 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/child_workflow_handle.rb @@ -10,18 +10,20 @@ module Worker class WorkflowInstance # Implementation of the child workflow handle. class ChildWorkflowHandle < Workflow::ChildWorkflowHandle - attr_reader :id, :first_execution_run_id + attr_reader :id, :first_execution_run_id, :result_hint - def initialize(id:, first_execution_run_id:, instance:, cancellation:, cancel_callback_key:) # rubocop:disable Lint/MissingSuper + def initialize(id:, first_execution_run_id:, instance:, # rubocop:disable Lint/MissingSuper + cancellation:, cancel_callback_key:, result_hint:) @id = id @first_execution_run_id = first_execution_run_id @instance = instance @cancellation = cancellation @cancel_callback_key = cancel_callback_key + @result_hint = result_hint @resolution = nil end - def result + def result(result_hint: nil) # Notice that we actually provide a detached cancellation here instead of defaulting to workflow # cancellation because we don't want workflow cancellation (or a user-provided cancellation to this result # call) to be able to interrupt waiting on a child that may be processing the cancellation. @@ -29,7 +31,7 @@ def result case @resolution.status when :completed - @instance.payload_converter.from_payload(@resolution.completed.result) + @instance.payload_converter.from_payload(@resolution.completed.result, hint: result_hint || @result_hint) when :failed raise @instance.failure_converter.from_failure(@resolution.failed.failure, @instance.payload_converter) when :cancelled @@ -44,8 +46,8 @@ def _resolve(resolution) @resolution = resolution end - def signal(signal, *args, cancellation: Workflow.cancellation) - @instance.context._signal_child_workflow(id:, signal:, args:, cancellation:) + def signal(signal, *args, cancellation: Workflow.cancellation, arg_hints: nil) + @instance.context._signal_child_workflow(id:, signal:, args:, cancellation:, arg_hints:) end end end diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb index 84380d52..eaddb83f 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb @@ -86,16 +86,21 @@ def execute_activity( cancellation_type:, activity_id:, disable_eager_execution:, - priority: + priority:, + arg_hints:, + result_hint: ) - activity = case activity - when Class - Activity::Definition::Info.from_activity(activity).name&.to_s - when Symbol, String - activity.to_s - else - raise ArgumentError, 'Activity must be a definition class, or a symbol/string' - end + activity, defn_arg_hints, defn_result_hint = + case activity + when Class + defn = Activity::Definition::Info.from_activity(activity) + [defn.name&.to_s, defn.arg_hints, defn.result_hint] + when Symbol, String + [activity.to_s, nil, nil] + else + raise ArgumentError, + 'Activity must be a definition class, or a symbol/string' + end raise 'Cannot invoke dynamic activities' unless activity @outbound.execute_activity( @@ -114,6 +119,8 @@ def execute_activity( activity_id:, disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution, priority:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {} ) ) @@ -129,16 +136,20 @@ def execute_local_activity( local_retry_threshold:, cancellation:, cancellation_type:, - activity_id: + activity_id:, + arg_hints:, + result_hint: ) - activity = case activity - when Class - Activity::Definition::Info.from_activity(activity).name&.to_s - when Symbol, String - activity.to_s - else - raise ArgumentError, 'Activity must be a definition class, or a symbol/string' - end + activity, defn_arg_hints, defn_result_hint = + case activity + when Class + defn = Activity::Definition::Info.from_activity(activity) + [defn.name&.to_s, defn.arg_hints, defn.result_hint] + when Symbol, String + [activity.to_s, nil, nil] + else + raise ArgumentError, 'Activity must be a definition class, or a symbol/string' + end raise 'Cannot invoke dynamic activities' unless activity @outbound.execute_local_activity( @@ -153,6 +164,8 @@ def execute_local_activity( cancellation:, cancellation_type:, activity_id:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {} ) ) @@ -262,11 +275,15 @@ def start_child_workflow( cron_schedule:, memo:, search_attributes:, - priority: + priority:, + arg_hints:, + result_hint: ) + workflow, defn_arg_hints, defn_result_hint = + Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow) @outbound.start_child_workflow( Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new( - workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow), + workflow:, args:, id:, task_queue:, @@ -284,6 +301,8 @@ def start_child_workflow( memo:, search_attributes:, priority:, + arg_hints: arg_hints || defn_arg_hints, + result_hint: result_hint || defn_result_hint, headers: {} ) ) @@ -370,26 +389,30 @@ def _outbound=(outbound) @outbound = outbound end - def _signal_child_workflow(id:, signal:, args:, cancellation:) + def _signal_child_workflow(id:, signal:, args:, cancellation:, arg_hints:) + signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @outbound.signal_child_workflow( Temporalio::Worker::Interceptor::Workflow::SignalChildWorkflowInput.new( id:, - signal: Workflow::Definition::Signal._name_from_parameter(signal), + signal:, args:, cancellation:, + arg_hints: arg_hints || defn_arg_hints, headers: {} ) ) end - def _signal_external_workflow(id:, run_id:, signal:, args:, cancellation:) + def _signal_external_workflow(id:, run_id:, signal:, args:, cancellation:, arg_hints:) + signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal) @outbound.signal_external_workflow( Temporalio::Worker::Interceptor::Workflow::SignalExternalWorkflowInput.new( id:, run_id:, - signal: Workflow::Definition::Signal._name_from_parameter(signal), + signal:, args:, cancellation:, + arg_hints: arg_hints || defn_arg_hints, headers: {} ) ) diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb index c880f2c3..fb6cf494 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/external_workflow_handle.rb @@ -18,8 +18,8 @@ def initialize(id:, run_id:, instance:) # rubocop:disable Lint/MissingSuper @instance = instance end - def signal(signal, *args, cancellation: Workflow.cancellation) - @instance.context._signal_external_workflow(id:, run_id:, signal:, args:, cancellation:) + def signal(signal, *args, cancellation: Workflow.cancellation, arg_hints: nil) + @instance.context._signal_external_workflow(id:, run_id:, signal:, args:, cancellation:, arg_hints:) end def cancel diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index 99ad747d..a1bf206e 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -56,7 +56,8 @@ def execute_activity(input) raise ArgumentError, 'Activity must have schedule_to_close_timeout or start_to_close_timeout' end - execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation) do + execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation, + result_hint: input.result_hint) do seq = (@activity_counter += 1) @instance.add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( @@ -66,7 +67,9 @@ def execute_activity(input) activity_type: input.activity, task_queue: input.task_queue, headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter), - arguments: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), + arguments: ProtoUtils.convert_to_payload_array( + @instance.payload_converter, input.args, hints: input.arg_hints + ), schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout), start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout), @@ -90,7 +93,8 @@ def execute_local_activity(input) @instance.assert_valid_local_activity.call(input.activity) - execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff| + execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation, + result_hint: input.result_hint) do |do_backoff| seq = (@activity_counter += 1) @instance.add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( @@ -99,7 +103,9 @@ def execute_local_activity(input) activity_id: input.activity_id || seq.to_s, activity_type: input.activity, headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter), - arguments: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), + arguments: ProtoUtils.convert_to_payload_array( + @instance.payload_converter, input.args, hints: input.arg_hints + ), schedule_to_close_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_close_timeout), schedule_to_start_timeout: ProtoUtils.seconds_to_duration(input.schedule_to_start_timeout), start_to_close_timeout: ProtoUtils.seconds_to_duration(input.start_to_close_timeout), @@ -115,7 +121,7 @@ def execute_local_activity(input) end end - def execute_activity_with_local_backoffs(local:, cancellation:, &) + def execute_activity_with_local_backoffs(local:, cancellation:, result_hint:, &) # We do not even want to schedule if the cancellation is already cancelled. We choose to use canceled # failure instead of wrapping in activity failure which is similar to what other SDKs do, with the accepted # tradeoff that it makes rescue more difficult (hence the presence of Error.canceled? helper). @@ -124,7 +130,7 @@ def execute_activity_with_local_backoffs(local:, cancellation:, &) # This has to be done in a loop for local activity backoff last_local_backoff = nil loop do - result = execute_activity_once(local:, cancellation:, last_local_backoff:, &) + result = execute_activity_once(local:, cancellation:, last_local_backoff:, result_hint:, &) return result unless result.is_a?(Bridge::Api::ActivityResult::DoBackoff) # @type var result: untyped @@ -136,7 +142,7 @@ def execute_activity_with_local_backoffs(local:, cancellation:, &) end # If this doesn't raise, it returns success | DoBackoff - def execute_activity_once(local:, cancellation:, last_local_backoff:, &) + def execute_activity_once(local:, cancellation:, last_local_backoff:, result_hint:, &) # Add to pending activities (removed by the resolver) seq = yield last_local_backoff @instance.pending_activities[seq] = Fiber.current @@ -169,7 +175,7 @@ def execute_activity_once(local:, cancellation:, last_local_backoff:, &) case resolution.status when :completed - @instance.payload_converter.from_payload(resolution.completed.result) + @instance.payload_converter.from_payload(resolution.completed.result, hint: result_hint) when :failed raise @instance.failure_converter.from_failure(resolution.failed.failure, @instance.payload_converter) when :cancelled @@ -193,6 +199,7 @@ def signal_child_workflow(input) signal: input.signal, args: input.args, cancellation: input.cancellation, + arg_hints: input.arg_hints, headers: input.headers ) end @@ -205,11 +212,12 @@ def signal_external_workflow(input) signal: input.signal, args: input.args, cancellation: input.cancellation, + arg_hints: input.arg_hints, headers: input.headers ) end - def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation:, headers:) + def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation:, arg_hints:, headers:) raise Error::CanceledError, 'Signal canceled before scheduled' if cancellation.canceled? # Add command @@ -217,7 +225,7 @@ def _signal_external_workflow(id:, run_id:, child:, signal:, args:, cancellation cmd = Bridge::Api::WorkflowCommands::SignalExternalWorkflowExecution.new( seq:, signal_name: signal, - args: ProtoUtils.convert_to_payload_array(@instance.payload_converter, args), + args: ProtoUtils.convert_to_payload_array(@instance.payload_converter, args, hints: arg_hints), headers: ProtoUtils.headers_to_proto_hash(headers, @instance.payload_converter) ) if child @@ -327,7 +335,8 @@ def start_child_workflow(input) workflow_id: input.id, workflow_type: input.workflow, task_queue: input.task_queue, - input: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args), + input: ProtoUtils.convert_to_payload_array(@instance.payload_converter, input.args, + hints: input.arg_hints), workflow_execution_timeout: ProtoUtils.seconds_to_duration(input.execution_timeout), workflow_run_timeout: ProtoUtils.seconds_to_duration(input.run_timeout), workflow_task_timeout: ProtoUtils.seconds_to_duration(input.task_timeout), @@ -374,7 +383,8 @@ def start_child_workflow(input) first_execution_run_id: resolution.succeeded.run_id, instance: @instance, cancellation: input.cancellation, - cancel_callback_key: + cancel_callback_key:, + result_hint: input.result_hint ) @instance.pending_child_workflows[seq] = handle handle diff --git a/temporalio/lib/temporalio/testing/workflow_environment.rb b/temporalio/lib/temporalio/testing/workflow_environment.rb index 569370e1..d5e4d23c 100644 --- a/temporalio/lib/temporalio/testing/workflow_environment.rb +++ b/temporalio/lib/temporalio/testing/workflow_environment.rb @@ -394,8 +394,8 @@ def initialize(handle, env) end # @!visibility private - def result(follow_runs: true, rpc_options: nil) - @env.time_skipping_unlocked { super(follow_runs:, rpc_options:) } + def result(follow_runs: true, result_hint: nil, rpc_options: nil) + @env.time_skipping_unlocked { super(follow_runs:, result_hint:, rpc_options:) } end end end diff --git a/temporalio/lib/temporalio/worker/interceptor.rb b/temporalio/lib/temporalio/worker/interceptor.rb index 30fb8afb..fffc12ea 100644 --- a/temporalio/lib/temporalio/worker/interceptor.rb +++ b/temporalio/lib/temporalio/worker/interceptor.rb @@ -22,6 +22,7 @@ def intercept_activity(next_interceptor) ExecuteInput = Data.define( :proc, :args, + :result_hint, :headers ) @@ -59,7 +60,8 @@ def execute(input) # Input for {Outbound.heartbeat}. HeartbeatInput = Data.define( - :details + :details, + :detail_hints ) # Outbound interceptor for intercepting outbound activity calls. This should be extended by users needing to @@ -216,8 +218,10 @@ def handle_update(input) :cancellation_type, :activity_id, :disable_eager_execution, - :headers, - :priority + :priority, + :arg_hints, + :result_hint, + :headers ) # Input for {Outbound.execute_local_activity}. @@ -232,6 +236,8 @@ def handle_update(input) :cancellation, :cancellation_type, :activity_id, + :arg_hints, + :result_hint, :headers ) @@ -246,6 +252,7 @@ def handle_update(input) :signal, :args, :cancellation, + :arg_hints, :headers ) @@ -256,6 +263,7 @@ def handle_update(input) :signal, :args, :cancellation, + :arg_hints, :headers ) @@ -285,8 +293,10 @@ def handle_update(input) :cron_schedule, :memo, :search_attributes, - :headers, - :priority + :priority, + :arg_hints, + :result_hint, + :headers ) # Outbound interceptor for intercepting outbound workflow calls. This should be extended by users needing to diff --git a/temporalio/lib/temporalio/workflow.rb b/temporalio/lib/temporalio/workflow.rb index bbe41668..28c4a573 100644 --- a/temporalio/lib/temporalio/workflow.rb +++ b/temporalio/lib/temporalio/workflow.rb @@ -132,6 +132,10 @@ def self.deprecate_patch(patch_id) # run there. If `false` (the default), eager execution may still be disabled at the worker level or may not be # requested due to lack of available slots. # @param priority [Priority] Priority of the activity. This is currently experimental. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # activity definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the activity + # definition has result hint, it is used by default. # # @return [Object] Result of the activity. # @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted). @@ -151,13 +155,15 @@ def self.execute_activity( cancellation_type: ActivityCancellationType::TRY_CANCEL, activity_id: nil, disable_eager_execution: false, - priority: Priority.default + priority: Priority.default, + arg_hints: nil, + result_hint: nil ) _current.execute_activity( activity, *args, task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:, - priority: + priority:, arg_hints:, result_hint: ) end @@ -180,13 +186,15 @@ def self.execute_child_workflow( cron_schedule: nil, memo: nil, search_attributes: nil, - priority: Priority.default + priority: Priority.default, + arg_hints: nil, + result_hint: nil ) start_child_workflow( workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, - retry_policy:, cron_schedule:, memo:, search_attributes:, priority: + retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints:, result_hint: ).result end @@ -217,6 +225,10 @@ def self.execute_child_workflow( # workflow. # @param activity_id [String, nil] Optional unique identifier for the activity. This is an advanced setting that # should not be set unless users are sure they need to. Contact Temporal before setting this value. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # activity definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the activity + # definition has result hint, it is used by default. # # @return [Object] Result of the activity. # @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted). @@ -232,12 +244,15 @@ def self.execute_local_activity( local_retry_threshold: nil, cancellation: Workflow.cancellation, cancellation_type: ActivityCancellationType::TRY_CANCEL, - activity_id: nil + activity_id: nil, + arg_hints: nil, + result_hint: nil ) _current.execute_local_activity( activity, *args, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, - retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, activity_id: + retry_policy:, local_retry_threshold:, cancellation:, cancellation_type:, + activity_id:, arg_hints:, result_hint: ) end @@ -378,6 +393,10 @@ def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation) # @param memo [Hash{String, Symbol => Object}, nil] Memo for the workflow. # @param search_attributes [SearchAttributes, nil] Search attributes for the workflow. # @param priority [Priority] Priority of the workflow. This is currently experimental. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # workflow definition has arg hints, those are used by default. + # @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow + # definition has result hint, it is used by default. # # @return [ChildWorkflowHandle] Workflow handle to the started workflow. # @raise [Error::WorkflowAlreadyStartedError] Workflow already exists for the ID. @@ -400,13 +419,15 @@ def self.start_child_workflow( cron_schedule: nil, memo: nil, search_attributes: nil, - priority: Priority.default + priority: Priority.default, + arg_hints: nil, + result_hint: nil ) _current.start_child_workflow( workflow, *args, id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, - retry_policy:, cron_schedule:, memo:, search_attributes:, priority: + retry_policy:, cron_schedule:, memo:, search_attributes:, priority:, arg_hints:, result_hint: ) end @@ -542,7 +563,7 @@ def self.durable_scheduler_disabled(&) # Error that is raised by a workflow out of the primary workflow method to issue a continue-as-new. class ContinueAsNewError < Error attr_accessor :args, :workflow, :task_queue, :run_timeout, :task_timeout, - :retry_policy, :memo, :search_attributes, :headers + :retry_policy, :memo, :search_attributes, :arg_hints, :headers # Create a continue as new error. # @@ -561,6 +582,8 @@ class ContinueAsNewError < Error # is used. # @param search_attributes [SearchAttributes, nil] Search attributes for the workflow. If unset/nil, the current # workflow search attributes are used. + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # workflow definition has arg hints, those are used by default. # @param headers [Hash] Headers for the workflow. The default is _not_ carried over from the # current workflow. def initialize( @@ -572,6 +595,7 @@ def initialize( retry_policy: nil, memo: nil, search_attributes: nil, + arg_hints: nil, headers: {} ) super('Continue as new') @@ -583,6 +607,7 @@ def initialize( @retry_policy = retry_policy @memo = memo @search_attributes = search_attributes + @arg_hints = arg_hints @headers = headers Workflow._current.initialize_continue_as_new_error(self) end diff --git a/temporalio/lib/temporalio/workflow/child_workflow_handle.rb b/temporalio/lib/temporalio/workflow/child_workflow_handle.rb index 12cf8119..43086504 100644 --- a/temporalio/lib/temporalio/workflow/child_workflow_handle.rb +++ b/temporalio/lib/temporalio/workflow/child_workflow_handle.rb @@ -21,12 +21,18 @@ def first_execution_run_id raise NotImplementedError end + # @return [Object, nil] Hint for the result if any. + def result_hint + raise NotImplementedError + end + # Wait for the result. # + # @param result_hint [Object, nil] Override the result hint, or if nil uses the one on the handle. # @return [Object] Result of the child workflow. # # @raise [Error::ChildWorkflowError] Workflow failed with +cause+ as the cause. - def result + def result(result_hint: nil) raise NotImplementedError end @@ -35,7 +41,9 @@ def result # @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name. # @param args [Array] Signal args. # @param cancellation [Cancellation] Cancellation for canceling the signalling. - def signal(signal, *args, cancellation: Workflow.cancellation) + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # signal definition has arg hints, those are used by default. + def signal(signal, *args, cancellation: Workflow.cancellation, arg_hints: nil) raise NotImplementedError end end diff --git a/temporalio/lib/temporalio/workflow/definition.rb b/temporalio/lib/temporalio/workflow/definition.rb index 2f21f5e3..33d0dfe0 100644 --- a/temporalio/lib/temporalio/workflow/definition.rb +++ b/temporalio/lib/temporalio/workflow/definition.rb @@ -47,6 +47,21 @@ def workflow_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParam @workflow_raw_args = value end + # Add workflow hints to be passed to converter for workflow args. + # + # @param hints [Array] Hints to add. + def workflow_arg_hint(*hints) + @workflow_arg_hints ||= [] + @workflow_arg_hints.concat(hints) + end + + # Set workflow result hint to be passed to converter for workflow result. + # + # @param hint [Object] Hint to set. + def workflow_result_hint(hint) + @workflow_result_hint = hint + end + # Configure workflow failure exception types. This sets the types of exceptions that, if a # workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow # via task failure. These are applied in addition to the worker option. If {::Exception} is set, it effectively @@ -121,16 +136,20 @@ def workflow_init(value = true) # rubocop:disable Style/OptionalBooleanParameter # {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}. # @param unfinished_policy [HandlerUnfinishedPolicy] How to treat unfinished handlers if they are still running # when the workflow ends. The default warns, but this can be disabled. + # @param arg_hints [Array, nil] Argument hint(s) for the signal. def workflow_signal( name: nil, description: nil, dynamic: false, raw_args: false, - unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON + unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON, + arg_hints: nil ) raise 'Cannot provide name if dynamic is true' if name && dynamic - self.pending_handler_details = { type: :signal, name:, description:, dynamic:, raw_args:, unfinished_policy: } + self.pending_handler_details = + { type: :signal, name:, description:, dynamic:, raw_args:, unfinished_policy:, + arg_hints: Array(arg_hints) } end # Mark the next method as a workflow query with a default name as the name of the method. Queries can not have @@ -144,15 +163,20 @@ def workflow_signal( # it is useful to have the second parameter be `*args` and `raw_args` be true. # @param raw_args [Boolean] If true, does not convert arguments, but instead provides each argument as # {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}. + # @param arg_hints [Object, Array, nil] Argument hint(s) for the query. + # @param result_hint [Object, nil] Result hint for the query. def workflow_query( name: nil, description: nil, dynamic: false, - raw_args: false + raw_args: false, + arg_hints: nil, + result_hint: nil ) raise 'Cannot provide name if dynamic is true' if name && dynamic - self.pending_handler_details = { type: :query, name:, description:, dynamic:, raw_args: } + self.pending_handler_details = { type: :query, name:, description:, dynamic:, raw_args:, + arg_hints: Array(arg_hints), result_hint: } end # Mark the next method as a workflow update with a default name as the name of the method. Updates can return @@ -168,16 +192,21 @@ def workflow_query( # {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}. # @param unfinished_policy [HandlerUnfinishedPolicy] How to treat unfinished handlers if they are still running # when the workflow ends. The default warns, but this can be disabled. + # @param arg_hints [Object, Array, nil] Argument hint(s) for the update. + # @param result_hint [Object, nil] Result hint for the update. def workflow_update( name: nil, description: nil, dynamic: false, raw_args: false, - unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON + unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON, + arg_hints: nil, + result_hint: nil ) raise 'Cannot provide name if dynamic is true' if name && dynamic - self.pending_handler_details = { type: :update, name:, description:, dynamic:, raw_args:, unfinished_policy: } + self.pending_handler_details = { type: :update, name:, description:, dynamic:, raw_args:, unfinished_policy:, + arg_hints: Array(arg_hints), result_hint: } end # Mark the next method as a workflow update validator to the given update method. The validator is expected to @@ -275,14 +304,17 @@ def self.method_added(method_name) to_invoke: method_name, description: handler[:description], raw_args: handler[:raw_args], - unfinished_policy: handler[:unfinished_policy] + unfinished_policy: handler[:unfinished_policy], + arg_hints: handler[:arg_hints] ), @workflow_signals, [@workflow_queries, @workflow_updates]] when :query [Query.new( name: handler[:dynamic] ? nil : (handler[:name] || method_name).to_s, to_invoke: method_name, description: handler[:description], - raw_args: handler[:raw_args] + raw_args: handler[:raw_args], + arg_hints: handler[:arg_hints], + result_hint: handler[:result_hint] ), @workflow_queries, [@workflow_signals, @workflow_updates]] when :update [Update.new( @@ -290,7 +322,9 @@ def self.method_added(method_name) to_invoke: method_name, description: handler[:description], raw_args: handler[:raw_args], - unfinished_policy: handler[:unfinished_policy] + unfinished_policy: handler[:unfinished_policy], + arg_hints: handler[:arg_hints], + result_hint: handler[:result_hint] ), @workflow_updates, [@workflow_signals, @workflow_queries]] when :dynamic_options raise 'Dynamic options method already set' if @dynamic_options_method @@ -334,7 +368,7 @@ def self._workflow_definition end # @!visibility private - def self._workflow_type_from_workflow_parameter(workflow) + def self._workflow_type_and_hints_from_workflow_parameter(workflow) case workflow when Class unless workflow < Definition @@ -342,11 +376,15 @@ def self._workflow_type_from_workflow_parameter(workflow) end info = Info.from_class(workflow) - info.name || raise(ArgumentError, 'Cannot pass dynamic workflow to start') + raise(ArgumentError, 'Cannot pass dynamic workflow to start') unless info.name + + [info.name.to_s, info.arg_hints, info.result_hint] when Info - workflow.name || raise(ArgumentError, 'Cannot pass dynamic workflow to start') + raise(ArgumentError, 'Cannot pass dynamic workflow to start') unless workflow.name + + [workflow.name.to_s, nil, nil] when String, Symbol - workflow.to_s + [workflow.to_s, nil, nil] else raise ArgumentError, 'Workflow is not a workflow class or string/symbol' end @@ -448,7 +486,9 @@ def self._build_workflow_definition queries:, updates:, versioning_behavior: @versioning_behavior || VersioningBehavior::UNSPECIFIED, - dynamic_options_method: @dynamic_options_method + dynamic_options_method: @dynamic_options_method, + arg_hints: @workflow_arg_hints, + result_hint: @workflow_result_hint ) end @@ -462,7 +502,7 @@ def execute(*args) class Info attr_reader :workflow_class, :override_name, :dynamic, :init, :raw_args, :failure_exception_types, :signals, :queries, :updates, :versioning_behavior, - :dynamic_options_method + :dynamic_options_method, :arg_hints, :result_hint # Derive the workflow definition info from the class. # @@ -489,7 +529,9 @@ def initialize( queries: {}, updates: {}, versioning_behavior: VersioningBehavior::UNSPECIFIED, - dynamic_options_method: nil + dynamic_options_method: nil, + arg_hints: nil, + result_hint: nil ) @workflow_class = workflow_class @override_name = override_name @@ -502,6 +544,8 @@ def initialize( @updates = updates.dup.freeze @versioning_behavior = versioning_behavior @dynamic_options_method = dynamic_options_method + @arg_hints = arg_hints + @result_hint = result_hint Internal::ProtoUtils.assert_non_reserved_name(name) end @@ -514,15 +558,17 @@ def name # A signal definition. This is usually built as a result of a {Definition.workflow_signal} method, but can be # manually created to set at runtime on {Workflow.signal_handlers}. class Signal - attr_reader :name, :to_invoke, :description, :raw_args, :unfinished_policy + attr_reader :name, :to_invoke, :description, :raw_args, :unfinished_policy, :arg_hints # @!visibility private - def self._name_from_parameter(signal) + def self._name_and_hints_from_parameter(signal) case signal when Workflow::Definition::Signal - signal.name || raise(ArgumentError, 'Cannot call dynamic signal directly') + raise(ArgumentError, 'Cannot call dynamic signal directly') unless signal.name + + [signal.name, signal.arg_hints] when String, Symbol - signal.to_s + [signal.to_s, nil] else raise ArgumentError, 'Signal is not a definition or string/symbol' end @@ -538,18 +584,21 @@ def self._name_from_parameter(signal) # @param raw_args [Boolean] Whether the parameters should be raw values. # @param unfinished_policy [HandlerUnfinishedPolicy] How the workflow reacts when this handler is still running # on workflow completion. + # @param arg_hints [Array, nil] Argument hints for the signal. def initialize( name:, to_invoke:, description: nil, raw_args: false, - unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON + unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON, + arg_hints: nil ) @name = name @to_invoke = to_invoke @description = description @raw_args = raw_args @unfinished_policy = unfinished_policy + @arg_hints = arg_hints Internal::ProtoUtils.assert_non_reserved_name(name) end end @@ -557,15 +606,17 @@ def initialize( # A query definition. This is usually built as a result of a {Definition.workflow_query} method, but can be # manually created to set at runtime on {Workflow.query_handlers}. class Query - attr_reader :name, :to_invoke, :description, :raw_args + attr_reader :name, :to_invoke, :description, :raw_args, :arg_hints, :result_hint # @!visibility private - def self._name_from_parameter(query) + def self._name_and_hints_from_parameter(query) case query when Workflow::Definition::Query - query.name || raise(ArgumentError, 'Cannot call dynamic query directly') + raise(ArgumentError, 'Cannot call dynamic query directly') unless query.name + + [query.name, query.arg_hints, query.result_hint] when String, Symbol - query.to_s + [query.to_s, nil, nil] else raise ArgumentError, 'Query is not a definition or string/symbol' end @@ -579,16 +630,22 @@ def self._name_from_parameter(query) # @param description [String, nil] Description for this handler that may appear in CLI/UI. This is currently # experimental. # @param raw_args [Boolean] Whether the parameters should be raw values. + # @param arg_hints [Array, nil] Argument hints for the query. + # @param result_hint [Object, nil] Result hints for the query. def initialize( name:, to_invoke:, description: nil, - raw_args: false + raw_args: false, + arg_hints: nil, + result_hint: nil ) @name = name @to_invoke = to_invoke @description = description @raw_args = raw_args + @arg_hints = arg_hints + @result_hint = result_hint Internal::ProtoUtils.assert_non_reserved_name(name) end end @@ -596,15 +653,18 @@ def initialize( # An update definition. This is usually built as a result of a {Definition.workflow_update} method, but can be # manually created to set at runtime on {Workflow.update_handlers}. class Update - attr_reader :name, :to_invoke, :description, :raw_args, :unfinished_policy, :validator_to_invoke + attr_reader :name, :to_invoke, :description, :raw_args, :unfinished_policy, :validator_to_invoke, + :arg_hints, :result_hint # @!visibility private - def self._name_from_parameter(update) + def self._name_and_hints_from_parameter(update) case update when Workflow::Definition::Update - update.name || raise(ArgumentError, 'Cannot call dynamic update directly') + raise(ArgumentError, 'Cannot call dynamic update directly') unless update.name + + [update.name, update.arg_hints, update.result_hint] when String, Symbol - update.to_s + [update.to_s, nil, nil] else raise ArgumentError, 'Update is not a definition or string/symbol' end @@ -621,13 +681,17 @@ def self._name_from_parameter(update) # @param unfinished_policy [HandlerUnfinishedPolicy] How the workflow reacts when this handler is still running # on workflow completion. # @param validator_to_invoke [Symbol, Proc, nil] Method name or proc validator to invoke. + # @param arg_hints [Array, nil] Argument hints for the update. + # @param result_hint [Object, nil] Result hints for the update. def initialize( name:, to_invoke:, description: nil, raw_args: false, unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON, - validator_to_invoke: nil + validator_to_invoke: nil, + arg_hints: nil, + result_hint: nil ) @name = name @to_invoke = to_invoke @@ -635,6 +699,8 @@ def initialize( @raw_args = raw_args @unfinished_policy = unfinished_policy @validator_to_invoke = validator_to_invoke + @arg_hints = arg_hints + @result_hint = result_hint Internal::ProtoUtils.assert_non_reserved_name(name) end @@ -646,7 +712,9 @@ def _with_validator_to_invoke(validator_to_invoke) description:, raw_args:, unfinished_policy:, - validator_to_invoke: + validator_to_invoke:, + arg_hints:, + result_hint: ) end end diff --git a/temporalio/lib/temporalio/workflow/external_workflow_handle.rb b/temporalio/lib/temporalio/workflow/external_workflow_handle.rb index 37e22100..0e2a0ef7 100644 --- a/temporalio/lib/temporalio/workflow/external_workflow_handle.rb +++ b/temporalio/lib/temporalio/workflow/external_workflow_handle.rb @@ -28,7 +28,9 @@ def run_id # @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name. # @param args [Array] Signal args. # @param cancellation [Cancellation] Cancellation for canceling the signalling. - def signal(signal, *args, cancellation: Workflow.cancellation) + # @param arg_hints [Array, nil] Overrides converter hints for arguments if any. If unset/nil and the + # signal definition has arg hints, those are used by default. + def signal(signal, *args, cancellation: Workflow.cancellation, arg_hints: nil) raise NotImplementedError end diff --git a/temporalio/sig/temporalio/activity/context.rbs b/temporalio/sig/temporalio/activity/context.rbs index dc671331..a3f3070c 100644 --- a/temporalio/sig/temporalio/activity/context.rbs +++ b/temporalio/sig/temporalio/activity/context.rbs @@ -10,7 +10,7 @@ module Temporalio def info: -> Info def instance: -> Definition? - def heartbeat: (*Object? details) -> void + def heartbeat: (*Object? details, ?detail_hints: Array[Object]?) -> void def cancellation: -> Cancellation def cancellation_details: -> CancellationDetails? def worker_shutdown_cancellation: -> Cancellation diff --git a/temporalio/sig/temporalio/activity/definition.rbs b/temporalio/sig/temporalio/activity/definition.rbs index 9b46d29e..70e3c978 100644 --- a/temporalio/sig/temporalio/activity/definition.rbs +++ b/temporalio/sig/temporalio/activity/definition.rbs @@ -6,12 +6,16 @@ module Temporalio def self.activity_cancel_raise: (bool cancel_raise) -> void def self.activity_dynamic: (?bool value) -> void def self.activity_raw_args: (?bool value) -> void + def self.activity_arg_hint: (*Object hints) -> void + def self.activity_result_hint: (Object? hint) -> void def self._activity_definition_details: -> { activity_name: String | Symbol | nil, activity_executor: Symbol, activity_cancel_raise: bool, - activity_raw_args: bool + activity_raw_args: bool, + activity_arg_hints: Array[Object]?, + activity_result_hint: Object? } def execute: (*untyped) -> untyped @@ -23,6 +27,8 @@ module Temporalio attr_reader executor: Symbol attr_reader cancel_raise: bool attr_reader raw_args: bool + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? def self.from_activity: (Definition | singleton(Definition) | Info activity) -> Info @@ -31,7 +37,9 @@ module Temporalio ?instance: Object | Proc | nil, ?executor: Symbol, ?cancel_raise: bool, - ?raw_args: bool + ?raw_args: bool, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) { (?) -> untyped } -> void end end diff --git a/temporalio/sig/temporalio/client.rbs b/temporalio/sig/temporalio/client.rbs index 2ac202ab..84a63d42 100644 --- a/temporalio/sig/temporalio/client.rbs +++ b/temporalio/sig/temporalio/client.rbs @@ -85,8 +85,10 @@ module Temporalio ?search_attributes: SearchAttributes?, ?start_delay: duration?, ?request_eager_start: bool, - ?priority: Priority, ?versioning_override: VersioningOverride?, + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> WorkflowHandle @@ -108,16 +110,18 @@ module Temporalio ?search_attributes: SearchAttributes?, ?start_delay: duration?, ?request_eager_start: bool, - ?priority: Priority, ?versioning_override: VersioningOverride?, - ?follow_runs: bool, + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? def workflow_handle: ( String workflow_id, ?run_id: String?, - ?first_execution_run_id: String? + ?first_execution_run_id: String?, + ?result_hint: Object? ) -> WorkflowHandle def start_update_with_start_workflow: ( @@ -126,6 +130,8 @@ module Temporalio start_workflow_operation: WithStartWorkflowOperation, wait_for_stage: WorkflowUpdateWaitStage::enum, ?id: String, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> WorkflowUpdateHandle @@ -134,6 +140,8 @@ module Temporalio *Object? args, start_workflow_operation: WithStartWorkflowOperation, ?id: String, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? @@ -141,6 +149,7 @@ module Temporalio Workflow::Definition::Signal | Symbol | String signal, *Object? args, start_workflow_operation: WithStartWorkflowOperation, + ?arg_hints: Array[Object]?, ?rpc_options: RPCOptions? ) -> WorkflowHandle diff --git a/temporalio/sig/temporalio/client/async_activity_handle.rbs b/temporalio/sig/temporalio/client/async_activity_handle.rbs index 5d993bde..3f65aba6 100644 --- a/temporalio/sig/temporalio/client/async_activity_handle.rbs +++ b/temporalio/sig/temporalio/client/async_activity_handle.rbs @@ -12,22 +12,26 @@ module Temporalio def heartbeat: ( *Object? details, + ?detail_hints: Array[Object]?, ?rpc_options: RPCOptions? ) -> void def complete: ( ?Object? result, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> void def fail: ( Exception error, ?last_heartbeat_details: Array[Object?], + ?last_heartbeat_detail_hints: Array[Object]?, ?rpc_options: RPCOptions? ) -> void def report_cancellation: ( *Object? details, + ?detail_hints: Array[Object]?, ?rpc_options: RPCOptions? ) -> void diff --git a/temporalio/sig/temporalio/client/interceptor.rbs b/temporalio/sig/temporalio/client/interceptor.rbs index 3e1e227e..2d0aee02 100644 --- a/temporalio/sig/temporalio/client/interceptor.rbs +++ b/temporalio/sig/temporalio/client/interceptor.rbs @@ -21,9 +21,11 @@ module Temporalio attr_reader search_attributes: SearchAttributes? attr_reader start_delay: duration? attr_reader request_eager_start: bool - attr_reader headers: Hash[String, Object?] - attr_reader priority: Priority attr_reader versioning_override: VersioningOverride? + attr_reader priority: Priority + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? + attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? def initialize: ( @@ -44,9 +46,11 @@ module Temporalio search_attributes: SearchAttributes?, start_delay: duration?, request_eager_start: bool, - headers: Hash[String, Object?], - priority: Priority, versioning_override: VersioningOverride?, + priority: Priority, + arg_hints: Array[Object]?, + result_hint: Object?, + headers: Hash[String, Object?], rpc_options: RPCOptions? ) -> void end @@ -57,6 +61,8 @@ module Temporalio attr_reader args: Array[Object?] attr_reader wait_for_stage: WorkflowUpdateWaitStage::enum attr_reader start_workflow_operation: WithStartWorkflowOperation + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? @@ -66,6 +72,8 @@ module Temporalio args: Array[Object?], wait_for_stage: WorkflowUpdateWaitStage::enum, start_workflow_operation: WithStartWorkflowOperation, + arg_hints: Array[Object]?, + result_hint: Object?, headers: Hash[String, Object?], rpc_options: RPCOptions? ) -> void @@ -75,12 +83,14 @@ module Temporalio attr_reader signal: String attr_reader args: Array[Object?] attr_reader start_workflow_operation: WithStartWorkflowOperation + attr_reader arg_hints: Array[Object]? attr_reader rpc_options: RPCOptions? def initialize: ( signal: String, args: Array[Object?], start_workflow_operation: WithStartWorkflowOperation, + arg_hints: Array[Object]?, rpc_options: RPCOptions? ) -> void end @@ -144,6 +154,7 @@ module Temporalio attr_reader run_id: String? attr_reader signal: String attr_reader args: Array[Object?] + attr_reader arg_hints: Array[Object]? attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? @@ -152,6 +163,7 @@ module Temporalio run_id: String?, signal: String, args: Array[Object?], + arg_hints: Array[Object]?, headers: Hash[String, Object?], rpc_options: RPCOptions? ) -> void @@ -163,6 +175,8 @@ module Temporalio attr_reader query: String attr_reader args: Array[Object?] attr_reader reject_condition: WorkflowQueryRejectCondition::enum? + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? @@ -172,6 +186,8 @@ module Temporalio query: String, args: Array[Object?], reject_condition: WorkflowQueryRejectCondition::enum?, + arg_hints: Array[Object]?, + result_hint: Object?, headers: Hash[String, Object?], rpc_options: RPCOptions? ) -> void @@ -184,6 +200,8 @@ module Temporalio attr_reader update: String attr_reader args: Array[Object?] attr_reader wait_for_stage: WorkflowUpdateWaitStage::enum + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] attr_reader rpc_options: RPCOptions? @@ -194,6 +212,8 @@ module Temporalio update: String, args: Array[Object?], wait_for_stage: WorkflowUpdateWaitStage::enum, + arg_hints: Array[Object]?, + result_hint: Object?, headers: Hash[String, Object?], rpc_options: RPCOptions? ) -> void @@ -358,11 +378,13 @@ module Temporalio class HeartbeatAsyncActivityInput attr_reader task_token_or_id_reference: String | ActivityIDReference attr_reader details: Array[Object?] + attr_reader detail_hints: Array[Object]? attr_reader rpc_options: RPCOptions? def initialize: ( task_token_or_id_reference: String | ActivityIDReference, details: Array[Object?], + detail_hints: Array[Object]?, rpc_options: RPCOptions? ) -> void end @@ -370,11 +392,13 @@ module Temporalio class CompleteAsyncActivityInput attr_reader task_token_or_id_reference: String | ActivityIDReference attr_reader result: Object? + attr_reader result_hint: Object? attr_reader rpc_options: RPCOptions? def initialize: ( task_token_or_id_reference: String | ActivityIDReference, result: Object?, + result_hint: Object?, rpc_options: RPCOptions? ) -> void end @@ -383,12 +407,14 @@ module Temporalio attr_reader task_token_or_id_reference: String | ActivityIDReference attr_reader error: Exception attr_reader last_heartbeat_details: Array[Object?] + attr_reader last_heartbeat_detail_hints: Array[Object]? attr_reader rpc_options: RPCOptions? def initialize: ( task_token_or_id_reference: String | ActivityIDReference, error: Exception, last_heartbeat_details: Array[Object?], + last_heartbeat_detail_hints: Array[Object]?, rpc_options: RPCOptions? ) -> void end @@ -396,12 +422,13 @@ module Temporalio class ReportCancellationAsyncActivityInput attr_reader task_token_or_id_reference: String | ActivityIDReference attr_reader details: Array[Object?] + attr_reader detail_hints: Array[Object]? attr_reader rpc_options: RPCOptions? def initialize: ( task_token_or_id_reference: String | ActivityIDReference, details: Array[Object?], - rpc_options: RPCOptions? + detail_hints: Array[Object]?, rpc_options: RPCOptions? ) -> void end diff --git a/temporalio/sig/temporalio/client/schedule.rbs b/temporalio/sig/temporalio/client/schedule.rbs index 44014961..24f92d02 100644 --- a/temporalio/sig/temporalio/client/schedule.rbs +++ b/temporalio/sig/temporalio/client/schedule.rbs @@ -74,6 +74,7 @@ module Temporalio attr_reader retry_policy: RetryPolicy? attr_reader memo: Hash[String, Object?]? attr_reader search_attributes: SearchAttributes? + attr_reader arg_hints: Array[Object]? attr_reader headers: Hash[String, Object?]? def self.new: ( @@ -89,6 +90,7 @@ module Temporalio ?retry_policy: RetryPolicy?, ?memo: Hash[String, Object?]?, ?search_attributes: SearchAttributes?, + ?arg_hints: Array[Object]?, ?headers: Hash[String, Object?]? ) -> StartWorkflow diff --git a/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs index 6f21e864..7d66d74a 100644 --- a/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs +++ b/temporalio/sig/temporalio/client/with_start_workflow_operation.rbs @@ -18,6 +18,8 @@ module Temporalio attr_reader memo: Hash[String | Symbol, Object?]? attr_reader search_attributes: SearchAttributes? attr_reader start_delay: duration? + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] def initialize: ( @@ -37,6 +39,8 @@ module Temporalio memo: Hash[String | Symbol, Object?]?, search_attributes: SearchAttributes?, start_delay: duration?, + arg_hints: Array[Object]?, + result_hint: Object?, headers: Hash[String, Object?] ) -> void end @@ -60,6 +64,8 @@ module Temporalio ?memo: Hash[String | Symbol, Object?]?, ?search_attributes: SearchAttributes?, ?start_delay: duration?, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?headers: Hash[String, Object?] ) -> void diff --git a/temporalio/sig/temporalio/client/workflow_handle.rbs b/temporalio/sig/temporalio/client/workflow_handle.rbs index a0ad22fb..e95ffaa5 100644 --- a/temporalio/sig/temporalio/client/workflow_handle.rbs +++ b/temporalio/sig/temporalio/client/workflow_handle.rbs @@ -5,17 +5,20 @@ module Temporalio attr_reader run_id: String? attr_reader result_run_id: String? attr_reader first_execution_run_id: String? + attr_reader result_hint: Object? def initialize: ( client: Client, id: String, run_id: String?, result_run_id: String?, - first_execution_run_id: String? + first_execution_run_id: String?, + result_hint: Object? ) -> void def result: ( ?follow_runs: bool, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? @@ -39,6 +42,7 @@ module Temporalio def signal: ( Workflow::Definition::Signal | Symbol | String signal, *Object? args, + ?arg_hints: Array[Object]?, ?rpc_options: RPCOptions? ) -> void @@ -46,6 +50,8 @@ module Temporalio Workflow::Definition::Query | Symbol | String query, *Object? args, ?reject_condition: WorkflowQueryRejectCondition::enum?, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? @@ -54,6 +60,8 @@ module Temporalio *Object? args, wait_for_stage: WorkflowUpdateWaitStage::enum, ?id: String, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> WorkflowUpdateHandle @@ -61,12 +69,15 @@ module Temporalio Workflow::Definition::Update | Symbol | String update, *Object? args, ?id: String, + ?arg_hints: Array[Object]?, + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? def update_handle: ( String id, - ?specific_run_id: String? + ?specific_run_id: String?, + ?result_hint: Object? ) -> WorkflowUpdateHandle def cancel: ( diff --git a/temporalio/sig/temporalio/client/workflow_update_handle.rbs b/temporalio/sig/temporalio/client/workflow_update_handle.rbs index 5bf4c3f8..8c13985c 100644 --- a/temporalio/sig/temporalio/client/workflow_update_handle.rbs +++ b/temporalio/sig/temporalio/client/workflow_update_handle.rbs @@ -4,18 +4,21 @@ module Temporalio attr_reader id: String attr_reader workflow_id: String attr_reader workflow_run_id: String? + attr_reader result_hint: Object? def initialize: ( client: Client, id: String, workflow_id: String, workflow_run_id: String?, - known_outcome: untyped? + known_outcome: untyped?, + result_hint: Object? ) -> void def result_obtained?: -> bool def result: ( + ?result_hint: Object?, ?rpc_options: RPCOptions? ) -> Object? end diff --git a/temporalio/sig/temporalio/converters/data_converter.rbs b/temporalio/sig/temporalio/converters/data_converter.rbs index 9f1b89b3..7056142c 100644 --- a/temporalio/sig/temporalio/converters/data_converter.rbs +++ b/temporalio/sig/temporalio/converters/data_converter.rbs @@ -13,11 +13,11 @@ module Temporalio ?payload_codec: PayloadCodec? ) -> void - def to_payload: (Object? value) -> untyped - def to_payloads: (Array[Object?] values) -> untyped + def to_payload: (Object? value, ?hint: Object?) -> untyped + def to_payloads: (Array[Object?] values, ?hints: Array[Object]?) -> untyped - def from_payload: (untyped payload) -> Object? - def from_payloads: (untyped payloads) -> Array[Object?] + def from_payload: (untyped payload, ?hint: Object?) -> Object? + def from_payloads: (untyped payloads, ?hints: Array[Object]?) -> Array[Object?] def to_failure: (Exception error) -> untyped def from_failure: (untyped failure) -> Exception diff --git a/temporalio/sig/temporalio/converters/payload_converter.rbs b/temporalio/sig/temporalio/converters/payload_converter.rbs index f0f6d9a9..113de88d 100644 --- a/temporalio/sig/temporalio/converters/payload_converter.rbs +++ b/temporalio/sig/temporalio/converters/payload_converter.rbs @@ -8,11 +8,11 @@ module Temporalio ?json_generate_options: Hash[Symbol, untyped] ) -> PayloadConverter - def to_payload: (Object? value) -> untyped - def to_payloads: (Array[Object?] values) -> untyped + def to_payload: (Object? value, ?hint: Object?) -> untyped + def to_payloads: (Array[Object?] values, ?hints: Array[Object]?) -> untyped - def from_payload: (untyped payload) -> Object? - def from_payloads: (untyped payloads) -> Array[Object?] + def from_payload: (untyped payload, ?hint: Object?) -> Object? + def from_payloads: (untyped payloads, ?hints: Array[Object]?) -> Array[Object?] end end end \ No newline at end of file diff --git a/temporalio/sig/temporalio/converters/payload_converter/encoding.rbs b/temporalio/sig/temporalio/converters/payload_converter/encoding.rbs index af7b8714..2bbee826 100644 --- a/temporalio/sig/temporalio/converters/payload_converter/encoding.rbs +++ b/temporalio/sig/temporalio/converters/payload_converter/encoding.rbs @@ -4,9 +4,9 @@ module Temporalio class Encoding def encoding: -> String - def to_payload: (Object? value) -> untyped? + def to_payload: (Object? value, ?hint: Object?) -> untyped? - def from_payload: (untyped payload) -> Object? + def from_payload: (untyped payload, ?hint: Object?) -> Object? end end end diff --git a/temporalio/sig/temporalio/internal/proto_utils.rbs b/temporalio/sig/temporalio/internal/proto_utils.rbs index 8163528b..47180ea4 100644 --- a/temporalio/sig/temporalio/internal/proto_utils.rbs +++ b/temporalio/sig/temporalio/internal/proto_utils.rbs @@ -51,12 +51,14 @@ module Temporalio def self.convert_from_payload_array: ( Converters::DataConverter | Converters::PayloadConverter converter, - Array[untyped] payloads + Array[untyped] payloads, + hints: Array[Object]? ) -> Array[Object?] def self.convert_to_payload_array: ( Converters::DataConverter | Converters::PayloadConverter converter, - Array[Object?] values + Array[Object?] values, + hints: Array[Object]? ) -> Array[untyped] def self.assert_non_reserved_name: (String | Symbol | nil name) -> void diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs index 378eebde..63561199 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance.rbs @@ -91,6 +91,7 @@ module Temporalio payload_array: Array[untyped], method_name: Symbol?, raw_args: bool, + arg_hints: Array[Object]?, ?ignore_first_param: bool ) -> Array[Object?] diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/child_workflow_handle.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/child_workflow_handle.rbs index ce0be17c..b55a8ed0 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/child_workflow_handle.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/child_workflow_handle.rbs @@ -8,7 +8,8 @@ module Temporalio first_execution_run_id: String, instance: WorkflowInstance, cancellation: Cancellation, - cancel_callback_key: Object + cancel_callback_key: Object, + result_hint: Object? ) -> void def _resolve: (untyped resolution) -> void diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs index 1c1b9b10..e3ffda6e 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs @@ -40,7 +40,9 @@ module Temporalio cancellation_type: Workflow::ActivityCancellationType::enum, activity_id: String?, disable_eager_execution: bool, - priority: Priority + priority: Priority, + arg_hints: Array[Object]?, + result_hint: Object? ) -> Object? def execute_local_activity: ( @@ -53,7 +55,9 @@ module Temporalio local_retry_threshold: duration?, cancellation: Cancellation, cancellation_type: Workflow::ActivityCancellationType::enum, - activity_id: String? + activity_id: String?, + arg_hints: Array[Object]?, + result_hint: Object? ) -> Object? def external_workflow_handle: (String workflow_id, ?run_id: String?) -> ExternalWorkflowHandle @@ -110,7 +114,9 @@ module Temporalio cron_schedule: String?, memo: Hash[String | Symbol, Object?]?, search_attributes: SearchAttributes?, - priority: Priority + priority: Priority, + arg_hints: Array[Object]?, + result_hint: Object? ) -> ChildWorkflowHandle def storage: -> Hash[Object, Object] @@ -138,7 +144,8 @@ module Temporalio id: String, signal: Workflow::Definition::Signal | Symbol | String, args: Array[Object?], - cancellation: Cancellation + cancellation: Cancellation, + arg_hints: Array[Object]? ) -> void def _signal_external_workflow: ( @@ -146,7 +153,8 @@ module Temporalio run_id: String?, signal: Workflow::Definition::Signal | Symbol | String, args: Array[Object?], - cancellation: Cancellation + cancellation: Cancellation, + arg_hints: Array[Object]? ) -> void end end diff --git a/temporalio/sig/temporalio/internal/worker/workflow_instance/outbound_implementation.rbs b/temporalio/sig/temporalio/internal/worker/workflow_instance/outbound_implementation.rbs index ae78b428..e3d55140 100644 --- a/temporalio/sig/temporalio/internal/worker/workflow_instance/outbound_implementation.rbs +++ b/temporalio/sig/temporalio/internal/worker/workflow_instance/outbound_implementation.rbs @@ -7,13 +7,15 @@ module Temporalio def execute_activity_with_local_backoffs: ( local: bool, - cancellation: Cancellation + cancellation: Cancellation, + result_hint: Object? ) { (untyped?) -> Integer } -> Object? def execute_activity_once: ( local: bool, cancellation: Cancellation, - last_local_backoff: untyped? + last_local_backoff: untyped?, + result_hint: Object? ) { (untyped?) -> Integer } -> Object? def _signal_external_workflow: ( @@ -23,6 +25,7 @@ module Temporalio signal: Workflow::Definition::Signal | Symbol | String, args: Array[Object?], cancellation: Cancellation, + arg_hints: Array[Object]?, headers: Hash[String, Object?] ) -> void end diff --git a/temporalio/sig/temporalio/worker/interceptor.rbs b/temporalio/sig/temporalio/worker/interceptor.rbs index 4a7d4b80..78053e5b 100644 --- a/temporalio/sig/temporalio/worker/interceptor.rbs +++ b/temporalio/sig/temporalio/worker/interceptor.rbs @@ -7,11 +7,14 @@ module Temporalio class ExecuteInput attr_reader proc: Proc attr_reader args: Array[Object?] + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] def initialize: ( proc: Proc, args: Array[Object?], + result_hint: Object?, headers: Hash[String, Object?] ) -> void end @@ -28,8 +31,9 @@ module Temporalio class HeartbeatInput attr_reader details: Array[Object?] + attr_reader detail_hints: Array[Object]? - def initialize: (details: Array[Object?]) -> void + def initialize: (details: Array[Object?], detail_hints: Array[Object]?) -> void end class Outbound @@ -142,8 +146,10 @@ module Temporalio attr_reader cancellation_type: Temporalio::Workflow::ActivityCancellationType::enum attr_reader activity_id: String? attr_reader disable_eager_execution: bool - attr_reader headers: Hash[String, Object?] attr_reader priority: Temporalio::Priority + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? + attr_reader headers: Hash[String, Object?] def initialize: ( activity: String, @@ -159,8 +165,10 @@ module Temporalio cancellation_type: Temporalio::Workflow::ActivityCancellationType::enum, activity_id: String?, disable_eager_execution: bool, - headers: Hash[String, Object?], - priority: Temporalio::Priority + priority: Temporalio::Priority, + arg_hints: Array[Object]?, + result_hint: Object?, + headers: Hash[String, Object?] ) -> void end @@ -175,6 +183,8 @@ module Temporalio attr_reader cancellation: Cancellation attr_reader cancellation_type: Temporalio::Workflow::ActivityCancellationType::enum attr_reader activity_id: String? + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? attr_reader headers: Hash[String, Object?] def initialize: ( @@ -188,6 +198,8 @@ module Temporalio cancellation: Cancellation, cancellation_type: Temporalio::Workflow::ActivityCancellationType::enum, activity_id: String?, + arg_hints: Array[Object]?, + result_hint: Object?, headers: Hash[String, Object?] ) -> void end @@ -205,6 +217,7 @@ module Temporalio attr_reader signal: String attr_reader args: Array[Object?] attr_reader cancellation: Cancellation + attr_reader arg_hints: Array[Object]? attr_reader headers: Hash[String, Object?] def initialize: ( @@ -212,6 +225,7 @@ module Temporalio signal: String, args: Array[Object?], cancellation: Cancellation, + arg_hints: Array[Object]?, headers: Hash[String, Object?] ) -> void end @@ -222,6 +236,7 @@ module Temporalio attr_reader signal: String attr_reader args: Array[Object?] attr_reader cancellation: Cancellation + attr_reader arg_hints: Array[Object]? attr_reader headers: Hash[String, Object?] def initialize: ( @@ -230,6 +245,7 @@ module Temporalio signal: String, args: Array[Object?], cancellation: Cancellation, + arg_hints: Array[Object]?, headers: Hash[String, Object?] ) -> void end @@ -264,8 +280,10 @@ module Temporalio attr_reader cron_schedule: String? attr_reader memo: Hash[String | Symbol, Object?]? attr_reader search_attributes: SearchAttributes? - attr_reader headers: Hash[String, Object?] attr_reader priority: Temporalio::Priority + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? + attr_reader headers: Hash[String, Object?] def initialize: ( workflow: String, @@ -285,8 +303,10 @@ module Temporalio cron_schedule: String?, memo: Hash[String | Symbol, Object?]?, search_attributes: SearchAttributes?, - headers: Hash[String, Object?], - priority: Temporalio::Priority + priority: Temporalio::Priority, + arg_hints: Array[Object]?, + result_hint: Object?, + headers: Hash[String, Object?] ) -> void end diff --git a/temporalio/sig/temporalio/workflow.rbs b/temporalio/sig/temporalio/workflow.rbs index 2bbdcbad..99c736dd 100644 --- a/temporalio/sig/temporalio/workflow.rbs +++ b/temporalio/sig/temporalio/workflow.rbs @@ -31,7 +31,9 @@ module Temporalio ?cancellation_type: ActivityCancellationType::enum, ?activity_id: String?, ?disable_eager_execution: bool, - ?priority: Priority + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> Object? def self.execute_child_workflow: ( @@ -52,7 +54,9 @@ module Temporalio ?cron_schedule: String?, ?memo: Hash[String | Symbol, Object?]?, ?search_attributes: SearchAttributes?, - ?priority: Priority + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> Object? def self.execute_local_activity: ( @@ -65,7 +69,9 @@ module Temporalio ?local_retry_threshold: duration?, ?cancellation: Cancellation, ?cancellation_type: ActivityCancellationType::enum, - ?activity_id: String? + ?activity_id: String?, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> Object? def self.external_workflow_handle: (String workflow_id, ?run_id: String?) -> ExternalWorkflowHandle @@ -116,7 +122,9 @@ module Temporalio ?cron_schedule: String?, ?memo: Hash[String | Symbol, Object?]?, ?search_attributes: SearchAttributes?, - ?priority: Priority + ?priority: Priority, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> ChildWorkflowHandle def self.storage: -> Hash[Object, Object] @@ -158,6 +166,7 @@ module Temporalio attr_accessor retry_policy: RetryPolicy? attr_accessor memo: Hash[String | Symbol, Object?]? attr_accessor search_attributes: SearchAttributes? + attr_accessor arg_hints: Array[Object]? attr_accessor headers: Hash[String, Object?] def initialize: ( @@ -169,6 +178,7 @@ module Temporalio ?retry_policy: RetryPolicy?, ?memo: Hash[String | Symbol, Object?]?, ?search_attributes: SearchAttributes?, + ?arg_hints: Array[Object]?, ?headers: Hash[String, Object?] ) -> void end diff --git a/temporalio/sig/temporalio/workflow/child_workflow_handle.rbs b/temporalio/sig/temporalio/workflow/child_workflow_handle.rbs index b818abf7..ddce2945 100644 --- a/temporalio/sig/temporalio/workflow/child_workflow_handle.rbs +++ b/temporalio/sig/temporalio/workflow/child_workflow_handle.rbs @@ -3,13 +3,15 @@ module Temporalio class ChildWorkflowHandle def id: -> String def first_execution_run_id: -> String + def result_hint: -> Object? - def result: -> Object? + def result: (?result_hint: Object?) -> Object? def signal: ( Workflow::Definition::Signal | Symbol | String signal, *Object? args, - ?cancellation: Cancellation + ?cancellation: Cancellation, + ?arg_hints: Array[Object]? ) -> void end end diff --git a/temporalio/sig/temporalio/workflow/definition.rbs b/temporalio/sig/temporalio/workflow/definition.rbs index 0f034683..09268c4e 100644 --- a/temporalio/sig/temporalio/workflow/definition.rbs +++ b/temporalio/sig/temporalio/workflow/definition.rbs @@ -4,6 +4,8 @@ module Temporalio def self.workflow_name: (String | Symbol workflow_name) -> void def self.workflow_dynamic: (?bool value) -> void def self.workflow_raw_args: (?bool value) -> void + def self.workflow_arg_hint: (*Object hints) -> void + def self.workflow_result_hint: (Object hint) -> void def self.workflow_failure_exception_type: (*singleton(Exception) types) -> void def self.workflow_query_attr_reader: (*Symbol attr_names, ?description: String?) -> void @@ -16,14 +18,17 @@ module Temporalio ?description: String?, ?dynamic: bool, ?raw_args: bool, - ?unfinished_policy: HandlerUnfinishedPolicy::enum + ?unfinished_policy: HandlerUnfinishedPolicy::enum, + ?arg_hints: Object | Array[Object] | nil ) -> void def self.workflow_query: ( ?name: String | Symbol | nil, ?description: String?, ?dynamic: bool, - ?raw_args: bool + ?raw_args: bool, + ?arg_hints: Object | Array[Object] | nil, + ?result_hint: Object? ) -> void def self.workflow_update: ( @@ -31,7 +36,9 @@ module Temporalio ?description: String?, ?dynamic: bool, ?raw_args: bool, - ?unfinished_policy: HandlerUnfinishedPolicy::enum + ?unfinished_policy: HandlerUnfinishedPolicy::enum, + ?arg_hints: Object | Array[Object] | nil, + ?result_hint: Object? ) -> void def self.workflow_update_validator: (Symbol update_method) -> void @@ -43,9 +50,9 @@ module Temporalio def self._workflow_definition: -> Info - def self._workflow_type_from_workflow_parameter: ( + def self._workflow_type_and_hints_from_workflow_parameter: ( singleton(Workflow::Definition) | Workflow::Definition::Info | Symbol | String workflow - ) -> String + ) -> [String, Array[Object]?, Object?] def self._build_workflow_definition: -> Info @@ -63,6 +70,8 @@ module Temporalio attr_reader updates: Hash[String?, Update] attr_reader versioning_behavior: Integer? attr_reader dynamic_options_method: Symbol? + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? def self.from_class: (singleton(Definition) workflow_class) -> Info @@ -77,7 +86,9 @@ module Temporalio ?queries: Hash[String, Query], ?updates: Hash[String, Update], ?versioning_behavior: Integer?, - ?dynamic_options_method: Symbol? + ?dynamic_options_method: Symbol?, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> void def name: -> String? @@ -89,15 +100,19 @@ module Temporalio attr_reader description: String? attr_reader raw_args: bool attr_reader unfinished_policy: HandlerUnfinishedPolicy::enum + attr_reader arg_hints: Array[Object]? - def self._name_from_parameter: (Workflow::Definition::Signal | String | Symbol) -> String + def self._name_and_hints_from_parameter: ( + Workflow::Definition::Signal | String | Symbol signal + ) -> [String, Array[Object]?] def initialize: ( name: String?, to_invoke: Symbol | Proc, ?description: String?, ?raw_args: bool, - ?unfinished_policy: HandlerUnfinishedPolicy::enum + ?unfinished_policy: HandlerUnfinishedPolicy::enum, + ?arg_hints: Array[Object]? ) -> void end @@ -106,14 +121,20 @@ module Temporalio attr_reader to_invoke: Symbol | Proc attr_reader description: String? attr_reader raw_args: bool + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? - def self._name_from_parameter: (Workflow::Definition::Query | String | Symbol) -> String + def self._name_and_hints_from_parameter: ( + Workflow::Definition::Query | String | Symbol query + ) -> [String, Array[Object]?, Object?] def initialize: ( name: String?, to_invoke: Symbol | Proc, ?description: String?, - ?raw_args: bool + ?raw_args: bool, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> void end @@ -124,8 +145,12 @@ module Temporalio attr_reader raw_args: bool attr_reader unfinished_policy: HandlerUnfinishedPolicy::enum attr_reader validator_to_invoke: Symbol | Proc | nil + attr_reader arg_hints: Array[Object]? + attr_reader result_hint: Object? - def self._name_from_parameter: (Workflow::Definition::Update | String | Symbol) -> String + def self._name_and_hints_from_parameter: ( + Workflow::Definition::Update | String | Symbol update + ) -> [String, Array[Object]?, Object?] def initialize: ( name: String?, @@ -133,7 +158,9 @@ module Temporalio ?description: String?, ?raw_args: bool, ?unfinished_policy: HandlerUnfinishedPolicy::enum, - ?validator_to_invoke: Symbol | Proc | nil + ?validator_to_invoke: Symbol | Proc | nil, + ?arg_hints: Array[Object]?, + ?result_hint: Object? ) -> void def _with_validator_to_invoke: (Symbol | Proc | nil validator_to_invoke) -> Update diff --git a/temporalio/sig/temporalio/workflow/external_workflow_handle.rbs b/temporalio/sig/temporalio/workflow/external_workflow_handle.rbs index 4142fefb..762ca16e 100644 --- a/temporalio/sig/temporalio/workflow/external_workflow_handle.rbs +++ b/temporalio/sig/temporalio/workflow/external_workflow_handle.rbs @@ -7,7 +7,8 @@ module Temporalio def signal: ( Workflow::Definition::Signal | Symbol | String signal, *Object? args, - ?cancellation: Cancellation + ?cancellation: Cancellation, + ?arg_hints: Array[Object]? ) -> void def cancel: -> void diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index f5692864..e9184fc1 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -1786,16 +1786,16 @@ def test_workflow_buffered_metrics end class FailWorkflowPayloadConverter < Temporalio::Converters::PayloadConverter - def to_payload(value) + def to_payload(value, hint: nil) if value == 'fail-on-this-result' raise Temporalio::Error::ApplicationError.new('Intentional error', type: 'IntentionalError') end - Temporalio::Converters::PayloadConverter.default.to_payload(value) + Temporalio::Converters::PayloadConverter.default.to_payload(value, hint:) end - def from_payload(payload) - value = Temporalio::Converters::PayloadConverter.default.from_payload(payload) + def from_payload(payload, hint: nil) + value = Temporalio::Converters::PayloadConverter.default.from_payload(payload, hint:) if value == 'fail-on-this' raise Temporalio::Error::ApplicationError.new('Intentional error', type: 'IntentionalError') end @@ -2329,6 +2329,198 @@ def test_no_local_activity end end + class HintTrackingJSONConverter < Temporalio::Converters::PayloadConverter::JSONPlain + attr_accessor :outbound_hints, :inbound_hints + + def to_payload(value, hint: nil) + (@outbound_hints ||= []) << { value:, hint: } + super + end + + def from_payload(payload, hint: nil) + super.tap { |value| (@inbound_hints ||= []) << { value:, hint: } } + end + end + + class HintActivity < Temporalio::Activity::Definition + activity_arg_hint :activity_arg1, :activity_arg2 + activity_result_hint :activity_result + + def execute(_arg1, _arg2) + 'act_result' + end + end + + class HintWorkflow < Temporalio::Workflow::Definition + workflow_arg_hint :workflow_arg1, :workflow_arg2 + workflow_result_hint :workflow_result + + def execute(_arg1, _arg2) + # Complete if we're continued + return 'wf_result' if Temporalio::Workflow.info.continued_run_id + + Temporalio::Workflow.execute_activity(HintActivity, 'act_arg1', 'act_arg2', start_to_close_timeout: 10) + Temporalio::Workflow.wait_condition { @got_update } + raise Temporalio::Workflow::ContinueAsNewError.new('cont_wf_arg1', 'cont_wf_arg2') + end + + workflow_signal arg_hints: :signal_arg1 + def my_signal(_arg1) + # No-op + end + + workflow_query arg_hints: :query_arg1, result_hint: :query_result + def my_query(_arg1) + 'que_result' + end + + workflow_update arg_hints: :update_arg1, result_hint: :update_result + def my_update(_arg1) + # Start child workflow, send signal to it, wait for completion + handle = Temporalio::Workflow.start_child_workflow(HintChildWorkflow, 'child_wf_arg1') + handle.signal(HintChildWorkflow.my_signal, 'child_sig_arg1') + handle.result + @got_update = true + 'upd_result' + end + end + + class HintChildWorkflow < Temporalio::Workflow::Definition + workflow_arg_hint :child_workflow_arg1 + workflow_result_hint :child_workflow_result + + def execute(_arg1) + Temporalio::Workflow.wait_condition { @got_signal } + 'child_wf_result' + end + + workflow_signal arg_hints: :child_signal_arg1 + def my_signal(_arg1) + @got_signal = true + end + end + + class HintWithStartWorkflow < Temporalio::Workflow::Definition + # Intentionally one less hint than args and no result hint + workflow_arg_hint :with_start_arg1 + + def execute(_arg1, _arg2) + Temporalio::Workflow.wait_condition { @result } + end + + workflow_signal arg_hints: :with_start_signal_arg1 + def my_signal(_arg1) + @result = 'sig_with_start_wf_result' + end + + # Intentionally no arg hint + workflow_update result_hint: :with_start_update_result + def my_update(_arg1) + @result = 'upd_with_start_wf_result' + 'upd_with_start_upd_result' + end + end + + def test_hints + # New client with tracking JSON converter + conv = HintTrackingJSONConverter.new + client = Temporalio::Client.new(**env.client.options.with( + data_converter: Temporalio::Converters::DataConverter.new( + payload_converter: Temporalio::Converters::PayloadConverter::Composite.new( + *Temporalio::Converters::PayloadConverter.default.converters.values.map do |c| + c.is_a?(Temporalio::Converters::PayloadConverter::JSONPlain) ? conv : c + end + ) + ) + ).to_h) + @expected_outbound_hints = [] + @expected_inbound_hints = [] + + # Start worker + task_queue = "tq-#{SecureRandom.uuid}" + Temporalio::Worker.new(client:, task_queue:, + workflows: [HintWorkflow, HintChildWorkflow, HintWithStartWorkflow], + activities: [HintActivity]).run do + # Run workflow + wf_hints = [{ value: 'wf_arg1', hint: :workflow_arg1 }, + { value: 'wf_arg2', hint: :workflow_arg2 }, + { value: 'wf_result', hint: :workflow_result }] + act_hints = [{ value: 'act_arg1', hint: :activity_arg1 }, + { value: 'act_arg2', hint: :activity_arg2 }, + { value: 'act_result', hint: :activity_result }] + @expected_outbound_hints.push(*wf_hints, *act_hints) + @expected_inbound_hints.push(*wf_hints, *act_hints) + handle = client.start_workflow( + HintWorkflow, + 'wf_arg1', 'wf_arg2', + id: "wf-#{SecureRandom.uuid}", task_queue: + ) + + # Send messages + msg_hints = [{ value: 'sig_arg1', hint: :signal_arg1 }, + { value: 'que_arg1', hint: :query_arg1 }, + { value: 'que_result', hint: :query_result }, + { value: 'upd_arg1', hint: :update_arg1 }, + { value: 'upd_result', hint: :update_result }] + @expected_outbound_hints.push(*msg_hints) + @expected_inbound_hints.push(*msg_hints) + handle.signal(HintWorkflow.my_signal, 'sig_arg1') + assert_equal 'que_result', handle.query(HintWorkflow.my_query, 'que_arg1') + assert_equal 'upd_result', handle.execute_update(HintWorkflow.my_update, 'upd_arg1') + + # Other things that happened + child_hints = [{ value: 'child_wf_arg1', hint: :child_workflow_arg1 }, + { value: 'child_sig_arg1', hint: :child_signal_arg1 }, + { value: 'child_wf_result', hint: :child_workflow_result }] + cont_hints = [{ value: 'cont_wf_arg1', hint: :workflow_arg1 }, + { value: 'cont_wf_arg2', hint: :workflow_arg2 }] + @expected_outbound_hints.push(*child_hints, *cont_hints) + @expected_inbound_hints.push(*child_hints, *cont_hints) + + # Check result + assert_equal 'wf_result', handle.result + + # Signal with start + sig_with_start_hints = [{ value: 'sig_with_start_wf_arg1', hint: :with_start_arg1 }, + { value: 'sig_with_start_wf_arg2', hint: nil }, + { value: 'sig_with_start_sig_arg1', hint: :with_start_signal_arg1 }, + { value: 'sig_with_start_wf_result', hint: nil }] + @expected_outbound_hints.push(*sig_with_start_hints) + @expected_inbound_hints.push(*sig_with_start_hints) + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + HintWithStartWorkflow, 'sig_with_start_wf_arg1', 'sig_with_start_wf_arg2', + id: "wf-#{SecureRandom.uuid}", task_queue: + ) + client.signal_with_start_workflow( + HintWithStartWorkflow.my_signal, 'sig_with_start_sig_arg1', + start_workflow_operation: + ) + assert_equal 'sig_with_start_wf_result', start_workflow_operation.workflow_handle.result + + # Update with start + upd_with_start_hints = [{ value: 'upd_with_start_wf_arg1', hint: :with_start_arg1 }, + { value: 'upd_with_start_wf_arg2', hint: nil }, + { value: 'upd_with_start_upd_arg1', hint: nil }, + { value: 'upd_with_start_upd_result', hint: :with_start_update_result }, + { value: 'upd_with_start_wf_result', hint: nil }] + @expected_outbound_hints.push(*upd_with_start_hints) + @expected_inbound_hints.push(*upd_with_start_hints) + start_workflow_operation = Temporalio::Client::WithStartWorkflowOperation.new( + HintWithStartWorkflow, 'upd_with_start_wf_arg1', 'upd_with_start_wf_arg2', + id: "wf-#{SecureRandom.uuid}", task_queue:, id_conflict_policy: Temporalio::WorkflowIDConflictPolicy::FAIL + ) + assert_equal 'upd_with_start_upd_result', client.execute_update_with_start_workflow( + HintWithStartWorkflow.my_update, 'upd_with_start_upd_arg1', + start_workflow_operation: + ) + assert_equal 'upd_with_start_wf_result', start_workflow_operation.workflow_handle.result + end + + # Check hints + assert_equal @expected_outbound_hints.to_set, conv.outbound_hints.to_set + assert_equal @expected_inbound_hints.to_set, conv.inbound_hints.to_set + end + class NonDurableTimerWorkfow < Temporalio::Workflow::Definition def execute sleep(0.1)