diff --git a/temporalio/lib/temporalio/internal/client/implementation.rb b/temporalio/lib/temporalio/internal/client/implementation.rb index c181e54f..395769be 100644 --- a/temporalio/lib/temporalio/internal/client/implementation.rb +++ b/temporalio/lib/temporalio/internal/client/implementation.rb @@ -492,6 +492,10 @@ def start_workflow_update(input) # the user cannot specify sooner than ACCEPTED) # @type var resp: untyped resp = nil + expected_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage, + req.wait_policy.lifecycle_stage) + accepted_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage, + Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED) loop do resp = @client.workflow_service.update_workflow_execution( req, @@ -500,10 +504,8 @@ def start_workflow_update(input) # We're only done if the response stage is after the requested stage # or the response stage is accepted - if resp.stage >= req.wait_policy.lifecycle_stage || - resp.stage >= Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED - break - end + actual_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage, resp.stage) + break if actual_stage >= expected_stage || actual_stage >= accepted_stage rescue Error::RPCError => e # Deadline exceeded or cancel is a special error type if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED diff --git a/temporalio/test/worker_workflow_handler_test.rb b/temporalio/test/worker_workflow_handler_test.rb index 595aaee3..4d941a51 100644 --- a/temporalio/test/worker_workflow_handler_test.rb +++ b/temporalio/test/worker_workflow_handler_test.rb @@ -619,6 +619,56 @@ def test_update_info end) end + class RetryStartUpdateWorkflow < Temporalio::Workflow::Definition + def execute + Temporalio::Workflow.wait_condition { false } + end + + workflow_update + def some_update + 'done' + end + end + + def test_retry_start_update + # This test confirms that the UpdateWorkflowExecution call occurs multiple times if it hasn't reached accepted. We + # check this via metrics. + buffer = Temporalio::Runtime::MetricBuffer.new(10_000) + Temporalio::Testing::WorkflowEnvironment.start_local( + runtime: Temporalio::Runtime.new( + telemetry: Temporalio::Runtime::TelemetryOptions.new(metrics: Temporalio::Runtime::MetricsOptions.new(buffer:)) + ), + logger: Logger.new($stdout), + # Reduce poll interval so it tries again sooner + dev_server_extra_args: ['--dynamic-config-value', 'history.longPollExpirationInterval="3s"'] + ) do |local_env| + # Start workflow, then issue update without a running worker + task_queue = "tq-#{SecureRandom.uuid}" + handle = local_env.client.start_workflow(RetryStartUpdateWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:) + + # Wait for update in background + queue = Queue.new + run_in_background { queue << handle.execute_update(RetryStartUpdateWorkflow.some_update) } + + # Wait for there to be at least 2 update requests + update_requests = 0 + assert_eventually do + buffer.retrieve_updates.each do |update| + if update.metric.name == 'temporal_long_request' && + update.attributes['operation'] == 'UpdateWorkflowExecution' + update_requests += update.value # steep:ignore + end + end + assert update_requests >= 2 + end + + # Now run the worker that will let the update be processed + Temporalio::Worker.new(client: local_env.client, task_queue:, workflows: [RetryStartUpdateWorkflow]).run do + assert_equal 'done', queue.pop(timeout: 5) + end + end + end + class UpdateWithStartWorkflow < Temporalio::Workflow::Definition workflow_query_attr_reader :counter