Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion temporalio/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ Rake::Task[:build].enhance([:copy_parent_files]) do
end

task :rust_lint do
sh 'cargo', 'clippy', '--', '-Dwarnings'
sh 'cargo', 'clippy', '-p', 'temporalio_bridge', '--no-deps', '--', '-Dwarnings'
sh 'cargo', 'fmt', '--check'
end

Expand Down
1 change: 1 addition & 0 deletions temporalio/extra/proto_gen.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def generate_import_helper_files
require 'temporalio/api/errordetails/v1/message'
require 'temporalio/api/export/v1/message'
require 'temporalio/api/operatorservice'
require 'temporalio/api/sdk/v1/workflow_metadata'
require 'temporalio/api/workflowservice'

module Temporalio
Expand Down
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'temporalio/api/errordetails/v1/message'
require 'temporalio/api/export/v1/message'
require 'temporalio/api/operatorservice'
require 'temporalio/api/sdk/v1/workflow_metadata'
require 'temporalio/api/workflowservice'

module Temporalio
Expand Down
20 changes: 20 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def operator_service
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear in
# CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can
# be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is currently
# experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and continue
# as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand All @@ -220,6 +226,8 @@ def start_workflow(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -238,6 +246,8 @@ def start_workflow(
args:,
workflow_id: id,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand All @@ -260,6 +270,12 @@ def start_workflow(
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear in
# CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can
# be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is currently
# experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and continue
# as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand Down Expand Up @@ -287,6 +303,8 @@ def execute_workflow(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -305,6 +323,8 @@ def execute_workflow(
*args,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand Down
2 changes: 2 additions & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def intercept_client(next_interceptor)
:args,
:workflow_id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand Down
26 changes: 25 additions & 1 deletion temporalio/lib/temporalio/client/schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ def _to_proto(data_converter)
:args,
:id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand All @@ -186,6 +188,14 @@ def _to_proto(data_converter)
# @return [String] Unique identifier for the workflow execution.
# @!attribute task_queue
# @return [String] Task queue to run the workflow on.
# @!attribute static_summary
# @return [String, nil] Fixed single-line summary for this workflow execution that may appear in CLI/UI.
# This can be in single-line Temporal markdown format. This is currently experimental.
# @!attribute static_details
# @return [String, nil] Fixed details for this workflow execution that may appear in CLI/UI. This can be in
# Temporal markdown format and can be multiple lines. This is a fixed value on the workflow that cannot be
# updated. For details that can be updated, use {Workflow.current_details=} within the workflow. This is
# currently experimental.
# @!attribute execution_timeout
# @return [Float, nil] Total workflow execution timeout in seconds including retries and continue as new.
# @!attribute run_timeout
Expand All @@ -212,6 +222,12 @@ class << self
# @param args [Array<Object>] Arguments to the workflow.
# @param id [String] Unique identifier for the workflow execution.
# @param task_queue [String] Task queue to run the workflow on.
# @param static_summary [String, nil] Fixed single-line summary for this workflow execution that may appear
# in CLI/UI. This can be in single-line Temporal markdown format. This is currently experimental.
# @param static_details [String, nil] Fixed details for this workflow execution that may appear in CLI/UI.
# This can be in Temporal markdown format and can be multiple lines. This is a fixed value on the workflow
# that cannot be updated. For details that can be updated, use {Workflow.current_details=} within the
# workflow. This is currently experimental.
# @param execution_timeout [Float, nil] Total workflow execution timeout in seconds including retries and
# continue as new.
# @param run_timeout [Float, nil] Timeout of a single workflow run in seconds.
Expand All @@ -225,6 +241,8 @@ def new(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -238,6 +256,8 @@ def new(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand All @@ -251,11 +271,14 @@ def new(

# @!visibility private
def self._from_proto(raw_info, data_converter)
(summary, details) = Internal::ProtoUtils.from_user_metadata(raw_info.user_metadata, data_converter)
StartWorkflow.new(
raw_info.workflow_type.name,
*data_converter.from_payloads(raw_info.input),
id: raw_info.workflow_id,
task_queue: raw_info.task_queue.name,
static_summary: summary,
static_details: details,
execution_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_execution_timeout),
run_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_run_timeout),
task_timeout: Internal::ProtoUtils.duration_to_seconds(raw_info.workflow_task_timeout),
Expand All @@ -280,7 +303,8 @@ def _to_proto(data_converter)
retry_policy: retry_policy&._to_proto,
memo: Internal::ProtoUtils.memo_to_proto(memo, data_converter),
search_attributes: search_attributes&._to_proto,
header: Internal::ProtoUtils.headers_to_proto(headers, data_converter)
header: Internal::ProtoUtils.headers_to_proto(headers, data_converter),
user_metadata: Internal::ProtoUtils.to_user_metadata(static_summary, static_details, data_converter)
)
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class WithStartWorkflowOperation
:args,
:id,
:task_queue,
:static_summary,
:static_details,
:execution_timeout,
:run_timeout,
:task_timeout,
Expand Down Expand Up @@ -41,6 +43,8 @@ def initialize(
*args,
id:,
task_queue:,
static_summary: nil,
static_details: nil,
execution_timeout: nil,
run_timeout: nil,
task_timeout: nil,
Expand All @@ -58,6 +62,8 @@ def initialize(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
execution_timeout:,
run_timeout:,
task_timeout:,
Expand Down
19 changes: 19 additions & 0 deletions temporalio/lib/temporalio/client/workflow_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,25 @@ class Description < WorkflowExecution
def initialize(raw_description, data_converter)
super(raw_description.workflow_execution_info, data_converter)
@raw_description = raw_description
@data_converter = data_converter
end

# @return [String, nil] Static summary configured on the workflow. This is currently experimental.
def static_summary
user_metadata.first
end

# @return [String, nil] Static details configured on the workflow. This is currently experimental.
def static_details
user_metadata.last
end

private

def user_metadata
@user_metadata ||= Internal::ProtoUtils.from_user_metadata(
@raw_description.execution_config&.user_metadata, @data_converter
)
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions temporalio/lib/temporalio/internal/client/implementation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def start_workflow(input)
search_attributes: input.search_attributes&._to_proto,
workflow_start_delay: ProtoUtils.seconds_to_duration(input.start_delay),
request_eager_execution: input.request_eager_start,
user_metadata: ProtoUtils.to_user_metadata(
input.static_summary, input.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter)
)

Expand Down Expand Up @@ -319,6 +322,9 @@ def _start_workflow_request_from_with_start_options(klass, start_options)
memo: ProtoUtils.memo_to_proto(start_options.memo, @client.data_converter),
search_attributes: start_options.search_attributes&._to_proto,
workflow_start_delay: ProtoUtils.seconds_to_duration(start_options.start_delay),
user_metadata: ProtoUtils.to_user_metadata(
start_options.static_summary, start_options.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(start_options.headers, @client.data_converter)
)
end
Expand Down
16 changes: 16 additions & 0 deletions temporalio/lib/temporalio/internal/proto_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,22 @@ def self.reserved_name?(name)
name.start_with?('__temporal_') || name == '__stack_trace' || name == '__enhanced_stack_trace'
end

def self.to_user_metadata(summary, details, converter)
return nil if (!summary || summary.empty?) && (!details || details.empty?)

metadata = Temporalio::Api::Sdk::V1::UserMetadata.new
metadata.summary = converter.to_payload(summary) if summary && !summary.empty?
metadata.details = converter.to_payload(details) if details && !details.empty?
metadata
end

def self.from_user_metadata(metadata, converter)
[
(converter.from_payload(metadata.summary) if metadata&.summary), #: String?
(converter.from_payload(metadata.details) if metadata&.details) #: String?
]
end

class LazyMemo
def initialize(raw_memo, converter)
@raw_memo = raw_memo
Expand Down
28 changes: 27 additions & 1 deletion temporalio/lib/temporalio/internal/worker/workflow_instance.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def self.new_completion_with_failure(run_id:, error:, failure_converter:, payloa
:failure_converter, :cancellation, :continue_as_new_suggested, :current_history_length,
:current_history_size, :replaying, :random, :signal_handlers, :query_handlers, :update_handlers,
:context_frozen
attr_accessor :current_details

def initialize(details)
# Initialize general state
Expand Down Expand Up @@ -386,9 +387,10 @@ def apply_signal(job)
def apply_query(job)
schedule do
# If it's a built-in, run it without interceptors, otherwise do normal behavior
# TODO(cretz): __temporal_workflow_metadata
result = if job.query_type == '__stack_trace'
scheduler.stack_trace
elsif job.query_type == '__temporal_workflow_metadata'
workflow_metadata
else
# Get query definition, falling back to dynamic if not present and not reserved
defn = query_handlers[job.query_type]
Expand Down Expand Up @@ -678,6 +680,30 @@ def convert_args(payload_array:, method_name:, raw_args:, ignore_first_param: fa
end
end

def workflow_metadata
Temporalio::Api::Sdk::V1::WorkflowMetadata.new(
definition: Temporalio::Api::Sdk::V1::WorkflowDefinition.new(
type: info.workflow_type,
query_definitions: query_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end,
signal_definitions: signal_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end,
update_definitions: update_handlers.values.map do |defn|
Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new(
name: defn.name || '', description: defn.description || ''
)
end
),
current_details: current_details || ''
)
end

def scoped_logger_info
@scoped_logger_info ||= {
attempt: info.attempt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ def continue_as_new_suggested
@instance.continue_as_new_suggested
end

def current_details
@instance.current_details || ''
end

def current_details=(details)
raise 'Details must be a String' unless details.nil? || details.is_a?(String)

@instance.current_details = (details || '')
end

def current_history_length
@instance.current_history_length
end
Expand All @@ -52,6 +62,7 @@ def execute_activity(
activity,
*args,
task_queue:,
summary:,
schedule_to_close_timeout:,
schedule_to_start_timeout:,
start_to_close_timeout:,
Expand All @@ -67,6 +78,7 @@ def execute_activity(
activity:,
args:,
task_queue: task_queue || info.task_queue,
summary:,
schedule_to_close_timeout:,
schedule_to_start_timeout:,
start_to_close_timeout:,
Expand Down Expand Up @@ -191,6 +203,8 @@ def start_child_workflow(
*args,
id:,
task_queue:,
static_summary:,
static_details:,
cancellation:,
cancellation_type:,
parent_close_policy:,
Expand All @@ -209,6 +223,8 @@ def start_child_workflow(
args:,
id:,
task_queue:,
static_summary:,
static_details:,
cancellation:,
cancellation_type:,
parent_close_policy:,
Expand Down
Loading
Loading