Skip to content

Commit 2d764b5

Browse files
authored
Properly retry start update if not ready yet (#282)
Fixes #278
1 parent 9a043a2 commit 2d764b5

File tree

2 files changed

+56
-4
lines changed

2 files changed

+56
-4
lines changed

temporalio/lib/temporalio/internal/client/implementation.rb

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,10 @@ def start_workflow_update(input)
492492
# the user cannot specify sooner than ACCEPTED)
493493
# @type var resp: untyped
494494
resp = nil
495+
expected_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage,
496+
req.wait_policy.lifecycle_stage)
497+
accepted_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage,
498+
Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED)
495499
loop do
496500
resp = @client.workflow_service.update_workflow_execution(
497501
req,
@@ -500,10 +504,8 @@ def start_workflow_update(input)
500504

501505
# We're only done if the response stage is after the requested stage
502506
# or the response stage is accepted
503-
if resp.stage >= req.wait_policy.lifecycle_stage ||
504-
resp.stage >= Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED
505-
break
506-
end
507+
actual_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage, resp.stage)
508+
break if actual_stage >= expected_stage || actual_stage >= accepted_stage
507509
rescue Error::RPCError => e
508510
# Deadline exceeded or cancel is a special error type
509511
if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED

temporalio/test/worker_workflow_handler_test.rb

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,56 @@ def test_update_info
619619
end)
620620
end
621621

622+
class RetryStartUpdateWorkflow < Temporalio::Workflow::Definition
623+
def execute
624+
Temporalio::Workflow.wait_condition { false }
625+
end
626+
627+
workflow_update
628+
def some_update
629+
'done'
630+
end
631+
end
632+
633+
def test_retry_start_update
634+
# This test confirms that the UpdateWorkflowExecution call occurs multiple times if it hasn't reached accepted. We
635+
# check this via metrics.
636+
buffer = Temporalio::Runtime::MetricBuffer.new(10_000)
637+
Temporalio::Testing::WorkflowEnvironment.start_local(
638+
runtime: Temporalio::Runtime.new(
639+
telemetry: Temporalio::Runtime::TelemetryOptions.new(metrics: Temporalio::Runtime::MetricsOptions.new(buffer:))
640+
),
641+
logger: Logger.new($stdout),
642+
# Reduce poll interval so it tries again sooner
643+
dev_server_extra_args: ['--dynamic-config-value', 'history.longPollExpirationInterval="3s"']
644+
) do |local_env|
645+
# Start workflow, then issue update without a running worker
646+
task_queue = "tq-#{SecureRandom.uuid}"
647+
handle = local_env.client.start_workflow(RetryStartUpdateWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)
648+
649+
# Wait for update in background
650+
queue = Queue.new
651+
run_in_background { queue << handle.execute_update(RetryStartUpdateWorkflow.some_update) }
652+
653+
# Wait for there to be at least 2 update requests
654+
update_requests = 0
655+
assert_eventually do
656+
buffer.retrieve_updates.each do |update|
657+
if update.metric.name == 'temporal_long_request' &&
658+
update.attributes['operation'] == 'UpdateWorkflowExecution'
659+
update_requests += update.value # steep:ignore
660+
end
661+
end
662+
assert update_requests >= 2
663+
end
664+
665+
# Now run the worker that will let the update be processed
666+
Temporalio::Worker.new(client: local_env.client, task_queue:, workflows: [RetryStartUpdateWorkflow]).run do
667+
assert_equal 'done', queue.pop(timeout: 5)
668+
end
669+
end
670+
end
671+
622672
class UpdateWithStartWorkflow < Temporalio::Workflow::Definition
623673
workflow_query_attr_reader :counter
624674

0 commit comments

Comments
 (0)