Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/extra/proto_gen.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def generate_import_helper_files
require 'temporalio/api/errordetails/v1/message'
require 'temporalio/api/export/v1/message'
require 'temporalio/api/operatorservice'
require 'temporalio/api/sdk/v1/workflow_metadata'
require 'temporalio/api/workflowservice'

module Temporalio
Expand Down
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'temporalio/api/errordetails/v1/message'
require 'temporalio/api/export/v1/message'
require 'temporalio/api/operatorservice'
require 'temporalio/api/sdk/v1/workflow_metadata'
require 'temporalio/api/workflowservice'

module Temporalio
Expand Down
20 changes: 20 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def operator_service
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear in
# CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can
# be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is currently
# experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and continue
# as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand All @@ -220,6 +226,8 @@ def start_workflow(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -238,6 +246,8 @@ def start_workflow(
args:,
workflow_id: id,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand All @@ -260,6 +270,12 @@ def start_workflow(
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear in
# CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can
# be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is currently
# experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and continue
# as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand Down Expand Up @@ -287,6 +303,8 @@ def execute_workflow(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -305,6 +323,8 @@ def execute_workflow(
*args,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand Down
2 changes: 2 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def intercept_client(next_interceptor)
:args,
:workflow_id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand Down
26 changes: 25 additions & 1 deletion temporalio/lib/temporalio/client/schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _to_proto(data_converter)
:args,
:id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand All @@ -186,6 +188,14 @@ def _to_proto(data_converter)
# @return [String] Unique identifier for the workflow execution.
# @!attribute task_queue
# @return [String] Task queue to run the workflow on.
# @!attribute static_summary
# @return [String, nil] Fixed single-line summary for this workflow execution that may appear in CLI/UI.
# This can be in single-line Temporal markdown format. This is currently experimental.
# @!attribute static_details
# @return [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can be in
# Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is
# currently experimental.
# @!attribute execution_timeout
# @return [Float, nil] Total workflow execution timeout in seconds including retries and continue as new.
# @!attribute run_timeout
Expand All @@ -212,6 +222,12 @@ class << self
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear
# in CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI.
# This can be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow
# that cannot be updated. For details that can be updated, use {Workflow.current_details=} within the
# workflow. This is currently experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and
# continue as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand All @@ -225,6 +241,8 @@ def new(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -238,6 +256,8 @@ def new(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand All @@ -251,11 +271,14 @@ def new(

# @!visibility private
def self._from_proto(raw_info, data_converter)
(summary, details) = Internal::ProtoUtils.from_user_metadata(raw_info.user_metadata, data_converter)
StartWorkflow.new(
raw_info.workflow_type.name,
*data_converter.from_payloads(raw_info.input),
id: raw_info.workflow_id,
task_queue: raw_info.task_queue.name,
static_summary: summary,
static_details: details,
execution_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_execution_timeout),
run_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_run_timeout),
task_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_task_timeout),
Expand All @@ -280,7 +303,8 @@ def _to_proto(data_converter)
retry_policy: retry_policy&._to_proto,
memo: Internal::ProtoUtils.memo_to_proto(memo, data_converter),
search_attributes: search_attributes&._to_proto,
header: Internal::ProtoUtils.headers_to_proto(headers, data_converter)
header: Internal::ProtoUtils.headers_to_proto(headers, data_converter),
user_metadata: Internal::ProtoUtils.to_user_metadata(static_summary, static_details, data_converter)
)
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class WithStartWorkflowOperation
:args,
:id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand Down Expand Up @@ -41,6 +43,8 @@ def initialize(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -58,6 +62,8 @@ def initialize(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand Down
19 changes: 19 additions & 0 deletions temporalio/lib/temporalio/client/workflow_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ class Description < WorkflowExecution
def initialize(raw_description, data_converter)
super(raw_description.workflow_execution_info, data_converter)
@raw_description = raw_description
@data_converter = data_converter
end

# @return [String, nil] Static summary configured on the workflow. This is currently experimental.
def static_summary
user_metadata.first
end

# @return [String, nil] Static details configured on the workflow. This is currently experimental.
def static_details
user_metadata.last
end

private

def user_metadata
@user_metadata ||= Internal::ProtoUtils.from_user_metadata(
@raw_description.execution_config&.user_metadata, @data_converter
)
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def start_workflow(input)
search_attributes: input.search_attributes&._to_proto,
workflow_start_delay: ProtoUtils.seconds_to_duration(input.start_delay),
request_eager_execution: input.request_eager_start,
user_metadata: ProtoUtils.to_user_metadata(
input.static_summary, input.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter)
)

Expand Down Expand Up @@ -319,6 +322,9 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
memo: ProtoUtils.memo_to_proto(start_options.memo, @client.data_converter),
search_attributes: start_options.search_attributes&._to_proto,
workflow_start_delay: ProtoUtils.seconds_to_duration(start_options.start_delay),
user_metadata: ProtoUtils.to_user_metadata(
start_options.static_summary, start_options.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(start_options.headers, @client.data_converter)
)
end
Expand Down
16 changes: 16 additions & 0 deletions temporalio/lib/temporalio/internal/proto_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ def self.reserved_name?(name)
name.start_with?('__temporal_') || name == '__stack_trace' || name == '__enhanced_stack_trace'
end

def self.to_user_metadata(summary, details, converter)
return nil if (!summary || summary.empty?) && (!details || details.empty?)

metadata = Temporalio::Api::Sdk::V1::UserMetadata.new
metadata.summary = converter.to_payload(summary) if summary && !summary.empty?
metadata.details = converter.to_payload(details) if details && !details.empty?
metadata
end

def self.from_user_metadata(metadata, converter)
[
(converter.from_payload(metadata.summary) if metadata&.summary), #: String?
(converter.from_payload(metadata.details) if metadata&.details) #: String?
]
end

class LazyMemo
def initialize(raw_memo, converter)
@raw_memo = raw_memo
Expand Down
28 changes: 27 additions & 1 deletion temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
:failure_converter, :cancellation, :continue_as_new_suggested, :current_history_length,
:current_history_size, :replaying, :random, :signal_handlers, :query_handlers, :update_handlers,
:context_frozen
attr_accessor :current_details

def initialize(details)
# Initialize general state
Expand Down Expand Up @@ -386,9 +387,10 @@ def apply_signal(job)
def apply_query(job)
schedule do
# If it's a built-in, run it without interceptors, otherwise do normal behavior
# TODO(cretz): __temporal_workflow_metadata
result = if job.query_type == '__stack_trace'
scheduler.stack_trace
elsif job.query_type == '__temporal_workflow_metadata'
workflow_metadata
else
# Get query definition, falling back to dynamic if not present and not reserved
defn = query_handlers[job.query_type]
Expand Down Expand Up @@ -678,6 +680,30 @@ def convert_args(payload_array:, method_name:, raw_args:, ignore_first_param: fa
end
end

def workflow_metadata
Temporalio::Api::Sdk::V1::WorkflowMetadata.new(
definition: Temporalio::Api::Sdk::V1::WorkflowDefinition.new(
type: info.workflow_type,
query_definitions: query_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end,
signal_definitions: signal_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end,
update_definitions: update_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end
),
current_details: current_details || ''
)
end

def scoped_logger_info
@scoped_logger_info ||= {
attempt: info.attempt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def continue_as_new_suggested
@instance.continue_as_new_suggested
end

def current_details
@instance.current_details || ''
end

def current_details=(details)
raise 'Details must be String' unless details.nil? || details.is_a?(String)

@instance.current_details = (details || '')
end

def current_history_length
@instance.current_history_length
end
Expand All @@ -52,6 +62,7 @@ def execute_activity(
activity,
*args,
task_queue:,
summary:,
schedule_to_close_timeout:,
schedule_to_start_timeout:,
start_to_close_timeout:,
Expand All @@ -67,6 +78,7 @@ def execute_activity(
activity:,
args:,
task_queue: task_queue || info.task_queue,
summary:,
schedule_to_close_timeout:,
schedule_to_start_timeout:,
start_to_close_timeout:,
Expand Down Expand Up @@ -191,6 +203,8 @@ def start_child_workflow(
*args,
id:,
task_queue:,
static_summary:,
static_details:,
cancellation:,
cancellation_type:,
parent_close_policy:,
Expand All @@ -209,6 +223,8 @@ def start_child_workflow(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
cancellation:,
cancellation_type:,
parent_close_policy:,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def execute_activity(input)
retry_policy: input.retry_policy&._to_proto,
cancellation_type: input.cancellation_type,
do_not_eagerly_execute: input.disable_eager_execution
)
),
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
)
)
seq
Expand Down Expand Up @@ -300,7 +301,8 @@ def sleep(input)
start_timer: Bridge::Api::WorkflowCommands::StartTimer.new(
seq:,
start_to_fire_timeout: ProtoUtils.seconds_to_duration(duration)
)
),
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
)
)
@instance.pending_timers[seq] = Fiber.current
Expand Down Expand Up @@ -354,6 +356,9 @@ def start_child_workflow(input)
memo: ProtoUtils.memo_to_proto_hash(input.memo, @instance.payload_converter),
search_attributes: input.search_attributes&._to_proto_hash,
cancellation_type: input.cancellation_type
),
user_metadata: ProtoUtils.to_user_metadata(
input.static_summary, input.static_details, @instance.payload_converter
)
)
)
Expand Down
Loading
Loading