Skip to content

Commit 9a043a2

Browse files
authored
Do not require heartbeating to be done in context (#280)
Fixes #279
1 parent 143e257 commit 9a043a2

File tree

4 files changed

+39
-5
lines changed

4 files changed

+39
-5
lines changed

temporalio/lib/temporalio/internal/worker/activity_worker.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ def run_activity(defn, activity, input)
251251
impl = @worker._activity_interceptors.reverse_each.reduce(impl) do |acc, int|
252252
int.intercept_activity(acc)
253253
end
254-
impl.init(OutboundImplementation.new(self))
254+
impl.init(OutboundImplementation.new(self, activity.info.task_token))
255255

256256
# Execute
257257
result = impl.execute(input)
@@ -381,15 +381,16 @@ def execute(input)
381381
end
382382

383383
class OutboundImplementation < Temporalio::Worker::Interceptor::Activity::Outbound
384-
def initialize(worker)
384+
def initialize(worker, task_token)
385385
super(nil) # steep:ignore
386386
@worker = worker
387+
@task_token = task_token
387388
end
388389

389390
def heartbeat(input)
390391
@worker.bridge_worker.record_activity_heartbeat(
391392
Bridge::Api::CoreInterface::ActivityHeartbeat.new(
392-
task_token: Activity::Context.current.info.task_token,
393+
task_token: @task_token,
393394
details: ProtoUtils.convert_to_payload_array(@worker.worker.options.client.data_converter,
394395
input.details)
395396
).to_proto

temporalio/lib/temporalio/worker/thread_pool.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ def _worker_task_completed
125125

126126
private
127127

128-
def locked_assign_worker(&block)
128+
def locked_assign_worker(&block) # rubocop:disable Naming/PredicateMethod
129129
# keep growing if the pool is not at the minimum yet
130130
worker, = @ready.pop || locked_add_busy_worker
131131
if worker

temporalio/sig/temporalio/internal/worker/activity_worker.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ module Temporalio
4949
end
5050

5151
class OutboundImplementation < Temporalio::Worker::Interceptor::Activity::Outbound
52-
def initialize: (ActivityWorker worker) -> void
52+
def initialize: (ActivityWorker worker, String task_token) -> void
5353
end
5454
end
5555
end

temporalio/test/worker_activity_test.rb

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,39 @@ def test_heartbeat_details
361361
execute_activity(HeartbeatDetailsActivity, retry_max_attempts: 2, heartbeat_timeout: 0.8)
362362
end
363363

364+
class BackgroundHeartbeatActivity < Temporalio::Activity::Definition
365+
def execute
366+
# First attempt sends a heartbeat with details and fails,
367+
# next attempt just returns the first attempt's details
368+
if Temporalio::Activity::Context.current.info.attempt == 1
369+
# Send heartbeat in background, and wait on background call
370+
# to complete before raising error
371+
queue = Queue.new
372+
context = Temporalio::Activity::Context.current
373+
if Fiber.current_scheduler
374+
Fiber.schedule do
375+
context.heartbeat('some detail')
376+
queue.push(nil)
377+
end
378+
else
379+
Thread.new do
380+
context.heartbeat('some detail')
381+
queue.push(nil)
382+
end
383+
end
384+
queue.pop
385+
raise 'Intentional error'
386+
else
387+
"details: #{Temporalio::Activity::Context.current.info.heartbeat_details}"
388+
end
389+
end
390+
end
391+
392+
def test_background_heartbeat
393+
assert_equal 'details: ["some detail"]',
394+
execute_activity(BackgroundHeartbeatActivity, retry_max_attempts: 2, heartbeat_timeout: 0.8)
395+
end
396+
364397
class ShieldingActivity < Temporalio::Activity::Definition
365398
attr_reader :canceled, :levels_reached
366399

0 commit comments

Comments
 (0)