Skip to content

Commit 05dac86

Browse files
committed
Support for custom metrics in workflows and minor README fixes
1 parent 65247ce commit 05dac86

File tree

16 files changed

+183
-58
lines changed

16 files changed

+183
-58
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class SayHelloActivity < Temporalio::Activity::Definition
114114
end
115115
```
116116

117-
Now to create the workflow, put the following in `say_hello_workflow.rb`:
117+
Workflows are also classes. To create the workflow, put the following in `say_hello_workflow.rb`:
118118

119119
```ruby
120120
require 'temporalio/workflow'
@@ -178,7 +178,7 @@ client = Temporalio::Client.connect('localhost:7233', 'my-namespace')
178178
# Run workflow
179179
result = client.execute_workflow(
180180
SayHelloWorkflow,
181-
'Temporal',
181+
'Temporal', # This is the workflow argument
182182
id: 'my-workflow-id',
183183
task_queue: 'my-task-queue'
184184
)
@@ -641,6 +641,8 @@ from workflows including:
641641
be called outside of a workflow without raising an exception.
642642
* `info` - Immutable workflow information.
643643
* `logger` - A Ruby logger that adds contextual information and takes care not to log on replay.
644+
* `metric_meter` - A metric meter for making custom metrics that adds contextual information and takes care not to log
645+
on replay.
644646
* `random` - A deterministic `Random` instance.
645647
* `memo` - A read-only hash of the memo (updated via `upsert_memo`).
646648
* `search_attributes` - A read-only `SearchAttributes` collection (updated via `upsert_search_attributes`).

temporalio/lib/temporalio/internal/worker/workflow_instance.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
require 'temporalio/internal/worker/workflow_instance/inbound_implementation'
1919
require 'temporalio/internal/worker/workflow_instance/outbound_implementation'
2020
require 'temporalio/internal/worker/workflow_instance/replay_safe_logger'
21+
require 'temporalio/internal/worker/workflow_instance/replay_safe_metric'
2122
require 'temporalio/internal/worker/workflow_instance/scheduler'
2223
require 'temporalio/retry_policy'
2324
require 'temporalio/scoped_logger'
@@ -64,6 +65,7 @@ def initialize(details)
6465
end
6566
@logger = ReplaySafeLogger.new(logger: details.logger, instance: self)
6667
@logger.scoped_values_getter = proc { scoped_logger_info }
68+
@runtime_metric_meter = details.metric_meter
6769
@scheduler = Scheduler.new(self)
6870
@payload_converter = details.payload_converter
6971
@failure_converter = details.failure_converter
@@ -189,6 +191,18 @@ def patch(patch_id:, deprecated:)
189191
end
190192
end
191193

194+
def metric_meter
195+
@metric_meter ||= ReplaySafeMetric::Meter.new(
196+
@runtime_metric_meter.with_additional_attributes(
197+
{
198+
namespace: info.namespace,
199+
task_queue: info.task_queue,
200+
workflow_type: info.workflow_type
201+
}
202+
)
203+
)
204+
end
205+
192206
private
193207

194208
def run_in_scheduler(&)

temporalio/lib/temporalio/internal/worker/workflow_instance/context.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ def memo
136136
@instance.memo
137137
end
138138

139+
def metric_meter
140+
@instance.metric_meter
141+
end
142+
139143
def now
140144
@instance.now
141145
end

temporalio/lib/temporalio/internal/worker/workflow_instance/details.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ module Worker
66
class WorkflowInstance
77
# Details needed to instantiate a {WorkflowInstance}.
88
class Details
9-
attr_reader :namespace, :task_queue, :definition, :initial_activation, :logger, :payload_converter,
10-
:failure_converter, :interceptors, :disable_eager_activity_execution, :illegal_calls,
11-
:workflow_failure_exception_types
9+
attr_reader :namespace, :task_queue, :definition, :initial_activation, :logger, :metric_meter,
10+
:payload_converter, :failure_converter, :interceptors, :disable_eager_activity_execution,
11+
:illegal_calls, :workflow_failure_exception_types
1212

1313
def initialize(
1414
namespace:,
1515
task_queue:,
1616
definition:,
1717
initial_activation:,
1818
logger:,
19+
metric_meter:,
1920
payload_converter:,
2021
failure_converter:,
2122
interceptors:,
@@ -28,6 +29,7 @@ def initialize(
2829
@definition = definition
2930
@initial_activation = initial_activation
3031
@logger = logger
32+
@metric_meter = metric_meter
3133
@payload_converter = payload_converter
3234
@failure_converter = failure_converter
3335
@interceptors = interceptors
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/scoped_logger'
4+
5+
module Temporalio
6+
module Internal
7+
module Worker
8+
class WorkflowInstance
9+
# Wrapper for a metric that does not log on replay.
10+
class ReplaySafeMetric < SimpleDelegator
11+
def record(value, additional_attributes: nil)
12+
return if Temporalio::Workflow.in_workflow? && Temporalio::Workflow::Unsafe.replaying?
13+
14+
super
15+
end
16+
17+
def with_additional_attributes(additional_attributes)
18+
ReplaySafeMetric.new(super)
19+
end
20+
21+
class Meter < SimpleDelegator
22+
def create_metric(
23+
metric_type,
24+
name,
25+
description: nil,
26+
unit: nil,
27+
value_type: :integer
28+
)
29+
ReplaySafeMetric.new(super)
30+
end
31+
32+
def with_additional_attributes(additional_attributes)
33+
Meter.new(super)
34+
end
35+
end
36+
end
37+
end
38+
end
39+
end
40+
end

temporalio/lib/temporalio/internal/worker/workflow_worker.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def initialize(worker:, bridge_worker:, workflow_definitions:)
5656
workflow_definitions:,
5757
bridge_worker:,
5858
logger: worker.options.logger,
59+
metric_meter: worker.options.client.connection.options.runtime.metric_meter,
5960
data_converter: worker.options.client.data_converter,
6061
deadlock_timeout: worker.options.debug_mode ? nil : 2.0,
6162
# TODO(cretz): Make this more performant for the default set?
@@ -140,18 +141,19 @@ def apply_codec_on_payload_visit(payload_or_payloads, &)
140141
end
141142

142143
class State
143-
attr_reader :workflow_definitions, :bridge_worker, :logger, :data_converter, :deadlock_timeout,
144+
attr_reader :workflow_definitions, :bridge_worker, :logger, :metric_meter, :data_converter, :deadlock_timeout,
144145
:illegal_calls, :namespace, :task_queue, :disable_eager_activity_execution,
145146
:workflow_interceptors, :workflow_failure_exception_types
146147

147148
def initialize(
148-
workflow_definitions:, bridge_worker:, logger:, data_converter:, deadlock_timeout:,
149+
workflow_definitions:, bridge_worker:, logger:, metric_meter:, data_converter:, deadlock_timeout:,
149150
illegal_calls:, namespace:, task_queue:, disable_eager_activity_execution:,
150151
workflow_interceptors:, workflow_failure_exception_types:
151152
)
152153
@workflow_definitions = workflow_definitions
153154
@bridge_worker = bridge_worker
154155
@logger = logger
156+
@metric_meter = metric_meter
155157
@data_converter = data_converter
156158
@deadlock_timeout = deadlock_timeout
157159
@illegal_calls = illegal_calls

temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ def create_instance(initial_activation, worker_state)
203203
definition:,
204204
initial_activation:,
205205
logger: worker_state.logger,
206+
metric_meter: worker_state.metric_meter,
206207
payload_converter: worker_state.data_converter.payload_converter,
207208
failure_converter: worker_state.data_converter.failure_converter,
208209
interceptors: worker_state.workflow_interceptors,

temporalio/lib/temporalio/workflow.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,12 @@ def self.memo
232232
_current.memo
233233
end
234234

235+
# @return [Metric::Meter] Metric meter to create metrics on. This metric meter already contains some
236+
# workflow-specific attributes and takes care not to apply metrics during replay.
237+
def self.metric_meter
238+
_current.metric_meter
239+
end
240+
235241
# @return [Time] Current UTC time for this workflow. This creates and returns a new {::Time} instance every time it
236242
# is invoked, it is not the same instance continually mutated.
237243
def self.now

temporalio/sig/temporalio/internal/worker/workflow_instance.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ module Temporalio
5252

5353
def patch: (patch_id: Symbol | String, deprecated: bool) -> bool
5454

55+
def metric_meter: -> Temporalio::Metric::Meter
56+
5557
def run_in_scheduler: [T] { -> T } -> T
5658

5759
def activate_internal: (untyped activation) -> untyped

temporalio/sig/temporalio/internal/worker/workflow_instance/context.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ module Temporalio
5959

6060
def memo: -> ExternallyImmutableHash[String, Object?]
6161

62+
def metric_meter: -> Temporalio::Metric::Meter
63+
6264
def now: -> Time
6365

6466
def patched: (String patch_id) -> bool

0 commit comments

Comments
 (0)