Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion temporalio/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Metrics/AbcSize:
Metrics/BlockLength:
Max: 100

# The default is too small
Metrics/BlockNesting:
Max: 5

# The default is too small
Metrics/ClassLength:
Max: 1000
Expand All @@ -59,7 +63,7 @@ Metrics/CyclomaticComplexity:

# The default is too small
Metrics/MethodLength:
Max: 100
Max: 200

# The default is too small
Metrics/ModuleLength:
Expand Down
105 changes: 104 additions & 1 deletion temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
require 'temporalio/client/interceptor'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
require 'temporalio/client/with_start_workflow_operation'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
require 'temporalio/client/workflow_query_reject_condition'
require 'temporalio/client/workflow_update_handle'
require 'temporalio/client/workflow_update_wait_stage'
require 'temporalio/common_enums'
require 'temporalio/converters'
require 'temporalio/error'
Expand Down Expand Up @@ -155,7 +158,7 @@ def initialize(
default_workflow_query_reject_condition:
).freeze
# Initialize interceptors
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int|
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore
int.intercept_client(acc)
end
end
Expand Down Expand Up @@ -334,6 +337,106 @@ def workflow_handle(
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
end

# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
# policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unclear in this sentence.

Suggested change
# policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be
# policy). Note that in some cases, the Update message may fail though the Workflow was sucessfully started; the handle can then be

Copy link
Member Author

@cretz cretz Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method doc saying "this will fail" means "this method will fail". I don't want to say the update message may fail because that may be a different meaning. I am telling the user to watch out, an exception may still mean partial success due to the unfortunate lack of atomicness of this API (unlike signal with start which will all-fail or all-succeed).

# retrieved on the start workflow operation.
#
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
# @param args [Array<Object>] Update arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
# have an `id_conflict_policy` set.
# @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not
# currently supported. See https://docs.temporal.io/workflows#update for more details.
# @param id [String] ID of the update.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowUpdateHandle] The update handle.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
# @raise [Error::RPCError] RPC error from call.
def start_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
wait_for_stage:,
id: SecureRandom.uuid,
rpc_options: nil
)
@impl.start_update_with_start_workflow(
Interceptor::StartUpdateWithStartWorkflowInput.new(
update_id: id,
update:,
args:,
wait_for_stage:,
start_workflow_operation:,
headers: {},
rpc_options:
)
)
end

# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
# policy), and wait for update result. This is a shortcut for {start_update_with_start_workflow} +
# {WorkflowUpdateHandle.result}.
#
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
# @param args [Array<Object>] Update arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
# have an `id_conflict_policy` set.
# @param id [String] ID of the update.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful update result.
# @raise [Error::WorkflowUpdateFailedError] If the update failed.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
# @raise [Error::RPCError] RPC error from call.
def execute_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
id: SecureRandom.uuid,
rpc_options: nil
)
start_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
id:,
rpc_options:
).result
end

# Send a signal, possibly starting the workflow at the same time if it doesn't exist.
#
# @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name.
# @param args [Array<Object>] Signal arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not
# support all `id_conflict_policy` options.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::RPCError] RPC error from call.
def signal_with_start_workflow(
signal,
*args,
start_workflow_operation:,
rpc_options: nil
)
@impl.signal_with_start_workflow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a user's POV, is there any important difference between the classic, dedicated SignalWithStart API and this one, based on the MultiOps API? I mean, is there anything that could really surprise users coming from other SDKs and could be worth mentioning in docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no difference. This does not use multi-ops, this uses the classic. This is just us trying to have a consistent user-facing API even though the gRPC API is inconsistent.

Interceptor::SignalWithStartWorkflowInput.new(
signal:,
args:,
start_workflow_operation:,
rpc_options:
)
)
end

# List workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
Expand Down
37 changes: 37 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ def intercept_client(next_interceptor)
:rpc_options
)

# Input for {Outbound.start_update_with_start_workflow}.
StartUpdateWithStartWorkflowInput = Data.define(
:update_id,
:update,
:args,
:wait_for_stage,
:start_workflow_operation,
:headers,
:rpc_options
)

# Input for {Outbound.signal_with_start_workflow}.
SignalWithStartWorkflowInput = Data.define(
:signal,
:args,
:start_workflow_operation,
# Headers intentionally not defined here, because they are not separate from start_workflow_operation
:rpc_options
)

# Input for {Outbound.list_workflows}.
ListWorkflowsInput = Data.define(
:query,
Expand Down Expand Up @@ -240,6 +260,23 @@ def start_workflow(input)
next_interceptor.start_workflow(input)
end

# Called for every {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow}
# call.
#
# @param input [StartUpdateWithStartWorkflowInput] Input.
# @return [WorkflowUpdateHandle] Workflow update handle.
def start_update_with_start_workflow(input)
next_interceptor.start_update_with_start_workflow(input)
end

# Called for every {Client.signal_with_start_workflow}.
#
# @param input [SignalWithStartWorkflowInput] Input.
# @return [WorkflowHandle] Workflow handle.
def signal_with_start_workflow(input)
next_interceptor.signal_with_start_workflow(input)
end

# Called for every {Client.list_workflows} call.
#
# @param input [ListWorkflowsInput] Input.
Expand Down
109 changes: 109 additions & 0 deletions temporalio/lib/temporalio/client/with_start_workflow_operation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# frozen_string_literal: true

require 'temporalio/common_enums'

module Temporalio
class Client
# Start operation used by {Client.start_update_with_start_workflow}, {Client.execute_update_with_start_workflow},
# and {Client.signal_with_start_workflow}.
class WithStartWorkflowOperation
Options = Data.define(
:workflow,
:args,
:id,
:task_queue,
:execution_timeout,
:run_timeout,
:task_timeout,
:id_reuse_policy,
:id_conflict_policy,
:retry_policy,
:cron_schedule,
:memo,
:search_attributes,
:start_delay,
:headers
)

# Options the operation was created with.
class Options; end # rubocop:disable Lint/EmptyClass

# @return [Options] Options the operation was created with.
attr_accessor :options

# Create a with-start workflow operation. These are mostly the same options as {Client.start_workflow}, see that
# documentation for more details.
#
# Note, for {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow},
# `id_conflict_policy` is required.
def initialize(
workflow,
*args,
id:,
task_queue:,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE,
id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED,
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil,
start_delay: nil,
headers: {}
)
@options = Options.new(
workflow:,
args:,
id:,
task_queue:,
execution_timeout:,
run_timeout:,
task_timeout:,
id_reuse_policy:,
id_conflict_policy:,
retry_policy:,
cron_schedule:,
memo:,
search_attributes:,
start_delay:,
headers:
)
@workflow_handle_mutex = Mutex.new
@workflow_handle_cond_var = ConditionVariable.new
end

# Get the workflow handle, possibly waiting until set, or raise an error if the workflow start was unsuccessful.
#
# @param wait [Boolean] True to wait until it is set, false to return immediately.
#
# @return [WorkflowHandle, nil] The workflow handle when available or `nil` if `wait` is false and it is not set
# yet.
# @raise [Error] Any error that occurred during the call before the workflow start returned.
def workflow_handle(wait: true)
@workflow_handle_mutex.synchronize do
@workflow_handle_cond_var.wait(@workflow_handle_mutex) unless @workflow_handle || !wait
raise @workflow_handle if @workflow_handle.is_a?(Exception)

@workflow_handle
end
end

# @!visibility private
def _set_workflow_handle(value)
@workflow_handle_mutex.synchronize do
@workflow_handle ||= value
@workflow_handle_cond_var.broadcast
end
end

# @!visibility private
def _mark_used
raise 'Start operation already used' if @in_use

@in_use = true
end
end
end
end
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def grpc_status

def create_grpc_status
return Api::Common::V1::GrpcStatus.new(code: @code) unless @raw_grpc_status
return @raw_grpc_status if @raw_grpc_status.is_a?(Api::Common::V1::GrpcStatus)

Api::Common::V1::GrpcStatus.decode(@raw_grpc_status)
end
Expand Down
Loading
Loading