Skip to content

Commit 51cab70

Browse files
committed
Allow for enabling/disabling aggregator without restart
1 parent 812acb9 commit 51cab70

File tree

2 files changed

+22
-25
lines changed

2 files changed

+22
-25
lines changed

common/rpc/interceptor/health_check.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"go.temporal.io/api/serviceerror"
88
"go.temporal.io/server/common"
99
"go.temporal.io/server/common/aggregate"
10+
"go.temporal.io/server/common/dynamicconfig"
1011
"go.temporal.io/server/common/log"
1112
"google.golang.org/grpc"
1213
)
@@ -51,6 +52,8 @@ type (
5152
HealthSignalAggregatorImpl struct {
5253
status int32
5354

55+
aggregatorEnabled dynamicconfig.BoolPropertyFn
56+
5457
latencyAverage aggregate.MovingWindowAverage
5558
errorRatio aggregate.MovingWindowAverage
5659

@@ -62,19 +65,25 @@ type (
6265

6366
// NewHealthSignalAggregator creates a new instance of HealthSignalAggregatorImpl
6467
func NewHealthSignalAggregator(
68+
logger log.Logger,
69+
aggregatorEnabled dynamicconfig.BoolPropertyFn,
6570
windowSize time.Duration,
6671
maxBufferSize int,
67-
logger log.Logger,
6872
) *HealthSignalAggregatorImpl {
6973
ret := &HealthSignalAggregatorImpl{
70-
latencyAverage: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
71-
errorRatio: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
72-
logger: logger,
74+
logger: logger,
75+
aggregatorEnabled: aggregatorEnabled,
76+
latencyAverage: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
77+
errorRatio: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
7378
}
7479
return ret
7580
}
7681

7782
func (s *HealthSignalAggregatorImpl) Record(latency time.Duration, err error) {
83+
if !s.aggregatorEnabled() {
84+
s.logger.Debug("health signal aggregator is disabled")
85+
return
86+
}
7887
s.latencyAverage.Record(latency.Milliseconds())
7988

8089
if isUnhealthyError(err) {
@@ -85,10 +94,16 @@ func (s *HealthSignalAggregatorImpl) Record(latency time.Duration, err error) {
8594
}
8695

8796
func (s *HealthSignalAggregatorImpl) AverageLatency() float64 {
97+
if !s.aggregatorEnabled() {
98+
s.logger.Debug("health signal aggregator is disabled")
99+
}
88100
return s.latencyAverage.Average()
89101
}
90102

91103
func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 {
104+
if !s.aggregatorEnabled() {
105+
s.logger.Debug("health signal aggregator is disabled")
106+
}
92107
return s.errorRatio.Average()
93108
}
94109

@@ -111,17 +126,3 @@ func isUnhealthyError(err error) bool {
111126
}
112127
return false
113128
}
114-
115-
var NoopHealthSignalAggregator HealthSignalAggregator = newNoopSignalAggregator()
116-
117-
func newNoopSignalAggregator() *NoopSignalAggregator { return &NoopSignalAggregator{} }
118-
119-
func (a *NoopSignalAggregator) Record(_ time.Duration, _ error) {}
120-
121-
func (a *NoopSignalAggregator) AverageLatency() float64 {
122-
return 0
123-
}
124-
125-
func (*NoopSignalAggregator) ErrorRatio() float64 {
126-
return 0
127-
}

service/history/fx.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -179,23 +179,19 @@ func HealthSignalAggregatorProvider(
179179
logger log.ThrottledLogger,
180180
) interceptor.HealthSignalAggregator {
181181
return interceptor.NewHealthSignalAggregator(
182+
logger,
183+
dynamicconfig.HistoryHealthSignalMetricsEnabled.Get(dynamicCollection),
182184
dynamicconfig.PersistenceHealthSignalWindowSize.Get(dynamicCollection)(),
183185
dynamicconfig.PersistenceHealthSignalBufferSize.Get(dynamicCollection)(),
184-
logger,
185186
)
186187
}
187188

188189
func HealthCheckInterceptorProvider(
189190
dynamicCollection *dynamicconfig.Collection,
190191
healthSignalAggregator *interceptor.HealthSignalAggregatorImpl,
191192
) *interceptor.HealthCheckInterceptor {
192-
if dynamicconfig.HistoryHealthSignalMetricsEnabled.Get(dynamicCollection)() {
193-
return interceptor.NewHealthCheckInterceptor(
194-
healthSignalAggregator,
195-
)
196-
}
197193
return interceptor.NewHealthCheckInterceptor(
198-
interceptor.NoopHealthSignalAggregator,
194+
healthSignalAggregator,
199195
)
200196
}
201197
func RateLimitInterceptorProvider(

0 commit comments

Comments
 (0)