Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions temporalio/ext/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,15 @@ impl Client {
.await?;
Ok(core)
},
move |_, result: Result<CoreClient, ClientInitError>| match result {
Ok(core) => callback.push(Client {
core,
runtime_handle,
}),
Err(err) => callback.push(new_error!("Failed client connect: {}", err)),
move |ruby, result: Result<CoreClient, ClientInitError>| match result {
Ok(core) => callback.push(
&ruby,
Client {
core,
runtime_handle,
},
),
Err(err) => callback.push(&ruby, new_error!("Failed client connect: {}", err)),
},
);
Ok(())
Expand Down Expand Up @@ -325,12 +328,12 @@ where
};
res.map(|msg| msg.get_ref().encode_to_vec())
},
move |_, result| {
move |ruby, result| {
match result {
// TODO(cretz): Any reasonable way to prevent byte copy that is just going to get decoded into proto
// object?
Ok(val) => callback.push(RString::from_slice(&val)),
Err(status) => callback.push(RpcFailure { status }),
Ok(val) => callback.push(&ruby, RString::from_slice(&val)),
Err(status) => callback.push(&ruby, RpcFailure { status }),
}
},
);
Expand Down
236 changes: 229 additions & 7 deletions temporalio/ext/src/metric.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::{sync::Arc, time::Duration};
use std::{any::Any, sync::Arc, time::Duration};

use magnus::{
class, function, method,
class, function,
gc::register_mark_object,
method,
prelude::*,
r_hash::ForEach,
value::{IntoId, Qfalse, Qtrue},
DataTypeFunctions, Error, Float, Integer, RHash, RString, Ruby, Symbol, TryConvert, TypedData,
Value,
value::{IntoId, Lazy, Qfalse, Qtrue},
DataTypeFunctions, Error, Float, Integer, RClass, RHash, RModule, RString, Ruby, StaticSymbol,
Symbol, TryConvert, TypedData, Value,
};
use temporal_sdk_core_api::telemetry::metrics::{
self, BufferInstrumentRef, CustomMetricAttributes, MetricEvent,
};
use temporal_sdk_core_api::telemetry::metrics;

use crate::{error, id, runtime::Runtime, ROOT_MOD};
use crate::{error, id, runtime::Runtime, util::SendSyncBoxValue, ROOT_MOD};

pub fn init(ruby: &Ruby) -> Result<(), Error> {
let root_mod = ruby.get_inner(&ROOT_MOD);
Expand Down Expand Up @@ -268,3 +272,221 @@ fn metric_key_value(k: Value, v: Value) -> Result<metrics::MetricKeyValue, Error
};
Ok(metrics::MetricKeyValue::new(key, val))
}

#[derive(Clone, Debug)]
pub struct BufferedMetricRef {
value: Arc<SendSyncBoxValue<Value>>,
}

impl BufferInstrumentRef for BufferedMetricRef {}

#[derive(Debug)]
struct BufferedMetricAttributes {
value: SendSyncBoxValue<RHash>,
}

impl CustomMetricAttributes for BufferedMetricAttributes {
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self as Arc<dyn Any + Send + Sync>
}
}

static METRIC_BUFFER_UPDATE: Lazy<RClass> = Lazy::new(|ruby| {
let cls = ruby
.class_object()
.const_get::<_, RModule>("Temporalio")
.unwrap()
.const_get::<_, RClass>("Runtime")
.unwrap()
.const_get::<_, RClass>("MetricBuffer")
.unwrap()
.const_get("Update")
.unwrap();
// Make sure class is not GC'd
register_mark_object(cls);
cls
});

static METRIC_BUFFER_METRIC: Lazy<RClass> = Lazy::new(|ruby| {
let cls = ruby
.class_object()
.const_get::<_, RModule>("Temporalio")
.unwrap()
.const_get::<_, RClass>("Runtime")
.unwrap()
.const_get::<_, RClass>("MetricBuffer")
.unwrap()
.const_get("Metric")
.unwrap();
// Make sure class is not GC'd
register_mark_object(cls);
cls
});

static METRIC_KIND_COUNTER: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("counter"));
static METRIC_KIND_GAUGE: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("gauge"));
static METRIC_KIND_HISTOGRAM: Lazy<StaticSymbol> = Lazy::new(|ruby| ruby.sym_new("histogram"));

pub fn convert_metric_events(
ruby: &Ruby,
events: Vec<MetricEvent<BufferedMetricRef>>,
durations_as_seconds: bool,
) -> Result<Vec<Value>, Error> {
let temp: Result<Vec<Option<Value>>, Error> = events
.into_iter()
.map(|e| convert_metric_event(ruby, e, durations_as_seconds))
.collect();
Ok(temp?.into_iter().flatten().collect())
}

fn convert_metric_event(
ruby: &Ruby,
event: MetricEvent<BufferedMetricRef>,
durations_as_seconds: bool,
) -> Result<Option<Value>, Error> {
match event {
// Create the metric and put it on the lazy ref
MetricEvent::Create {
params,
populate_into,
kind,
} => {
let cls = ruby.get_inner(&METRIC_BUFFER_METRIC);
let val: Value = cls.funcall(
"new",
(
// Name
params.name.to_string(),
// Description
Some(params.description)
.filter(|s| !s.is_empty())
.map(|s| s.to_string()),
// Unit
if matches!(kind, metrics::MetricKind::HistogramDuration)
&& params.unit == "duration"
{
if durations_as_seconds {
Some("s".to_owned())
} else {
Some("ms".to_owned())
}
} else if params.unit.is_empty() {
None
} else {
Some(params.unit.to_string())
},
// Kind
match kind {
metrics::MetricKind::Counter => ruby.get_inner(&METRIC_KIND_COUNTER),
metrics::MetricKind::Gauge | metrics::MetricKind::GaugeF64 => {
ruby.get_inner(&METRIC_KIND_GAUGE)
}
metrics::MetricKind::Histogram
| metrics::MetricKind::HistogramF64
| metrics::MetricKind::HistogramDuration => {
ruby.get_inner(&METRIC_KIND_HISTOGRAM)
}
},
),
)?;
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricRef {
value: Arc::new(SendSyncBoxValue::new(val)),
}))
.map_err(|_| error!("Failed setting metric ref"))?;
Ok(None)
}
// Create the attributes and put it on the lazy ref
MetricEvent::CreateAttributes {
populate_into,
append_from,
attributes,
} => {
// Create a hash (from existing or new)
let hash: RHash = match append_from {
Some(existing) => {
let attrs = existing
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.map_err(|_| {
error!("Unable to downcast to expected buffered metric attributes")
})?
.value
.value(ruby);
attrs.funcall("dup", ())?
}
None => ruby.hash_new_capa(attributes.len()),
};
// Add attributes
for kv in attributes.into_iter() {
match kv.value {
metrics::MetricValue::String(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Int(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Float(v) => hash.aset(kv.key, v)?,
metrics::MetricValue::Bool(v) => hash.aset(kv.key, v)?,
};
}
hash.freeze();
// Put on lazy ref
populate_into
.set(Arc::new(BufferedMetricAttributes {
value: SendSyncBoxValue::new(hash),
}))
.map_err(|_| error!("Failed setting metric attrs"))?;
Ok(None)
}
// Convert to Ruby metric update
MetricEvent::Update {
instrument,
attributes,
update,
} => {
let cls = ruby.get_inner(&METRIC_BUFFER_UPDATE);
Ok(Some(
cls.funcall(
"new",
(
// Metric
instrument.get().clone().value.clone().value(ruby),
// Value
match update {
metrics::MetricUpdateVal::Duration(v) if durations_as_seconds => {
ruby.into_value(v.as_secs_f64())
}
metrics::MetricUpdateVal::Duration(v) => {
// As of this writing, https://github.com/matsadler/magnus/pull/136 not released, so we will do
// the logic ourselves
let val = v.as_millis();
if val <= u64::MAX as u128 {
ruby.into_value(val as u64)
} else {
ruby.module_kernel()
.funcall("Integer", (val.to_string(),))
.unwrap()
}
}
metrics::MetricUpdateVal::Delta(v) => ruby.into_value(v),
metrics::MetricUpdateVal::DeltaF64(v) => ruby.into_value(v),
metrics::MetricUpdateVal::Value(v) => ruby.into_value(v),
metrics::MetricUpdateVal::ValueF64(v) => ruby.into_value(v),
},
// Attributes
attributes
.get()
.clone()
.as_any()
.downcast::<BufferedMetricAttributes>()
.map_err(|_| {
error!("Unable to downcast to expected buffered metric attributes")
})?
.value
.value(ruby),
),
)?,
))
}
}
}
37 changes: 30 additions & 7 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use super::{error, id, ROOT_MOD};
use crate::metric::{convert_metric_events, BufferedMetricRef};
use crate::util::{without_gvl, Struct};
use magnus::{
class, function, method, prelude::*, DataTypeFunctions, Error, Ruby, TypedData, Value,
class, function, method, prelude::*, DataTypeFunctions, Error, RArray, Ruby, TypedData, Value,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::time::Duration;
use std::{future::Future, sync::Arc};
use temporal_sdk_core::telemetry::{build_otlp_metric_exporter, start_prometheus_metric_exporter};
use temporal_sdk_core::telemetry::{
build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer,
};
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
use temporal_sdk_core_api::telemetry::{
Logger, MetricTemporality, OtelCollectorOptionsBuilder, OtlpProtocol,
PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
};
use tracing::error as log_error;
use url::Url;
Expand All @@ -24,6 +27,10 @@ pub fn init(ruby: &Ruby) -> Result<(), Error> {
.define_class("Runtime", class::object())?;
class.define_singleton_method("new", function!(Runtime::new, 1))?;
class.define_method("run_command_loop", method!(Runtime::run_command_loop, 0))?;
class.define_method(
"retrieve_buffered_metrics",
method!(Runtime::retrieve_buffered_metrics, 1),
)?;
Ok(())
}

Expand All @@ -33,6 +40,7 @@ pub struct Runtime {
/// Separate cloneable handle that can be referenced in other Rust objects.
pub(crate) handle: RuntimeHandle,
async_command_rx: Receiver<AsyncCommand>,
metrics_call_buffer: Option<Arc<MetricsCallBuffer<BufferedMetricRef>>>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -94,9 +102,10 @@ impl Runtime {
.map_err(|err| error!("Failed initializing telemetry: {}", err))?;

// Create metrics (created after Core runtime since it needs Tokio handle)
let mut metrics_call_buffer = None;
if let Some(metrics) = telemetry.child(id!("metrics"))? {
let _guard = core.tokio_handle().enter();
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.child(id!("buffered_with_size"))?) {
match (metrics.child(id!("opentelemetry"))?, metrics.child(id!("prometheus"))?, metrics.member::<Option<usize>>(id!("buffered_with_size"))?) {
// Build OTel
(Some(opentelemetry), None, None) => {
let mut opts_build = OtelCollectorOptionsBuilder::default();
Expand Down Expand Up @@ -148,8 +157,11 @@ impl Runtime {
|err| error!("Failed building starting Prometheus exporter: {}", err),
)?.meter);
},
// TODO(cretz): Metric buffering
(None, None, Some(_buffer_size)) => return Err(error!("Metric buffering not yet supported")),
(None, None, Some(buffer_size)) => {
let buffer = Arc::new(MetricsCallBuffer::new(buffer_size));
core.telemetry_mut().attach_late_init_metrics(buffer.clone());
metrics_call_buffer = Some(buffer);
},
_ => return Err(error!("One and only one of opentelemetry, prometheus, or buffered_with_size must be set"))
};
}
Expand All @@ -163,6 +175,7 @@ impl Runtime {
async_command_tx,
},
async_command_rx,
metrics_call_buffer,
})
}

Expand Down Expand Up @@ -193,6 +206,16 @@ impl Runtime {
}
}
}

pub fn retrieve_buffered_metrics(&self, durations_as_seconds: bool) -> Result<RArray, Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not store whether durations are seconds on the buffer itself? Feels weird to pass it every time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, no real reason, just seemed to be a conversion-time parameter as opposed to struct state and is how we did it in Python. It is class state from a user POV though.

let ruby = Ruby::get().expect("Not in Ruby thread");
let buff = self
.metrics_call_buffer
.clone()
.expect("Attempting to retrieve buffered metrics without buffer");
let updates = convert_metric_events(&ruby, buff.retrieve(), durations_as_seconds)?;
Ok(ruby.ary_new_from_values(&updates))
}
}

impl RuntimeHandle {
Expand Down
Loading
Loading