Skip to content

Commit 52b1abd

Browse files
committed
Reduce the scope of the illegal call tracer and other tracing fixes
1 parent 46ebcb4 commit 52b1abd

File tree

5 files changed

+157
-96
lines changed

5 files changed

+157
-96
lines changed

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 78 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -162,86 +162,9 @@ def initialize(details)
162162
end
163163

164164
def activate(activation)
165-
# Run inside of scheduler
166-
run_in_scheduler { activate_internal(activation) }
167-
end
168-
169-
def add_command(command)
170-
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen
171-
172-
@commands << command
173-
end
174-
175-
def instance
176-
@instance or raise 'Instance accessed before created'
177-
end
178-
179-
def search_attributes
180-
# Lazy on first access
181-
@search_attributes ||= SearchAttributes._from_proto(
182-
@init_job.search_attributes, disable_mutations: true, never_nil: true
183-
) || raise
184-
end
185-
186-
def memo
187-
# Lazy on first access
188-
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
189-
end
190-
191-
def now
192-
# Create each time
193-
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
194-
end
195-
196-
def illegal_call_tracing_disabled(&)
197-
@tracer.disable(&)
198-
end
199-
200-
def patch(patch_id:, deprecated:)
201-
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
202-
# command.
203-
patch_id = patch_id.to_s
204-
@patches_memoized ||= {}
205-
@patches_memoized.fetch(patch_id) do
206-
patched = !replaying || @patches_notified.include?(patch_id)
207-
@patches_memoized[patch_id] = patched
208-
if patched
209-
add_command(
210-
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
211-
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
212-
)
213-
)
214-
end
215-
patched
216-
end
217-
end
218-
219-
def metric_meter
220-
@metric_meter ||= ReplaySafeMetric::Meter.new(
221-
@runtime_metric_meter.with_additional_attributes(
222-
{
223-
namespace: info.namespace,
224-
task_queue: info.task_queue,
225-
workflow_type: info.workflow_type
226-
}
227-
)
228-
)
229-
end
230-
231-
private
232-
233-
def run_in_scheduler(&)
165+
# Run inside of scheduler (removed on ensure)
234166
Fiber.set_scheduler(@scheduler)
235-
if @tracer
236-
@tracer.enable(&)
237-
else
238-
yield
239-
end
240-
ensure
241-
Fiber.set_scheduler(nil)
242-
end
243167

244-
def activate_internal(activation)
245168
# Reset some activation state
246169
@commands = []
247170
@current_activation_error = nil
@@ -259,15 +182,20 @@ def activate_internal(activation)
259182
# Create instance if it doesn't already exist
260183
@instance ||= with_context_frozen { create_instance }
261184

262-
# Apply jobs
185+
# Apply jobs without call checking
186+
# illegal_call_tracing_disabled { activation.jobs.each { |job| apply(job) } }
263187
activation.jobs.each { |job| apply(job) }
264188

265189
# Schedule primary 'execute' if not already running (i.e. this is
266190
# the first activation)
267191
@primary_fiber ||= schedule(top_level: true) { run_workflow }
268192

269-
# Run the event loop
270-
@scheduler.run_until_all_yielded
193+
# Run the event loop in the tracer if it exists
194+
if @tracer
195+
@tracer.enable { @scheduler.run_until_all_yielded }
196+
else
197+
@scheduler.run_until_all_yielded
198+
end
271199
rescue Exception => e # rubocop:disable Lint/RescueException
272200
on_top_level_exception(e)
273201
end
@@ -306,8 +234,77 @@ def activate_internal(activation)
306234
ensure
307235
@commands = nil
308236
@current_activation_error = nil
237+
Fiber.set_scheduler(nil)
238+
end
239+
240+
def add_command(command)
241+
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen
242+
243+
@commands << command
244+
end
245+
246+
def instance
247+
@instance or raise 'Instance accessed before created'
248+
end
249+
250+
def search_attributes
251+
# Lazy on first access
252+
@search_attributes ||= SearchAttributes._from_proto(
253+
@init_job.search_attributes, disable_mutations: true, never_nil: true
254+
) || raise
255+
end
256+
257+
def memo
258+
# Lazy on first access
259+
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
260+
end
261+
262+
def now
263+
# Create each time
264+
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
309265
end
310266

267+
def illegal_call_tracing_disabled(&)
268+
if @tracer
269+
@tracer.disable_temporarily(&)
270+
else
271+
yield
272+
end
273+
end
274+
275+
def patch(patch_id:, deprecated:)
276+
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
277+
# command.
278+
patch_id = patch_id.to_s
279+
@patches_memoized ||= {}
280+
@patches_memoized.fetch(patch_id) do
281+
patched = !replaying || @patches_notified.include?(patch_id)
282+
@patches_memoized[patch_id] = patched
283+
if patched
284+
add_command(
285+
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
286+
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
287+
)
288+
)
289+
end
290+
patched
291+
end
292+
end
293+
294+
def metric_meter
295+
@metric_meter ||= ReplaySafeMetric::Meter.new(
296+
@runtime_metric_meter.with_additional_attributes(
297+
{
298+
namespace: info.namespace,
299+
task_queue: info.task_queue,
300+
workflow_type: info.workflow_type
301+
}
302+
)
303+
)
304+
end
305+
306+
private
307+
311308
def create_instance
312309
# Convert workflow arguments
313310
@workflow_arguments = convert_args(payload_array: @init_job.arguments,

temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def initialize(illegal_calls)
8484
when :all
8585
''
8686
when Temporalio::Worker::IllegalWorkflowCallValidator
87-
disable do
87+
disable_temporarily do
8888
vals.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
8989
class_name:, method_name: tp.callee_id, trace_point: tp
9090
))
@@ -98,7 +98,7 @@ def initialize(illegal_calls)
9898
when true
9999
''
100100
when Temporalio::Worker::IllegalWorkflowCallValidator
101-
disable do
101+
disable_temporarily do
102102
per_method.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
103103
class_name:, method_name: tp.callee_id, trace_point: tp
104104
))
@@ -118,8 +118,11 @@ def initialize(illegal_calls)
118118
end
119119

120120
def enable(&block)
121-
# We've seen leaking issues in Ruby 3.2 where the TracePoint inadvertently remains enabled even for threads
122-
# that it was not started on. So we will check the thread ourselves.
121+
# This is not reentrant and not expected to be called as such. We've seen leaking issues in Ruby 3.2 where
122+
# the TracePoint inadvertently remains enabled even for threads that it was not started on. So we will check
123+
# the thread ourselves. We also use the "enabled thread" concept for disabling checks too, see
124+
# disable_temporarily for more details.
125+
123126
@enabled_thread = Thread.current
124127
@tracepoint.enable do
125128
block.call
@@ -128,13 +131,17 @@ def enable(&block)
128131
end
129132
end
130133

131-
def disable(&block)
134+
def disable_temporarily(&)
135+
# An earlier version of this used @tracepoint.disable, but in some versions of Ruby, the observed behavior
136+
# is confusingly not reentrant or at least not predictable. Therefore, instead of calling
137+
# @tracepoint.disable, we are just unsetting the enabled thread. This means the tracer is still running, but
138+
# no checks are performed. This is effectively a no-op if tracing was never enabled.
139+
132140
previous_thread = @enabled_thread
133-
@tracepoint.disable do
134-
block.call
135-
ensure
136-
@enabled_thread = previous_thread
137-
end
141+
@enabled_thread = nil
142+
yield
143+
ensure
144+
@enabled_thread = previous_thread
138145
end
139146
end
140147
end

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ module Temporalio
5858

5959
def metric_meter: -> Temporalio::Metric::Meter
6060

61-
def run_in_scheduler: [T] { -> T } -> T
62-
63-
def activate_internal: (untyped activation) -> untyped
64-
6561
def create_instance: -> Temporalio::Workflow::Definition
6662

6763
def apply: (untyped job) -> void

temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Temporalio
1212
) -> void
1313

1414
def enable: [T] { -> T } -> T
15-
def disable: [T] { -> T } -> T
15+
def disable_temporarily: [T] { -> T } -> T
1616
end
1717
end
1818
end

temporalio/test/worker_workflow_child_test.rb

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,4 +276,65 @@ def test_search_attributes
276276
)
277277
assert_equal({ ATTR_KEY_TEXT.name => 'changed-text', ATTR_KEY_KEYWORD.name => 'some-keyword' }, results)
278278
end
279+
280+
class ManyChildrenActivity < Temporalio::Activity::Definition
281+
def execute(name)
282+
"Hello #{name}"
283+
end
284+
end
285+
286+
class ManyChildrenChildWorkflow < Temporalio::Workflow::Definition
287+
def execute(name)
288+
Temporalio::Workflow.execute_activity(
289+
ManyChildrenActivity,
290+
name,
291+
start_to_close_timeout: 30
292+
)
293+
end
294+
end
295+
296+
class ManyChildrenWorkflow < Temporalio::Workflow::Definition
297+
COUNT = 500
298+
299+
def execute
300+
futures = ManyChildrenWorkflow::COUNT.times.map do |i|
301+
Temporalio::Workflow::Future.new do
302+
Temporalio::Workflow.execute_child_workflow(ManyChildrenChildWorkflow, "Test #{i}")
303+
end
304+
end
305+
306+
Temporalio::Workflow::Future.all_of(*futures).wait
307+
308+
'done'
309+
end
310+
end
311+
312+
def test_many_children
313+
worker = Temporalio::Worker.new(
314+
client: env.client,
315+
task_queue: "tq-#{SecureRandom.uuid}",
316+
activities: [ManyChildrenActivity],
317+
workflows: [ManyChildrenWorkflow, ManyChildrenChildWorkflow],
318+
# This is a slow test, so we need to beef up the tuner and pollers
319+
tuner: Temporalio::Worker::Tuner.create_fixed(
320+
workflow_slots: ManyChildrenWorkflow::COUNT + 1,
321+
activity_slots: ManyChildrenWorkflow::COUNT
322+
),
323+
max_concurrent_workflow_task_polls: 60,
324+
max_concurrent_activity_task_polls: 60
325+
)
326+
worker.run do
327+
handle = env.client.start_workflow(
328+
ManyChildrenWorkflow,
329+
id: "wf-#{SecureRandom.uuid}",
330+
task_queue: worker.task_queue
331+
)
332+
assert_equal('done', handle.result)
333+
# Confirm there are expected number of child completions
334+
assert_equal(
335+
ManyChildrenWorkflow::COUNT,
336+
handle.fetch_history_events.count(&:child_workflow_execution_completed_event_attributes)
337+
)
338+
end
339+
end
279340
end

0 commit comments

Comments
 (0)