Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/activity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'temporalio/activity/context'
require 'temporalio/activity/definition'
require 'temporalio/activity/info'
require 'temporalio/priority'

module Temporalio
# All activity related classes.
Expand Down
3 changes: 3 additions & 0 deletions temporalio/lib/temporalio/activity/info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Activity
:heartbeat_details,
:heartbeat_timeout,
:local?,
:priority,
:schedule_to_close_timeout,
:scheduled_time,
:start_to_close_timeout,
Expand Down Expand Up @@ -38,6 +39,8 @@ module Activity
# @return [Float, nil] Heartbeat timeout set by the caller.
# @!attribute local?
# @return [Boolean] Whether the activity is a local activity or not.
# @!attribute priority
# @return [Priority] The priority of this activity.
# @!attribute schedule_to_close_timeout
# @return [Float, nil] Schedule to close timeout set by the caller.
# @!attribute scheduled_time
Expand Down
11 changes: 11 additions & 0 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require 'temporalio/converters'
require 'temporalio/error'
require 'temporalio/internal/client/implementation'
require 'temporalio/priority'
require 'temporalio/retry_policy'
require 'temporalio/runtime'
require 'temporalio/search_attributes'
Expand Down Expand Up @@ -217,6 +218,9 @@ def operator_service
# with `cron_schedule`.
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
# server to start it on a local worker running with this same client. This is currently experimental.
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority of the workflow. This is currently experimental.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [WorkflowHandle] A workflow handle to the started workflow.
Expand All @@ -241,6 +245,7 @@ def start_workflow(
start_delay: nil,
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
rpc_options: nil
)
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
Expand All @@ -263,6 +268,7 @@ def start_workflow(
request_eager_start:,
headers: {},
versioning_override:,
priority:,
rpc_options:
))
end
Expand Down Expand Up @@ -295,6 +301,9 @@ def start_workflow(
# with `cron_schedule`.
# @param request_eager_start [Boolean] Potentially reduce the latency to start this workflow by encouraging the
# server to start it on a local worker running with this same client. This is currently experimental.
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
# This is currently experimental.
# @param priority [Priority] Priority for the workflow. This is currently experimental.
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
#
# @return [Object] Successful result of the workflow.
Expand All @@ -320,6 +329,7 @@ def execute_workflow(
start_delay: nil,
request_eager_start: false,
versioning_override: nil,
priority: Priority.default,
follow_runs: true,
rpc_options: nil
)
Expand All @@ -342,6 +352,7 @@ def execute_workflow(
start_delay:,
request_eager_start:,
versioning_override:,
priority:,
rpc_options:
)
follow_runs ? handle.result : handle
Expand Down
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/client/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def intercept_client(next_interceptor)
:request_eager_start,
:headers,
:versioning_override,
:priority,
:rpc_options
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def start_workflow(input)
input.static_summary, input.static_details, @client.data_converter
),
header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter),
priority: input.priority._to_proto,
versioning_override: input.versioning_override&._to_proto
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def execute_activity(task_token, defn, start)
),
heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
local?: start.is_local,
priority: Priority._from_proto(start.priority),
schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def initialize(details)
workflow_id: @init_job.parent_workflow_info.workflow_id
)
end,
priority: Priority._from_proto(@init_job.priority),
retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy),
root: if @init_job.root_workflow
Workflow::Info::RootInfo.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def execute_activity(
cancellation:,
cancellation_type:,
activity_id:,
disable_eager_execution:
disable_eager_execution:,
priority:
)
activity = case activity
when Class
Expand All @@ -102,6 +103,7 @@ def execute_activity(
cancellation_type:,
activity_id:,
disable_eager_execution: disable_eager_execution || @instance.disable_eager_activity_execution,
priority:,
headers: {}
)
)
Expand Down Expand Up @@ -249,7 +251,8 @@ def start_child_workflow(
retry_policy:,
cron_schedule:,
memo:,
search_attributes:
search_attributes:,
priority:
)
@outbound.start_child_workflow(
Temporalio::Worker::Interceptor::Workflow::StartChildWorkflowInput.new(
Expand All @@ -270,6 +273,7 @@ def start_child_workflow(
cron_schedule:,
memo:,
search_attributes:,
priority:,
headers: {}
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def execute_activity(input)
heartbeat_timeout: ProtoUtils.seconds_to_duration(input.heartbeat_timeout),
retry_policy: input.retry_policy&._to_proto,
cancellation_type: input.cancellation_type,
do_not_eagerly_execute: input.disable_eager_execution
do_not_eagerly_execute: input.disable_eager_execution,
priority: input.priority._to_proto
),
user_metadata: ProtoUtils.to_user_metadata(input.summary, nil, @instance.payload_converter)
)
Expand Down Expand Up @@ -337,7 +338,8 @@ def start_child_workflow(input)
headers: ProtoUtils.headers_to_proto_hash(input.headers, @instance.payload_converter),
memo: ProtoUtils.memo_to_proto_hash(input.memo, @instance.payload_converter),
search_attributes: input.search_attributes&._to_proto_hash,
cancellation_type: input.cancellation_type
cancellation_type: input.cancellation_type,
priority: input.priority._to_proto
),
user_metadata: ProtoUtils.to_user_metadata(
input.static_summary, input.static_details, @instance.payload_converter
Expand Down
59 changes: 59 additions & 0 deletions temporalio/lib/temporalio/priority.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# frozen_string_literal: true

require 'temporalio/api'

module Temporalio
Priority = Data.define(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to default a field for this you can still leave the initialize the way you had it in the class and just call super, e.g.

def initialize(priority_key: nil)
  super
end

But requiring it is fine too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, honestly requiring it makes more sense

:priority_key
)

# Priority contains metadata that controls relative ordering of task processing when tasks are
# backlogged in a queue. Initially, Priority will be used in activity and workflow task
# queues, which are typically where backlogs exist. Priority is (for now) attached to
# workflows and activities. Activities and child workflows inherit Priority from the workflow
# that created them, but may override fields when they are started or modified. For each field
# of a Priority on an activity/workflow, not present or equal to zero/empty string means to
# inherit the value from the calling workflow, or if there is no calling workflow, then use
# the default (documented on the field).
#
# The overall semantics of Priority are:
# 1. First, consider "priority_key": lower number goes first.
# (more will be added here later).
#
# @!attribute priority_key
# @return [Integer, nil] The priority key, which is a positive integer from 1 to n, where
# smaller integers correspond to higher priorities (tasks run sooner). In general, tasks in a
# queue should be processed in close to priority order, although small deviations are possible.
# The maximum priority value (minimum priority) is determined by server configuration, and
# defaults to 5.
#
# The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes
# out to 3.
class Priority
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should be a Data class (and get all of the goodies it provides). I think it makes sense like record did for .NET and @dataclass did for Python. Still will need much of what you have here (so after the Data.define, re-open the class Priority to add stuff).

# @!visibility private
def self._from_proto(priority)
return default if priority.nil?

new(priority_key: priority.priority_key.zero? ? nil : priority.priority_key)
end

# The default priority instance.
#
# @return [Priority] The default priority
def self.default
@default ||= new(priority_key: nil)
end

# @!visibility private
def _to_proto
return nil if priority_key.nil?

Temporalio::Api::Common::V1::Priority.new(priority_key: priority_key || 0)
end

# @return [Boolean] True if this priority is empty/default
def empty?
priority_key.nil?
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def self.default_info
heartbeat_details: [],
heartbeat_timeout: nil,
local?: false,
priority: Temporalio::Priority.default,
schedule_to_close_timeout: 1.0,
scheduled_time: Time.at(0),
start_to_close_timeout: 1.0,
Expand Down
6 changes: 4 additions & 2 deletions temporalio/lib/temporalio/worker/interceptor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ def handle_update(input)
:cancellation_type,
:activity_id,
:disable_eager_execution,
:headers
:headers,
:priority
)

# Input for {Outbound.execute_local_activity}.
Expand Down Expand Up @@ -284,7 +285,8 @@ def handle_update(input)
:cron_schedule,
:memo,
:search_attributes,
:headers
:headers,
:priority
)

# Outbound interceptor for intercepting outbound workflow calls. This should be extended by users needing to
Expand Down
2 changes: 1 addition & 1 deletion temporalio/lib/temporalio/worker/workflow_replayer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def initialize(
)._to_bridge_options,
identity_override: options.identity,
max_cached_workflows: 2,
max_concurrent_workflow_task_polls: 1,
max_concurrent_workflow_task_polls: 2,
nonsticky_to_sticky_poll_ratio: 1.0,
max_concurrent_activity_task_polls: 1,
no_remote_activities: true,
Expand Down
19 changes: 13 additions & 6 deletions temporalio/lib/temporalio/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'random/formatter'
require 'temporalio/error'
require 'temporalio/priority'
require 'temporalio/workflow/activity_cancellation_type'
require 'temporalio/workflow/child_workflow_cancellation_type'
require 'temporalio/workflow/child_workflow_handle'
Expand Down Expand Up @@ -130,6 +131,7 @@ def self.deprecate_patch(patch_id)
# optimization on some servers that sends activities back to the same worker as the calling workflow if they can
# run there. If `false` (the default), eager execution may still be disabled at the worker level or may not be
# requested due to lack of available slots.
# @param priority [Priority] Priority of the activity. This is currently experimental.
#
# @return [Object] Result of the activity.
# @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted).
Expand All @@ -148,12 +150,14 @@ def self.execute_activity(
cancellation: Workflow.cancellation,
cancellation_type: ActivityCancellationType::TRY_CANCEL,
activity_id: nil,
disable_eager_execution: false
disable_eager_execution: false,
priority: Priority.default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the import to this file and to activity.rb? I know it doesn't make sense to add everywhere, but along with temporalio/client, these two are kinda "single require entry points" to the SDK.

)
_current.execute_activity(
activity, *args,
task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:,
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:
heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:,
priority:
)
end

Expand All @@ -175,13 +179,14 @@ def self.execute_child_workflow(
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
priority: Priority.default
)
start_child_workflow(
workflow, *args,
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
retry_policy:, cron_schedule:, memo:, search_attributes:
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
).result
end

Expand Down Expand Up @@ -372,6 +377,7 @@ def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation)
# @param cron_schedule [String, nil] Cron schedule. Users should use schedules instead of this.
# @param memo [Hash{String, Symbol => Object}, nil] Memo for the workflow.
# @param search_attributes [SearchAttributes, nil] Search attributes for the workflow.
# @param priority [Priority] Priority of the workflow. This is currently experimental.
#
# @return [ChildWorkflowHandle] Workflow handle to the started workflow.
# @raise [Error::WorkflowAlreadyStartedError] Workflow already exists for the ID.
Expand All @@ -393,13 +399,14 @@ def self.start_child_workflow(
retry_policy: nil,
cron_schedule: nil,
memo: nil,
search_attributes: nil
search_attributes: nil,
priority: Priority.default
)
_current.start_child_workflow(
workflow, *args,
id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:,
parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:,
retry_policy:, cron_schedule:, memo:, search_attributes:
retry_policy:, cron_schedule:, memo:, search_attributes:, priority:
)
end

Expand Down
3 changes: 3 additions & 0 deletions temporalio/lib/temporalio/workflow/info.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ module Workflow
:last_result,
:namespace,
:parent,
:priority,
:retry_policy,
:root,
:run_id,
Expand Down Expand Up @@ -44,6 +45,8 @@ module Workflow
# @return [String] Namespace for the workflow.
# @!attribute parent
# @return [ParentInfo, nil] Parent information for the workflow if this is a child.
# @!attribute priority
# @return [Priority] The priority of this workflow.
# @!attribute retry_policy
# @return [RetryPolicy, nil] Retry policy for the workflow.
# @!attribute root
Expand Down
2 changes: 2 additions & 0 deletions temporalio/sig/temporalio/activity/info.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Temporalio
attr_reader heartbeat_details: Array[Object?]
attr_reader heartbeat_timeout: Float?
attr_reader local?: bool
attr_reader priority: Temporalio::Priority?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attr_reader priority: Temporalio::Priority?
attr_reader priority: Temporalio::Priority

attr_reader schedule_to_close_timeout: Float?
attr_reader scheduled_time: Time
attr_reader start_to_close_timeout: Float?
Expand All @@ -27,6 +28,7 @@ module Temporalio
heartbeat_details: Array[Object?],
heartbeat_timeout: Float?,
local?: bool,
priority: Temporalio::Priority?,
schedule_to_close_timeout: Float?,
scheduled_time: Time,
start_to_close_timeout: Float?,
Expand Down
Loading
Loading