Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,11 @@ Some notes about activity definition:
"Activity Concurrency and Executors" section later for more details.
* Technically an activity definition can be created manually via `Temporalio::Activity::Definition::Info.new` that
accepts a proc or a block, but the class form is recommended.
* `activity_dynamic` can be used to mark an activity dynamic. Dynamic activities do not have names and handle any
activity that is not otherwise registered. A worker can only have one dynamic activity.
* `workflow_raw_args` can be used to have activity arguments delivered to `execute` as
`Temporalio::Converters::RawValue`s. These are wrappers for the raw payloads that have not been converted to types
(but they have been decoded by the codec if present). They can be converted with `payload_converter` on the context.

#### Activity Context

Expand Down
52 changes: 43 additions & 9 deletions temporalio/lib/temporalio/activity/definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,47 @@ def activity_executor(executor_name)
# @param cancel_raise [Boolean] Whether to raise.
def activity_cancel_raise(cancel_raise)
unless cancel_raise.is_a?(TrueClass) || cancel_raise.is_a?(FalseClass)
raise ArgumentError,
'Must be a boolean'
raise ArgumentError, 'Must be a boolean'
end

@activity_cancel_raise = cancel_raise
end

# Set an activity as dynamic. Dynamic activities do not have names and handle any activity that is not otherwise
# registered. A worker can only have one dynamic activity. It is often useful to use {activity_raw_args} with
# this.
#
# @param value [Boolean] Whether the activity is dynamic.
def activity_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParameter
raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass)

@activity_dynamic = value
end

# Have activity arguments delivered to `execute` as {Converters::RawValue}s. These are wrappers for the raw
# payloads that have not been converted to types (but they have been decoded by the codec if present). They can
# be converted with {Context#payload_converter}.
#
# @param value [Boolean] Whether the activity accepts raw arguments.
def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter
raise ArgumentError, 'Must be a boolean' unless value.is_a?(TrueClass) || value.is_a?(FalseClass)

@activity_raw_args = value
end
end

# @!visibility private
def self._activity_definition_details
activity_name = @activity_name
raise 'Cannot have activity name specified for dynamic activity' if activity_name && @activity_dynamic

# Default to unqualified class name if not dynamic
activity_name ||= name.to_s.split('::').last unless @activity_dynamic
{
activity_name: @activity_name || name.to_s.split('::').last,
activity_name:,
activity_executor: @activity_executor || :default,
activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise
activity_cancel_raise: @activity_cancel_raise.nil? ? true : @activity_cancel_raise,
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args
}
end

Expand All @@ -75,7 +102,7 @@ def execute(*args)
# Definition info of an activity. Activities are usually classes/instances that extend {Definition}, but
# definitions can also be manually created with a block via {initialize} here.
class Info
# @return [String, Symbol] Name of the activity.
# @return [String, Symbol, nil] Name of the activity, or nil if the activity is dynamic.
attr_reader :name

# @return [Proc] Proc for the activity.
Expand All @@ -87,6 +114,9 @@ class Info
# @return [Boolean] Whether to raise in thread/fiber on cancellation. Default is `true`.
attr_reader :cancel_raise

# @return [Boolean] Whether to use {Converters::RawValue}s as arguments.
attr_reader :raw_args

# Obtain definition info representing the given activity, which can be a class, instance, or definition info.
#
# @param activity [Definition, Class<Definition>, Info] Activity to get info for.
Expand All @@ -105,14 +135,16 @@ def self.from_activity(activity)
new(
name: details[:activity_name],
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise]
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
) { |*args| activity.new.execute(*args) } # Instantiate and call
when Definition
details = activity.class._activity_definition_details
new(
name: details[:activity_name],
executor: details[:activity_executor],
cancel_raise: details[:activity_cancel_raise]
cancel_raise: details[:activity_cancel_raise],
raw_args: details[:activity_raw_args]
) { |*args| activity.execute(*args) } # Just and call
when Info
activity
Expand All @@ -123,17 +155,19 @@ def self.from_activity(activity)

# Manually create activity definition info. Most users will use an instance/class of {Definition}.
#
# @param name [String, Symbol] Name of the activity.
# @param name [String, Symbol, nil] Name of the activity or nil for dynamic activity.
# @param executor [Symbol] Name of the executor.
# @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation.
# @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments.
# @yield Use this block as the activity.
def initialize(name:, executor: :default, cancel_raise: true, &block)
def initialize(name:, executor: :default, cancel_raise: true, raw_args: false, &block)
@name = name
raise ArgumentError, 'Must give block' unless block_given?

@proc = block
@executor = executor
@cancel_raise = cancel_raise
@raw_args = raw_args
end
end
end
Expand Down
21 changes: 14 additions & 7 deletions temporalio/lib/temporalio/internal/worker/activity_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'temporalio/activity'
require 'temporalio/activity/definition'
require 'temporalio/cancellation'
require 'temporalio/converters/raw_value'
require 'temporalio/internal/bridge/api'
require 'temporalio/internal/proto_utils'
require 'temporalio/scoped_logger'
Expand All @@ -29,12 +30,13 @@ def initialize(worker:, bridge_worker:)
Activity::Context.current_or_nil&._scoped_logger_info
}

# Build up activity hash by name, failing if any fail validation
# Build up activity hash by name (can be nil for dynamic), failing if any fail validation
@activities = worker.options.activities.each_with_object({}) do |act, hash|
# Class means create each time, instance means just call, definition
# does nothing special
defn = Activity::Definition::Info.from_activity(act)
# Confirm name not in use
raise ArgumentError, 'Only one dynamic activity allowed' if !defn.name && hash.key?(defn.name)
raise ArgumentError, "Multiple activities named #{defn.name}" if hash.key?(defn.name)

# Confirm executor is a known executor and let it initialize
Expand Down Expand Up @@ -91,8 +93,8 @@ def handle_task(task)
def handle_start_task(task_token, start)
set_running_activity(task_token, nil)

# Find activity definition
defn = @activities[start.activity_type]
# Find activity definition, falling back to dynamic if present
defn = @activities[start.activity_type] || @activities[nil]
if defn.nil?
raise Error::ApplicationError.new(
"Activity #{start.activity_type} for workflow #{start.workflow_execution.workflow_id} " \
Expand Down Expand Up @@ -185,10 +187,15 @@ def execute_activity(task_token, defn, start)
# Build input
input = Temporalio::Worker::Interceptor::Activity::ExecuteInput.new(
proc: defn.proc,
args: ProtoUtils.convert_from_payload_array(
@worker.options.client.data_converter,
start.input.to_ary
),
# If the activity wants raw_args, we only decode we don't convert
args: if defn.raw_args
payloads = start.input.to_ary
codec = @worker.options.client.data_converter.payload_codec
payloads = codec.decode(payloads) if codec
payloads.map { |p| Temporalio::Converters::RawValue.new(p) }
else
ProtoUtils.convert_from_payload_array(@worker.options.client.data_converter, start.input.to_ary)
end,
headers: ProtoUtils.headers_from_proto_map(start.header_fields, @worker.options.client.data_converter) || {}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def execute_activity(input)
else
raise ArgumentError, 'Activity must be a definition class, or a symbol/string'
end
raise 'Cannot invoke dynamic activities' unless activity_type

execute_activity_with_local_backoffs(local: false, cancellation: input.cancellation) do
seq = (@activity_counter += 1)
@instance.add_command(
Expand Down Expand Up @@ -102,6 +104,8 @@ def execute_local_activity(input)
else
raise ArgumentError, 'Activity must be a definition class, or a symbol/string'
end
raise 'Cannot invoke dynamic activities' unless activity_type

execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff|
seq = (@activity_counter += 1)
@instance.add_command(
Expand Down
4 changes: 2 additions & 2 deletions temporalio/lib/temporalio/workflow/definition.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def workflow_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParame
end

# Have workflow arguments delivered to `execute` (and `initialize` if {workflow_init} in use) as
# {Converters::RawValue}s. These are wrappers for the raw payloads that have not been decoded. They can be
# decoded with {Workflow.payload_converter}.
# {Converters::RawValue}s. These are wrappers for the raw payloads that have not been converted to types (but
# they have been decoded by the codec if present). They can be converted with {Workflow.payload_converter}.
#
# @param value [Boolean] Whether the workflow accepts raw arguments.
def workflow_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter
Expand Down
15 changes: 10 additions & 5 deletions temporalio/sig/temporalio/activity/definition.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,32 @@ module Temporalio
def self.activity_name: (String | Symbol name) -> void
def self.activity_executor: (Symbol executor_name) -> void
def self.activity_cancel_raise: (bool cancel_raise) -> void
def self.activity_dynamic: (?bool value) -> void
def self.activity_raw_args: (?bool value) -> void

def self._activity_definition_details: -> {
activity_name: String | Symbol,
activity_name: String | Symbol | nil,
activity_executor: Symbol,
activity_cancel_raise: bool
activity_cancel_raise: bool,
activity_raw_args: bool
}

def execute: (*untyped) -> untyped

class Info
attr_reader name: String | Symbol
attr_reader name: String | Symbol | nil
attr_reader proc: Proc
attr_reader executor: Symbol
attr_reader cancel_raise: bool
attr_reader raw_args: bool

def self.from_activity: (Definition | singleton(Definition) | Info activity) -> Info

def initialize: (
name: String | Symbol,
name: String | Symbol | nil,
?executor: Symbol,
?cancel_raise: bool
?cancel_raise: bool,
?raw_args: bool
) { (?) -> untyped } -> void
end
end
Expand Down
35 changes: 35 additions & 0 deletions temporalio/test/worker_activity_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,41 @@ def test_interceptor_from_client
assert_equal ['heartbeat-val'], interceptor.calls[2][1].details
end

class DynamicActivity < Temporalio::Activity::Definition
activity_dynamic

def execute(*args)
"Activity #{Temporalio::Activity::Context.current.info.activity_type} called with #{args}"
end
end

def test_dynamic_activity
assert_equal 'Activity does-not-exist called with ["arg1", 123]',
execute_activity(DynamicActivity, 'arg1', 123, override_name: 'does-not-exist')
end

class DynamicActivityRawArgs < Temporalio::Activity::Definition
activity_dynamic
activity_raw_args

def execute(*args)
metadata_encodings, decoded_args = args.map do |arg|
raise 'Bad type' unless arg.is_a?(Temporalio::Converters::RawValue)

[arg.payload.metadata['encoding'],
Temporalio::Activity::Context.current.payload_converter.from_payload(arg.payload)]
end.transpose
"Activity #{Temporalio::Activity::Context.current.info.activity_type} called with " \
"#{decoded_args} that have encodings #{metadata_encodings}"
end
end

def test_dynamic_activity_raw_args
assert_equal 'Activity does-not-exist called with ' \
'["arg1", nil, 123] that have encodings ["json/plain", "binary/null", "json/plain"]',
execute_activity(DynamicActivityRawArgs, 'arg1', nil, 123, override_name: 'does-not-exist')
end

# steep:ignore
def execute_activity(
activity,
Expand Down
Loading