Skip to content

Commit c80a12c

Browse files
committed
Cancel History long polls before deadline
1 parent 3b1b8d0 commit c80a12c

File tree

14 files changed

+373
-139
lines changed

14 files changed

+373
-139
lines changed

common/contextutil/timeout.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package contextutil
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// WithDeadlineBuffer returns a child context whose deadline is capped by both:
9+
// - the parent's deadline minus `buffer`
10+
// - or, the given `maxTimeout` duration from now
11+
func WithDeadlineBuffer(
12+
parent context.Context,
13+
maxTimeout time.Duration,
14+
buffer time.Duration,
15+
) (context.Context, context.CancelFunc) {
16+
noop := func() {}
17+
18+
// If the parent is already done, don't bother creating a child.
19+
if parent.Err() != nil {
20+
return parent, noop
21+
}
22+
23+
deadline, hasDeadline := parent.Deadline()
24+
if !hasDeadline {
25+
if maxTimeout == 0 {
26+
return parent, noop
27+
}
28+
return context.WithTimeout(parent, maxTimeout)
29+
}
30+
31+
// Inherit the parent's deadline if caller doesn't set a cap.
32+
if maxTimeout == 0 {
33+
return parent, noop
34+
}
35+
36+
remaining := time.Until(deadline) - buffer
37+
if remaining <= 0 {
38+
return parent, noop
39+
}
40+
if remaining < maxTimeout {
41+
maxTimeout = remaining
42+
}
43+
return context.WithTimeout(parent, maxTimeout)
44+
}

common/metrics/grpc_test.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/suite"
99
metricsspb "go.temporal.io/server/api/metrics/v1"
1010
"go.temporal.io/server/common/log"
11+
"go.temporal.io/server/common/testing/rpctest"
1112
"go.uber.org/mock/gomock"
1213
"google.golang.org/grpc"
1314
"google.golang.org/grpc/metadata"
@@ -36,7 +37,7 @@ func (s *grpcSuite) TearDownTest() {}
3637
func (s *grpcSuite) TestMetadataMetricInjection() {
3738
logger := log.NewMockLogger(s.controller)
3839
ctx := context.Background()
39-
ssts := newMockServerTransportStream()
40+
ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjection")
4041
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)
4142
anyMetricName := "any_metric_name"
4243

@@ -73,8 +74,9 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
7374
)
7475

7576
s.Nil(err)
76-
s.Equal(len(ssts.trailers), 1)
77-
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
77+
trailers := ssts.CapturedTrailers()
78+
s.Equal(1, len(trailers))
79+
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
7880
s.NotNil(propagationContextBlobs)
7981
s.Equal(1, len(propagationContextBlobs))
8082
baggage := &metricsspb.Baggage{}
@@ -93,7 +95,7 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
9395
func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
9496
logger := log.NewMockLogger(s.controller)
9597
ctx := context.Background()
96-
ssts := newMockServerTransportStream()
98+
ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjectionNoMetric")
9799
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)
98100

99101
smcii := NewServerMetricsContextInjectorInterceptor()
@@ -128,8 +130,9 @@ func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
128130
)
129131

130132
s.Nil(err)
131-
s.Equal(len(ssts.trailers), 1)
132-
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
133+
trailers := ssts.CapturedTrailers()
134+
s.Equal(1, len(trailers))
135+
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
133136
s.NotNil(propagationContextBlobs)
134137
s.Equal(1, len(propagationContextBlobs))
135138
baggage := &metricsspb.Baggage{}
@@ -162,26 +165,3 @@ func (s *grpcSuite) TestContextCounterAddNoMetricsContext() {
162165
testCounterName := "test_counter"
163166
ContextCounterAdd(context.Background(), testCounterName, 3)
164167
}
165-
166-
func newMockServerTransportStream() *mockServerTransportStream {
167-
return &mockServerTransportStream{trailers: []*metadata.MD{}}
168-
}
169-
170-
type mockServerTransportStream struct {
171-
trailers []*metadata.MD
172-
}
173-
174-
func (s *mockServerTransportStream) Method() string {
175-
return "mockssts"
176-
}
177-
func (s *mockServerTransportStream) SetHeader(md metadata.MD) error {
178-
return nil
179-
}
180-
func (s *mockServerTransportStream) SendHeader(md metadata.MD) error {
181-
return nil
182-
}
183-
func (s *mockServerTransportStream) SetTrailer(md metadata.MD) error {
184-
mdCopy := md.Copy()
185-
s.trailers = append(s.trailers, &mdCopy)
186-
return nil
187-
}

common/rpc/grpc.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,17 @@ package rpc
33
import (
44
"context"
55
"crypto/tls"
6-
"errors"
76
"time"
87

9-
"go.temporal.io/api/serviceerror"
108
"go.temporal.io/server/common/headers"
119
"go.temporal.io/server/common/log"
12-
"go.temporal.io/server/common/log/tag"
1310
"go.temporal.io/server/common/metrics"
1411
"go.temporal.io/server/common/rpc/interceptor"
1512
serviceerrors "go.temporal.io/server/common/serviceerror"
1613
"google.golang.org/grpc"
1714
"google.golang.org/grpc/backoff"
1815
"google.golang.org/grpc/credentials"
1916
"google.golang.org/grpc/credentials/insecure"
20-
"google.golang.org/grpc/metadata"
2117
"google.golang.org/grpc/status"
2218
)
2319

@@ -44,14 +40,6 @@ const (
4440

4541
// maxInternodeRecvPayloadSize indicates the internode max receive payload size.
4642
maxInternodeRecvPayloadSize = 128 * 1024 * 1024 // 128 Mb
47-
48-
// ResourceExhaustedCauseHeader will be added to rpc response if request returns ResourceExhausted error.
49-
// Value of this header will be ResourceExhaustedCause.
50-
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"
51-
52-
// ResourceExhaustedScopeHeader will be added to rpc response if request returns ResourceExhausted error.
53-
// Value of this header will be the scope of exhausted resource.
54-
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
5543
)
5644

5745
// Dial creates a client connection to the given target with default options.
@@ -121,46 +109,3 @@ func headersInterceptor(
121109
ctx = headers.Propagate(ctx)
122110
return invoker(ctx, method, req, reply, cc, opts...)
123111
}
124-
125-
func NewFrontendServiceErrorInterceptor(
126-
logger log.Logger,
127-
) grpc.UnaryServerInterceptor {
128-
return func(
129-
ctx context.Context,
130-
req interface{},
131-
_ *grpc.UnaryServerInfo,
132-
handler grpc.UnaryHandler,
133-
) (interface{}, error) {
134-
135-
resp, err := handler(ctx, req)
136-
137-
if err == nil {
138-
return resp, err
139-
}
140-
141-
// mask some internal service errors at frontend
142-
switch err.(type) {
143-
case *serviceerrors.ShardOwnershipLost:
144-
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
145-
case *serviceerror.DataLoss:
146-
err = serviceerror.NewUnavailable("internal history service error")
147-
}
148-
149-
addHeadersForResourceExhausted(ctx, logger, err)
150-
151-
return resp, err
152-
}
153-
}
154-
155-
func addHeadersForResourceExhausted(ctx context.Context, logger log.Logger, err error) {
156-
var reErr *serviceerror.ResourceExhausted
157-
if errors.As(err, &reErr) {
158-
headerErr := grpc.SetHeader(ctx, metadata.Pairs(
159-
ResourceExhaustedCauseHeader, reErr.Cause.String(),
160-
ResourceExhaustedScopeHeader, reErr.Scope.String(),
161-
))
162-
if headerErr != nil {
163-
logger.Error("Failed to add Resource-Exhausted headers to response", tag.Error(headerErr))
164-
}
165-
}
166-
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package interceptor
2+
3+
import (
4+
"context"
5+
6+
"go.temporal.io/api/serviceerror"
7+
"go.temporal.io/server/common/api"
8+
"go.temporal.io/server/common/log"
9+
"go.temporal.io/server/common/log/tag"
10+
serviceerrors "go.temporal.io/server/common/serviceerror"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/metadata"
13+
)
14+
15+
const (
16+
// ResourceExhaustedCauseHeader is added to rpc response if request returns ResourceExhausted error.
17+
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"
18+
19+
// ResourceExhaustedScopeHeader is added to rpc response if request returns ResourceExhausted error.
20+
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
21+
)
22+
23+
// NewFrontendServiceErrorInterceptor returns a gRPC interceptor that has two responsibilities:
24+
// 1. Mask certain internal service error details.
25+
// 2. Propagate resource exhaustion details via gRPC headers.
26+
func NewFrontendServiceErrorInterceptor(
27+
logger log.Logger,
28+
) grpc.UnaryServerInterceptor {
29+
return func(
30+
ctx context.Context,
31+
req interface{},
32+
info *grpc.UnaryServerInfo,
33+
handler grpc.UnaryHandler,
34+
) (interface{}, error) {
35+
resp, err := handler(ctx, req)
36+
if err == nil {
37+
return resp, nil
38+
}
39+
40+
switch serviceErr := err.(type) {
41+
case *serviceerrors.ShardOwnershipLost:
42+
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
43+
case *serviceerror.DataLoss:
44+
err = serviceerror.NewUnavailable("internal history service error")
45+
case *serviceerror.ResourceExhausted:
46+
if headerErr := grpc.SetHeader(ctx, metadata.Pairs(
47+
ResourceExhaustedCauseHeader, serviceErr.Cause.String(),
48+
ResourceExhaustedScopeHeader, serviceErr.Scope.String(),
49+
)); headerErr != nil {
50+
// So while this is *not* a user-facing error or problem in itself,
51+
// it indicates that there might be larger connection issues at play.
52+
logger.Error("Failed to add Resource-Exhausted headers to response",
53+
tag.Operation(api.MethodName(info.FullMethod)),
54+
tag.Error(headerErr))
55+
}
56+
}
57+
58+
return resp, err
59+
}
60+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package interceptor
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
enumspb "go.temporal.io/api/enums/v1"
11+
"go.temporal.io/api/serviceerror"
12+
"go.temporal.io/server/common/log/tag"
13+
serviceerrors "go.temporal.io/server/common/serviceerror"
14+
"go.temporal.io/server/common/testing/rpctest"
15+
"go.temporal.io/server/common/testing/testlogger"
16+
"google.golang.org/grpc"
17+
"google.golang.org/grpc/metadata"
18+
)
19+
20+
func TestFrontendServiceErrorInterceptor(t *testing.T) {
21+
tests := []struct {
22+
name string
23+
handlerErr error
24+
configureStream func(s *rpctest.MockServerTransportStream)
25+
verifyFn func(t *testing.T, err error, stream *rpctest.MockServerTransportStream)
26+
expectLogErr string
27+
}{
28+
{
29+
name: "Passthrough",
30+
handlerErr: nil,
31+
verifyFn: func(t *testing.T, err error, _ *rpctest.MockServerTransportStream) {
32+
require.NoError(t, err)
33+
},
34+
},
35+
{
36+
name: "Mask ShardOwnershipLost",
37+
handlerErr: serviceerrors.NewShardOwnershipLost("owner-host", "current-host"),
38+
verifyFn: func(t *testing.T, err error, _ *rpctest.MockServerTransportStream) {
39+
require.Error(t, err)
40+
41+
var unavail *serviceerror.Unavailable
42+
require.ErrorAs(t, err, &unavail)
43+
assert.Contains(t, unavail.Error(), "shard unavailable")
44+
},
45+
},
46+
{
47+
name: "Mask DataLoss",
48+
handlerErr: serviceerror.NewDataLoss("..."),
49+
verifyFn: func(t *testing.T, err error, _ *rpctest.MockServerTransportStream) {
50+
require.Error(t, err)
51+
52+
var unavail *serviceerror.Unavailable
53+
require.ErrorAs(t, err, &unavail)
54+
assert.Equal(t, "internal history service error", unavail.Error())
55+
},
56+
},
57+
{
58+
name: "Set ResourceExhaustedHeaders",
59+
handlerErr: &serviceerror.ResourceExhausted{
60+
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT,
61+
Scope: enumspb.RESOURCE_EXHAUSTED_SCOPE_SYSTEM,
62+
},
63+
verifyFn: func(t *testing.T, err error, s *rpctest.MockServerTransportStream) {
64+
require.Error(t, err)
65+
66+
hdr := s.CapturedHeaders()
67+
require.NotNil(t, hdr)
68+
assert.Equal(t, []string{
69+
enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT.String()},
70+
hdr.Get(ResourceExhaustedCauseHeader))
71+
assert.Equal(t, []string{
72+
enumspb.RESOURCE_EXHAUSTED_SCOPE_SYSTEM.String()},
73+
hdr.Get(ResourceExhaustedScopeHeader))
74+
},
75+
},
76+
{
77+
name: "Set ResourceExhaustedHeaders Failure",
78+
handlerErr: serviceerror.NewResourceExhausted(enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, "rate limit exceeded"),
79+
configureStream: func(s *rpctest.MockServerTransportStream) {
80+
s.SetHeaderFunc = func(md metadata.MD) error { return errors.New("injected header failure") }
81+
},
82+
expectLogErr: "Failed to add Resource-Exhausted headers to response",
83+
verifyFn: func(t *testing.T, err error, _ *rpctest.MockServerTransportStream) {
84+
require.Error(t, err)
85+
86+
var re *serviceerror.ResourceExhausted
87+
require.ErrorAs(t, err, &re)
88+
assert.Equal(t, enumspb.RESOURCE_EXHAUSTED_CAUSE_RPS_LIMIT, re.Cause)
89+
},
90+
},
91+
}
92+
93+
for _, tc := range tests {
94+
t.Run(tc.name, func(t *testing.T) {
95+
method := "/test/method"
96+
97+
tl := testlogger.NewTestLogger(t, testlogger.FailOnAnyUnexpectedError)
98+
if tc.expectLogErr != "" {
99+
tl.Expect(testlogger.Error, tc.expectLogErr, tag.Operation("method"))
100+
}
101+
102+
stream := rpctest.NewMockServerTransportStream(method)
103+
if tc.configureStream != nil {
104+
tc.configureStream(stream)
105+
}
106+
ctx := grpc.NewContextWithServerTransportStream(context.Background(), stream)
107+
108+
var interceptorFn grpc.UnaryServerInterceptor
109+
interceptorFn = NewFrontendServiceErrorInterceptor(tl)
110+
info := &grpc.UnaryServerInfo{FullMethod: method}
111+
_, err := interceptorFn(ctx, nil, info,
112+
func(_ context.Context, _ any) (any, error) {
113+
return nil, tc.handlerErr
114+
})
115+
116+
tc.verifyFn(t, err, stream)
117+
})
118+
}
119+
}

0 commit comments

Comments
 (0)