Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion temporalio/.rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ Metrics/AbcSize:
Metrics/BlockLength:
Max: 100

# The default is too small
Metrics/BlockNesting:
Max: 5

# The default is too small
Metrics/ClassLength:
Max: 1000
Expand All @@ -59,7 +63,7 @@ Metrics/CyclomaticComplexity:

# The default is too small
Metrics/MethodLength:
Max: 100
Max: 200

# The default is too small
Metrics/ModuleLength:
Expand Down
105 changes: 104 additions & 1 deletion temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
require 'temporalio/client/interceptor'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
require 'temporalio/client/with_start_workflow_operation'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
require 'temporalio/client/workflow_query_reject_condition'
require 'temporalio/client/workflow_update_handle'
require 'temporalio/client/workflow_update_wait_stage'
require 'temporalio/common_enums'
require 'temporalio/converters'
require 'temporalio/error'
Expand Down Expand Up @@ -155,7 +158,7 @@ def initialize(
default_workflow_query_reject_condition:
).freeze
# Initialize interceptors
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int|
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore
int.intercept_client(acc)
end
end
Expand Down Expand Up @@ -334,6 +337,106 @@ def workflow_handle(
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
end

# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
# policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unclear in this sentence.

Suggested change
# policy). Note that in some cases this may fail but the workflow will still be started, and the handle can then be
# policy). Note that in some cases, the Update message may fail though the Workflow was sucessfully started; the handle can then be

Copy link
Member Author

@cretz cretz Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method doc saying "this will fail" means "this method will fail". I don't want to say the update message may fail because that may be a different meaning. I am telling the user to watch out, an exception may still mean partial success due to the unfortunate lack of atomicness of this API (unlike signal with start which will all-fail or all-succeed).

# retrieved on the start workflow operation.
#
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
# @param args [Array<Object>] Update arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
# have an `id_conflict_policy` set.
# @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not
# currently supported. See https://docs.temporal.io/workflows#update for more details.
# @param id [String] ID of the update.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowUpdateHandle] The update handle.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
# @raise [Error::RPCError] RPC error from call.
def start_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
wait_for_stage:,
id: SecureRandom.uuid,
rpc_options: nil
)
@impl.start_update_with_start_workflow(
Interceptor::StartUpdateWithStartWorkflowInput.new(
update_id: id,
update:,
args:,
wait_for_stage:,
start_workflow_operation:,
headers: {},
rpc_options:
)
)
end

# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
# policy), and wait for update result. This is a shortcut for {start_update_with_start_workflow} +
# {WorkflowUpdateHandle.result}.
#
# @param update [Workflow::Definition::Update, Symbol, String] Update definition or name.
# @param args [Array<Object>] Update arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
# have an `id_conflict_policy` set.
# @param id [String] ID of the update.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful update result.
# @raise [Error::WorkflowUpdateFailedError] If the update failed.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::WorkflowUpdateRPCTimeoutOrCanceledError] This update call timed out or was canceled. This doesn't
# mean the update itself was timed out or canceled, and this doesn't mean the workflow did not start.
# @raise [Error::RPCError] RPC error from call.
def execute_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
id: SecureRandom.uuid,
rpc_options: nil
)
start_update_with_start_workflow(
update,
*args,
start_workflow_operation:,
wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
id:,
rpc_options:
).result
end

# Send a signal, possibly starting the workflow at the same time if it doesn't exist.
#
# @param signal [Workflow::Definition::Signal, Symbol, String] Signal definition or name.
# @param args [Array<Object>] Signal arguments.
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not
# support all `id_conflict_policy` options.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists and conflict/reuse policy does not allow.
# @raise [Error::RPCError] RPC error from call.
def signal_with_start_workflow(
signal,
*args,
start_workflow_operation:,
rpc_options: nil
)
@impl.signal_with_start_workflow(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a user's POV, is there any important difference between the classic, dedicated SignalWithStart API and this one, based on the MultiOps API? I mean, is there anything that could really surprise users coming from other SDKs and could be worth mentioning in docs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no difference. This does not use multi-ops, this uses the classic. This is just us trying to have a consistent user-facing API even though the gRPC API is inconsistent.

Interceptor::SignalWithStartWorkflowInput.new(
signal:,
args:,
start_workflow_operation:,
rpc_options:
)
)
end

# List workflows.
#
# @param query [String, nil] A Temporal visibility list filter.
Expand Down
37 changes: 37 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,26 @@ def intercept_client(next_interceptor)
:rpc_options
)

# Input for {Outbound.start_update_with_start_workflow}.
StartUpdateWithStartWorkflowInput = Data.define(
:update_id,
:update,
:args,
:wait_for_stage,
:start_workflow_operation,
:headers,
:rpc_options
)

# Input for {Outbound.signal_with_start_workflow}.
SignalWithStartWorkflowInput = Data.define(
:signal,
:args,
:start_workflow_operation,
# Headers intentionally not defined here, because they are not separate from start_workflow_operation
:rpc_options
)

# Input for {Outbound.list_workflows}.
ListWorkflowsInput = Data.define(
:query,
Expand Down Expand Up @@ -240,6 +260,23 @@ def start_workflow(input)
next_interceptor.start_workflow(input)
end

# Called for every {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow}
# call.
#
# @param input [StartUpdateWithStartWorkflowInput] Input.
# @return [WorkflowUpdateHandle] Workflow update handle.
def start_update_with_start_workflow(input)
next_interceptor.start_update_with_start_workflow(input)
end

# Called for every {Client.signal_with_start_workflow}.
#
# @param input [SignalWithStartWorkflowInput] Input.
# @return [WorkflowHandle] Workflow handle.
def signal_with_start_workflow(input)
next_interceptor.signal_with_start_workflow(input)
end

# Called for every {Client.list_workflows} call.
#
# @param input [ListWorkflowsInput] Input.
Expand Down
109 changes: 109 additions & 0 deletions temporalio/lib/temporalio/client/with_start_workflow_operation.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# frozen_string_literal: true

require 'temporalio/common_enums'

module Temporalio
class Client
# Start operation used by {Client.start_update_with_start_workflow}, {Client.execute_update_with_start_workflow},
# and {Client.signal_with_start_workflow}.
class WithStartWorkflowOperation
Options = Data.define(
:workflow,
:args,
:id,
:task_queue,
:execution_timeout,
:run_timeout,
:task_timeout,
:id_reuse_policy,
:id_conflict_policy,
:retry_policy,
:cron_schedule,
:memo,
:search_attributes,
:start_delay,
:headers
)

# Options the operation was created with.
class Options; end # rubocop:disable Lint/EmptyClass

# @return [Options] Options the operation was created with.
attr_accessor :options

# Create a with-start workflow operation. These are mostly the same options as {Client.start_workflow}, see that
# documentation for more details.
#
# Note, for {Client.start_update_with_start_workflow} and {Client.execute_update_with_start_workflow},
# `id_conflict_policy` is required.
def initialize(
workflow,
*args,
id:,
task_queue:,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
id_reuse_policy: WorkflowIDReusePolicy::ALLOW_DUPLICATE,
id_conflict_policy: WorkflowIDConflictPolicy::UNSPECIFIED,
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil,
start_delay: nil,
headers: {}
)
@options = Options.new(
workflow:,
args:,
id:,
task_queue:,
execution_timeout:,
run_timeout:,
task_timeout:,
id_reuse_policy:,
id_conflict_policy:,
retry_policy:,
cron_schedule:,
memo:,
search_attributes:,
start_delay:,
headers:
)
@workflow_handle_mutex = Mutex.new
@workflow_handle_cond_var = ConditionVariable.new
end

# Get the workflow handle, possibly waiting until set, or raise an error if the workflow start was unsuccessful.
#
# @param wait [Boolean] True to wait until it is set, false to return immediately.
#
# @return [WorkflowHandle, nil] The workflow handle when available or `nil` if `wait` is false and it is not set
# yet.
# @raise [Error] Any error that occurred during the call before the workflow start returned.
def workflow_handle(wait: true)
@workflow_handle_mutex.synchronize do
@workflow_handle_cond_var.wait(@workflow_handle_mutex) unless @workflow_handle || !wait
raise @workflow_handle if @workflow_handle.is_a?(Exception)

@workflow_handle
end
end

# @!visibility private
def _set_workflow_handle(value)
@workflow_handle_mutex.synchronize do
@workflow_handle ||= value
@workflow_handle_cond_var.broadcast
end
end

# @!visibility private
def _mark_used
raise 'Start operation already used' if @in_use

@in_use = true
end
end
end
end
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def grpc_status

def create_grpc_status
return Api::Common::V1::GrpcStatus.new(code: @code) unless @raw_grpc_status
return @raw_grpc_status if @raw_grpc_status.is_a?(Api::Common::V1::GrpcStatus)

Api::Common::V1::GrpcStatus.decode(@raw_grpc_status)
end
Expand Down
Loading
Loading