Skip to content

Commit 3e7249e

Browse files
committed
Add workflow check that local activity exists
1 parent 95dc94a commit 3e7249e

File tree

14 files changed

+106
-27
lines changed

14 files changed

+106
-27
lines changed

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,17 @@ def run_activity(defn, activity, input)
297297
)
298298
end
299299

300+
def assert_valid_activity(activity)
301+
defn = @activities[activity]
302+
defn = @activities[nil] if !defn && !Internal::ProtoUtils.reserved_name?(activity)
303+
304+
return unless defn.nil?
305+
306+
raise ArgumentError,
307+
"Activity #{activity} " \
308+
"is not registered on this worker, available activities: #{@activities.keys.sort.join(', ')}"
309+
end
310+
300311
class RunningActivity < Activity::Context
301312
attr_reader :info, :cancellation, :worker_shutdown_cancellation, :payload_converter, :logger
302313
attr_accessor :instance, :_outbound_impl, :_server_requested_cancel

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
5656
:pending_external_signals, :pending_external_cancels, :in_progress_handlers, :payload_converter,
5757
:failure_converter, :cancellation, :continue_as_new_suggested, :current_history_length,
5858
:current_history_size, :replaying, :random, :signal_handlers, :query_handlers, :update_handlers,
59-
:context_frozen
59+
:context_frozen, :assert_valid_local_activity
6060
attr_accessor :io_enabled, :current_details
6161

6262
def initialize(details)
@@ -108,6 +108,8 @@ def initialize(details)
108108
@query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query)
109109
@update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update)
110110

111+
@assert_valid_local_activity = details.assert_valid_local_activity
112+
111113
# Create all things needed from initial job
112114
@init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow
113115
raise 'Missing init job from first activation' unless @init_job

temporalio/lib/temporalio/internal/worker/workflow_instance/details.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ class WorkflowInstance
88
class Details
99
attr_reader :namespace, :task_queue, :definition, :initial_activation, :logger, :metric_meter,
1010
:payload_converter, :failure_converter, :interceptors, :disable_eager_activity_execution,
11-
:illegal_calls, :workflow_failure_exception_types, :unsafe_workflow_io_enabled
11+
:illegal_calls, :workflow_failure_exception_types, :unsafe_workflow_io_enabled,
12+
:assert_valid_local_activity
1213

1314
def initialize(
1415
namespace:,
@@ -23,7 +24,8 @@ def initialize(
2324
disable_eager_activity_execution:,
2425
illegal_calls:,
2526
workflow_failure_exception_types:,
26-
unsafe_workflow_io_enabled:
27+
unsafe_workflow_io_enabled:,
28+
assert_valid_local_activity:
2729
)
2830
@namespace = namespace
2931
@task_queue = task_queue
@@ -38,6 +40,7 @@ def initialize(
3840
@illegal_calls = illegal_calls
3941
@workflow_failure_exception_types = workflow_failure_exception_types
4042
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled
43+
@assert_valid_local_activity = assert_valid_local_activity
4144
end
4245
end
4346
end

temporalio/lib/temporalio/internal/worker/workflow_instance/outbound_implementation.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ def execute_local_activity(input)
8787
raise ArgumentError, 'Activity must have schedule_to_close_timeout or start_to_close_timeout'
8888
end
8989

90+
@instance.assert_valid_local_activity.call(input.activity)
91+
9092
execute_activity_with_local_backoffs(local: true, cancellation: input.cancellation) do |do_backoff|
9193
seq = (@activity_counter += 1)
9294
@instance.add_command(

temporalio/lib/temporalio/internal/worker/workflow_worker.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ def initialize(
6969
workflow_payload_codec_thread_pool:,
7070
unsafe_workflow_io_enabled:,
7171
debug_mode:,
72-
on_eviction: nil
72+
assert_valid_local_activity:, on_eviction: nil
7373
)
7474
@executor = workflow_executor
7575

@@ -111,7 +111,8 @@ def initialize(
111111

112112
t
113113
end.freeze,
114-
unsafe_workflow_io_enabled:
114+
unsafe_workflow_io_enabled:,
115+
assert_valid_local_activity:
115116
)
116117
@state.on_eviction = on_eviction if on_eviction
117118

@@ -186,14 +187,16 @@ def apply_codec_on_payload_visit(payload_or_payloads, &)
186187
class State
187188
attr_reader :workflow_definitions, :bridge_worker, :logger, :metric_meter, :data_converter, :deadlock_timeout,
188189
:illegal_calls, :namespace, :task_queue, :disable_eager_activity_execution,
189-
:workflow_interceptors, :workflow_failure_exception_types, :unsafe_workflow_io_enabled
190+
:workflow_interceptors, :workflow_failure_exception_types, :unsafe_workflow_io_enabled,
191+
:assert_valid_local_activity
190192

191193
attr_writer :on_eviction
192194

193195
def initialize(
194196
workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout:,
195197
illegal_calls:, namespace:, task_queue:, disable_eager_activity_execution:,
196-
workflow_interceptors:, workflow_failure_exception_types:, unsafe_workflow_io_enabled:
198+
workflow_interceptors:, workflow_failure_exception_types:, unsafe_workflow_io_enabled:,
199+
assert_valid_local_activity:
197200
)
198201
@workflow_definitions = workflow_definitions
199202
@bridge_worker = bridge_worker
@@ -208,6 +211,7 @@ def initialize(
208211
@workflow_interceptors = workflow_interceptors
209212
@workflow_failure_exception_types = workflow_failure_exception_types
210213
@unsafe_workflow_io_enabled = unsafe_workflow_io_enabled
214+
@assert_valid_local_activity = assert_valid_local_activity
211215

212216
@running_workflows = {}
213217
@running_workflows_mutex = Mutex.new

temporalio/lib/temporalio/worker.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,8 @@ def initialize(
490490
workflow_failure_exception_types:,
491491
workflow_payload_codec_thread_pool:,
492492
unsafe_workflow_io_enabled:,
493-
debug_mode:
493+
debug_mode:,
494+
assert_valid_local_activity: ->(activity) { _assert_valid_local_activity(activity) }
494495
)
495496
end
496497

@@ -599,5 +600,17 @@ def _on_shutdown_complete
599600
@workflow_worker&.on_shutdown_complete
600601
@workflow_worker = nil
601602
end
603+
604+
# @!visibility private
605+
def _assert_valid_local_activity(activity)
606+
unless @activity_worker.nil?
607+
@activity_worker.assert_valid_activity(activity)
608+
return
609+
end
610+
611+
raise ArgumentError,
612+
"Activity #{activity} " \
613+
'is not registered on this worker, no available activities.'
614+
end
602615
end
603616
end

temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,8 @@ def create_instance(initial_activation, worker_state)
214214
disable_eager_activity_execution: worker_state.disable_eager_activity_execution,
215215
illegal_calls: worker_state.illegal_calls,
216216
workflow_failure_exception_types: worker_state.workflow_failure_exception_types,
217-
unsafe_workflow_io_enabled: worker_state.unsafe_workflow_io_enabled
217+
unsafe_workflow_io_enabled: worker_state.unsafe_workflow_io_enabled,
218+
assert_valid_local_activity: worker_state.assert_valid_local_activity
218219
)
219220
)
220221
end

temporalio/lib/temporalio/worker/workflow_replayer.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,8 @@ def initialize(
245245
workflow_payload_codec_thread_pool: options.workflow_payload_codec_thread_pool,
246246
unsafe_workflow_io_enabled: options.unsafe_workflow_io_enabled,
247247
debug_mode: options.debug_mode,
248-
on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job } # steep:ignore
248+
on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job }, # steep:ignore
249+
assert_valid_local_activity: ->(_) {}
249250
)
250251

251252
# Create the runner

temporalio/sig/temporalio/internal/worker/activity_worker.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ module Temporalio
2626
Temporalio::Worker::Interceptor::Activity::ExecuteInput input
2727
) -> void
2828

29+
def assert_valid_activity: (String activity) -> void
30+
2931
class RunningActivity < Activity::Context
3032
attr_accessor instance: Activity::Definition?
3133
attr_accessor _outbound_impl: Temporalio::Worker::Interceptor::Activity::Outbound?

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ module Temporalio
3333
attr_reader query_handlers: Hash[String?, Workflow::Definition::Query]
3434
attr_reader update_handlers: Hash[String?, Workflow::Definition::Update]
3535
attr_reader context_frozen: bool
36+
attr_reader assert_valid_local_activity: ^(String) -> void
3637

3738
attr_accessor io_enabled: bool
3839
attr_accessor current_details: String?

0 commit comments

Comments
 (0)