Skip to content

Commit 9fb8e46

Browse files
committed
Implement statsd exporter for otel
1 parent 5416e0f commit 9fb8e46

File tree

5 files changed

+628
-29
lines changed

5 files changed

+628
-29
lines changed

common/metrics/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ func MetricsHandlerFromConfig(logger log.Logger, c *Config) (Handler, error) {
461461

462462
if c.Prometheus != nil && c.Prometheus.Framework == FrameworkOpentelemetry {
463463
fatalOnListenerError := true
464-
otelProvider, err := NewOpenTelemetryProvider(logger, c.Prometheus, &c.ClientConfig, fatalOnListenerError)
464+
otelProvider, err := NewOpenTelemetryProvider(logger, c.Prometheus, &c.ClientConfig, fatalOnListenerError, c.Statsd)
465465
if err != nil {
466466
logger.Fatal(err.Error())
467467
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# StatsD Support for OpenTelemetry Provider
2+
3+
This document shows how to use the new StatsD support in the Temporal OpenTelemetry metrics provider.
4+
5+
## Configuration
6+
7+
To enable StatsD support, you need to provide a `StatsdConfig` when creating the OpenTelemetry provider:
8+
9+
```go
10+
package main
11+
12+
import (
13+
"context"
14+
"log"
15+
16+
"go.temporal.io/server/common/metrics"
17+
"go.temporal.io/server/common/log"
18+
)
19+
20+
func main() {
21+
logger := log.NewNoopLogger()
22+
23+
// Configure StatsD
24+
statsdConfig := &metrics.StatsdConfig{
25+
HostPort: "localhost:8125", // StatsD server address
26+
Prefix: "temporal", // Metric prefix
27+
Reporter: metrics.StatsdReporterConfig{
28+
TagSeparator: ",", // Tag separator for StatsD tags
29+
},
30+
}
31+
32+
// Configure client settings
33+
clientConfig := &metrics.ClientConfig{
34+
PerUnitHistogramBoundaries: map[string][]float64{
35+
metrics.Dimensionless: {1, 5, 10, 25, 50, 100},
36+
metrics.Bytes: {1024, 4096, 16384, 65536},
37+
metrics.Milliseconds: {1, 5, 10, 25, 50, 100, 250, 500, 1000},
38+
metrics.Seconds: {0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0},
39+
},
40+
}
41+
42+
// Create OpenTelemetry provider with StatsD support
43+
provider, err := metrics.NewOpenTelemetryProvider(
44+
logger,
45+
nil, // No Prometheus config
46+
clientConfig,
47+
false, // Don't fatal on listener error
48+
statsdConfig, // StatsD config
49+
)
50+
if err != nil {
51+
log.Fatal("Failed to create OpenTelemetry provider:", err)
52+
}
53+
defer provider.Stop(logger)
54+
55+
// Get a meter and create metrics
56+
meter := provider.GetMeter()
57+
58+
// Create a counter
59+
counter, err := meter.Int64Counter("requests_total")
60+
if err != nil {
61+
log.Fatal("Failed to create counter:", err)
62+
}
63+
64+
// Create a histogram
65+
histogram, err := meter.Float64Histogram("request_duration")
66+
if err != nil {
67+
log.Fatal("Failed to create histogram:", err)
68+
}
69+
70+
// Use the metrics
71+
counter.Add(context.Background(), 1)
72+
histogram.Record(context.Background(), 0.123)
73+
74+
// Metrics will be sent to StatsD automatically via the PeriodicReader
75+
}
76+
```
77+
78+
## Using Both Prometheus and StatsD
79+
80+
You can also configure both Prometheus and StatsD exporters simultaneously:
81+
82+
```go
83+
// Configure both Prometheus and StatsD
84+
prometheusConfig := &metrics.PrometheusConfig{
85+
ListenAddress: "127.0.0.1:9090",
86+
HandlerPath: "/metrics",
87+
}
88+
89+
statsdConfig := &metrics.StatsdConfig{
90+
HostPort: "localhost:8125",
91+
Prefix: "temporal",
92+
Reporter: metrics.StatsdReporterConfig{
93+
TagSeparator: ",",
94+
},
95+
}
96+
97+
// Create provider with both exporters
98+
provider, err := metrics.NewOpenTelemetryProvider(
99+
logger,
100+
prometheusConfig, // Prometheus config
101+
clientConfig,
102+
false,
103+
statsdConfig, // StatsD config
104+
)
105+
```
106+
107+
## Metric Types Supported
108+
109+
The StatsD exporter supports the following OpenTelemetry metric types:
110+
111+
### Counters (Sum metrics)
112+
- **Monotonic sums**: Exported as StatsD counters using `Inc()`
113+
- **Non-monotonic sums**: Exported as StatsD gauges using `Gauge()`
114+
115+
### Gauges
116+
- Exported as StatsD gauges using `Gauge()`
117+
118+
### Histograms
119+
- Exported as multiple StatsD metrics:
120+
- `{metric_name}.count`: Total count of observations
121+
- `{metric_name}.sum`: Sum of all observed values
122+
- `{metric_name}.bucket_le_{bound}`: Count of observations in each bucket
123+
124+
## Configuration Options
125+
126+
### StatsdConfig Fields
127+
128+
- `HostPort`: The address of the StatsD server (e.g., "localhost:8125")
129+
- `Prefix`: A prefix to add to all metric names
130+
- `Reporter.TagSeparator`: The separator to use for StatsD tags (default: ",")
131+
132+
### Notes
133+
134+
- The StatsD exporter uses the `github.com/cactus/go-statsd-client/v5` library
135+
- Metrics are sent via UDP, so there's no guarantee of delivery
136+
- The exporter automatically converts OpenTelemetry attributes to StatsD tags
137+
- Float64 values are converted to int64 for StatsD compatibility where necessary
138+
- The exporter uses a PeriodicReader to automatically send metrics at regular intervals
139+
140+
## Error Handling
141+
142+
The StatsD exporter logs errors but doesn't fail the application if StatsD is unavailable. This ensures that your application continues to work even if the StatsD server is down.

common/metrics/opentelemetry_provider.go

Lines changed: 73 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ type (
2323
}
2424

2525
openTelemetryProviderImpl struct {
26-
meter metric.Meter
27-
config *PrometheusConfig
28-
server *http.Server
26+
meter metric.Meter
27+
config *PrometheusConfig
28+
server *http.Server
29+
statsdExporter *statsdExporter
2930
}
3031
)
3132

@@ -34,22 +35,50 @@ func NewOpenTelemetryProvider(
3435
prometheusConfig *PrometheusConfig,
3536
clientConfig *ClientConfig,
3637
fatalOnListenerError bool,
38+
statsdConfig *StatsdConfig,
3739
) (*openTelemetryProviderImpl, error) {
38-
reg := prometheus.NewRegistry()
39-
exporterOpts := []exporters.Option{exporters.WithRegisterer(reg)}
40-
if clientConfig.WithoutUnitSuffix {
41-
exporterOpts = append(exporterOpts, exporters.WithoutUnits())
42-
}
43-
if clientConfig.WithoutCounterSuffix {
44-
exporterOpts = append(exporterOpts, exporters.WithoutCounterSuffixes())
40+
var readers []sdkmetrics.Reader
41+
var metricServer *http.Server
42+
var statsdExp *statsdExporter
43+
44+
// Set up Prometheus exporter if config is provided
45+
if prometheusConfig != nil {
46+
reg := prometheus.NewRegistry()
47+
exporterOpts := []exporters.Option{exporters.WithRegisterer(reg)}
48+
if clientConfig.WithoutUnitSuffix {
49+
exporterOpts = append(exporterOpts, exporters.WithoutUnits())
50+
}
51+
if clientConfig.WithoutCounterSuffix {
52+
exporterOpts = append(exporterOpts, exporters.WithoutCounterSuffixes())
53+
}
54+
if clientConfig.Prefix != "" {
55+
exporterOpts = append(exporterOpts, exporters.WithNamespace(clientConfig.Prefix))
56+
}
57+
exporter, err := exporters.New(exporterOpts...)
58+
if err != nil {
59+
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
60+
return nil, err
61+
}
62+
readers = append(readers, exporter)
63+
metricServer = initPrometheusListener(prometheusConfig, reg, logger, fatalOnListenerError)
4564
}
46-
if clientConfig.Prefix != "" {
47-
exporterOpts = append(exporterOpts, exporters.WithNamespace(clientConfig.Prefix))
65+
66+
// Set up StatsD exporter if config is provided
67+
if statsdConfig != nil {
68+
var err error
69+
statsdExp, err = NewStatsdExporter(statsdConfig, logger)
70+
if err != nil {
71+
logger.Error("Failed to initialize statsd exporter.", tag.Error(err))
72+
return nil, err
73+
}
74+
// Create a PeriodicReader with the StatsD exporter
75+
statsdReader := sdkmetrics.NewPeriodicReader(statsdExp)
76+
readers = append(readers, statsdReader)
4877
}
49-
exporter, err := exporters.New(exporterOpts...)
50-
if err != nil {
51-
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
52-
return nil, err
78+
79+
// If no exporters are configured, log a warning
80+
if len(readers) == 0 {
81+
logger.Warn("No metric exporters configured (neither Prometheus nor StatsD)")
5382
}
5483

5584
var views []sdkmetrics.View
@@ -66,16 +95,20 @@ func NewOpenTelemetryProvider(
6695
},
6796
))
6897
}
69-
provider := sdkmetrics.NewMeterProvider(
70-
sdkmetrics.WithReader(exporter),
71-
sdkmetrics.WithView(views...),
72-
)
73-
metricServer := initPrometheusListener(prometheusConfig, reg, logger, fatalOnListenerError)
98+
99+
meterProviderOpts := []sdkmetrics.Option{sdkmetrics.WithView(views...)}
100+
for _, reader := range readers {
101+
meterProviderOpts = append(meterProviderOpts, sdkmetrics.WithReader(reader))
102+
}
103+
104+
provider := sdkmetrics.NewMeterProvider(meterProviderOpts...)
74105
meter := provider.Meter("temporal")
106+
75107
reporter := &openTelemetryProviderImpl{
76-
meter: meter,
77-
config: prometheusConfig,
78-
server: metricServer,
108+
meter: meter,
109+
config: prometheusConfig,
110+
server: metricServer,
111+
statsdExporter: statsdExp,
79112
}
80113

81114
return reporter, nil
@@ -124,9 +157,21 @@ func (r *openTelemetryProviderImpl) GetMeter() metric.Meter {
124157
}
125158

126159
func (r *openTelemetryProviderImpl) Stop(logger log.Logger) {
127-
ctx, closeCtx := context.WithTimeout(context.Background(), time.Second)
128-
defer closeCtx()
129-
if err := r.server.Shutdown(ctx); !(err == nil || err == http.ErrServerClosed) {
130-
logger.Error("Prometheus metrics server shutdown failure.", tag.Address(r.config.ListenAddress), tag.Error(err))
160+
// Shutdown Prometheus server if it exists
161+
if r.server != nil {
162+
ctx, closeCtx := context.WithTimeout(context.Background(), time.Second)
163+
defer closeCtx()
164+
if err := r.server.Shutdown(ctx); !(err == nil || err == http.ErrServerClosed) {
165+
logger.Error("Prometheus metrics server shutdown failure.", tag.Address(r.config.ListenAddress), tag.Error(err))
166+
}
167+
}
168+
169+
// Shutdown StatsD exporter if it exists
170+
if r.statsdExporter != nil {
171+
ctx, closeCtx := context.WithTimeout(context.Background(), time.Second)
172+
defer closeCtx()
173+
if err := r.statsdExporter.Shutdown(ctx); err != nil {
174+
logger.Error("StatsD exporter shutdown failure.", tag.Error(err))
175+
}
131176
}
132177
}

0 commit comments

Comments
 (0)