Skip to content

Commit 61b9b65

Browse files
authored
Improve extensibility of optentelemetry metrics (#2521)
1 parent 73f40bf commit 61b9b65

File tree

6 files changed

+160
-79
lines changed

6 files changed

+160
-79
lines changed

common/metrics/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func InitReporterFromPrometheusConfig(logger log.Logger, config *PrometheusConfi
267267
case FrameworkTally:
268268
return NewTallyReporterFromPrometheusConfig(logger, config, clientConfig), nil
269269
case FrameworkOpentelemetry:
270-
return NewOpentelemeteryReporter(logger, config, clientConfig)
270+
return NewOpentelemeteryReporterWithMust(logger, config, clientConfig)
271271
default:
272272
err := fmt.Errorf("unsupported framework type specified in config: %q", config.Framework)
273273
logger.Error(err.Error())

common/metrics/opentelemetry_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type (
4646

4747
// NewOpentelemeteryClientByReporter creates and returns a new instance of Client implementation
4848
// serviceIdx indicates the service type in (InputhostIndex, ... StorageIndex)
49-
func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter OpentelemetryReporter, logger log.Logger, gaugeCache OtelGaugeCache) (Client, error) {
49+
func NewOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter OpentelemetryReporter, logger log.Logger, gaugeCache OtelGaugeCache) (Client, error) {
5050
tagsFilterConfig := NewTagFilteringScopeConfig(clientConfig.ExcludeTags)
5151

5252
scopeWrapper := func(impl internalScope) internalScope {
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// The MIT License
2+
//
3+
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
4+
//
5+
// Copyright (c) 2020 Uber Technologies, Inc.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
package metrics
26+
27+
import (
28+
"context"
29+
"net/http"
30+
"time"
31+
32+
"go.opentelemetry.io/otel/exporters/prometheus"
33+
"go.opentelemetry.io/otel/metric"
34+
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
35+
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
36+
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
37+
"go.opentelemetry.io/otel/sdk/resource"
38+
39+
"go.temporal.io/server/common/log"
40+
"go.temporal.io/server/common/log/tag"
41+
)
42+
43+
var _ OpentelemetryMustProvider = (*opentelemetryMustProviderImpl)(nil)
44+
45+
type (
46+
OpentelemetryMustProvider interface {
47+
Stop(logger log.Logger)
48+
GetMeterMust() metric.MeterMust
49+
}
50+
51+
opentelemetryMustProviderImpl struct {
52+
exporter *prometheus.Exporter
53+
meter metric.Meter
54+
meterMust metric.MeterMust
55+
config *PrometheusConfig
56+
server *http.Server
57+
}
58+
)
59+
60+
func NewOpentelemetryMustProvider(
61+
logger log.Logger,
62+
prometheusConfig *PrometheusConfig,
63+
clientConfig *ClientConfig,
64+
) (*opentelemetryMustProviderImpl, error) {
65+
histogramBoundaries := prometheusConfig.DefaultHistogramBoundaries
66+
if len(histogramBoundaries) == 0 {
67+
histogramBoundaries = defaultHistogramBoundaries
68+
}
69+
70+
c := controller.New(
71+
processor.NewFactory(
72+
NewOtelAggregatorSelector(
73+
histogramBoundaries,
74+
clientConfig.PerUnitHistogramBoundaries,
75+
),
76+
aggregation.CumulativeTemporalitySelector(),
77+
processor.WithMemory(true),
78+
),
79+
controller.WithResource(resource.Empty()),
80+
)
81+
exporter, err := prometheus.New(prometheus.Config{DefaultHistogramBoundaries: histogramBoundaries}, c)
82+
83+
if err != nil {
84+
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
85+
return nil, err
86+
}
87+
88+
metricServer := initPrometheusListener(prometheusConfig, logger, exporter)
89+
90+
meter := c.Meter("temporal")
91+
meterMust := metric.Must(meter)
92+
reporter := &opentelemetryMustProviderImpl{
93+
exporter: exporter,
94+
meter: meter,
95+
meterMust: meterMust,
96+
config: prometheusConfig,
97+
server: metricServer,
98+
}
99+
100+
return reporter, nil
101+
}
102+
103+
func initPrometheusListener(config *PrometheusConfig, logger log.Logger, exporter *prometheus.Exporter) *http.Server {
104+
handlerPath := config.HandlerPath
105+
if handlerPath == "" {
106+
handlerPath = "/metrics"
107+
}
108+
109+
handler := http.NewServeMux()
110+
handler.HandleFunc(handlerPath, exporter.ServeHTTP)
111+
112+
if config.ListenAddress == "" {
113+
logger.Fatal("Listen address must be specified.", tag.Address(config.ListenAddress))
114+
}
115+
server := &http.Server{Addr: config.ListenAddress, Handler: handler}
116+
117+
go func() {
118+
err := server.ListenAndServe()
119+
if err != http.ErrServerClosed {
120+
logger.Fatal("Failed to initialize prometheus listener.", tag.Address(config.ListenAddress))
121+
}
122+
}()
123+
124+
return server
125+
}
126+
127+
func (r *opentelemetryMustProviderImpl) GetMeterMust() metric.MeterMust {
128+
return r.meterMust
129+
}
130+
131+
func (r *opentelemetryMustProviderImpl) Stop(logger log.Logger) {
132+
ctx, closeCtx := context.WithTimeout(context.Background(), time.Second)
133+
defer closeCtx()
134+
if err := r.server.Shutdown(ctx); !(err == nil || err == http.ErrServerClosed) {
135+
logger.Error("Prometheus metrics server shutdown failure.", tag.Address(r.config.ListenAddress), tag.Error(err))
136+
}
137+
}

common/metrics/opentelemetry_reporter.go

Lines changed: 18 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,15 @@
2525
package metrics
2626

2727
import (
28-
"context"
29-
"net/http"
30-
"time"
31-
3228
"go.opentelemetry.io/otel/exporters/prometheus"
3329
"go.opentelemetry.io/otel/metric"
34-
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
35-
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
36-
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
37-
"go.opentelemetry.io/otel/sdk/resource"
3830

3931
"go.temporal.io/server/common/log"
40-
"go.temporal.io/server/common/log/tag"
4132
)
4233

4334
var _ Reporter = (OpentelemetryReporter)(nil)
4435
var _ OpentelemetryReporter = (*opentelemetryReporterImpl)(nil)
36+
var _ OpentelemetryMustProvider = (*opentelemetryMustProviderImpl)(nil)
4537

4638
type (
4739
OpentelemetryReporter interface {
@@ -56,105 +48,57 @@ type (
5648
exporter *prometheus.Exporter
5749
meter metric.Meter
5850
meterMust metric.MeterMust
59-
config *PrometheusConfig
60-
server *http.Server
6151
clientConfig *ClientConfig
6252
gaugeCache OtelGaugeCache
6353
userScope UserScope
54+
mustProvider OpentelemetryMustProvider
6455
}
6556

6657
OpentelemetryListener struct {
6758
}
6859
)
6960

70-
func NewOpentelemeteryReporter(
71-
logger log.Logger,
61+
func NewOpentelemeteryReporterWithMust(
62+
logger log.Logger, // keeping this to maintain API in case of adding more logging later
7263
prometheusConfig *PrometheusConfig,
7364
clientConfig *ClientConfig,
7465
) (*opentelemetryReporterImpl, error) {
75-
histogramBoundaries := prometheusConfig.DefaultHistogramBoundaries
76-
if len(histogramBoundaries) == 0 {
77-
histogramBoundaries = defaultHistogramBoundaries
78-
}
79-
80-
c := controller.New(
81-
processor.NewFactory(
82-
NewOtelAggregatorSelector(
83-
histogramBoundaries,
84-
clientConfig.PerUnitHistogramBoundaries,
85-
),
86-
aggregation.CumulativeTemporalitySelector(),
87-
processor.WithMemory(true),
88-
),
89-
controller.WithResource(resource.Empty()),
90-
)
91-
exporter, err := prometheus.New(
92-
prometheus.Config{DefaultHistogramBoundaries: histogramBoundaries}, c)
93-
66+
mustProvider, err := NewOpentelemetryMustProvider(logger, prometheusConfig, clientConfig)
9467
if err != nil {
95-
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
9668
return nil, err
9769
}
70+
return NewOpentelemeteryReporter(logger, clientConfig, mustProvider)
71+
}
9872

99-
metricServer := initPrometheusListener(prometheusConfig, logger, exporter)
100-
101-
meter := c.Meter("temporal")
102-
meterMust := metric.Must(meter)
73+
func NewOpentelemeteryReporter(
74+
logger log.Logger, // keeping this to maintain API in case of adding more logging later
75+
clientConfig *ClientConfig,
76+
mustProvider OpentelemetryMustProvider,
77+
) (*opentelemetryReporterImpl, error) {
78+
meterMust := mustProvider.GetMeterMust()
10379
gaugeCache := NewOtelGaugeCache(meterMust)
104-
userScope := newOpentelemetryUserScope(meterMust, clientConfig.Tags, gaugeCache)
80+
userScope := NewOpentelemetryUserScope(meterMust, clientConfig.Tags, gaugeCache)
10581
reporter := &opentelemetryReporterImpl{
106-
exporter: exporter,
107-
meter: meter,
108-
meterMust: meterMust,
109-
config: prometheusConfig,
110-
server: metricServer,
11182
clientConfig: clientConfig,
11283
gaugeCache: gaugeCache,
11384
userScope: userScope,
85+
mustProvider: mustProvider,
11486
}
11587

11688
return reporter, nil
11789
}
11890

119-
func initPrometheusListener(config *PrometheusConfig, logger log.Logger, exporter *prometheus.Exporter) *http.Server {
120-
handlerPath := config.HandlerPath
121-
if handlerPath == "" {
122-
handlerPath = "/metrics"
123-
}
124-
125-
handler := http.NewServeMux()
126-
handler.HandleFunc(handlerPath, exporter.ServeHTTP)
127-
128-
if config.ListenAddress == "" {
129-
logger.Fatal("Listen address must be specified.", tag.Address(config.ListenAddress))
130-
}
131-
server := &http.Server{Addr: config.ListenAddress, Handler: handler}
132-
133-
go func() {
134-
err := server.ListenAndServe()
135-
if err != http.ErrServerClosed {
136-
logger.Fatal("Failed to initialize prometheus listener.", tag.Address(config.ListenAddress))
137-
}
138-
}()
139-
140-
return server
141-
}
142-
14391
func (r *opentelemetryReporterImpl) GetMeterMust() metric.MeterMust {
144-
return r.meterMust
92+
return r.mustProvider.GetMeterMust()
14593
}
14694

14795
func (r *opentelemetryReporterImpl) NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error) {
14896

149-
return newOpentelemeteryClient(r.clientConfig, serviceIdx, r, logger, r.gaugeCache)
97+
return NewOpentelemeteryClient(r.clientConfig, serviceIdx, r, logger, r.gaugeCache)
15098
}
15199

152100
func (r *opentelemetryReporterImpl) Stop(logger log.Logger) {
153-
ctx, closeCtx := context.WithTimeout(context.Background(), time.Second)
154-
defer closeCtx()
155-
if err := r.server.Shutdown(ctx); !(err == nil || err == http.ErrServerClosed) {
156-
logger.Error("Prometheus metrics server shutdown failure.", tag.Address(r.config.ListenAddress), tag.Error(err))
157-
}
101+
r.mustProvider.Stop(logger)
158102
}
159103

160104
func (r *opentelemetryReporterImpl) UserScope() UserScope {

common/metrics/opentelemetry_user_scope.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type opentelemetryUserScope struct {
4040
gaugeCache OtelGaugeCache
4141
}
4242

43-
func newOpentelemetryUserScope(
43+
func NewOpentelemetryUserScope(
4444
meterMust metric.MeterMust,
4545
tags map[string]string,
4646
gaugeCache OtelGaugeCache,
@@ -95,5 +95,5 @@ func (o opentelemetryUserScope) Tagged(tags map[string]string) UserScope {
9595
for key, value := range tags {
9696
tagMap[key] = value
9797
}
98-
return newOpentelemetryUserScope(o.meterMust, tagMap, o.gaugeCache)
98+
return NewOpentelemetryUserScope(o.meterMust, tagMap, o.gaugeCache)
9999
}

common/metrics/otel_metric_test_utility.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func NewOtelMetricTestUtility() *OtelMetricTestUtility {
6262
}
6363

6464
func (t *OtelMetricTestUtility) GetClient(config *ClientConfig, idx ServiceIdx) Client {
65-
result, err := newOpentelemeteryClient(config, idx, t.reporter, log.NewNoopLogger(), t.gaugeCache)
65+
result, err := NewOpentelemeteryClient(config, idx, t.reporter, log.NewNoopLogger(), t.gaugeCache)
6666
if err != nil {
6767
panic(err)
6868
}

0 commit comments

Comments
 (0)