Skip to content

Commit 62bffeb

Browse files
committed
Revert change to history handler, add interceptor
1 parent 4baf023 commit 62bffeb

File tree

4 files changed

+146
-225
lines changed

4 files changed

+146
-225
lines changed
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package interceptor
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.temporal.io/api/serviceerror"
8+
"go.temporal.io/server/common"
9+
"go.temporal.io/server/common/aggregate"
10+
"go.temporal.io/server/common/log"
11+
"google.golang.org/grpc"
12+
)
13+
14+
// HealthCheckInterceptor is a gRPC interceptor that records health metrics
15+
type HealthCheckInterceptor struct {
16+
historyHealthSignalAggregator HealthSignalAggregator
17+
}
18+
19+
// NewHealthCheckInterceptor creates a new health check interceptor
20+
func NewHealthCheckInterceptor(historyHealthSignalAggregator HealthSignalAggregator) *HealthCheckInterceptor {
21+
return &HealthCheckInterceptor{
22+
historyHealthSignalAggregator: historyHealthSignalAggregator,
23+
}
24+
}
25+
26+
// UnaryIntercept implements the gRPC unary interceptor interface
27+
func (h *HealthCheckInterceptor) UnaryIntercept(
28+
ctx context.Context,
29+
req interface{},
30+
info *grpc.UnaryServerInfo,
31+
handler grpc.UnaryHandler,
32+
) (interface{}, error) {
33+
startTime := time.Now()
34+
resp, err := handler(ctx, req)
35+
elapsed := time.Since(startTime)
36+
37+
h.historyHealthSignalAggregator.Record(elapsed, err)
38+
39+
return resp, err
40+
}
41+
42+
type (
43+
// HealthSignalAggregator interface for tracking RPC health signals
44+
HealthSignalAggregator interface {
45+
Record(latency time.Duration, err error)
46+
AverageLatency() float64
47+
ErrorRatio() float64
48+
}
49+
50+
// HealthSignalAggregatorImpl implements HealthSignalAggregator
51+
HealthSignalAggregatorImpl struct {
52+
status int32
53+
54+
latencyAverage aggregate.MovingWindowAverage
55+
errorRatio aggregate.MovingWindowAverage
56+
57+
logger log.Logger
58+
}
59+
60+
noopSignalAggregator struct{}
61+
)
62+
63+
// NewHealthSignalAggregatorImpl creates a new instance of HealthSignalAggregatorImpl
64+
func NewHealthSignalAggregatorImpl(
65+
windowSize time.Duration,
66+
maxBufferSize int,
67+
logger log.Logger,
68+
) *HealthSignalAggregatorImpl {
69+
ret := &HealthSignalAggregatorImpl{
70+
latencyAverage: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
71+
errorRatio: aggregate.NewMovingWindowAvgImpl(windowSize, maxBufferSize),
72+
logger: logger,
73+
}
74+
return ret
75+
}
76+
77+
func (s *HealthSignalAggregatorImpl) Record(latency time.Duration, err error) {
78+
s.latencyAverage.Record(latency.Milliseconds())
79+
80+
if isUnhealthyError(err) {
81+
s.errorRatio.Record(1)
82+
} else {
83+
s.errorRatio.Record(0)
84+
}
85+
}
86+
87+
func (s *HealthSignalAggregatorImpl) AverageLatency() float64 {
88+
return s.latencyAverage.Average()
89+
}
90+
91+
func (s *HealthSignalAggregatorImpl) ErrorRatio() float64 {
92+
return s.errorRatio.Average()
93+
}
94+
95+
func isUnhealthyError(err error) bool {
96+
if err == nil {
97+
return false
98+
}
99+
if common.IsContextCanceledErr(err) {
100+
return true
101+
}
102+
if common.IsContextDeadlineExceededErr(err) {
103+
return true
104+
}
105+
106+
switch err.(type) {
107+
case *serviceerror.Unavailable,
108+
*serviceerror.DeadlineExceeded,
109+
*serviceerror.Canceled:
110+
return true
111+
}
112+
return false
113+
}
114+
115+
var NoopHealthSignalAggregator HealthSignalAggregator = newNoopSignalAggregator()
116+
117+
func newNoopSignalAggregator() *noopSignalAggregator { return &noopSignalAggregator{} }
118+
119+
func (a *noopSignalAggregator) Record(_ time.Duration, _ error) {}
120+
121+
func (a *noopSignalAggregator) AverageLatency() float64 {
122+
return 0
123+
}
124+
125+
func (*noopSignalAggregator) ErrorRatio() float64 {
126+
return 0
127+
}

service/history/fx.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,12 @@ var Module = fx.Options(
5858
fx.Provide(RetryableInterceptorProvider),
5959
fx.Provide(TelemetryInterceptorProvider),
6060
fx.Provide(RateLimitInterceptorProvider),
61+
fx.Provide(HealthCheckInterceptorProvider),
6162
fx.Provide(service.GrpcServerOptionsProvider),
6263
fx.Provide(ESProcessorConfigProvider),
6364
fx.Provide(VisibilityManagerProvider),
6465
fx.Provide(ThrottledLoggerRpsFnProvider),
6566
fx.Provide(PersistenceRateLimitingParamsProvider),
66-
fx.Provide(HealthSignalAggregatorProvider),
6767
service.PersistenceLazyLoadedServiceResolverModule,
6868
fx.Provide(ServiceResolverProvider),
6969
fx.Provide(EventNotifierProvider),
@@ -116,7 +116,6 @@ func HandlerProvider(args NewHandlerArgs) *Handler {
116116
taskQueueManager: args.TaskQueueManager,
117117
taskCategoryRegistry: args.TaskCategoryRegistry,
118118
dlqMetricsEmitter: args.DLQMetricsEmitter,
119-
healthSignalAggregator: args.HealthSignalAggregator,
120119

121120
replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory,
122121
replicationTaskConverterProvider: args.ReplicationTaskConverterFactory,
@@ -172,6 +171,23 @@ func TelemetryInterceptorProvider(
172171
)
173172
}
174173

174+
func HealthCheckInterceptorProvider(
175+
dynamicCollection *dynamicconfig.Collection,
176+
metricsHandler metrics.Handler,
177+
logger log.ThrottledLogger,
178+
) *interceptor.HealthCheckInterceptor {
179+
if dynamicconfig.HistoryHealthSignalMetricsEnabled.Get(dynamicCollection)() {
180+
return interceptor.NewHealthCheckInterceptor(
181+
interceptor.NewHealthSignalAggregatorImpl(
182+
dynamicconfig.PersistenceHealthSignalWindowSize.Get(dynamicCollection)(),
183+
dynamicconfig.PersistenceHealthSignalBufferSize.Get(dynamicCollection)(),
184+
logger,
185+
))
186+
}
187+
return interceptor.NewHealthCheckInterceptor(
188+
interceptor.NoopHealthSignalAggregator,
189+
)
190+
}
175191
func RateLimitInterceptorProvider(
176192
serviceConfig *configs.Config,
177193
) *interceptor.RateLimitInterceptor {
@@ -292,22 +308,3 @@ func ReplicationProgressCacheProvider(
292308
) replication.ProgressCache {
293309
return replication.NewProgressCache(serviceConfig, logger, handler)
294310
}
295-
296-
func HealthSignalAggregatorProvider(
297-
dynamicCollection *dynamicconfig.Collection,
298-
metricsHandler metrics.Handler,
299-
logger log.ThrottledLogger,
300-
) HealthSignalAggregator {
301-
if dynamicconfig.HistoryHealthSignalMetricsEnabled.Get(dynamicCollection)() {
302-
return NewHealthSignalAggregatorImpl(
303-
dynamicconfig.PersistenceHealthSignalAggregationEnabled.Get(dynamicCollection)(),
304-
dynamicconfig.PersistenceHealthSignalWindowSize.Get(dynamicCollection)(),
305-
dynamicconfig.PersistenceHealthSignalBufferSize.Get(dynamicCollection)(),
306-
metricsHandler,
307-
dynamicconfig.ShardRPSWarnLimit.Get(dynamicCollection),
308-
dynamicconfig.ShardPerNsRPSWarnPercent.Get(dynamicCollection),
309-
logger,
310-
)
311-
}
312-
return NoopHealthSignalAggregator
313-
}

service/history/handler.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type (
106106
ThrottledLogger log.ThrottledLogger
107107
PersistenceExecutionManager persistence.ExecutionManager
108108
PersistenceShardManager persistence.ShardManager
109-
PersistenceHealthSignal persistence.historyHealthSignalAggregator
109+
PersistenceHealthSignal persistence.HealthSignalAggregator
110110
HealthServer *health.Server
111111
PersistenceVisibilityManager manager.VisibilityManager
112112
HistoryServiceResolver membership.ServiceResolver
@@ -206,20 +206,6 @@ func (h *Handler) DeepHealthCheck(
206206
if status.Status != healthpb.HealthCheckResponse_SERVING {
207207
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_DECLINED_SERVING}, nil
208208
}
209-
// Check that the RPC latency doesn't exceed the threshold.
210-
if _, ok := h.historyHealthSignalAggregator.(*noopSignalAggregator); ok {
211-
h.logger.Warn("health signal aggregator is using noop implementation")
212-
}
213-
if h.historyHealthSignalAggregator.AverageLatency() > h.config.HealthRPCLatencyFailure() {
214-
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_NOT_SERVING))
215-
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING}, nil
216-
}
217-
218-
// Check if the RPC error ratio exceeds the threshold
219-
if h.historyHealthSignalAggregator.ErrorRatio() > h.config.HealthRPCErrorRatio() {
220-
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_NOT_SERVING))
221-
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING}, nil
222-
}
223209

224210
// Check if the persistence layer is healthy.
225211
latency := h.persistenceHealthSignal.AverageLatency()

0 commit comments

Comments
 (0)