Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 235 additions & 6 deletions temporalio/ext/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::{sync::Arc, time::Duration};
use std::{any::Any, rc::Rc, sync::Arc, time::Duration};

use magnus::{
class, function, method,
class, function,
gc::register_mark_object,
method,
prelude::*,
r_hash::ForEach,
value::{IntoId, Qfalse, Qtrue},
DataTypeFunctions, Error, Float, Integer, RHash, RString, Ruby, Symbol, TryConvert, TypedData,
Value,
value::{BoxValue, IntoId, Lazy, Qfalse, Qtrue},
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
Symbol, TryConvert, TypedData, Value,
};
use temporal_sdk_core_api::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
};
use temporal_sdk_core_api::telemetry::metrics;

use crate::{error, id, runtime::Runtime, ROOT_MOD};

Expand Down Expand Up @@ -268,3 +272,228 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error
};
Ok(metrics::MetricKeyValue::new(key, val))
}

#[derive(Clone, Debug)]
pub struct BufferedMetricRef {
value: Rc<BoxValue<Value>>,
}

impl BufferInstrumentRef for BufferedMetricRef {}

// We can't use Ruby Opaque because it doesn't protect the object from being
// GC'd, but we trust ourselves not to access this value outside of Ruby
// context (which has global GVL to ensure thread safety).
unsafe impl Send for BufferedMetricRef {}
unsafe impl Sync for BufferedMetricRef {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's clear to me this is only ever used from one thread at a time (Sync), but it's not clear to me that it will always be from the same thread (Send).

Since it's stored on the runtime, and then cloned in the retrieve_buffered_metrics call, if a user calls that from a different thread, the Rc could have counting errors. Ideally we somehow force it to always be the same thread, or failing that we need to warn the user that they can only ever call it from the same thread that the runtime was created on, I think.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with GVL yes we can trust only one thread at a time. As for which thread, for our use case, any Magnus Ruby Value is accessible via any Ruby thread. There is a utility that does mark it as Send called Opaque (https://docs.rs/magnus/latest/magnus/value/struct.Opaque.html). But that has other downsides in that it doesn't prevent GC'ing and we can't combine them due to their constraints.

So we have to decide, do we want BoxValue which prevents GC but we have to mark Send/Sync ourselves, or do we want Opaque which has Send/Sync but does not prevent GC. Looking at the implementations of these and seeing that Opaque was just a transparent Send/Sync to prevent access without a Ruby object (so requires the GVL to be captured), I chose the BoxValue approach. But I could have used Opaque and manually did a rb_gc_register_address on new and rb_gc_unregister_address on drop, but this seems better.

Is there a Rust-side concern with Send I am missing if I trust how I use the value itself to be safe?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so Opaque can be Send because it's only declared as such when wrapping a Ruby value - and it makes the statement that Ruby values are fine to move across threads - cool.

The problem here with our thing is almost unrelated to Ruby - it's about the Rc. The implementation of Rc's refcount relies on the fact that it never leaves the thread it was created on (IE: it uses something like thread local storage for refcounting). So, if our handler that is using it gets called from a thread that wasn't the one it was created on, since clone changes the refcount, that's UB

Copy link
Member Author

@cretz cretz Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yes of course, I should not use Rc if I want to be Send safe, forgot I used it. This is my bad coding where I forced Rc to be send/sync when it's not. I should change my code to:

#[derive(Clone, Debug)]
pub struct BufferedMetricRef {
    value: Arc<SendSyncBoxValue<Value>>,
}

struct SendSyncBoxValue<T>(BoxValue<T>);

unsafe impl<T> Send for SendSyncBoxValue<T> {}
unsafe impl<T> Sync for SendSyncBoxValue<T> {}

Or something like that. That way I can inherit send/sync as a property of the Ruby child only. I will fix this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be fine

Copy link
Member Author

@cretz cretz Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I added this utility in 7ab900b, but since I applied it to another part where we were doing the same thing, it kinda expanded the Rust code that was changed. Basically, like Opaque, we require a &Ruby reference to prove that they are on a Ruby thread before accessing the value.


#[derive(Debug)]
struct BufferedMetricAttributes {
value: BoxValue<RHash>,
}

// See Send/Sync for BufferedMetricRef for details on why we do this
unsafe impl Send for BufferedMetricAttributes {}
unsafe impl Sync for BufferedMetricAttributes {}

impl CustomMetricAttributes for BufferedMetricAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

static METRIC_BUFFER_UPDATE: Lazy<RClass> = Lazy::new(|ruby| {
let cls = ruby
.class_object()
.const_get::<_, RModule>("Temporalio")
.unwrap()
.const_get::<_, RClass>("Runtime")
.unwrap()
.const_get::<_, RClass>("MetricBuffer")
.unwrap()
.const_get("Update")
.unwrap();
// Make sure class is not GC'd
register_mark_object(cls);
cls
});

static METRIC_BUFFER_METRIC: Lazy<RClass> = Lazy::new(|ruby| {
let cls = ruby
.class_object()
.const_get::<_, RModule>("Temporalio")
.unwrap()
.const_get::<_, RClass>("Runtime")
.unwrap()
.const_get::<_, RClass>("MetricBuffer")
.unwrap()
.const_get("Metric")
.unwrap();
// Make sure class is not GC'd
register_mark_object(cls);
cls
});

static METRIC_KIND_COUNTER: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("counter"));
static METRIC_KIND_GAUGE: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("gauge"));
static METRIC_KIND_HISTOGRAM: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("histogram"));

pub fn convert_metric_events(
ruby: &Ruby,
events: Vec<MetricEvent<BufferedMetricRef>>,
durations_as_seconds: bool,
) -> Result<Vec<Value>, Error> {
let temp: Result<Vec<Option<Value>>, Error> = events
.into_iter()
.map(|e| convert_metric_event(ruby, e, durations_as_seconds))
.collect();
Ok(temp?.into_iter().flatten().collect())
}

fn convert_metric_event(
ruby: &Ruby,
event: MetricEvent<BufferedMetricRef>,
durations_as_seconds: bool,
) -> Result<Option<Value>, Error> {
match event {
// Create the metric and put it on the lazy ref
MetricEvent::Create {
params,
populate_into,
kind,
} => {
let cls = ruby.get_inner(&METRIC_BUFFER_METRIC);
let val: Value = cls.funcall(
"new",
(
// Name
params.name.to_string(),
// Description
Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
// Unit
if matches!(kind, metrics::MetricKind::HistogramDuration)
&& params.unit == "duration"
{
if durations_as_seconds {
Some("s".to_owned())
} else {
Some("ms".to_owned())
}
} else if params.unit.is_empty() {
None
} else {
Some(params.unit.to_string())
},
// Kind
match kind {
metrics::MetricKind::Counter => ruby.get_inner(&METRIC_KIND_COUNTER),
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => {
ruby.get_inner(&METRIC_KIND_GAUGE)
}
metrics::MetricKind::Histogram
| metrics::MetricKind::HistogramF64
| metrics::MetricKind::HistogramDuration => {
ruby.get_inner(&METRIC_KIND_HISTOGRAM)
}
},
),
)?;
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricRef {
value: Rc::new(BoxValue::new(val)),
}))
.map_err(|_| error!("Failed setting metric ref"))?;
Ok(None)
}
// Create the attributes and put it on the lazy ref
MetricEvent::CreateAttributes {
populate_into,
append_from,
attributes,
} => {
// Create a hash (from existing or new)
let hash: RHash = match append_from {
Some(existing) => {
let attrs = existing
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.map_err(|_| {
error!("Unable to downcast to expected buffered metric attributes")
})?;
attrs.value.as_ref().funcall("dup", ())?
}
None => ruby.hash_new_capa(attributes.len()),
};
// Add attributes
for kv in attributes.into_iter() {
match kv.value {
metrics::MetricValue::String(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Int(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Float(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Bool(v) => hash.aset(kv.key, v)?,
};
}
hash.freeze();
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricAttributes {
value: BoxValue::new(hash),
}))
.map_err(|_| error!("Failed setting metric attrs"))?;
Ok(None)
}
// Convert to Ruby metric update
MetricEvent::Update {
instrument,
attributes,
update,
} => {
let cls = ruby.get_inner(&METRIC_BUFFER_UPDATE);
Ok(Some(
cls.funcall(
"new",
(
// Metric
**instrument.get().clone().value.clone(),
// Value
match update {
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
ruby.into_value(v.as_secs_f64())
}
metrics::MetricUpdateVal::Duration(v) => {
// As of this writing, https://github.com/matsadler/magnus/pull/136 not released, so we will do
// the logic ourselves
let val = v.as_millis();
if val <= u64::MAX as u128 {
ruby.into_value(val as u64)
} else {
ruby.module_kernel()
.funcall("Integer", (val.to_string(),))
.unwrap()
}
}
metrics::MetricUpdateVal::Delta(v) => ruby.into_value(v),
metrics::MetricUpdateVal::DeltaF64(v) => ruby.into_value(v),
metrics::MetricUpdateVal::Value(v) => ruby.into_value(v),
metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v),
},
// Attributes
*attributes
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.map_err(|_| {
error!("Unable to downcast to expected buffered metric attributes")
})?
.value,
),
)?,
))
}
}
}
37 changes: 30 additions & 7 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use super::{error, id, ROOT_MOD};
use crate::metric::{convert_metric_events, BufferedMetricRef};
use crate::util::{without_gvl, Struct};
use magnus::{
class, function, method, prelude::*, DataTypeFunctions, Error, Ruby, TypedData, Value,
class, function, method, prelude::*, DataTypeFunctions, Error, RArray, Ruby, TypedData, Value,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::Duration;
use std::{future::Future, sync::Arc};
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core::telemetry::{
build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer,
};
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
use temporal_sdk_core_api::telemetry::{
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
};
use tracing::error as log_error;
use url::Url;
Expand All @@ -24,6 +27,10 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
.define_class("Runtime", class::object())?;
class.define_singleton_method("new", function!(Runtime::new, 1))?;
class.define_method("run_command_loop", method!(Runtime::run_command_loop, 0))?;
class.define_method(
"retrieve_buffered_metrics",
method!(Runtime::retrieve_buffered_metrics, 1),
)?;
Ok(())
}

Expand All @@ -33,6 +40,7 @@ pub struct Runtime {
/// Separate cloneable handle that can be referenced in other Rust objects.
pub(crate) handle: RuntimeHandle,
async_command_rx: Receiver<AsyncCommand>,
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -94,9 +102,10 @@ impl Runtime {
.map_err(|err| error!("Failed initializing telemetry: {}", err))?;

// Create metrics (created after Core runtime since it needs Tokio handle)
let mut metrics_call_buffer = None;
if let Some(metrics) = telemetry.child(id!("metrics"))? {
let _guard = core.tokio_handle().enter();
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.child(id!("buffered_with_size"))?) {
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.member::<Option<usize>>(id!("buffered_with_size"))?) {
// Build OTel
(Some(opentelemetry), None, None) => {
let mut opts_build = OtelCollectorOptionsBuilder::default();
Expand Down Expand Up @@ -148,8 +157,11 @@ impl Runtime {
|err| error!("Failed building starting Prometheus exporter: {}", err),
)?.meter);
},
// TODO(cretz): Metric buffering
(None, None, Some(_buffer_size)) => return Err(error!("Metric buffering not yet supported")),
(None, None, Some(buffer_size)) => {
let buffer = Arc::new(MetricsCallBuffer::new(buffer_size));
core.telemetry_mut().attach_late_init_metrics(buffer.clone());
metrics_call_buffer = Some(buffer);
},
_ => return Err(error!("One and only one of opentelemetry, prometheus, or buffered_with_size must be set"))
};
}
Expand All @@ -163,6 +175,7 @@ impl Runtime {
async_command_tx,
},
async_command_rx,
metrics_call_buffer,
})
}

Expand Down Expand Up @@ -193,6 +206,16 @@ impl Runtime {
}
}
}

pub fn retrieve_buffered_metrics(&self, durations_as_seconds: bool) -> Result<RArray, Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not store whether durations are seconds on the buffer itself? Feels weird to pass it every time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no real reason, just seemed to be a conversion-time parameter as opposed to struct state and is how we did it in Python. It is class state from a user POV though.

let ruby = Ruby::get().expect("Not in Ruby thread");
let buff = self
.metrics_call_buffer
.clone()
.expect("Attempting to retrieve buffered metrics without buffer");
let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?;
Ok(ruby.ary_new_from_values(&updates))
}
}

impl RuntimeHandle {
Expand Down
Loading
Loading