diff --git a/temporalio/ext/src/client.rs b/temporalio/ext/src/client.rs index 8a9f448f..1b8faa75 100644 --- a/temporalio/ext/src/client.rs +++ b/temporalio/ext/src/client.rs @@ -176,12 +176,15 @@ impl Client { .await?; Ok(core) }, - move |_, result: Result| match result { - Ok(core) => callback.push(Client { - core, - runtime_handle, - }), - Err(err) => callback.push(new_error!("Failed client connect: {}", err)), + move |ruby, result: Result| match result { + Ok(core) => callback.push( + &ruby, + Client { + core, + runtime_handle, + }, + ), + Err(err) => callback.push(&ruby, new_error!("Failed client connect: {}", err)), }, ); Ok(()) @@ -325,12 +328,12 @@ where }; res.map(|msg| msg.get_ref().encode_to_vec()) }, - move |_, result| { + move |ruby, result| { match result { // TODO(cretz): Any reasonable way to prevent byte copy that is just going to get decoded into proto // object? - Ok(val) => callback.push(RString::from_slice(&val)), - Err(status) => callback.push(RpcFailure { status }), + Ok(val) => callback.push(&ruby, RString::from_slice(&val)), + Err(status) => callback.push(&ruby, RpcFailure { status }), } }, ); diff --git a/temporalio/ext/src/metric.rs b/temporalio/ext/src/metric.rs index 17cc09c7..8090e690 100644 --- a/temporalio/ext/src/metric.rs +++ b/temporalio/ext/src/metric.rs @@ -1,16 +1,20 @@ -use std::{sync::Arc, time::Duration}; +use std::{any::Any, sync::Arc, time::Duration}; use magnus::{ - class, function, method, + class, function, + gc::register_mark_object, + method, prelude::*, r_hash::ForEach, - value::{IntoId, Qfalse, Qtrue}, - DataTypeFunctions, Error, Float, Integer, RHash, RString, Ruby, Symbol, TryConvert, TypedData, - Value, + value::{IntoId, Lazy, Qfalse, Qtrue}, + DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol, + Symbol, TryConvert, TypedData, Value, +}; +use temporal_sdk_core_api::telemetry::metrics::{ + self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent, }; -use temporal_sdk_core_api::telemetry::metrics; -use crate::{error, id, runtime::Runtime, ROOT_MOD}; +use crate::{error, id, runtime::Runtime, util::SendSyncBoxValue, ROOT_MOD}; pub fn init(ruby: &Ruby) -> Result<(), Error> { let root_mod = ruby.get_inner(&ROOT_MOD); @@ -268,3 +272,221 @@ fn metric_key_value(k: Value, v: Value) -> Result>, +} + +impl BufferInstrumentRef for BufferedMetricRef {} + +#[derive(Debug)] +struct BufferedMetricAttributes { + value: SendSyncBoxValue, +} + +impl CustomMetricAttributes for BufferedMetricAttributes { + fn as_any(self: Arc) -> Arc { + self as Arc + } +} + +static METRIC_BUFFER_UPDATE: Lazy = Lazy::new(|ruby| { + let cls = ruby + .class_object() + .const_get::<_, RModule>("Temporalio") + .unwrap() + .const_get::<_, RClass>("Runtime") + .unwrap() + .const_get::<_, RClass>("MetricBuffer") + .unwrap() + .const_get("Update") + .unwrap(); + // Make sure class is not GC'd + register_mark_object(cls); + cls +}); + +static METRIC_BUFFER_METRIC: Lazy = Lazy::new(|ruby| { + let cls = ruby + .class_object() + .const_get::<_, RModule>("Temporalio") + .unwrap() + .const_get::<_, RClass>("Runtime") + .unwrap() + .const_get::<_, RClass>("MetricBuffer") + .unwrap() + .const_get("Metric") + .unwrap(); + // Make sure class is not GC'd + register_mark_object(cls); + cls +}); + +static METRIC_KIND_COUNTER: Lazy = Lazy::new(|ruby| ruby.sym_new("counter")); +static METRIC_KIND_GAUGE: Lazy = Lazy::new(|ruby| ruby.sym_new("gauge")); +static METRIC_KIND_HISTOGRAM: Lazy = Lazy::new(|ruby| ruby.sym_new("histogram")); + +pub fn convert_metric_events( + ruby: &Ruby, + events: Vec>, + durations_as_seconds: bool, +) -> Result, Error> { + let temp: Result>, Error> = events + .into_iter() + .map(|e| convert_metric_event(ruby, e, durations_as_seconds)) + .collect(); + Ok(temp?.into_iter().flatten().collect()) +} + +fn convert_metric_event( + ruby: &Ruby, + event: MetricEvent, + durations_as_seconds: bool, +) -> Result, Error> { + match event { + // Create the metric and put it on the lazy ref + MetricEvent::Create { + params, + populate_into, + kind, + } => { + let cls = ruby.get_inner(&METRIC_BUFFER_METRIC); + let val: Value = cls.funcall( + "new", + ( + // Name + params.name.to_string(), + // Description + Some(params.description) + .filter(|s| !s.is_empty()) + .map(|s| s.to_string()), + // Unit + if matches!(kind, metrics::MetricKind::HistogramDuration) + && params.unit == "duration" + { + if durations_as_seconds { + Some("s".to_owned()) + } else { + Some("ms".to_owned()) + } + } else if params.unit.is_empty() { + None + } else { + Some(params.unit.to_string()) + }, + // Kind + match kind { + metrics::MetricKind::Counter => ruby.get_inner(&METRIC_KIND_COUNTER), + metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => { + ruby.get_inner(&METRIC_KIND_GAUGE) + } + metrics::MetricKind::Histogram + | metrics::MetricKind::HistogramF64 + | metrics::MetricKind::HistogramDuration => { + ruby.get_inner(&METRIC_KIND_HISTOGRAM) + } + }, + ), + )?; + // Put on lazy ref + populate_into + .set(Arc::new(BufferedMetricRef { + value: Arc::new(SendSyncBoxValue::new(val)), + })) + .map_err(|_| error!("Failed setting metric ref"))?; + Ok(None) + } + // Create the attributes and put it on the lazy ref + MetricEvent::CreateAttributes { + populate_into, + append_from, + attributes, + } => { + // Create a hash (from existing or new) + let hash: RHash = match append_from { + Some(existing) => { + let attrs = existing + .get() + .clone() + .as_any() + .downcast::() + .map_err(|_| { + error!("Unable to downcast to expected buffered metric attributes") + })? + .value + .value(ruby); + attrs.funcall("dup", ())? + } + None => ruby.hash_new_capa(attributes.len()), + }; + // Add attributes + for kv in attributes.into_iter() { + match kv.value { + metrics::MetricValue::String(v) => hash.aset(kv.key, v)?, + metrics::MetricValue::Int(v) => hash.aset(kv.key, v)?, + metrics::MetricValue::Float(v) => hash.aset(kv.key, v)?, + metrics::MetricValue::Bool(v) => hash.aset(kv.key, v)?, + }; + } + hash.freeze(); + // Put on lazy ref + populate_into + .set(Arc::new(BufferedMetricAttributes { + value: SendSyncBoxValue::new(hash), + })) + .map_err(|_| error!("Failed setting metric attrs"))?; + Ok(None) + } + // Convert to Ruby metric update + MetricEvent::Update { + instrument, + attributes, + update, + } => { + let cls = ruby.get_inner(&METRIC_BUFFER_UPDATE); + Ok(Some( + cls.funcall( + "new", + ( + // Metric + instrument.get().clone().value.clone().value(ruby), + // Value + match update { + metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => { + ruby.into_value(v.as_secs_f64()) + } + metrics::MetricUpdateVal::Duration(v) => { + // As of this writing, https://github.com/matsadler/magnus/pull/136 not released, so we will do + // the logic ourselves + let val = v.as_millis(); + if val <= u64::MAX as u128 { + ruby.into_value(val as u64) + } else { + ruby.module_kernel() + .funcall("Integer", (val.to_string(),)) + .unwrap() + } + } + metrics::MetricUpdateVal::Delta(v) => ruby.into_value(v), + metrics::MetricUpdateVal::DeltaF64(v) => ruby.into_value(v), + metrics::MetricUpdateVal::Value(v) => ruby.into_value(v), + metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v), + }, + // Attributes + attributes + .get() + .clone() + .as_any() + .downcast::() + .map_err(|_| { + error!("Unable to downcast to expected buffered metric attributes") + })? + .value + .value(ruby), + ), + )?, + )) + } + } +} diff --git a/temporalio/ext/src/runtime.rs b/temporalio/ext/src/runtime.rs index 608d2627..2b9737b6 100644 --- a/temporalio/ext/src/runtime.rs +++ b/temporalio/ext/src/runtime.rs @@ -1,7 +1,8 @@ use super::{error, id, ROOT_MOD}; +use crate::metric::{convert_metric_events, BufferedMetricRef}; use crate::util::{without_gvl, Struct}; use magnus::{ - class, function, method, prelude::*, DataTypeFunctions, Error, Ruby, TypedData, Value, + class, function, method, prelude::*, DataTypeFunctions, Error, RArray, Ruby, TypedData, Value, }; use std::collections::HashMap; use std::net::SocketAddr; @@ -9,11 +10,13 @@ use std::str::FromStr; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::Duration; use std::{future::Future, sync::Arc}; -use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter}; +use temporal_sdk_core::telemetry::{ + build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer, +}; use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder}; use temporal_sdk_core_api::telemetry::{ - Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol, - PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, + metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder, + OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder, }; use tracing::error as log_error; use url::Url; @@ -24,6 +27,10 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> { .define_class("Runtime", class::object())?; class.define_singleton_method("new", function!(Runtime::new, 1))?; class.define_method("run_command_loop", method!(Runtime::run_command_loop, 0))?; + class.define_method( + "retrieve_buffered_metrics", + method!(Runtime::retrieve_buffered_metrics, 1), + )?; Ok(()) } @@ -33,6 +40,7 @@ pub struct Runtime { /// Separate cloneable handle that can be referenced in other Rust objects. pub(crate) handle: RuntimeHandle, async_command_rx: Receiver, + metrics_call_buffer: Option>>, } #[derive(Clone)] @@ -94,9 +102,10 @@ impl Runtime { .map_err(|err| error!("Failed initializing telemetry: {}", err))?; // Create metrics (created after Core runtime since it needs Tokio handle) + let mut metrics_call_buffer = None; if let Some(metrics) = telemetry.child(id!("metrics"))? { let _guard = core.tokio_handle().enter(); - match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.child(id!("buffered_with_size"))?) { + match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.member::>(id!("buffered_with_size"))?) { // Build OTel (Some(opentelemetry), None, None) => { let mut opts_build = OtelCollectorOptionsBuilder::default(); @@ -148,8 +157,11 @@ impl Runtime { |err| error!("Failed building starting Prometheus exporter: {}", err), )?.meter); }, - // TODO(cretz): Metric buffering - (None, None, Some(_buffer_size)) => return Err(error!("Metric buffering not yet supported")), + (None, None, Some(buffer_size)) => { + let buffer = Arc::new(MetricsCallBuffer::new(buffer_size)); + core.telemetry_mut().attach_late_init_metrics(buffer.clone()); + metrics_call_buffer = Some(buffer); + }, _ => return Err(error!("One and only one of opentelemetry, prometheus, or buffered_with_size must be set")) }; } @@ -163,6 +175,7 @@ impl Runtime { async_command_tx, }, async_command_rx, + metrics_call_buffer, }) } @@ -193,6 +206,16 @@ impl Runtime { } } } + + pub fn retrieve_buffered_metrics(&self, durations_as_seconds: bool) -> Result { + let ruby = Ruby::get().expect("Not in Ruby thread"); + let buff = self + .metrics_call_buffer + .clone() + .expect("Attempting to retrieve buffered metrics without buffer"); + let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?; + Ok(ruby.ary_new_from_values(&updates)) + } } impl RuntimeHandle { diff --git a/temporalio/ext/src/testing.rs b/temporalio/ext/src/testing.rs index d0e93b0f..a444f3e2 100644 --- a/temporalio/ext/src/testing.rs +++ b/temporalio/ext/src/testing.rs @@ -77,13 +77,16 @@ impl EphemeralServer { let runtime_handle = runtime.handle.clone(); runtime.handle.spawn( async move { opts.start_server().await }, - move |_, result| match result { - Ok(core) => callback.push(EphemeralServer { - target: core.target.clone(), - core: Mutex::new(Some(core)), - runtime_handle, - }), - Err(err) => callback.push(new_error!("Failed starting server: {}", err)), + move |ruby, result| match result { + Ok(core) => callback.push( + &ruby, + EphemeralServer { + target: core.target.clone(), + core: Mutex::new(Some(core)), + runtime_handle, + }, + ), + Err(err) => callback.push(&ruby, new_error!("Failed starting server: {}", err)), }, ); Ok(()) @@ -109,13 +112,16 @@ impl EphemeralServer { let runtime_handle = runtime.handle.clone(); runtime.handle.spawn( async move { opts.start_server().await }, - move |_, result| match result { - Ok(core) => callback.push(EphemeralServer { - target: core.target.clone(), - core: Mutex::new(Some(core)), - runtime_handle, - }), - Err(err) => callback.push(new_error!("Failed starting server: {}", err)), + move |ruby, result| match result { + Ok(core) => callback.push( + &ruby, + EphemeralServer { + target: core.target.clone(), + core: Mutex::new(Some(core)), + runtime_handle, + }, + ), + Err(err) => callback.push(&ruby, new_error!("Failed starting server: {}", err)), }, ); Ok(()) @@ -149,9 +155,9 @@ impl EphemeralServer { .spawn( async move { core.shutdown().await }, move |ruby, result| match result { - Ok(_) => callback.push(ruby.qnil()), + Ok(_) => callback.push(&ruby, ruby.qnil()), Err(err) => { - callback.push(new_error!("Failed shutting down server: {}", err)) + callback.push(&ruby, new_error!("Failed shutting down server: {}", err)) } }, ) diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index aa0844a8..083d6ca0 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -103,27 +103,44 @@ where } } +/// Utility for pushing a result to a queue in an async callback. pub(crate) struct AsyncCallback { - queue: BoxValue, + queue: SendSyncBoxValue, } -// We trust our usage of this across threads. We would use Opaque but we can't -// box that properly/safely. The inner queue is always expected to be a Ruby -// Queue. -unsafe impl Send for AsyncCallback {} -unsafe impl Sync for AsyncCallback {} - impl AsyncCallback { pub(crate) fn from_queue(queue: Value) -> Self { Self { - queue: BoxValue::new(queue), + queue: SendSyncBoxValue::new(queue), } } - pub(crate) fn push(&self, value: V) -> Result<(), Error> + pub(crate) fn push(&self, ruby: &Ruby, value: V) -> Result<(), Error> where V: IntoValue, { - self.queue.funcall(id!("push"), (value,)).map(|_: Value| ()) + let queue = self.queue.value(ruby); + queue.funcall(id!("push"), (value,)).map(|_: Value| ()) + } +} + +/// Utility that basically combines Magnus BoxValue with Magnus Opaque. It's a +/// Send/Sync safe Ruby value that prevents GC until dropped and is only +/// accessible from a Ruby thread. +#[derive(Debug)] +pub(crate) struct SendSyncBoxValue(BoxValue); + +// We trust our usage of this across threads. We would use Opaque but we can't +// box that properly/safely to ensure it does not get GC'd. +unsafe impl Send for SendSyncBoxValue {} +unsafe impl Sync for SendSyncBoxValue {} + +impl SendSyncBoxValue { + pub fn new(val: T) -> Self { + Self(BoxValue::new(val)) + } + + pub fn value(&self, _: &Ruby) -> T { + *self.0 } } diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index ea39a061..2dc50fe0 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -225,21 +225,23 @@ impl Worker { Ok(None) => ruby.qnil().as_value(), Err(err) => new_error!("Poll failure: {}", err).as_value(), }; - callback.push(ruby.ary_new_from_values(&[ - poll_result.worker_index.into_value(), - worker_type.into_value(), - result, - ])) + callback.push( + &ruby, + ruby.ary_new_from_values(&[ + poll_result.worker_index.into_value(), + worker_type.into_value(), + result, + ]), + ) }))); } }, move |ruby, _| { // Call with nil, nil, nil to say done - complete_callback.push(ruby.ary_new_from_values(&[ - ruby.qnil(), - ruby.qnil(), - ruby.qnil(), - ])) + complete_callback.push( + &ruby, + ruby.ary_new_from_values(&[ruby.qnil(), ruby.qnil(), ruby.qnil()]), + ) }, ); Ok(()) @@ -289,13 +291,16 @@ impl Worker { }, move |ruby, errs| { if errs.is_empty() { - callback.push(ruby.qnil()) + callback.push(&ruby, ruby.qnil()) } else { - callback.push(new_error!( - "{} worker(s) failed to finalize, reasons: {}", - errs.len(), - errs.join(", ") - )) + callback.push( + &ruby, + new_error!( + "{} worker(s) failed to finalize, reasons: {}", + errs.len(), + errs.join(", ") + ), + ) } }, ); @@ -308,8 +313,8 @@ impl Worker { self.runtime_handle.spawn( async move { temporal_sdk_core_api::Worker::validate(&*worker).await }, move |ruby, result| match result { - Ok(()) => callback.push(ruby.qnil()), - Err(err) => callback.push(new_error!("Failed validating worker: {}", err)), + Ok(()) => callback.push(&ruby, ruby.qnil()), + Err(err) => callback.push(&ruby, new_error!("Failed validating worker: {}", err)), }, ); Ok(()) @@ -325,8 +330,8 @@ impl Worker { temporal_sdk_core_api::Worker::complete_activity_task(&*worker, completion).await }, move |ruby, result| match result { - Ok(()) => callback.push((ruby.qnil(),)), - Err(err) => callback.push((new_error!("Completion failure: {}", err),)), + Ok(()) => callback.push(&ruby, (ruby.qnil(),)), + Err(err) => callback.push(&ruby, (new_error!("Completion failure: {}", err),)), }, ); Ok(()) @@ -357,16 +362,19 @@ impl Worker { .await }, move |ruby, result| { - callback.push(ruby.ary_new_from_values(&[ - (-1).into_value_with(&ruby), - run_id.into_value_with(&ruby), - match result { - Ok(()) => ruby.qnil().into_value_with(&ruby), - Err(err) => { - new_error!("Completion failure: {}", err).into_value_with(&ruby) - } - }, - ])) + callback.push( + &ruby, + ruby.ary_new_from_values(&[ + (-1).into_value_with(&ruby), + run_id.into_value_with(&ruby), + match result { + Ok(()) => ruby.qnil().into_value_with(&ruby), + Err(err) => { + new_error!("Completion failure: {}", err).into_value_with(&ruby) + } + }, + ]), + ) }, ); Ok(()) diff --git a/temporalio/lib/temporalio/runtime.rb b/temporalio/lib/temporalio/runtime.rb index 945e21fe..3360d58a 100644 --- a/temporalio/lib/temporalio/runtime.rb +++ b/temporalio/lib/temporalio/runtime.rb @@ -4,6 +4,7 @@ require 'temporalio/internal/bridge/runtime' require 'temporalio/internal/metric' require 'temporalio/metric' +require 'temporalio/runtime/metric_buffer' module Temporalio # Runtime for Temporal Ruby SDK. @@ -107,6 +108,7 @@ def _to_bridge MetricsOptions = Data.define( :opentelemetry, :prometheus, + :buffer, :attach_service_name, :global_tags, :metric_prefix @@ -116,10 +118,13 @@ def _to_bridge # # @!attribute opentelemetry # @return [OpenTelemetryMetricsOptions, nil] OpenTelemetry options if using OpenTelemetry. This is mutually - # exclusive with +prometheus+. + # exclusive with `prometheus` and `buffer`. # @!attribute prometheus # @return [PrometheusMetricsOptions, nil] Prometheus options if using Prometheus. This is mutually exclusive with - # +opentelemetry+. + # `opentelemetry` and `buffer`. + # @!attribute buffer + # @return [MetricBuffer, nil] Metric buffer to send all metrics to. This is mutually exclusive with `prometheus` + # and `opentelemetry`. # @!attribute attach_service_name # @return [Boolean] Whether to put the service_name on every metric. # @!attribute global_tags @@ -130,19 +135,26 @@ class MetricsOptions # Create metrics options. Either `opentelemetry` or `prometheus` required, but not both. # # @param opentelemetry [OpenTelemetryMetricsOptions, nil] OpenTelemetry options if using OpenTelemetry. This is - # mutually exclusive with `prometheus`. + # mutually exclusive with `prometheus` and `buffer`. # @param prometheus [PrometheusMetricsOptions, nil] Prometheus options if using Prometheus. This is mutually - # exclusive with `opentelemetry`. + # exclusive with `opentelemetry` and `buffer`. + # @param buffer [MetricBuffer, nil] Metric buffer to send all metrics to. This is mutually exclusive with + # `prometheus` and `opentelemetry`. # @param attach_service_name [Boolean] Whether to put the service_name on every metric. # @param global_tags [Hash, nil] Resource tags to be applied to all metrics. # @param metric_prefix [String, nil] Prefix to put on every Temporal metric. If unset, defaults to `temporal_`. def initialize( opentelemetry: nil, prometheus: nil, + buffer: nil, attach_service_name: true, global_tags: nil, metric_prefix: nil ) + if [opentelemetry, prometheus, buffer].count { |v| !v.nil? } > 1 + raise 'Can only have one of opentelemetry, prometheus, or buffer' + end + super end @@ -152,6 +164,7 @@ def _to_bridge Internal::Bridge::Runtime::MetricsOptions.new( opentelemetry: opentelemetry&._to_bridge, prometheus: prometheus&._to_bridge, + buffered_with_size: buffer&._buffer_size, attach_service_name:, global_tags:, metric_prefix: @@ -301,6 +314,9 @@ def self.default=(runtime) # # @param telemetry [TelemetryOptions] Telemetry options to set. def initialize(telemetry: TelemetryOptions.new) + # Set runtime on the buffer which will fail if the buffer is used on another runtime + telemetry.metrics&.buffer&._set_runtime(self) + @core_runtime = Internal::Bridge::Runtime.new( Internal::Bridge::Runtime::Options.new(telemetry: telemetry._to_bridge) ) diff --git a/temporalio/lib/temporalio/runtime/metric_buffer.rb b/temporalio/lib/temporalio/runtime/metric_buffer.rb new file mode 100644 index 00000000..43e8e710 --- /dev/null +++ b/temporalio/lib/temporalio/runtime/metric_buffer.rb @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +module Temporalio + class Runtime + # Metric buffer for use with a runtime to capture metrics. Only one metric buffer can be associated with a runtime + # and {retrieve_updates} cannot be called before the runtime is created. Once runtime created, users should + # regularly call {retrieve_updates} to drain the buffer. + # + # @note WARNING: It is important that the buffer size is set to a high number and that {retrieve_updates} is called + # regularly to drain the buffer. If the buffer is full, metric updates will be dropped and an error will be + # logged. + class MetricBuffer + # Enumerates for the duration format. + module DurationFormat + # Durations are millisecond integers. + MILLISECONDS = :milliseconds + + # Durations are second floats. + SECONDS = :seconds + end + + Update = Data.define(:metric, :value, :attributes) + + # Metric buffer update. + # + # @note WARNING: The constructor of this class should not be invoked by users and may change in incompatible ways + # in the future. + # + # @!attribute metric + # @return [Metric] Metric for this update. For performance reasons, this is created lazily on first use and is + # the same object each time an update on this metric exists. + # @!attribute value + # @return [Integer, Float] Metric value for this update. + # @!attribute attributes + # @return [Hash{String => String, Integer, Float, Boolean}] Attributes for this value as a frozen hash. + # For performance reasons this is sometimes the same hash if the attribute set is reused at a metric level. + class Update # rubocop:disable Lint/EmptyClass + # DEV NOTE: This class is instantiated inside Rust, be careful changing it. + end + + Metric = Data.define(:name, :description, :unit, :kind) + + # Metric definition present on an update. + # + # @!attribute name + # @return [String] Name of the metric. + # @!attribute description + # @return [String, nil] Description of the metric if any. + # @!attribute unit + # @return [String, nil] Unit of the metric if any. + # @!attribute kind + # @return [:counter, :histogram, :gauge] Kind of the metric. + class Metric # rubocop:disable Lint/EmptyClass + # DEV NOTE: This class is instantiated inside Rust, be careful changing it. + end + + # Create a metric buffer with the given size. + # + # @note WARNING: It is important that the buffer size is set to a high number and is drained regularly. See + # {MetricBuffer} warning. + # + # @param buffer_size [Integer] Maximum size of the buffer before metrics will be dropped. + # @param duration_format [DurationFormat] How durations are represented. + def initialize(buffer_size, duration_format: DurationFormat::MILLISECONDS) + @buffer_size = buffer_size + @duration_format = duration_format + @runtime = nil + end + + # Drain the buffer and return all metric updates. + # + # @note WARNING: It is important that this is called regularly. See {MetricBuffer} warning. + # + # @return [Array] Updates since last time this was called. + def retrieve_updates + raise 'Attempting to retrieve updates before runtime created' unless @runtime + + @runtime._core_runtime.retrieve_buffered_metrics(@duration_format == DurationFormat::SECONDS) + end + + # @!visibility private + def _buffer_size + @buffer_size + end + + # @!visibility private + def _set_runtime(runtime) + raise 'Metric buffer already attached to a runtime' if @runtime + + @runtime = runtime + end + end + end +end diff --git a/temporalio/sig/temporalio/internal/bridge/runtime.rbs b/temporalio/sig/temporalio/internal/bridge/runtime.rbs index dd71721e..92cf5e97 100644 --- a/temporalio/sig/temporalio/internal/bridge/runtime.rbs +++ b/temporalio/sig/temporalio/internal/bridge/runtime.rbs @@ -29,6 +29,7 @@ module Temporalio class MetricsOptions attr_accessor opentelemetry: OpenTelemetryMetricsOptions? attr_accessor prometheus: PrometheusMetricsOptions? + attr_accessor buffered_with_size: Integer? attr_accessor attach_service_name: bool attr_accessor global_tags: Hash[String, String]? attr_accessor metric_prefix: String? @@ -36,6 +37,7 @@ module Temporalio def initialize: ( opentelemetry: OpenTelemetryMetricsOptions?, prometheus: PrometheusMetricsOptions?, + buffered_with_size: Integer?, attach_service_name: bool, global_tags: Hash[String, String]?, metric_prefix: String? @@ -79,6 +81,8 @@ module Temporalio def self.new: (Options options) -> Runtime def run_command_loop: -> void + + def retrieve_buffered_metrics: (bool durations_as_seconds) -> Array[Temporalio::Runtime::MetricBuffer::Update] end end end diff --git a/temporalio/sig/temporalio/runtime.rbs b/temporalio/sig/temporalio/runtime.rbs index 0fa4a9bd..adfebd32 100644 --- a/temporalio/sig/temporalio/runtime.rbs +++ b/temporalio/sig/temporalio/runtime.rbs @@ -39,6 +39,7 @@ module Temporalio class MetricsOptions attr_reader opentelemetry: OpenTelemetryMetricsOptions? attr_reader prometheus: PrometheusMetricsOptions? + attr_reader buffer: MetricBuffer? attr_reader attach_service_name: bool attr_reader global_tags: Hash[String, String]? attr_reader metric_prefix: String? @@ -46,6 +47,7 @@ module Temporalio def initialize: ( ?opentelemetry: OpenTelemetryMetricsOptions?, ?prometheus: PrometheusMetricsOptions?, + ?buffer: MetricBuffer?, ?attach_service_name: bool, ?global_tags: Hash[String, String]?, ?metric_prefix: String? diff --git a/temporalio/sig/temporalio/runtime/metric_buffer.rbs b/temporalio/sig/temporalio/runtime/metric_buffer.rbs new file mode 100644 index 00000000..f3e9f20a --- /dev/null +++ b/temporalio/sig/temporalio/runtime/metric_buffer.rbs @@ -0,0 +1,49 @@ +module Temporalio + class Runtime + class MetricBuffer + module DurationFormat + type enum = Symbol + + MILLISECONDS: enum + SECONDS: enum + end + + class Update + attr_reader metric: Metric + attr_reader value: Integer | Float + attr_reader attributes: Hash[String, String | Integer | Float | bool] + + def initialize: ( + metric: Metric, + value: Integer | Float, + attributes: Hash[String, String | Integer | Float | bool] + ) -> void + end + + class Metric + attr_reader name: String + attr_reader description: String? + attr_reader unit: String? + attr_reader kind: (:counter | :histogram | :gauge) + + def initialize: ( + name: String, + description: String?, + unit: String?, + kind: (:counter | :histogram | :gauge) + ) -> void + end + + def initialize: ( + Integer buffer_size, + ?duration_format: DurationFormat::enum + ) -> void + + def retrieve_updates: -> Array[Update] + + def _buffer_size: -> Integer + + def _set_runtime: (Runtime runtime) -> void + end + end +end \ No newline at end of file diff --git a/temporalio/test/worker_workflow_test.rb b/temporalio/test/worker_workflow_test.rb index b3fbe553..18799ede 100644 --- a/temporalio/test/worker_workflow_test.rb +++ b/temporalio/test/worker_workflow_test.rb @@ -1630,6 +1630,130 @@ def test_custom_metrics assert line.end_with?(' 4560') end + def test_workflow_buffered_metrics + # Create runtime with metric buffer + buffer = Temporalio::Runtime::MetricBuffer.new(10_000) + runtime = Temporalio::Runtime.new( + telemetry: Temporalio::Runtime::TelemetryOptions.new(metrics: Temporalio::Runtime::MetricsOptions.new(buffer:)) + ) + + # Confirm nothing there yet + assert_equal [], buffer.retrieve_updates + + # Create a counter and make one with more attrs + runtime_counter = runtime.metric_meter.create_metric( + :counter, 'runtime-counter', description: 'runtime-counter-desc', unit: 'runtime-counter-unit' + ) + runtime_counter_with_attrs = runtime_counter.with_additional_attributes({ 'foo' => 'bar', 'baz' => 123 }) + + # Send adds to both + runtime_counter.record(100) + runtime_counter_with_attrs.record(200) + + # Get updates and check their values + updates1 = buffer.retrieve_updates + assert_equal [ + Temporalio::Runtime::MetricBuffer::Update.new( + metric: Temporalio::Runtime::MetricBuffer::Metric.new( + name: 'runtime-counter', description: 'runtime-counter-desc', unit: 'runtime-counter-unit', kind: :counter + ), + value: 100, + attributes: { 'service_name' => 'temporal-core-sdk' } + ), + Temporalio::Runtime::MetricBuffer::Update.new( + metric: Temporalio::Runtime::MetricBuffer::Metric.new( + name: 'runtime-counter', description: 'runtime-counter-desc', unit: 'runtime-counter-unit', kind: :counter + ), + value: 200, + attributes: { 'service_name' => 'temporal-core-sdk', 'foo' => 'bar', 'baz' => 123 } + ) + ], updates1 + # Also confirm that for performance reasons the metrics are actually the same object + assert_same updates1.first.metric, updates1.last.metric + + # Confirm no more updates + assert_equal [], buffer.retrieve_updates + + # Send some more adds and check + runtime_counter.record(300) + runtime_counter_with_attrs.record(400) + updates2 = buffer.retrieve_updates + assert_equal 2, updates2.size + assert_same updates1.first.metric, updates2.first.metric + assert_same updates1.first.attributes, updates2.first.attributes + assert_equal 300, updates2.first.value + assert_same updates1.last.metric, updates2.last.metric + assert_same updates1.last.attributes, updates2.last.attributes + assert_equal 400, updates2.last.value + + # Confirm no more updates + assert_equal [], buffer.retrieve_updates + + # Test simple gauge + runtime_gauge = runtime.metric_meter.create_metric(:gauge, 'runtime-gauge', value_type: :float) + runtime_gauge.record(1.23, additional_attributes: { 'somekey' => true }) + updates3 = buffer.retrieve_updates + assert_equal [ + Temporalio::Runtime::MetricBuffer::Update.new( + metric: Temporalio::Runtime::MetricBuffer::Metric.new( + name: 'runtime-gauge', description: nil, unit: nil, kind: :gauge + ), + value: 1.23, + attributes: { 'service_name' => 'temporal-core-sdk', 'somekey' => true } + ) + ], updates3 + + # Confirm no more updates + assert_equal [], buffer.retrieve_updates + + # Create a new client on the runtime and execute the custom metric workflow + conn_opts = env.client.connection.options.with(runtime:) + client_opts = env.client.options.with( + connection: Temporalio::Client::Connection.new(**conn_opts.to_h) + ) + client = Temporalio::Client.new(**client_opts.to_h) # steep:ignore + task_queue = "tq-#{SecureRandom.uuid}" + assert_equal 'done', execute_workflow( + CustomMetricsWorkflow, + activities: [CustomMetricsActivity], + client:, + task_queue: + ) + + # Drain updates and confirm updates exist as expected + updates = buffer.retrieve_updates + # Workflow histogram + assert_includes updates, Temporalio::Runtime::MetricBuffer::Update.new( + metric: Temporalio::Runtime::MetricBuffer::Metric.new( + name: 'my-workflow-histogram', description: nil, unit: nil, kind: :histogram + ), + value: 4560, + attributes: { + 'service_name' => 'temporal-core-sdk', + 'namespace' => 'default', + 'task_queue' => task_queue, + 'workflow_type' => 'CustomMetricsWorkflow', + 'someattr' => 'someval2', + 'anotherattr' => 'anotherval2' + } + ) + # Activity counter + assert_includes updates, Temporalio::Runtime::MetricBuffer::Update.new( + metric: Temporalio::Runtime::MetricBuffer::Metric.new( + name: 'my-activity-counter', description: nil, unit: nil, kind: :counter + ), + value: 123, + attributes: { + 'service_name' => 'temporal-core-sdk', + 'namespace' => 'default', + 'task_queue' => task_queue, + 'activity_type' => 'CustomMetricsActivity', + 'someattr' => 'someval1', + 'anotherattr' => 'anotherval1' + } + ) + end + class FailWorkflowPayloadConverter < Temporalio::Converters::PayloadConverter def to_payload(value) if value == 'fail-on-this-result'