Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,9 @@ signal of course). These definition-level hints are passed to converters both fr
implementation side.

There are some advanced payload uses in the SDK that do not currently have a way to set hints. These include
workflow/schedule memo, workflow get/upsert memo, activity last heartbeat details, and application error details. In
some cases, users can use `Temporalio::Converters::RawValue` and then manually convert with hints. For others, hints can
be added as needed, please open an issue or otherwise contact Temporal.
workflow/schedule memo, workflow get/upsert memo, and application error details. In some cases, users can use
`Temporalio::Converters::RawValue` and then manually convert with hints. For others, hints can be added as needed,
please open an issue or otherwise contact Temporal.

### Workers

Expand Down
26 changes: 22 additions & 4 deletions temporalio/lib/temporalio/activity/info.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
# frozen_string_literal: true

require 'temporalio/activity/context'
require 'temporalio/internal/proto_utils'

module Temporalio
module Activity
Info = Data.define(
:activity_id,
:activity_type,
:attempt,
:current_attempt_scheduled_time,
:heartbeat_details,
:heartbeat_timeout,
:local?,
:priority,
:raw_heartbeat_details,
:schedule_to_close_timeout,
:scheduled_time,
:start_to_close_timeout,
Expand All @@ -33,14 +36,15 @@ module Activity
# @return [Integer] Attempt the activity is on.
# @!attribute current_attempt_scheduled_time
# @return [Time] When the current attempt was scheduled.
# @!attribute heartbeat_details
# @return [Array<Object>] Details from the last heartbeat of the last attempt.
# @!attribute heartbeat_timeout
# @return [Float, nil] Heartbeat timeout set by the caller.
# @!attribute local?
# @return [Boolean] Whether the activity is a local activity or not.
# @!attribute priority
# @return [Priority] The priority of this activity.
# @!attribute raw_heartbeat_details
# @return [Array<Converter::RawValue>] Raw details from the last heartbeat of the last attempt. Can use
# {heartbeat_details} to get lazily-converted values.
# @!attribute schedule_to_close_timeout
# @return [Float, nil] Schedule to close timeout set by the caller.
# @!attribute scheduled_time
Expand All @@ -65,6 +69,20 @@ module Activity
#
# @note WARNING: This class may have required parameters added to its constructor. Users should not instantiate this
# class or it may break in incompatible ways.
class Info; end # rubocop:disable Lint/EmptyClass
class Info
# Convert raw heartbeat details into Ruby types.
#
# Note, this live-converts every invocation.
#
# @param hints [Array<Object>, nil] Hints, if any, to assist conversion.
# @return [Array<Object>] Converted details.
def heartbeat_details(hints: nil)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is already covered by existing tests

Internal::ProtoUtils.convert_from_payload_array(
Context.current.payload_converter,
raw_heartbeat_details.map(&:payload),
hints:
)
end
end
end
end
13 changes: 7 additions & 6 deletions temporalio/lib/temporalio/internal/worker/activity_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,15 @@ def execute_activity(task_token, defn, start)
current_attempt_scheduled_time: Internal::ProtoUtils.timestamp_to_time(
start.current_attempt_scheduled_time
) || raise, # Never nil
heartbeat_details: ProtoUtils.convert_from_payload_array(
@worker.options.client.data_converter,
start.heartbeat_details.to_ary,
hints: nil
),
heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
local?: start.is_local,
priority: Priority._from_proto(start.priority),
raw_heartbeat_details: begin
payloads = start.heartbeat_details.to_ary
codec = @worker.options.client.data_converter.payload_codec
payloads = codec.decode(payloads) if codec
payloads.map { |p| Temporalio::Converters::RawValue.new(p) }
end,
schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),
Expand All @@ -194,7 +195,7 @@ def execute_activity(task_token, defn, start)
workflow_namespace: start.workflow_namespace,
workflow_run_id: start.workflow_execution.run_id,
workflow_type: start.workflow_type
).freeze
)

# Build input
input = Temporalio::Worker::Interceptor::Activity::ExecuteInput.new(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ def self.default_info
activity_type: 'unknown',
attempt: 1,
current_attempt_scheduled_time: Time.at(0),
heartbeat_details: [],
heartbeat_timeout: nil,
local?: false,
priority: Temporalio::Priority.default,
raw_heartbeat_details: [],
schedule_to_close_timeout: 1.0,
scheduled_time: Time.at(0),
start_to_close_timeout: 1.0,
Expand Down
6 changes: 4 additions & 2 deletions temporalio/sig/temporalio/activity/info.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ module Temporalio
attr_reader activity_type: String
attr_reader attempt: Integer
attr_reader current_attempt_scheduled_time: Time
attr_reader heartbeat_details: Array[Object?]
attr_reader heartbeat_timeout: Float?
attr_reader local?: bool
attr_reader priority: Temporalio::Priority
attr_reader raw_heartbeat_details: Array[Converters::RawValue]
attr_reader schedule_to_close_timeout: Float?
attr_reader scheduled_time: Time
attr_reader start_to_close_timeout: Float?
Expand All @@ -25,10 +25,10 @@ module Temporalio
activity_type: String,
attempt: Integer,
current_attempt_scheduled_time: Time,
heartbeat_details: Array[Object?],
heartbeat_timeout: Float?,
local?: bool,
priority: Temporalio::Priority?,
raw_heartbeat_details: Array[Converters::RawValue],
schedule_to_close_timeout: Float?,
scheduled_time: Time,
start_to_close_timeout: Float?,
Expand All @@ -41,6 +41,8 @@ module Temporalio
workflow_type: String
) -> void

def heartbeat_details: (?hints: Array[Object]?) -> Array[Object?]

def with: (**untyped) -> Info
end
end
Expand Down
Loading