Skip to content

Commit d3391ec

Browse files
authored
Reduce the scope of the illegal call tracer and other tracing fixes (#332)
1 parent 4fc6a58 commit d3391ec

File tree

9 files changed

+406
-100
lines changed

9 files changed

+406
-100
lines changed

temporalio/Steepfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ target :lib do
99

1010
ignore 'lib/temporalio/api', 'lib/temporalio/internal/bridge/api'
1111

12-
library 'uri'
12+
library 'uri', 'objspace'
1313

1414
configure_code_diagnostics do |hash|
1515
# TODO(cretz): Fix as more protos are generated

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 76 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -162,86 +162,9 @@ def initialize(details)
162162
end
163163

164164
def activate(activation)
165-
# Run inside of scheduler
166-
run_in_scheduler { activate_internal(activation) }
167-
end
168-
169-
def add_command(command)
170-
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen
171-
172-
@commands << command
173-
end
174-
175-
def instance
176-
@instance or raise 'Instance accessed before created'
177-
end
178-
179-
def search_attributes
180-
# Lazy on first access
181-
@search_attributes ||= SearchAttributes._from_proto(
182-
@init_job.search_attributes, disable_mutations: true, never_nil: true
183-
) || raise
184-
end
185-
186-
def memo
187-
# Lazy on first access
188-
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
189-
end
190-
191-
def now
192-
# Create each time
193-
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
194-
end
195-
196-
def illegal_call_tracing_disabled(&)
197-
@tracer.disable(&)
198-
end
199-
200-
def patch(patch_id:, deprecated:)
201-
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
202-
# command.
203-
patch_id = patch_id.to_s
204-
@patches_memoized ||= {}
205-
@patches_memoized.fetch(patch_id) do
206-
patched = !replaying || @patches_notified.include?(patch_id)
207-
@patches_memoized[patch_id] = patched
208-
if patched
209-
add_command(
210-
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
211-
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
212-
)
213-
)
214-
end
215-
patched
216-
end
217-
end
218-
219-
def metric_meter
220-
@metric_meter ||= ReplaySafeMetric::Meter.new(
221-
@runtime_metric_meter.with_additional_attributes(
222-
{
223-
namespace: info.namespace,
224-
task_queue: info.task_queue,
225-
workflow_type: info.workflow_type
226-
}
227-
)
228-
)
229-
end
230-
231-
private
232-
233-
def run_in_scheduler(&)
165+
# Run inside of scheduler (removed on ensure)
234166
Fiber.set_scheduler(@scheduler)
235-
if @tracer
236-
@tracer.enable(&)
237-
else
238-
yield
239-
end
240-
ensure
241-
Fiber.set_scheduler(nil)
242-
end
243167

244-
def activate_internal(activation)
245168
# Reset some activation state
246169
@commands = []
247170
@current_activation_error = nil
@@ -266,8 +189,12 @@ def activate_internal(activation)
266189
# the first activation)
267190
@primary_fiber ||= schedule(top_level: true) { run_workflow }
268191

269-
# Run the event loop
270-
@scheduler.run_until_all_yielded
192+
# Run the event loop in the tracer if it exists
193+
if @tracer
194+
@tracer.enable { @scheduler.run_until_all_yielded }
195+
else
196+
@scheduler.run_until_all_yielded
197+
end
271198
rescue Exception => e # rubocop:disable Lint/RescueException
272199
on_top_level_exception(e)
273200
end
@@ -306,8 +233,77 @@ def activate_internal(activation)
306233
ensure
307234
@commands = nil
308235
@current_activation_error = nil
236+
Fiber.set_scheduler(nil)
237+
end
238+
239+
def add_command(command)
240+
raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen
241+
242+
@commands << command
243+
end
244+
245+
def instance
246+
@instance or raise 'Instance accessed before created'
247+
end
248+
249+
def search_attributes
250+
# Lazy on first access
251+
@search_attributes ||= SearchAttributes._from_proto(
252+
@init_job.search_attributes, disable_mutations: true, never_nil: true
253+
) || raise
254+
end
255+
256+
def memo
257+
# Lazy on first access
258+
@memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {})
259+
end
260+
261+
def now
262+
# Create each time
263+
ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present'
309264
end
310265

266+
def illegal_call_tracing_disabled(&)
267+
if @tracer
268+
@tracer.disable_temporarily(&)
269+
else
270+
yield
271+
end
272+
end
273+
274+
def patch(patch_id:, deprecated:)
275+
# Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the
276+
# command.
277+
patch_id = patch_id.to_s
278+
@patches_memoized ||= {}
279+
@patches_memoized.fetch(patch_id) do
280+
patched = !replaying || @patches_notified.include?(patch_id)
281+
@patches_memoized[patch_id] = patched
282+
if patched
283+
add_command(
284+
Bridge::Api::WorkflowCommands::WorkflowCommand.new(
285+
set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:)
286+
)
287+
)
288+
end
289+
patched
290+
end
291+
end
292+
293+
def metric_meter
294+
@metric_meter ||= ReplaySafeMetric::Meter.new(
295+
@runtime_metric_meter.with_additional_attributes(
296+
{
297+
namespace: info.namespace,
298+
task_queue: info.task_queue,
299+
workflow_type: info.workflow_type
300+
}
301+
)
302+
)
303+
end
304+
305+
private
306+
311307
def create_instance
312308
# Convert workflow arguments
313309
@workflow_arguments = convert_args(payload_array: @init_job.arguments,

temporalio/lib/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rb

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def initialize(illegal_calls)
8484
when :all
8585
''
8686
when Temporalio::Worker::IllegalWorkflowCallValidator
87-
disable do
87+
disable_temporarily do
8888
vals.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
8989
class_name:, method_name: tp.callee_id, trace_point: tp
9090
))
@@ -98,7 +98,7 @@ def initialize(illegal_calls)
9898
when true
9999
''
100100
when Temporalio::Worker::IllegalWorkflowCallValidator
101-
disable do
101+
disable_temporarily do
102102
per_method.block.call(Temporalio::Worker::IllegalWorkflowCallValidator::CallInfo.new(
103103
class_name:, method_name: tp.callee_id, trace_point: tp
104104
))
@@ -118,8 +118,11 @@ def initialize(illegal_calls)
118118
end
119119

120120
def enable(&block)
121-
# We've seen leaking issues in Ruby 3.2 where the TracePoint inadvertently remains enabled even for threads
122-
# that it was not started on. So we will check the thread ourselves.
121+
# This is not reentrant and not expected to be called as such. We've seen leaking issues in Ruby 3.2 where
122+
# the TracePoint inadvertently remains enabled even for threads that it was not started on. So we will check
123+
# the thread ourselves. We also use the "enabled thread" concept for disabling checks too, see
124+
# disable_temporarily for more details.
125+
123126
@enabled_thread = Thread.current
124127
@tracepoint.enable do
125128
block.call
@@ -128,13 +131,17 @@ def enable(&block)
128131
end
129132
end
130133

131-
def disable(&block)
134+
def disable_temporarily(&)
135+
# An earlier version of this used @tracepoint.disable, but in some versions of Ruby, the observed behavior
136+
# is confusingly not reentrant or at least not predictable. Therefore, instead of calling
137+
# @tracepoint.disable, we are just unsetting the enabled thread. This means the tracer is still running, but
138+
# no checks are performed. This is effectively a no-op if tracing was never enabled.
139+
132140
previous_thread = @enabled_thread
133-
@tracepoint.disable do
134-
block.call
135-
ensure
136-
@enabled_thread = previous_thread
137-
end
141+
@enabled_thread = nil
142+
yield
143+
ensure
144+
@enabled_thread = previous_thread
138145
end
139146
end
140147
end

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,6 @@ module Temporalio
5858

5959
def metric_meter: -> Temporalio::Metric::Meter
6060

61-
def run_in_scheduler: [T] { -> T } -> T
62-
63-
def activate_internal: (untyped activation) -> untyped
64-
6561
def create_instance: -> Temporalio::Workflow::Definition
6662

6763
def apply: (untyped job) -> void

temporalio/sig/temporalio/internal/worker/workflow_instance/illegal_call_tracer.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ module Temporalio
1212
) -> void
1313

1414
def enable: [T] { -> T } -> T
15-
def disable: [T] { -> T } -> T
15+
def disable_temporarily: [T] { -> T } -> T
1616
end
1717
end
1818
end

0 commit comments

Comments
 (0)