-
Notifications
You must be signed in to change notification settings - Fork 22
Priority annotations #274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Priority annotations #274
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -37,6 +37,7 @@ def intercept_client(next_interceptor) | |||||||||
| :start_delay, | ||||||||||
| :request_eager_start, | ||||||||||
| :headers, | ||||||||||
| :priority, | ||||||||||
| :versioning_override, | ||||||||||
|
||||||||||
| :priority, | |
| :versioning_override, | |
| :versioning_override, | |
| :priority, |
Pedantic, but might as well keep in the same order as the parameters of the outer call
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -70,6 +70,7 @@ def start_workflow(input) | |||||
| input.static_summary, input.static_details, @client.data_converter | ||||||
| ), | ||||||
| header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter), | ||||||
| priority: input.priority&._to_proto, | ||||||
|
||||||
| priority: input.priority&._to_proto, | |
| priority: input.priority._to_proto, |
Same for everywhere else where priority actually can't be nil
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| require 'temporalio/api' | ||
|
|
||
| module Temporalio | ||
| # Priority contains metadata that controls relative ordering of task processing when tasks are | ||
| # backlogged in a queue. Initially, Priority will be used in activity and workflow task | ||
| # queues, which are typically where backlogs exist. Priority is (for now) attached to | ||
| # workflows and activities. Activities and child workflows inherit Priority from the workflow | ||
| # that created them, but may override fields when they are started or modified. For each field | ||
| # of a Priority on an activity/workflow, not present or equal to zero/empty string means to | ||
| # inherit the value from the calling workflow, or if there is no calling workflow, then use | ||
| # the default (documented on the field). | ||
| # | ||
| # The overall semantics of Priority are: | ||
| # 1. First, consider "priority_key": lower number goes first. | ||
| # (more will be added here later). | ||
| class Priority | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm wondering if this should be a |
||
| # The priority key, which is a positive integer from 1 to n, where smaller integers | ||
| # correspond to higher priorities (tasks run sooner). In general, tasks in a queue should | ||
| # be processed in close to priority order, although small deviations are possible. The | ||
| # maximum priority value (minimum priority) is determined by server configuration, and | ||
| # defaults to 5. | ||
| # | ||
| # The default priority is (min+max)/2. With the default max of 5 and min of 1, that comes | ||
| # out to 3. | ||
| # | ||
| # @return [Integer, nil] The priority key | ||
| attr_reader :priority_key | ||
|
|
||
| # @!visibility private | ||
| def self._from_proto(priority) | ||
| return default if priority.nil? | ||
|
|
||
| new(priority_key: priority.priority_key.zero? ? nil : priority.priority_key) | ||
| end | ||
|
|
||
| # The default priority instance. | ||
| # | ||
| # @return [Priority] The default priority | ||
| def self.default | ||
| @default ||= new | ||
| end | ||
|
|
||
| # Create a new Priority instance | ||
| # | ||
| # @param priority_key [Integer, nil] The priority key | ||
| def initialize(priority_key: nil) | ||
| @priority_key = priority_key | ||
| end | ||
|
|
||
| # @!visibility private | ||
| def _to_proto | ||
| return nil if priority_key.nil? | ||
|
|
||
| Temporalio::Api::Common::V1::Priority.new(priority_key: priority_key || 0) | ||
| end | ||
|
|
||
| # @return [Boolean] True if this priority is empty/default | ||
| def empty? | ||
| priority_key.nil? | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -130,6 +130,7 @@ def self.deprecate_patch(patch_id) | |
| # optimization on some servers that sends activities back to the same worker as the calling workflow if they can | ||
| # run there. If `false` (the default), eager execution may still be disabled at the worker level or may not be | ||
| # requested due to lack of available slots. | ||
| # @param priority [Priority] Priority of the activity. This is currently experimental. | ||
| # | ||
| # @return [Object] Result of the activity. | ||
| # @raise [Error::ActivityError] Activity failed (and retry was disabled or exhausted). | ||
|
|
@@ -148,12 +149,14 @@ def self.execute_activity( | |
| cancellation: Workflow.cancellation, | ||
| cancellation_type: ActivityCancellationType::TRY_CANCEL, | ||
| activity_id: nil, | ||
| disable_eager_execution: false | ||
| disable_eager_execution: false, | ||
| priority: Priority.default | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add the import to this file and to |
||
| ) | ||
| _current.execute_activity( | ||
| activity, *args, | ||
| task_queue:, summary:, schedule_to_close_timeout:, schedule_to_start_timeout:, start_to_close_timeout:, | ||
| heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution: | ||
| heartbeat_timeout:, retry_policy:, cancellation:, cancellation_type:, activity_id:, disable_eager_execution:, | ||
| priority: | ||
| ) | ||
| end | ||
|
|
||
|
|
@@ -175,13 +178,14 @@ def self.execute_child_workflow( | |
| retry_policy: nil, | ||
| cron_schedule: nil, | ||
| memo: nil, | ||
| search_attributes: nil | ||
| search_attributes: nil, | ||
| priority: Priority.default | ||
| ) | ||
| start_child_workflow( | ||
| workflow, *args, | ||
| id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, | ||
| parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, | ||
| retry_policy:, cron_schedule:, memo:, search_attributes: | ||
| retry_policy:, cron_schedule:, memo:, search_attributes:, priority: | ||
| ).result | ||
| end | ||
|
|
||
|
|
@@ -372,6 +376,7 @@ def self.sleep(duration, summary: nil, cancellation: Workflow.cancellation) | |
| # @param cron_schedule [String, nil] Cron schedule. Users should use schedules instead of this. | ||
| # @param memo [Hash{String, Symbol => Object}, nil] Memo for the workflow. | ||
| # @param search_attributes [SearchAttributes, nil] Search attributes for the workflow. | ||
| # @param priority [Priority] Priority of the workflow. This is currently experimental. | ||
| # | ||
| # @return [ChildWorkflowHandle] Workflow handle to the started workflow. | ||
| # @raise [Error::WorkflowAlreadyStartedError] Workflow already exists for the ID. | ||
|
|
@@ -393,13 +398,14 @@ def self.start_child_workflow( | |
| retry_policy: nil, | ||
| cron_schedule: nil, | ||
| memo: nil, | ||
| search_attributes: nil | ||
| search_attributes: nil, | ||
| priority: Priority.default | ||
| ) | ||
| _current.start_child_workflow( | ||
| workflow, *args, | ||
| id:, task_queue:, static_summary:, static_details:, cancellation:, cancellation_type:, | ||
| parent_close_policy:, execution_timeout:, run_timeout:, task_timeout:, id_reuse_policy:, | ||
| retry_policy:, cron_schedule:, memo:, search_attributes: | ||
| retry_policy:, cron_schedule:, memo:, search_attributes:, priority: | ||
| ) | ||
| end | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -8,6 +8,7 @@ module Temporalio | |||||
| attr_reader heartbeat_details: Array[Object?] | ||||||
| attr_reader heartbeat_timeout: Float? | ||||||
| attr_reader local?: bool | ||||||
| attr_reader priority: Temporalio::Priority? | ||||||
|
||||||
| attr_reader priority: Temporalio::Priority? | |
| attr_reader priority: Temporalio::Priority |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this ever be
nil? (same for workflow info)