Skip to content

Commit 7658b4b

Browse files
committed
Allow for DeepHealthCheck handler to check data from interceptor
1 parent dec2e4d commit 7658b4b

File tree

3 files changed

+48
-11
lines changed

3 files changed

+48
-11
lines changed

common/rpc/interceptor/health_check.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type (
5757
logger log.Logger
5858
}
5959

60-
noopSignalAggregator struct{}
60+
NoopSignalAggregator struct{}
6161
)
6262

6363
// NewHealthSignalAggregatorImpl creates a new instance of HealthSignalAggregatorImpl
@@ -114,14 +114,14 @@ func isUnhealthyError(err error) bool {
114114

115115
var NoopHealthSignalAggregator HealthSignalAggregator = newNoopSignalAggregator()
116116

117-
func newNoopSignalAggregator() *noopSignalAggregator { return &noopSignalAggregator{} }
117+
func newNoopSignalAggregator() *NoopSignalAggregator { return &NoopSignalAggregator{} }
118118

119-
func (a *noopSignalAggregator) Record(_ time.Duration, _ error) {}
119+
func (a *NoopSignalAggregator) Record(_ time.Duration, _ error) {}
120120

121-
func (a *noopSignalAggregator) AverageLatency() float64 {
121+
func (a *NoopSignalAggregator) AverageLatency() float64 {
122122
return 0
123123
}
124124

125-
func (*noopSignalAggregator) ErrorRatio() float64 {
125+
func (*NoopSignalAggregator) ErrorRatio() float64 {
126126
return 0
127127
}

service/history/fx.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var Module = fx.Options(
5858
fx.Provide(RetryableInterceptorProvider),
5959
fx.Provide(TelemetryInterceptorProvider),
6060
fx.Provide(RateLimitInterceptorProvider),
61+
fx.Provide(HealthSignalAggregatorProvider),
6162
fx.Provide(HealthCheckInterceptorProvider),
6263
fx.Provide(service.GrpcServerOptionsProvider),
6364
fx.Provide(ESProcessorConfigProvider),
@@ -101,6 +102,7 @@ func HandlerProvider(args NewHandlerArgs) *Handler {
101102
persistenceVisibilityManager: args.PersistenceVisibilityManager,
102103
persistenceHealthSignal: args.PersistenceHealthSignal,
103104
healthServer: args.HealthServer,
105+
historyHealthSignal: args.HistoryHealthSignal,
104106
historyServiceResolver: args.HistoryServiceResolver,
105107
metricsHandler: args.MetricsHandler,
106108
payloadSerializer: args.PayloadSerializer,
@@ -171,18 +173,26 @@ func TelemetryInterceptorProvider(
171173
)
172174
}
173175

174-
func HealthCheckInterceptorProvider(
176+
func HealthSignalAggregatorProvider(
175177
dynamicCollection *dynamicconfig.Collection,
176178
metricsHandler metrics.Handler,
177179
logger log.ThrottledLogger,
180+
) *interceptor.HealthSignalAggregatorImpl {
181+
return interceptor.NewHealthSignalAggregatorImpl(
182+
dynamicconfig.PersistenceHealthSignalWindowSize.Get(dynamicCollection)(),
183+
dynamicconfig.PersistenceHealthSignalBufferSize.Get(dynamicCollection)(),
184+
logger,
185+
)
186+
}
187+
188+
func HealthCheckInterceptorProvider(
189+
dynamicCollection *dynamicconfig.Collection,
190+
healthSignalAggregator *interceptor.HealthSignalAggregatorImpl,
178191
) *interceptor.HealthCheckInterceptor {
179192
if dynamicconfig.HistoryHealthSignalMetricsEnabled.Get(dynamicCollection)() {
180193
return interceptor.NewHealthCheckInterceptor(
181-
interceptor.NewHealthSignalAggregatorImpl(
182-
dynamicconfig.PersistenceHealthSignalWindowSize.Get(dynamicCollection)(),
183-
dynamicconfig.PersistenceHealthSignalBufferSize.Get(dynamicCollection)(),
184-
logger,
185-
))
194+
healthSignalAggregator,
195+
)
186196
}
187197
return interceptor.NewHealthCheckInterceptor(
188198
interceptor.NoopHealthSignalAggregator,

service/history/handler.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"go.temporal.io/server/common/persistence/serialization"
3838
"go.temporal.io/server/common/persistence/visibility/manager"
3939
"go.temporal.io/server/common/primitives/timestamp"
40+
"go.temporal.io/server/common/rpc/interceptor"
4041
"go.temporal.io/server/common/searchattribute"
4142
serviceerrors "go.temporal.io/server/common/serviceerror"
4243
"go.temporal.io/server/common/tasktoken"
@@ -77,6 +78,7 @@ type (
7778
persistenceVisibilityManager manager.VisibilityManager
7879
persistenceHealthSignal persistence.HealthSignalAggregator
7980
healthServer *health.Server
81+
historyHealthSignal interceptor.HealthSignalAggregator
8082
historyServiceResolver membership.ServiceResolver
8183
metricsHandler metrics.Handler
8284
payloadSerializer serialization.Serializer
@@ -106,6 +108,7 @@ type (
106108
PersistenceExecutionManager persistence.ExecutionManager
107109
PersistenceShardManager persistence.ShardManager
108110
PersistenceHealthSignal persistence.HealthSignalAggregator
111+
HistoryHealthSignal interceptor.HealthSignalAggregator
109112
HealthServer *health.Server
110113
PersistenceVisibilityManager manager.VisibilityManager
111114
HistoryServiceResolver membership.ServiceResolver
@@ -206,6 +209,11 @@ func (h *Handler) DeepHealthCheck(
206209
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_DECLINED_SERVING}, nil
207210
}
208211

212+
rsp := h.checkHistoryHealthSignals()
213+
if rsp != nil {
214+
return rsp, nil
215+
}
216+
209217
latency := h.persistenceHealthSignal.AverageLatency()
210218
errRatio := h.persistenceHealthSignal.ErrorRatio()
211219

@@ -217,6 +225,25 @@ func (h *Handler) DeepHealthCheck(
217225
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_SERVING}, nil
218226
}
219227

228+
// checkHistoryHealthSignal checks the history health signal that is captured by the interceptor.
229+
func (h *Handler) checkHistoryHealthSignals() *historyservice.DeepHealthCheckResponse {
230+
// Check that the RPC latency doesn't exceed the threshold.
231+
if _, ok := h.historyHealthSignal.(*interceptor.NoopSignalAggregator); ok {
232+
h.logger.Warn("health signal aggregator is using noop implementation")
233+
}
234+
if h.historyHealthSignal.AverageLatency() > h.config.HealthRPCLatencyFailure() {
235+
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_NOT_SERVING))
236+
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING}
237+
}
238+
239+
// Check if the RPC error ratio exceeds the threshold
240+
if h.historyHealthSignal.ErrorRatio() > h.config.HealthRPCErrorRatio() {
241+
metrics.HistoryHostHealthGauge.With(h.metricsHandler).Record(float64(enumsspb.HEALTH_STATE_NOT_SERVING))
242+
return &historyservice.DeepHealthCheckResponse{State: enumsspb.HEALTH_STATE_NOT_SERVING}
243+
}
244+
return nil
245+
}
246+
220247
// IsWorkflowTaskValid - whether workflow task is still valid
221248
func (h *Handler) IsWorkflowTaskValid(ctx context.Context, request *historyservice.IsWorkflowTaskValidRequest) (_ *historyservice.IsWorkflowTaskValidResponse, retError error) {
222249
defer log.CapturePanic(h.logger, &retError)

0 commit comments

Comments
 (0)