Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions temporalio/ext/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use temporal_sdk_core::telemetry::{
build_otlp_metric_exporter, start_prometheus_metric_exporter, MetricsCallBuffer,
};
use temporal_sdk_core::{CoreRuntime, TokioRuntimeBuilder};
use temporal_sdk_core_api::telemetry::HistogramBucketOverrides;
use temporal_sdk_core_api::telemetry::{
metrics::MetricCallBufferer, Logger, MetricTemporality, OtelCollectorOptionsBuilder,
OtlpProtocol, PrometheusExporterOptionsBuilder, TelemetryOptionsBuilder,
Expand Down Expand Up @@ -129,6 +130,9 @@ impl Runtime {
if opentelemetry.member::<bool>(id!("http"))? {
opts_build.protocol(OtlpProtocol::Http);
}
if let Some(overrides) = opentelemetry.member::<Option<HashMap<String, Vec<f64>>>>(id!("histogram_bucket_overrides"))? {
opts_build.histogram_bucket_overrides(HistogramBucketOverrides { overrides });
}
let opts = opts_build
.build()
.map_err(|err| error!("Invalid OpenTelemetry options: {}", err))?;
Expand All @@ -150,6 +154,9 @@ impl Runtime {
if let Some(global_tags) = metrics.member::<Option<HashMap<String, String>>>(id!("global_tags"))? {
opts_build.global_tags(global_tags);
}
if let Some(overrides) = prom.member::<Option<HashMap<String, Vec<f64>>>>(id!("histogram_bucket_overrides"))? {
opts_build.histogram_bucket_overrides(HistogramBucketOverrides { overrides });
}
let opts = opts_build
.build()
.map_err(|err| error!("Invalid Prometheus options: {}", err))?;
Expand Down
2 changes: 2 additions & 0 deletions temporalio/lib/temporalio/internal/bridge/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Runtime
:metric_temporality_delta,
:durations_as_seconds,
:http,
:histogram_bucket_overrides, # Optional
keyword_init: true
)

Expand All @@ -46,6 +47,7 @@ class Runtime
:counters_total_suffix,
:unit_suffix,
:durations_as_seconds,
:histogram_bucket_overrides, # Optional
keyword_init: true
)
end
Expand Down
28 changes: 22 additions & 6 deletions temporalio/lib/temporalio/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ def _to_bridge
:metric_periodicity,
:metric_temporality,
:durations_as_seconds,
:http
:http,
:histogram_bucket_overrides
)

# Options for exporting metrics to OpenTelemetry.
Expand All @@ -197,6 +198,9 @@ def _to_bridge
# +false+.
# @!attribute http
# @return [Boolean] True if the protocol is HTTP, false if gRPC (the default).
# @!attribute histogram_bucket_overrides
# @return [Hash<String, Array<Numeric>>, nil] Override default histogram buckets. Key of the hash it the metric
# name, value is an array of floats for the set of buckets.
class OpenTelemetryMetricsOptions
# OpenTelemetry metric temporality.
module MetricTemporality
Expand All @@ -213,13 +217,16 @@ module MetricTemporality
# @param durations_as_seconds [Boolean] Whether to use float seconds instead of integer milliseconds for
# durations.
# @param http [Boolean] True if the protocol is HTTP, false if gRPC (the default).
# @param histogram_bucket_overrides [Hash<String, Array<Numeric>>, nil] Override default histogram buckets. Key of
# the hash it the metric name, value is an array of floats for the set of buckets.
def initialize(
url:,
headers: nil,
metric_periodicity: nil,
metric_temporality: MetricTemporality::CUMULATIVE,
durations_as_seconds: false,
http: false
http: false,
histogram_bucket_overrides: nil
)
super
end
Expand All @@ -237,7 +244,8 @@ def _to_bridge
else raise 'Unrecognized metric temporality'
end,
durations_as_seconds:,
http:
http:,
histogram_bucket_overrides:
)
end
end
Expand All @@ -246,7 +254,8 @@ def _to_bridge
:bind_address,
:counters_total_suffix,
:unit_suffix,
:durations_as_seconds
:durations_as_seconds,
:histogram_bucket_overrides
)

# Options for exporting metrics to Prometheus.
Expand All @@ -259,6 +268,9 @@ def _to_bridge
# @return [Boolean] If `true`, all histograms will include the unit in their name as a suffix.
# @!attribute durations_as_seconds
# @return [Boolean] Whether to use float seconds instead of integer milliseconds for durations.
# @!attribute histogram_bucket_overrides
# @return [Hash<String, Array<Numeric>>, nil] Override default histogram buckets. Key of the hash it the metric
# name, value is an array of floats for the set of buckets.
class PrometheusMetricsOptions
# Create Prometheus options.
#
Expand All @@ -267,11 +279,14 @@ class PrometheusMetricsOptions
# @param unit_suffix [Boolean] If `true`, all histograms will include the unit in their name as a suffix.
# @param durations_as_seconds [Boolean] Whether to use float seconds instead of integer milliseconds for
# durations.
# @param histogram_bucket_overrides [Hash<String, Array<Numeric>>, nil] Override default histogram buckets. Key of
# the hash it the metric name, value is an array of floats for the set of buckets.
def initialize(
bind_address:,
counters_total_suffix: false,
unit_suffix: false,
durations_as_seconds: false
durations_as_seconds: false,
histogram_bucket_overrides: nil
)
super
end
Expand All @@ -283,7 +298,8 @@ def _to_bridge
bind_address:,
counters_total_suffix:,
unit_suffix:,
durations_as_seconds:
durations_as_seconds:,
histogram_bucket_overrides:
)
end
end
Expand Down
8 changes: 6 additions & 2 deletions temporalio/sig/temporalio/internal/bridge/runtime.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ module Temporalio
attr_accessor metric_temporality_delta: bool
attr_accessor durations_as_seconds: bool
attr_accessor http: bool
attr_accessor histogram_bucket_overrides: Hash[String, Array[Numeric]]?

def initialize: (
url: String,
headers: Hash[String, String]?,
metric_periodicity: Float?,
metric_temporality_delta: bool,
durations_as_seconds: bool,
http: bool
http: bool,
histogram_bucket_overrides: Hash[String, Array[Numeric]]?
) -> void
end

Expand All @@ -67,12 +69,14 @@ module Temporalio
attr_accessor counters_total_suffix: bool
attr_accessor unit_suffix: bool
attr_accessor durations_as_seconds: bool
attr_accessor histogram_bucket_overrides: Hash[String, Array[Numeric]]?

def initialize: (
bind_address: String,
counters_total_suffix: bool,
unit_suffix: bool,
durations_as_seconds: bool
durations_as_seconds: bool,
histogram_bucket_overrides: Hash[String, Array[Numeric]]?
) -> void
end

Expand Down
8 changes: 6 additions & 2 deletions temporalio/sig/temporalio/runtime.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,16 @@ module Temporalio
attr_reader metric_temporality: MetricTemporality
attr_reader durations_as_seconds: bool
attr_reader http: bool
attr_reader histogram_bucket_overrides: Hash[String, Array[Numeric]]?

def initialize: (
url: String,
?headers: Hash[String, String]?,
?metric_periodicity: Float?,
?metric_temporality: MetricTemporality,
?durations_as_seconds: bool,
?http: bool
?http: bool,
?histogram_bucket_overrides: Hash[String, Array[Numeric]]?
) -> void

def _to_bridge: -> Internal::Bridge::Runtime::OpenTelemetryMetricsOptions
Expand All @@ -88,12 +90,14 @@ module Temporalio
attr_reader counters_total_suffix: bool
attr_reader unit_suffix: bool
attr_reader durations_as_seconds: bool
attr_reader histogram_bucket_overrides: Hash[String, Array[Numeric]]?

def initialize: (
bind_address: String,
?counters_total_suffix: bool,
?unit_suffix: bool,
?durations_as_seconds: bool
?durations_as_seconds: bool,
?histogram_bucket_overrides: Hash[String, Array[Numeric]]?
) -> void

def _to_bridge: -> Internal::Bridge::Runtime::PrometheusMetricsOptions
Expand Down
39 changes: 39 additions & 0 deletions temporalio/test/runtime_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,43 @@ def test_metric_basics
assert_bad_call { counter_int.record(1, additional_attributes: { 123 => 'foo' }) }
# steep:ignore:end
end

def test_histogram_bucket_overrides
# Prom metrics with custom histogram buckets
prom_addr = "127.0.0.1:#{find_free_port}"
runtime = Temporalio::Runtime.new(
telemetry: Temporalio::Runtime::TelemetryOptions.new(
metrics: Temporalio::Runtime::MetricsOptions.new(
prometheus: Temporalio::Runtime::PrometheusMetricsOptions.new(
bind_address: prom_addr,
histogram_bucket_overrides: {
'temporal_request_latency' => [123.4, 567.89],
'custom_histogram' => [5, 6, 7]
}
)
)
)
)
conn_opts = env.client.connection.options.with(runtime:)
client_opts = env.client.options.with(
connection: Temporalio::Client::Connection.new(**conn_opts.to_h) # steep:ignore
)
client = Temporalio::Client.new(**client_opts.to_h) # steep:ignore

# Generate metrics
client.workflow_service.get_system_info(Temporalio::Api::WorkflowService::V1::GetSystemInfoRequest.new)
hist = runtime.metric_meter.create_metric(:histogram, 'custom_histogram', value_type: :float)
hist.record(4.5)
hist.record(5.5)
hist.record(6.5)
hist.record(7.5)

# Check metrics
dump = Net::HTTP.get(URI("http://#{prom_addr}/metrics"))
assert_metric_line(dump, 'temporal_request_latency_bucket', le: '123.4')
assert_metric_line(dump, 'temporal_request_latency_bucket', le: '567.89')
assert_equal '1', assert_metric_line(dump, 'custom_histogram_bucket', le: '5')
assert_equal '2', assert_metric_line(dump, 'custom_histogram_bucket', le: '6')
assert_equal '3', assert_metric_line(dump, 'custom_histogram_bucket', le: '7')
end
end
Loading