Skip to content

Commit 200c1a0

Browse files
committed
Converter hints
Fixes #271
1 parent 143e421 commit 200c1a0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1027
-273
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ opinions. Please communicate with us on [Slack](https://t.mp/slack) in the `#rub
4343
- [Cloud Client Using API Key](#cloud-client-using-api-key)
4444
- [Data Conversion](#data-conversion)
4545
- [ActiveModel](#activemodel)
46+
- [Converter Hints](#converter-hints)
4647
- [Workers](#workers)
4748
- [Workflows](#workflows)
4849
- [Workflow Definition](#workflow-definition)
@@ -336,6 +337,25 @@ Now if `include ActiveModelJSONSupport` is present on any ActiveModel class, on
336337
which will use `as_json` which calls the super `as_json` but also includes the fully qualified class name as the JSON
337338
`create_id` key. On deserialization, Ruby JSON then uses this key to know what class to call `json_create` on.
338339

340+
##### Converter Hints
341+
342+
In most places where objects are converted to payloads or vice versa, a "hint" can be provided to tell the converter
343+
something else about the object/payload to assist conversion. The default converters ignore these hints, but custom
344+
converters can be written to take advantage of them. For example, hints may be used to provide a custom converter the
345+
Ruby type to deserialize a payload into.
346+
347+
These hints manifest themselves various ways throughout the API. The most obvious way is when making definitions. An
348+
activity can define `activity_arg_hint` (which accepts multiple) and/or `activity_result_hint` for activity-level hints.
349+
Similarly, a workflow can define `workflow_arg_hint` and/or `workflow_result_hint` for workflow-level hints.
350+
`workflow_signal`, `workflow_query`, and `workflow_update` all similarly accept `arg_hints` and `result_hint` (except
351+
signal of course). These definition-level hints are passed to converters both from the caller side and the
352+
implementation side.
353+
354+
There are some advanced payload uses in the SDK that do not currently have a way to set hints. These include
355+
workflow/schedule memo, workflow get/upsert memo, activity last heartbeat details, and application error details. In
356+
some cases, users can use `Temporalio::Converters::RawValue` and then manually convert with hints. For others, hints can
357+
be added as needed, please open an issue or otherwise contact Temporal.
358+
339359
### Workers
340360

341361
Workers host workflows and/or activities. Here's how to run a worker:

temporalio/lib/temporalio/activity/context.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ def instance
6161
# Users do not have to be concerned with burdening the server by calling this too frequently.
6262
#
6363
# @param details [Array<Object>] Details to record with the heartbeat.
64-
def heartbeat(*details)
64+
# @param detail_hints [Array<Object>, nil] Hints to pass to converter.
65+
def heartbeat(*details, detail_hints: nil)
6566
raise NotImplementedError
6667
end
6768

temporalio/lib/temporalio/activity/definition.rb

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,21 @@ def activity_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParam
7878

7979
@activity_raw_args = value
8080
end
81+
82+
# Add activity hints to be passed to converter for activity args.
83+
#
84+
# @param hints [Array<Object>] Hints to add.
85+
def activity_arg_hint(*hints)
86+
@activity_arg_hints ||= []
87+
@activity_arg_hints.concat(hints)
88+
end
89+
90+
# Set activity result hint to be passed to converter for activity result.
91+
#
92+
# @param hint [Object] Hint to set.
93+
def activity_result_hint(hint)
94+
@activity_result_hint = hint
95+
end
8196
end
8297

8398
# @!visibility private
@@ -96,7 +111,9 @@ def self._activity_definition_details
96111
activity_name:,
97112
activity_executor: @activity_executor || :default,
98113
activity_cancel_raise: @activity_cancel_raise.nil? || @activity_cancel_raise,
99-
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args
114+
activity_raw_args: @activity_raw_args.nil? ? false : @activity_raw_args,
115+
activity_arg_hints: @activity_arg_hints,
116+
activity_result_hint: @activity_result_hint
100117
}
101118
end
102119

@@ -127,6 +144,12 @@ class Info
127144
# @return [Boolean] Whether to use {Converters::RawValue}s as arguments.
128145
attr_reader :raw_args
129146

147+
# @return [Array<Object>, nil] Argument hints.
148+
attr_reader :arg_hints
149+
150+
# @return [Object, nil] Result hint
151+
attr_reader :result_hint
152+
130153
# Obtain definition info representing the given activity, which can be a class, instance, or definition info.
131154
#
132155
# @param activity [Definition, Class<Definition>, Info] Activity to get info for.
@@ -147,7 +170,9 @@ def self.from_activity(activity)
147170
instance: proc { activity.new },
148171
executor: details[:activity_executor],
149172
cancel_raise: details[:activity_cancel_raise],
150-
raw_args: details[:activity_raw_args]
173+
raw_args: details[:activity_raw_args],
174+
arg_hints: details[:activity_arg_hints],
175+
result_hint: details[:activity_result_hint]
151176
) { |*args| Context.current.instance&.execute(*args) }
152177
when Definition
153178
details = activity.class._activity_definition_details
@@ -156,7 +181,9 @@ def self.from_activity(activity)
156181
instance: activity,
157182
executor: details[:activity_executor],
158183
cancel_raise: details[:activity_cancel_raise],
159-
raw_args: details[:activity_raw_args]
184+
raw_args: details[:activity_raw_args],
185+
arg_hints: details[:activity_arg_hints],
186+
result_hint: details[:activity_result_hint]
160187
) { |*args| Context.current.instance&.execute(*args) }
161188
when Info
162189
activity
@@ -172,13 +199,17 @@ def self.from_activity(activity)
172199
# @param executor [Symbol] Name of the executor.
173200
# @param cancel_raise [Boolean] Whether to raise in thread/fiber on cancellation.
174201
# @param raw_args [Boolean] Whether to use {Converters::RawValue}s as arguments.
202+
# @param arg_hints [Array<Object>, nil] Argument hints.
203+
# @param result_hint [Object, nil] Result hint.
175204
# @yield Use this block as the activity.
176205
def initialize(
177206
name:,
178207
instance: nil,
179208
executor: :default,
180209
cancel_raise: true,
181210
raw_args: false,
211+
arg_hints: nil,
212+
result_hint: nil,
182213
&block
183214
)
184215
@name = name
@@ -189,6 +220,8 @@ def initialize(
189220
@executor = executor
190221
@cancel_raise = cancel_raise
191222
@raw_args = raw_args
223+
@arg_hints = arg_hints
224+
@result_hint = result_hint
192225
Internal::ProtoUtils.assert_non_reserved_name(name)
193226
end
194227
end

temporalio/lib/temporalio/client.rb

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,10 @@ def operator_service
221221
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
222222
# This is currently experimental.
223223
# @param priority [Priority] Priority of the workflow. This is currently experimental.
224+
# @param arg_hints [Array<Object>, nil] Overrides converter hints for arguments if any. If unset/nil and the
225+
# workflow definition has arg hints, those are used by default.
226+
# @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow
227+
# definition has result hint, it is used by default.
224228
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
225229
#
226230
# @return [WorkflowHandle] A workflow handle to the started workflow.
@@ -246,10 +250,15 @@ def start_workflow(
246250
request_eager_start: false,
247251
versioning_override: nil,
248252
priority: Priority.default,
253+
arg_hints: nil,
254+
result_hint: nil,
249255
rpc_options: nil
250256
)
257+
# Take hints from definition if there is a definition
258+
workflow, defn_arg_hints, defn_result_hint =
259+
Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(workflow)
251260
@impl.start_workflow(Interceptor::StartWorkflowInput.new(
252-
workflow: Workflow::Definition._workflow_type_from_workflow_parameter(workflow),
261+
workflow:,
253262
args:,
254263
workflow_id: id,
255264
task_queue:,
@@ -269,6 +278,8 @@ def start_workflow(
269278
headers: {},
270279
versioning_override:,
271280
priority:,
281+
arg_hints: arg_hints || defn_arg_hints,
282+
result_hint: result_hint || defn_result_hint,
272283
rpc_options:
273284
))
274285
end
@@ -304,6 +315,10 @@ def start_workflow(
304315
# @param versioning_override [VersioningOverride, nil] Override the version of the workflow.
305316
# This is currently experimental.
306317
# @param priority [Priority] Priority for the workflow. This is currently experimental.
318+
# @param arg_hints [Array<Object>, nil] Overrides converter hints for arguments if any. If unset/nil and the
319+
# workflow definition has arg hints, those are used by default.
320+
# @param result_hint [Object, nil] Overrides converter hint for result if any. If unset/nil and the workflow
321+
# definition has result hint, it is used by default.
307322
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
308323
#
309324
# @return [Object] Successful result of the workflow.
@@ -330,10 +345,11 @@ def execute_workflow(
330345
request_eager_start: false,
331346
versioning_override: nil,
332347
priority: Priority.default,
333-
follow_runs: true,
348+
arg_hints: nil,
349+
result_hint: nil,
334350
rpc_options: nil
335351
)
336-
handle = start_workflow(
352+
start_workflow(
337353
workflow,
338354
*args,
339355
id:,
@@ -353,9 +369,10 @@ def execute_workflow(
353369
request_eager_start:,
354370
versioning_override:,
355371
priority:,
372+
arg_hints:,
373+
result_hint:,
356374
rpc_options:
357-
)
358-
follow_runs ? handle.result : handle
375+
).result
359376
end
360377

361378
# Get a workflow handle to an existing workflow by its ID.
@@ -365,14 +382,18 @@ def execute_workflow(
365382
# interactions occur on the latest of the workflow ID.
366383
# @param first_execution_run_id [String, nil] First execution run ID used for some calls like cancellation and
367384
# termination to ensure the affected workflow is only within the same chain as this given run ID.
385+
# @param result_hint [Object, nil] Converter hint for the workflow's result.
368386
#
369387
# @return [WorkflowHandle] The workflow handle.
370388
def workflow_handle(
371389
workflow_id,
372390
run_id: nil,
373-
first_execution_run_id: nil
391+
first_execution_run_id: nil,
392+
result_hint: nil
374393
)
375-
WorkflowHandle.new(client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:)
394+
WorkflowHandle.new(
395+
client: self, id: workflow_id, run_id:, result_run_id: run_id, first_execution_run_id:, result_hint:
396+
)
376397
end
377398

378399
# Start an update, possibly starting the workflow at the same time if it doesn't exist (depending upon ID conflict
@@ -386,6 +407,10 @@ def workflow_handle(
386407
# @param wait_for_stage [WorkflowUpdateWaitStage] Required stage to wait until returning. ADMITTED is not
387408
# currently supported. See https://docs.temporal.io/workflows#update for more details.
388409
# @param id [String] ID of the update.
410+
# @param arg_hints [Array<Object>, nil] Overrides converter hints for update arguments if any. If unset/nil and the
411+
# update definition has arg hints, those are used by default.
412+
# @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update
413+
# definition has result hint, it is used by default.
389414
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
390415
#
391416
# @return [WorkflowUpdateHandle] The update handle.
@@ -399,15 +424,20 @@ def start_update_with_start_workflow(
399424
start_workflow_operation:,
400425
wait_for_stage:,
401426
id: SecureRandom.uuid,
427+
arg_hints: nil,
428+
result_hint: nil,
402429
rpc_options: nil
403430
)
431+
update, defn_arg_hints, defn_result_hint = Workflow::Definition::Update._name_and_hints_from_parameter(update)
404432
@impl.start_update_with_start_workflow(
405433
Interceptor::StartUpdateWithStartWorkflowInput.new(
406434
update_id: id,
407-
update: Workflow::Definition::Update._name_from_parameter(update),
435+
update:,
408436
args:,
409437
wait_for_stage:,
410438
start_workflow_operation:,
439+
arg_hints: arg_hints || defn_arg_hints,
440+
result_hint: result_hint || defn_result_hint,
411441
headers: {},
412442
rpc_options:
413443
)
@@ -423,6 +453,10 @@ def start_update_with_start_workflow(
423453
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This must
424454
# have an `id_conflict_policy` set.
425455
# @param id [String] ID of the update.
456+
# @param arg_hints [Array<Object>, nil] Overrides converter hints for update arguments if any. If unset/nil and the
457+
# update definition has arg hints, those are used by default.
458+
# @param result_hint [Object, nil] Overrides converter hint for update result if any. If unset/nil and the update
459+
# definition has result hint, it is used by default.
426460
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
427461
#
428462
# @return [Object] Successful update result.
@@ -436,6 +470,8 @@ def execute_update_with_start_workflow(
436470
*args,
437471
start_workflow_operation:,
438472
id: SecureRandom.uuid,
473+
arg_hints: nil,
474+
result_hint: nil,
439475
rpc_options: nil
440476
)
441477
start_update_with_start_workflow(
@@ -444,6 +480,8 @@ def execute_update_with_start_workflow(
444480
start_workflow_operation:,
445481
wait_for_stage: WorkflowUpdateWaitStage::COMPLETED,
446482
id:,
483+
arg_hints:,
484+
result_hint:,
447485
rpc_options:
448486
).result
449487
end
@@ -454,6 +492,8 @@ def execute_update_with_start_workflow(
454492
# @param args [Array<Object>] Signal arguments.
455493
# @param start_workflow_operation [WithStartWorkflowOperation] Required with-start workflow operation. This may not
456494
# support all `id_conflict_policy` options.
495+
# @param arg_hints [Array<Object>, nil] Overrides converter hints for signal arguments if any. If unset/nil and the
496+
# signal definition has arg hints, those are used by default.
457497
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
458498
#
459499
# @return [WorkflowHandle] A workflow handle to the workflow.
@@ -463,13 +503,16 @@ def signal_with_start_workflow(
463503
signal,
464504
*args,
465505
start_workflow_operation:,
506+
arg_hints: nil,
466507
rpc_options: nil
467508
)
509+
signal, defn_arg_hints = Workflow::Definition::Signal._name_and_hints_from_parameter(signal)
468510
@impl.signal_with_start_workflow(
469511
Interceptor::SignalWithStartWorkflowInput.new(
470-
signal: Workflow::Definition::Signal._name_from_parameter(signal),
512+
signal:,
471513
args:,
472514
start_workflow_operation:,
515+
arg_hints: arg_hints || defn_arg_hints,
473516
rpc_options:
474517
)
475518
)

temporalio/lib/temporalio/client/async_activity_handle.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,23 +27,27 @@ def initialize(client:, task_token:, id_reference:)
2727
# Record a heartbeat for the activity.
2828
#
2929
# @param details [Array<Object>] Details of the heartbeat.
30+
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
3031
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
31-
def heartbeat(*details, rpc_options: nil)
32+
def heartbeat(*details, detail_hints: nil, rpc_options: nil)
3233
@client._impl.heartbeat_async_activity(Interceptor::HeartbeatAsyncActivityInput.new(
3334
task_token_or_id_reference:,
3435
details:,
36+
detail_hints:,
3537
rpc_options:
3638
))
3739
end
3840

3941
# Complete the activity.
4042
#
4143
# @param result [Object, nil] Result of the activity.
44+
# @param result_hint [Object, nil] Converter hint for the result.
4245
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
43-
def complete(result = nil, rpc_options: nil)
46+
def complete(result = nil, result_hint: nil, rpc_options: nil)
4447
@client._impl.complete_async_activity(Interceptor::CompleteAsyncActivityInput.new(
4548
task_token_or_id_reference:,
4649
result:,
50+
result_hint:,
4751
rpc_options:
4852
))
4953
end
@@ -52,25 +56,29 @@ def complete(result = nil, rpc_options: nil)
5256
#
5357
# @param error [Exception] Error for the activity.
5458
# @param last_heartbeat_details [Array<Object>] Last heartbeat details for the activity.
59+
# @param last_heartbeat_detail_hints [Array<Object>, nil] Converter hints for the last heartbeat details.
5560
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
56-
def fail(error, last_heartbeat_details: [], rpc_options: nil)
61+
def fail(error, last_heartbeat_details: [], last_heartbeat_detail_hints: nil, rpc_options: nil)
5762
@client._impl.fail_async_activity(Interceptor::FailAsyncActivityInput.new(
5863
task_token_or_id_reference:,
5964
error:,
6065
last_heartbeat_details:,
66+
last_heartbeat_detail_hints:,
6167
rpc_options:
6268
))
6369
end
6470

6571
# Report the activity as canceled.
6672
#
6773
# @param details [Array<Object>] Cancellation details.
74+
# @param detail_hints [Array<Object>, nil] Converter hints for the details.
6875
# @param rpc_options [RPCOptions, nil] Advanced RPC options.
6976
# @raise [AsyncActivityCanceledError] If the activity has been canceled.
70-
def report_cancellation(*details, rpc_options: nil)
77+
def report_cancellation(*details, detail_hints: nil, rpc_options: nil)
7178
@client._impl.report_cancellation_async_activity(Interceptor::ReportCancellationAsyncActivityInput.new(
7279
task_token_or_id_reference:,
7380
details:,
81+
detail_hints:,
7482
rpc_options:
7583
))
7684
end

0 commit comments

Comments
 (0)