diff --git a/README.md b/README.md index e5e4a51c..43c107ea 100644 --- a/README.md +++ b/README.md @@ -906,6 +906,11 @@ Some notes about activity definition: "Activity Concurrency and Executors" section later for more details. * Technically an activity definition can be created manually via `Temporalio::Activity::Definition::Info.new` that accepts a proc or a block, but the class form is recommended. +* `activity_dynamic` can be used to mark an activity dynamic. Dynamic activities do not have names and handle any + activity that is not otherwise registered. A worker can only have one dynamic activity. +* `workflow_raw_args` can be used to have activity arguments delivered to `execute` as + `Temporalio::Converters::RawValue`s. These are wrappers for the raw payloads that have not been converted to types + (but they have been decoded by the codec if present). They can be converted with `payload_converter` on the context. #### Activity Context diff --git a/temporalio/lib/temporalio/activity/definition.rb b/temporalio/lib/temporalio/activity/definition.rb index c107ca87..58d8641a 100644 --- a/temporalio/lib/temporalio/activity/definition.rb +++ b/temporalio/lib/temporalio/activity/definition.rb @@ -49,20 +49,47 @@ def activity_executor(executor_name) # @param cancel_raise [Boolean] Whether to raise. def activity_cancel_raise(cancel_raise) unless cancel_raise.is_a?(TrueClass) || cancel_raise.is_a?(FalseClass) - raise ArgumentError, - 'Must be a boolean' + raise ArgumentError, 'Must be a boolean' end @activity_cancel_raise = cancel_raise end + + # Set an activity as dynamic. Dynamic activities do not have names and handle any activity that is not otherwise + # registered. A worker can only have one dynamic activity. It is often useful to use {activity_raw_args} with + # this. + # + # @param value [Boolean] Whether the activity is dynamic. + def activity_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParameter + raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass) + + @activity_dynamic = value + end + + # Have activity arguments delivered to `execute` as {Converters::RawValue}s. These are wrappers for the raw + # payloads that have not been converted to types (but they have been decoded by the codec if present). They can + # be converted with {Context#payload_converter}. + # + # @param value [Boolean] Whether the activity accepts raw arguments. + def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter + raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass) + + @activity_raw_args = value + end end # @!visibility private def self._activity_definition_details + activity_name = @activity_name + raise 'Cannot have activity name specified for dynamic activity' if activity_name && @activity_dynamic + + # Default to unqualified class name if not dynamic + activity_name ||= name.to_s.split('::').last unless @activity_dynamic { - activity_name: @activity_name || name.to_s.split('::').last, + activity_name:, activity_executor: @activity_executor || :default, - activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise + activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise, + activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args } end @@ -75,7 +102,7 @@ def execute(*args) # Definition info of an activity. Activities are usually classes/instances that extend {Definition}, but # definitions can also be manually created with a block via {initialize} here. class Info - # @return [String, Symbol] Name of the activity. + # @return [String, Symbol, nil] Name of the activity, or nil if the activity is dynamic. attr_reader :name # @return [Proc] Proc for the activity. @@ -87,6 +114,9 @@ class Info # @return [Boolean] Whether to raise in thread/fiber on cancellation. Default is `true`. attr_reader :cancel_raise + # @return [Boolean] Whether to use {Converters::RawValue}s as arguments. + attr_reader :raw_args + # Obtain definition info representing the given activity, which can be a class, instance, or definition info. # # @param activity [Definition, Class, Info] Activity to get info for. @@ -105,14 +135,16 @@ def self.from_activity(activity) new( name: details[:activity_name], executor: details[:activity_executor], - cancel_raise: details[:activity_cancel_raise] + cancel_raise: details[:activity_cancel_raise], + raw_args: details[:activity_raw_args] ) { |*args| activity.new.execute(*args) } # Instantiate and call when Definition details = activity.class._activity_definition_details new( name: details[:activity_name], executor: details[:activity_executor], - cancel_raise: details[:activity_cancel_raise] + cancel_raise: details[:activity_cancel_raise], + raw_args: details[:activity_raw_args] ) { |*args| activity.execute(*args) } # Just and call when Info activity @@ -123,17 +155,19 @@ def self.from_activity(activity) # Manually create activity definition info. Most users will use an instance/class of {Definition}. # - # @param name [String, Symbol] Name of the activity. + # @param name [String, Symbol, nil] Name of the activity or nil for dynamic activity. # @param executor [Symbol] Name of the executor. # @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation. + # @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments. # @yield Use this block as the activity. - def initialize(name:, executor: :default, cancel_raise: true, &block) + def initialize(name:, executor: :default, cancel_raise: true, raw_args: false, &block) @name = name raise ArgumentError, 'Must give block' unless block_given? @proc = block @executor = executor @cancel_raise = cancel_raise + @raw_args = raw_args end end end diff --git a/temporalio/lib/temporalio/internal/worker/activity_worker.rb b/temporalio/lib/temporalio/internal/worker/activity_worker.rb index b727c48b..46cd0f1f 100644 --- a/temporalio/lib/temporalio/internal/worker/activity_worker.rb +++ b/temporalio/lib/temporalio/internal/worker/activity_worker.rb @@ -3,6 +3,7 @@ require 'temporalio/activity' require 'temporalio/activity/definition' require 'temporalio/cancellation' +require 'temporalio/converters/raw_value' require 'temporalio/internal/bridge/api' require 'temporalio/internal/proto_utils' require 'temporalio/scoped_logger' @@ -29,12 +30,13 @@ def initialize(worker:, bridge_worker:) Activity::Context.current_or_nil&._scoped_logger_info } - # Build up activity hash by name, failing if any fail validation + # Build up activity hash by name (can be nil for dynamic), failing if any fail validation @activities = worker.options.activities.each_with_object({}) do |act, hash| # Class means create each time, instance means just call, definition # does nothing special defn = Activity::Definition::Info.from_activity(act) # Confirm name not in use + raise ArgumentError, 'Only one dynamic activity allowed' if !defn.name && hash.key?(defn.name) raise ArgumentError, "Multiple activities named #{defn.name}" if hash.key?(defn.name) # Confirm executor is a known executor and let it initialize @@ -91,8 +93,8 @@ def handle_task(task) def handle_start_task(task_token, start) set_running_activity(task_token, nil) - # Find activity definition - defn = @activities[start.activity_type] + # Find activity definition, falling back to dynamic if present + defn = @activities[start.activity_type] || @activities[nil] if defn.nil? raise Error::ApplicationError.new( "Activity #{start.activity_type} for workflow #{start.workflow_execution.workflow_id} " \ @@ -185,10 +187,15 @@ def execute_activity(task_token, defn, start) # Build input input = Temporalio::Worker::Interceptor::Activity::ExecuteInput.new( proc: defn.proc, - args: ProtoUtils.convert_from_payload_array( - @worker.options.client.data_converter, - start.input.to_ary - ), + # If the activity wants raw_args, we only decode we don't convert + args: if defn.raw_args + payloads = start.input.to_ary + codec = @worker.options.client.data_converter.payload_codec + payloads = codec.decode(payloads) if codec + payloads.map { |p| Temporalio::Converters::RawValue.new(p) } + else + ProtoUtils.convert_from_payload_array(@worker.options.client.data_converter, start.input.to_ary) + end, headers: ProtoUtils.headers_from_proto_map(start.header_fields, @worker.options.client.data_converter) || {} ) diff --git a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb index 5cf46c25..6dc3ee50 100644 --- a/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb +++ b/temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb @@ -64,6 +64,8 @@ def execute_activity(input) else raise ArgumentError, 'Activity must be a definition class, or a symbol/string' end + raise 'Cannot invoke dynamic activities' unless activity_type + execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation) do seq = (@activity_counter += 1) @instance.add_command( @@ -102,6 +104,8 @@ def execute_local_activity(input) else raise ArgumentError, 'Activity must be a definition class, or a symbol/string' end + raise 'Cannot invoke dynamic activities' unless activity_type + execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff| seq = (@activity_counter += 1) @instance.add_command( diff --git a/temporalio/lib/temporalio/workflow/definition.rb b/temporalio/lib/temporalio/workflow/definition.rb index d926395a..7395a9b1 100644 --- a/temporalio/lib/temporalio/workflow/definition.rb +++ b/temporalio/lib/temporalio/workflow/definition.rb @@ -38,8 +38,8 @@ def workflow_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParame end # Have workflow arguments delivered to `execute` (and `initialize` if {workflow_init} in use) as - # {Converters::RawValue}s. These are wrappers for the raw payloads that have not been decoded. They can be - # decoded with {Workflow.payload_converter}. + # {Converters::RawValue}s. These are wrappers for the raw payloads that have not been converted to types (but + # they have been decoded by the codec if present). They can be converted with {Workflow.payload_converter}. # # @param value [Boolean] Whether the workflow accepts raw arguments. def workflow_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter diff --git a/temporalio/sig/temporalio/activity/definition.rbs b/temporalio/sig/temporalio/activity/definition.rbs index e3bdd324..b838fb7b 100644 --- a/temporalio/sig/temporalio/activity/definition.rbs +++ b/temporalio/sig/temporalio/activity/definition.rbs @@ -4,27 +4,32 @@ module Temporalio def self.activity_name: (String | Symbol name) -> void def self.activity_executor: (Symbol executor_name) -> void def self.activity_cancel_raise: (bool cancel_raise) -> void + def self.activity_dynamic: (?bool value) -> void + def self.activity_raw_args: (?bool value) -> void def self._activity_definition_details: -> { - activity_name: String | Symbol, + activity_name: String | Symbol | nil, activity_executor: Symbol, - activity_cancel_raise: bool + activity_cancel_raise: bool, + activity_raw_args: bool } def execute: (*untyped) -> untyped class Info - attr_reader name: String | Symbol + attr_reader name: String | Symbol | nil attr_reader proc: Proc attr_reader executor: Symbol attr_reader cancel_raise: bool + attr_reader raw_args: bool def self.from_activity: (Definition | singleton(Definition) | Info activity) -> Info def initialize: ( - name: String | Symbol, + name: String | Symbol | nil, ?executor: Symbol, - ?cancel_raise: bool + ?cancel_raise: bool, + ?raw_args: bool ) { (?) -> untyped } -> void end end diff --git a/temporalio/test/worker_activity_test.rb b/temporalio/test/worker_activity_test.rb index 2558e228..5ed29788 100644 --- a/temporalio/test/worker_activity_test.rb +++ b/temporalio/test/worker_activity_test.rb @@ -822,6 +822,41 @@ def test_interceptor_from_client assert_equal ['heartbeat-val'], interceptor.calls[2][1].details end + class DynamicActivity < Temporalio::Activity::Definition + activity_dynamic + + def execute(*args) + "Activity #{Temporalio::Activity::Context.current.info.activity_type} called with #{args}" + end + end + + def test_dynamic_activity + assert_equal 'Activity does-not-exist called with ["arg1", 123]', + execute_activity(DynamicActivity, 'arg1', 123, override_name: 'does-not-exist') + end + + class DynamicActivityRawArgs < Temporalio::Activity::Definition + activity_dynamic + activity_raw_args + + def execute(*args) + metadata_encodings, decoded_args = args.map do |arg| + raise 'Bad type' unless arg.is_a?(Temporalio::Converters::RawValue) + + [arg.payload.metadata['encoding'], + Temporalio::Activity::Context.current.payload_converter.from_payload(arg.payload)] + end.transpose + "Activity #{Temporalio::Activity::Context.current.info.activity_type} called with " \ + "#{decoded_args} that have encodings #{metadata_encodings}" + end + end + + def test_dynamic_activity_raw_args + assert_equal 'Activity does-not-exist called with ' \ + '["arg1", nil, 123] that have encodings ["json/plain", "binary/null", "json/plain"]', + execute_activity(DynamicActivityRawArgs, 'arg1', nil, 123, override_name: 'does-not-exist') + end + # steep:ignore def execute_activity( activity,