Skip to content

Commit 3acd1b5

Browse files
committed
Buffered metric support
Fixes #176
1 parent 6218c1f commit 3acd1b5

File tree

8 files changed

+558
-17
lines changed

8 files changed

+558
-17
lines changed

temporalio/ext/src/metric.rs

Lines changed: 235 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::{any::Any, rc::Rc, sync::Arc, time::Duration};
22

33
use magnus::{
4-
class, function, method,
4+
class, function,
5+
gc::register_mark_object,
6+
method,
57
prelude::*,
68
r_hash::ForEach,
7-
value::{IntoId, Qfalse, Qtrue},
8-
DataTypeFunctions, Error, Float, Integer, RHash, RString, Ruby, Symbol, TryConvert, TypedData,
9-
Value,
9+
value::{BoxValue, IntoId, Lazy, Qfalse, Qtrue},
10+
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
11+
Symbol, TryConvert, TypedData, Value,
12+
};
13+
use temporal_sdk_core_api::telemetry::metrics::{
14+
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
1015
};
11-
use temporal_sdk_core_api::telemetry::metrics;
1216

1317
use crate::{error, id, runtime::Runtime, ROOT_MOD};
1418

@@ -268,3 +272,228 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error
268272
};
269273
Ok(metrics::MetricKeyValue::new(key, val))
270274
}
275+
276+
#[derive(Clone, Debug)]
277+
pub struct BufferedMetricRef {
278+
value: Rc<BoxValue<Value>>,
279+
}
280+
281+
impl BufferInstrumentRef for BufferedMetricRef {}
282+
283+
// We can't use Ruby Opaque because it doesn't protect the object from being
284+
// GC'd, but we trust ourselves not to access this value outside of Ruby
285+
// context (which has global GVL to ensure thread safety).
286+
unsafe impl Send for BufferedMetricRef {}
287+
unsafe impl Sync for BufferedMetricRef {}
288+
289+
#[derive(Debug)]
290+
struct BufferedMetricAttributes {
291+
value: BoxValue<RHash>,
292+
}
293+
294+
// See Send/Sync for BufferedMetricRef for details on why we do this
295+
unsafe impl Send for BufferedMetricAttributes {}
296+
unsafe impl Sync for BufferedMetricAttributes {}
297+
298+
impl CustomMetricAttributes for BufferedMetricAttributes {
299+
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
300+
self as Arc<dyn Any + Send + Sync>
301+
}
302+
}
303+
304+
static METRIC_BUFFER_UPDATE: Lazy<RClass> = Lazy::new(|ruby| {
305+
let cls = ruby
306+
.class_object()
307+
.const_get::<_, RModule>("Temporalio")
308+
.unwrap()
309+
.const_get::<_, RClass>("Runtime")
310+
.unwrap()
311+
.const_get::<_, RClass>("MetricBuffer")
312+
.unwrap()
313+
.const_get("Update")
314+
.unwrap();
315+
// Make sure class is not GC'd
316+
register_mark_object(cls);
317+
cls
318+
});
319+
320+
static METRIC_BUFFER_METRIC: Lazy<RClass> = Lazy::new(|ruby| {
321+
let cls = ruby
322+
.class_object()
323+
.const_get::<_, RModule>("Temporalio")
324+
.unwrap()
325+
.const_get::<_, RClass>("Runtime")
326+
.unwrap()
327+
.const_get::<_, RClass>("MetricBuffer")
328+
.unwrap()
329+
.const_get("Metric")
330+
.unwrap();
331+
// Make sure class is not GC'd
332+
register_mark_object(cls);
333+
cls
334+
});
335+
336+
static METRIC_KIND_COUNTER: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("counter"));
337+
static METRIC_KIND_GAUGE: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("gauge"));
338+
static METRIC_KIND_HISTOGRAM: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("histogram"));
339+
340+
pub fn convert_metric_events(
341+
ruby: &Ruby,
342+
events: Vec<MetricEvent<BufferedMetricRef>>,
343+
durations_as_seconds: bool,
344+
) -> Result<Vec<Value>, Error> {
345+
let temp: Result<Vec<Option<Value>>, Error> = events
346+
.into_iter()
347+
.map(|e| convert_metric_event(ruby, e, durations_as_seconds))
348+
.collect();
349+
Ok(temp?.into_iter().flatten().collect())
350+
}
351+
352+
fn convert_metric_event(
353+
ruby: &Ruby,
354+
event: MetricEvent<BufferedMetricRef>,
355+
durations_as_seconds: bool,
356+
) -> Result<Option<Value>, Error> {
357+
match event {
358+
// Create the metric and put it on the lazy ref
359+
MetricEvent::Create {
360+
params,
361+
populate_into,
362+
kind,
363+
} => {
364+
let cls = ruby.get_inner(&METRIC_BUFFER_METRIC);
365+
let val: Value = cls.funcall(
366+
"new",
367+
(
368+
// Name
369+
params.name.to_string(),
370+
// Description
371+
Some(params.description)
372+
.filter(|s| !s.is_empty())
373+
.map(|s| s.to_string()),
374+
// Unit
375+
if matches!(kind, metrics::MetricKind::HistogramDuration)
376+
&& params.unit == "duration"
377+
{
378+
if durations_as_seconds {
379+
Some("s".to_owned())
380+
} else {
381+
Some("ms".to_owned())
382+
}
383+
} else if params.unit.is_empty() {
384+
None
385+
} else {
386+
Some(params.unit.to_string())
387+
},
388+
// Kind
389+
match kind {
390+
metrics::MetricKind::Counter => ruby.get_inner(&METRIC_KIND_COUNTER),
391+
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => {
392+
ruby.get_inner(&METRIC_KIND_GAUGE)
393+
}
394+
metrics::MetricKind::Histogram
395+
| metrics::MetricKind::HistogramF64
396+
| metrics::MetricKind::HistogramDuration => {
397+
ruby.get_inner(&METRIC_KIND_HISTOGRAM)
398+
}
399+
},
400+
),
401+
)?;
402+
// Put on lazy ref
403+
populate_into
404+
.set(Arc::new(BufferedMetricRef {
405+
value: Rc::new(BoxValue::new(val)),
406+
}))
407+
.map_err(|_| error!("Failed setting metric ref"))?;
408+
Ok(None)
409+
}
410+
// Create the attributes and put it on the lazy ref
411+
MetricEvent::CreateAttributes {
412+
populate_into,
413+
append_from,
414+
attributes,
415+
} => {
416+
// Create a hash (from existing or new)
417+
let hash: RHash = match append_from {
418+
Some(existing) => {
419+
let attrs = existing
420+
.get()
421+
.clone()
422+
.as_any()
423+
.downcast::<BufferedMetricAttributes>()
424+
.map_err(|_| {
425+
error!("Unable to downcast to expected buffered metric attributes")
426+
})?;
427+
attrs.value.as_ref().funcall("dup", ())?
428+
}
429+
None => ruby.hash_new_capa(attributes.len()),
430+
};
431+
// Add attributes
432+
for kv in attributes.into_iter() {
433+
match kv.value {
434+
metrics::MetricValue::String(v) => hash.aset(kv.key, v)?,
435+
metrics::MetricValue::Int(v) => hash.aset(kv.key, v)?,
436+
metrics::MetricValue::Float(v) => hash.aset(kv.key, v)?,
437+
metrics::MetricValue::Bool(v) => hash.aset(kv.key, v)?,
438+
};
439+
}
440+
hash.freeze();
441+
// Put on lazy ref
442+
populate_into
443+
.set(Arc::new(BufferedMetricAttributes {
444+
value: BoxValue::new(hash),
445+
}))
446+
.map_err(|_| error!("Failed setting metric attrs"))?;
447+
Ok(None)
448+
}
449+
// Convert to Ruby metric update
450+
MetricEvent::Update {
451+
instrument,
452+
attributes,
453+
update,
454+
} => {
455+
let cls = ruby.get_inner(&METRIC_BUFFER_UPDATE);
456+
Ok(Some(
457+
cls.funcall(
458+
"new",
459+
(
460+
// Metric
461+
**instrument.get().clone().value.clone(),
462+
// Value
463+
match update {
464+
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
465+
ruby.into_value(v.as_secs_f64())
466+
}
467+
metrics::MetricUpdateVal::Duration(v) => {
468+
// As of this writing, https://github.com/matsadler/magnus/pull/136 not released, so we will do
469+
// the logic ourselves
470+
let val = v.as_millis();
471+
if val <= u64::MAX as u128 {
472+
ruby.into_value(val as u64)
473+
} else {
474+
ruby.module_kernel()
475+
.funcall("Integer", (val.to_string(),))
476+
.unwrap()
477+
}
478+
}
479+
metrics::MetricUpdateVal::Delta(v) => ruby.into_value(v),
480+
metrics::MetricUpdateVal::DeltaF64(v) => ruby.into_value(v),
481+
metrics::MetricUpdateVal::Value(v) => ruby.into_value(v),
482+
metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v),
483+
},
484+
// Attributes
485+
*attributes
486+
.get()
487+
.clone()
488+
.as_any()
489+
.downcast::<BufferedMetricAttributes>()
490+
.map_err(|_| {
491+
error!("Unable to downcast to expected buffered metric attributes")
492+
})?
493+
.value,
494+
),
495+
)?,
496+
))
497+
}
498+
}
499+
}

temporalio/ext/src/runtime.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,22 @@
11
use super::{error, id, ROOT_MOD};
2+
use crate::metric::{convert_metric_events, BufferedMetricRef};
23
use crate::util::{without_gvl, Struct};
34
use magnus::{
4-
class, function, method, prelude::*, DataTypeFunctions, Error, Ruby, TypedData, Value,
5+
class, function, method, prelude::*, DataTypeFunctions, Error, RArray, Ruby, TypedData, Value,
56
};
67
use std::collections::HashMap;
78
use std::net::SocketAddr;
89
use std::str::FromStr;
910
use std::sync::mpsc::{channel, Receiver, Sender};
1011
use std::time::Duration;
1112
use std::{future::Future, sync::Arc};
12-
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
13+
use temporal_sdk_core::telemetry::{
14+
build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer,
15+
};
1316
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
1417
use temporal_sdk_core_api::telemetry::{
15-
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
16-
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
18+
metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
19+
OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
1720
};
1821
use tracing::error as log_error;
1922
use url::Url;
@@ -24,6 +27,10 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
2427
.define_class("Runtime", class::object())?;
2528
class.define_singleton_method("new", function!(Runtime::new, 1))?;
2629
class.define_method("run_command_loop", method!(Runtime::run_command_loop, 0))?;
30+
class.define_method(
31+
"retrieve_buffered_metrics",
32+
method!(Runtime::retrieve_buffered_metrics, 1),
33+
)?;
2734
Ok(())
2835
}
2936

@@ -33,6 +40,7 @@ pub struct Runtime {
3340
/// Separate cloneable handle that can be referenced in other Rust objects.
3441
pub(crate) handle: RuntimeHandle,
3542
async_command_rx: Receiver<AsyncCommand>,
43+
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
3644
}
3745

3846
#[derive(Clone)]
@@ -94,9 +102,10 @@ impl Runtime {
94102
.map_err(|err| error!("Failed initializing telemetry: {}", err))?;
95103

96104
// Create metrics (created after Core runtime since it needs Tokio handle)
105+
let mut metrics_call_buffer = None;
97106
if let Some(metrics) = telemetry.child(id!("metrics"))? {
98107
let _guard = core.tokio_handle().enter();
99-
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.child(id!("buffered_with_size"))?) {
108+
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.member::<Option<usize>>(id!("buffered_with_size"))?) {
100109
// Build OTel
101110
(Some(opentelemetry), None, None) => {
102111
let mut opts_build = OtelCollectorOptionsBuilder::default();
@@ -148,8 +157,11 @@ impl Runtime {
148157
|err| error!("Failed building starting Prometheus exporter: {}", err),
149158
)?.meter);
150159
},
151-
// TODO(cretz): Metric buffering
152-
(None, None, Some(_buffer_size)) => return Err(error!("Metric buffering not yet supported")),
160+
(None, None, Some(buffer_size)) => {
161+
let buffer = Arc::new(MetricsCallBuffer::new(buffer_size));
162+
core.telemetry_mut().attach_late_init_metrics(buffer.clone());
163+
metrics_call_buffer = Some(buffer);
164+
},
153165
_ => return Err(error!("One and only one of opentelemetry, prometheus, or buffered_with_size must be set"))
154166
};
155167
}
@@ -163,6 +175,7 @@ impl Runtime {
163175
async_command_tx,
164176
},
165177
async_command_rx,
178+
metrics_call_buffer,
166179
})
167180
}
168181

@@ -193,6 +206,16 @@ impl Runtime {
193206
}
194207
}
195208
}
209+
210+
pub fn retrieve_buffered_metrics(&self, durations_as_seconds: bool) -> Result<RArray, Error> {
211+
let ruby = Ruby::get().expect("Not in Ruby thread");
212+
let buff = self
213+
.metrics_call_buffer
214+
.clone()
215+
.expect("Attempting to retrieve buffered metrics without buffer");
216+
let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?;
217+
Ok(ruby.ary_new_from_values(&updates))
218+
}
196219
}
197220

198221
impl RuntimeHandle {

0 commit comments

Comments
 (0)