Skip to content

Commit f5c6ca1

Browse files
authored
Batch Mutable State Metrics (#6655)
## What changed? Extend metrics.Handler interface to include a batching feature which enables implementations to send "wide events". Batch the mutable state metrics. ## Why? Send a single metrics "event" where possible rather than several. ## How did you test it? Unit tests. ## Potential risks None expected. ## Documentation Nothing in `docs/` to update. Release notes to come. ## Is hotfix candidate? No
1 parent f08c9fa commit f5c6ca1

File tree

8 files changed

+248
-45
lines changed

8 files changed

+248
-45
lines changed

common/metrics/metrics.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
package metrics
2828

2929
import (
30+
"io"
3031
"time"
3132

3233
"go.temporal.io/server/common/log"
@@ -58,6 +59,15 @@ type (
5859
Histogram(string, MetricUnit) HistogramIface
5960

6061
Stop(log.Logger)
62+
63+
// StartBatch returns a BatchHandler that can emit a series of metrics as a single "wide event".
64+
// If wide events aren't supported in the underlying implementation, metrics can still be sent individually.
65+
StartBatch(string) BatchHandler
66+
}
67+
68+
BatchHandler interface {
69+
Handler
70+
io.Closer
6171
}
6272

6373
// CounterIface is an ever-increasing counter.
@@ -66,7 +76,7 @@ type (
6676
// Tags provided are merged with the source MetricsHandler
6777
Record(int64, ...Tag)
6878
}
69-
// GaugeIface can be set to any float and repesents a latest value instrument.
79+
// GaugeIface can be set to any float and represents a latest value instrument.
7080
GaugeIface interface {
7181
// Record updates the gauge value.
7282
// Tags provided are merged with the source MetricsHandler

common/metrics/metrics_mock.go

Lines changed: 151 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/metrics/metricstest/capture_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,13 @@ func (c *CaptureHandler) Histogram(name string, unit metrics.MetricUnit) metrics
142142
return metrics.HistogramFunc(func(v int64, tags ...metrics.Tag) { c.record(name, v, unit, tags...) })
143143
}
144144

145+
func (c *CaptureHandler) Close() error {
146+
return nil
147+
}
148+
149+
func (c *CaptureHandler) StartBatch(_ string) metrics.BatchHandler {
150+
return c
151+
}
152+
145153
// Stop implements [metrics.Handler.Stop].
146154
func (*CaptureHandler) Stop(log.Logger) {}

common/metrics/noop_impl.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,14 @@ func (*noopMetricsHandler) Histogram(string, MetricUnit) HistogramIface {
6868

6969
func (*noopMetricsHandler) Stop(log.Logger) {}
7070

71+
func (*noopMetricsHandler) Close() error {
72+
return nil
73+
}
74+
75+
func (n *noopMetricsHandler) StartBatch(_ string) BatchHandler {
76+
return n
77+
}
78+
7179
var NoopCounterMetricFunc = CounterFunc(func(i int64, t ...Tag) {})
7280
var NoopGaugeMetricFunc = GaugeFunc(func(f float64, t ...Tag) {})
7381
var NoopTimerMetricFunc = TimerFunc(func(d time.Duration, t ...Tag) {})

common/metrics/otel_metrics_handler.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,14 @@ func (omp *otelMetricsHandler) Stop(l log.Logger) {
204204
omp.provider.Stop(l)
205205
}
206206

207+
func (omp *otelMetricsHandler) Close() error {
208+
return nil
209+
}
210+
211+
func (omp *otelMetricsHandler) StartBatch(_ string) BatchHandler {
212+
return omp
213+
}
214+
207215
// makeSet returns an otel attribute.Set with the given tags merged with the
208216
// otelMetricsHandler's tags.
209217
func (omp *otelMetricsHandler) makeSet(tags []Tag) attribute.Set {

common/metrics/tally_metrics_handler.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,60 +66,68 @@ func NewTallyMetricsHandler(cfg ClientConfig, scope tally.Scope) *tallyMetricsHa
6666

6767
// WithTags creates a new MetricProvder with provided []Tag
6868
// Tags are merged with registered Tags from the source MetricsHandler
69-
func (tmp *tallyMetricsHandler) WithTags(tags ...Tag) Handler {
69+
func (tmh *tallyMetricsHandler) WithTags(tags ...Tag) Handler {
7070
return &tallyMetricsHandler{
71-
scope: tmp.scope.Tagged(tagsToMap(tags, tmp.excludeTags)),
72-
perUnitBuckets: tmp.perUnitBuckets,
73-
excludeTags: tmp.excludeTags,
71+
scope: tmh.scope.Tagged(tagsToMap(tags, tmh.excludeTags)),
72+
perUnitBuckets: tmh.perUnitBuckets,
73+
excludeTags: tmh.excludeTags,
7474
}
7575
}
7676

7777
// Counter obtains a counter for the given name.
78-
func (tmp *tallyMetricsHandler) Counter(counter string) CounterIface {
78+
func (tmh *tallyMetricsHandler) Counter(counter string) CounterIface {
7979
return CounterFunc(func(i int64, t ...Tag) {
80-
scope := tmp.scope
80+
scope := tmh.scope
8181
if len(t) > 0 {
82-
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
82+
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
8383
}
8484
scope.Counter(counter).Inc(i)
8585
})
8686
}
8787

8888
// Gauge obtains a gauge for the given name.
89-
func (tmp *tallyMetricsHandler) Gauge(gauge string) GaugeIface {
89+
func (tmh *tallyMetricsHandler) Gauge(gauge string) GaugeIface {
9090
return GaugeFunc(func(f float64, t ...Tag) {
91-
scope := tmp.scope
91+
scope := tmh.scope
9292
if len(t) > 0 {
93-
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
93+
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
9494
}
9595
scope.Gauge(gauge).Update(f)
9696
})
9797
}
9898

9999
// Timer obtains a timer for the given name.
100-
func (tmp *tallyMetricsHandler) Timer(timer string) TimerIface {
100+
func (tmh *tallyMetricsHandler) Timer(timer string) TimerIface {
101101
return TimerFunc(func(d time.Duration, t ...Tag) {
102-
scope := tmp.scope
102+
scope := tmh.scope
103103
if len(t) > 0 {
104-
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
104+
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
105105
}
106106
scope.Timer(timer).Record(d)
107107
})
108108
}
109109

110110
// Histogram obtains a histogram for the given name.
111-
func (tmp *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
111+
func (tmh *tallyMetricsHandler) Histogram(histogram string, unit MetricUnit) HistogramIface {
112112
return HistogramFunc(func(i int64, t ...Tag) {
113-
scope := tmp.scope
113+
scope := tmh.scope
114114
if len(t) > 0 {
115-
scope = tmp.scope.Tagged(tagsToMap(t, tmp.excludeTags))
115+
scope = tmh.scope.Tagged(tagsToMap(t, tmh.excludeTags))
116116
}
117-
scope.Histogram(histogram, tmp.perUnitBuckets[unit]).RecordValue(float64(i))
117+
scope.Histogram(histogram, tmh.perUnitBuckets[unit]).RecordValue(float64(i))
118118
})
119119
}
120120

121121
func (*tallyMetricsHandler) Stop(log.Logger) {}
122122

123+
func (*tallyMetricsHandler) Close() error {
124+
return nil
125+
}
126+
127+
func (tmh *tallyMetricsHandler) StartBatch(_ string) BatchHandler {
128+
return tmh
129+
}
130+
123131
func tagsToMap(t1 []Tag, e excludeTags) map[string]string {
124132
if len(t1) == 0 {
125133
return nil

service/history/historybuilder/history_builder_categorization_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ func (h StubHandler) Histogram(_ string, _ metrics.MetricUnit) metrics.Histogram
6666

6767
func (h StubHandler) Stop(_ log.Logger) {}
6868

69+
func (h StubHandler) Close() error {
70+
return nil
71+
}
72+
73+
func (h StubHandler) StartBatch(_ string) metrics.BatchHandler {
74+
return h
75+
}
76+
6977
func TestHistoryBuilder_IsDirty(t *testing.T) {
7078
hb := HistoryBuilder{EventStore: EventStore{}}
7179
if hb.IsDirty() {

0 commit comments

Comments
 (0)