Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ def start_workflow_update(input)
# the user cannot specify sooner than ACCEPTED)
# @type var resp: untyped
resp = nil
expected_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage,
req.wait_policy.lifecycle_stage)
accepted_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage,
Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED)
loop do
resp = @client.workflow_service.update_workflow_execution(
req,
Expand All @@ -500,10 +504,8 @@ def start_workflow_update(input)

# We're only done if the response stage is after the requested stage
# or the response stage is accepted
if resp.stage >= req.wait_policy.lifecycle_stage ||
resp.stage >= Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED
break
end
actual_stage = ProtoUtils.enum_to_int(Api::Enums::V1::UpdateWorkflowExecutionLifecycleStage, resp.stage)
break if actual_stage >= expected_stage || actual_stage >= accepted_stage
rescue Error::RPCError => e
# Deadline exceeded or cancel is a special error type
if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELED
Expand Down
50 changes: 50 additions & 0 deletions temporalio/test/worker_workflow_handler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,56 @@ def test_update_info
end)
end

class RetryStartUpdateWorkflow < Temporalio::Workflow::Definition
def execute
Temporalio::Workflow.wait_condition { false }
end

workflow_update
def some_update
'done'
end
end

def test_retry_start_update
# This test confirms that the UpdateWorkflowExecution call occurs multiple times if it hasn't reached accepted. We
# check this via metrics.
buffer = Temporalio::Runtime::MetricBuffer.new(10_000)
Temporalio::Testing::WorkflowEnvironment.start_local(
runtime: Temporalio::Runtime.new(
telemetry: Temporalio::Runtime::TelemetryOptions.new(metrics: Temporalio::Runtime::MetricsOptions.new(buffer:))
),
logger: Logger.new($stdout),
# Reduce poll interval so it tries again sooner
dev_server_extra_args: ['--dynamic-config-value', 'history.longPollExpirationInterval="3s"']
) do |local_env|
# Start workflow, then issue update without a running worker
task_queue = "tq-#{SecureRandom.uuid}"
handle = local_env.client.start_workflow(RetryStartUpdateWorkflow, id: "wf-#{SecureRandom.uuid}", task_queue:)

# Wait for update in background
queue = Queue.new
run_in_background { queue << handle.execute_update(RetryStartUpdateWorkflow.some_update) }

# Wait for there to be at least 2 update requests
update_requests = 0
assert_eventually do
buffer.retrieve_updates.each do |update|
if update.metric.name == 'temporal_long_request' &&
update.attributes['operation'] == 'UpdateWorkflowExecution'
update_requests += update.value # steep:ignore
end
end
assert update_requests >= 2
end

# Now run the worker that will let the update be processed
Temporalio::Worker.new(client: local_env.client, task_queue:, workflows: [RetryStartUpdateWorkflow]).run do
assert_equal 'done', queue.pop(timeout: 5)
end
end
end

class UpdateWithStartWorkflow < Temporalio::Workflow::Definition
workflow_query_attr_reader :counter

Expand Down
Loading