Skip to content

Commit fc00929

Browse files
committed
Cancel History long polls before deadline
1 parent 96c3aae commit fc00929

File tree

15 files changed

+413
-131
lines changed

15 files changed

+413
-131
lines changed

.github/.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ linters:
2020
staticcheck:
2121
checks:
2222
- "all"
23+
- "-ST1000" # disable: package comment is missing
2324
godox:
2425
keywords:
2526
- FIXME

common/contextutil/deadline.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package contextutil
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
var noop = func() {}
9+
10+
// WithDeadlineBuffer creates a child context with desired timeout.
11+
// If buffer is non-zero, then child context timeout will be
12+
// the minOf(parentCtx.Deadline()-buffer, maxTimeout). Use this
13+
// method to create child context when childContext cannot use
14+
// all of parent's deadline but instead there is a need to leave
15+
// some time for parent to do some post-work
16+
func WithDeadlineBuffer(
17+
parent context.Context,
18+
timeout time.Duration,
19+
buffer time.Duration,
20+
) (context.Context, context.CancelFunc) {
21+
if parent.Err() != nil {
22+
return parent, noop
23+
}
24+
25+
deadline, hasDeadline := parent.Deadline()
26+
27+
if !hasDeadline {
28+
return context.WithTimeout(parent, timeout)
29+
}
30+
31+
remaining := time.Until(deadline) - buffer
32+
if remaining < timeout {
33+
// Cap the timeout to the remaining time minus buffer.
34+
timeout = max(0, remaining)
35+
}
36+
return context.WithTimeout(parent, timeout)
37+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package contextutil
2+
3+
import (
4+
"context"
5+
"math"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
const testTolerance = 5 * time.Second
13+
14+
func TestWithDeadlineBuffer(t *testing.T) {
15+
const timeout = 10 * time.Minute
16+
const buffer = 1 * time.Minute
17+
start := time.Now()
18+
19+
t.Run("parent is cancelled", func(t *testing.T) {
20+
parent, cancel := context.WithCancel(context.Background())
21+
cancel()
22+
23+
child, _ := WithDeadlineBuffer(parent, timeout, buffer)
24+
require.Equal(t, parent, child)
25+
})
26+
27+
t.Run("parent has no deadline", func(t *testing.T) {
28+
parent := context.Background()
29+
30+
t.Run("timeout specified", func(t *testing.T) {
31+
child, _ := WithDeadlineBuffer(parent, timeout, 0)
32+
dl, _ := child.Deadline()
33+
require.WithinDuration(t, start.Add(timeout), dl, testTolerance)
34+
})
35+
})
36+
37+
t.Run("parent has deadline", func(t *testing.T) {
38+
parent, parentCancel := context.WithTimeout(context.Background(), timeout)
39+
defer parentCancel()
40+
parentDeadline, _ := parent.Deadline()
41+
42+
t.Run("enough buffer left", func(t *testing.T) {
43+
child, _ := WithDeadlineBuffer(parent, math.MaxInt, buffer)
44+
dl, _ := child.Deadline()
45+
require.WithinDuration(t, parentDeadline.Add(-buffer), dl, testTolerance)
46+
})
47+
48+
t.Run("no buffer left", func(t *testing.T) {
49+
child, _ := WithDeadlineBuffer(parent, math.MaxInt, math.MaxInt)
50+
require.Equal(t, child.Err(), context.DeadlineExceeded)
51+
})
52+
53+
t.Run("enough buffer left but less than max timeout", func(t *testing.T) {
54+
child, _ := WithDeadlineBuffer(parent, timeout/2, buffer)
55+
dl, _ := child.Deadline()
56+
require.WithinDuration(t, parentDeadline.Add(-timeout/2), dl, testTolerance)
57+
})
58+
})
59+
}

common/metrics/grpc_test.go

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/stretchr/testify/suite"
99
metricsspb "go.temporal.io/server/api/metrics/v1"
1010
"go.temporal.io/server/common/log"
11+
"go.temporal.io/server/common/testing/rpctest"
1112
"go.uber.org/mock/gomock"
1213
"google.golang.org/grpc"
1314
"google.golang.org/grpc/metadata"
@@ -36,7 +37,7 @@ func (s *grpcSuite) TearDownTest() {}
3637
func (s *grpcSuite) TestMetadataMetricInjection() {
3738
logger := log.NewMockLogger(s.controller)
3839
ctx := context.Background()
39-
ssts := newMockServerTransportStream()
40+
ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjection")
4041
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)
4142
anyMetricName := "any_metric_name"
4243

@@ -73,8 +74,9 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
7374
)
7475

7576
s.Nil(err)
76-
s.Equal(len(ssts.trailers), 1)
77-
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
77+
trailers := ssts.CapturedTrailers()
78+
s.Equal(1, len(trailers))
79+
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
7880
s.NotNil(propagationContextBlobs)
7981
s.Equal(1, len(propagationContextBlobs))
8082
baggage := &metricsspb.Baggage{}
@@ -93,7 +95,7 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
9395
func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
9496
logger := log.NewMockLogger(s.controller)
9597
ctx := context.Background()
96-
ssts := newMockServerTransportStream()
98+
ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjectionNoMetric")
9799
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)
98100

99101
smcii := NewServerMetricsContextInjectorInterceptor()
@@ -128,8 +130,9 @@ func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
128130
)
129131

130132
s.Nil(err)
131-
s.Equal(len(ssts.trailers), 1)
132-
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
133+
trailers := ssts.CapturedTrailers()
134+
s.Equal(1, len(trailers))
135+
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
133136
s.NotNil(propagationContextBlobs)
134137
s.Equal(1, len(propagationContextBlobs))
135138
baggage := &metricsspb.Baggage{}
@@ -162,26 +165,3 @@ func (s *grpcSuite) TestContextCounterAddNoMetricsContext() {
162165
testCounterName := "test_counter"
163166
ContextCounterAdd(context.Background(), testCounterName, 3)
164167
}
165-
166-
func newMockServerTransportStream() *mockServerTransportStream {
167-
return &mockServerTransportStream{trailers: []*metadata.MD{}}
168-
}
169-
170-
type mockServerTransportStream struct {
171-
trailers []*metadata.MD
172-
}
173-
174-
func (s *mockServerTransportStream) Method() string {
175-
return "mockssts"
176-
}
177-
func (s *mockServerTransportStream) SetHeader(md metadata.MD) error {
178-
return nil
179-
}
180-
func (s *mockServerTransportStream) SendHeader(md metadata.MD) error {
181-
return nil
182-
}
183-
func (s *mockServerTransportStream) SetTrailer(md metadata.MD) error {
184-
mdCopy := md.Copy()
185-
s.trailers = append(s.trailers, &mdCopy)
186-
return nil
187-
}

common/rpc/grpc.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,17 @@ package rpc
33
import (
44
"context"
55
"crypto/tls"
6-
"errors"
76
"time"
87

9-
"go.temporal.io/api/serviceerror"
108
"go.temporal.io/server/common/headers"
119
"go.temporal.io/server/common/log"
12-
"go.temporal.io/server/common/log/tag"
1310
"go.temporal.io/server/common/metrics"
1411
"go.temporal.io/server/common/rpc/interceptor"
1512
serviceerrors "go.temporal.io/server/common/serviceerror"
1613
"google.golang.org/grpc"
1714
"google.golang.org/grpc/backoff"
1815
"google.golang.org/grpc/credentials"
1916
"google.golang.org/grpc/credentials/insecure"
20-
"google.golang.org/grpc/metadata"
2117
"google.golang.org/grpc/status"
2218
)
2319

@@ -44,14 +40,6 @@ const (
4440

4541
// maxInternodeRecvPayloadSize indicates the internode max receive payload size.
4642
maxInternodeRecvPayloadSize = 128 * 1024 * 1024 // 128 Mb
47-
48-
// ResourceExhaustedCauseHeader will be added to rpc response if request returns ResourceExhausted error.
49-
// Value of this header will be ResourceExhaustedCause.
50-
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"
51-
52-
// ResourceExhaustedScopeHeader will be added to rpc response if request returns ResourceExhausted error.
53-
// Value of this header will be the scope of exhausted resource.
54-
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
5543
)
5644

5745
// Dial creates a client connection to the given target with default options.
@@ -121,46 +109,3 @@ func headersInterceptor(
121109
ctx = headers.Propagate(ctx)
122110
return invoker(ctx, method, req, reply, cc, opts...)
123111
}
124-
125-
func NewFrontendServiceErrorInterceptor(
126-
logger log.Logger,
127-
) grpc.UnaryServerInterceptor {
128-
return func(
129-
ctx context.Context,
130-
req interface{},
131-
_ *grpc.UnaryServerInfo,
132-
handler grpc.UnaryHandler,
133-
) (interface{}, error) {
134-
135-
resp, err := handler(ctx, req)
136-
137-
if err == nil {
138-
return resp, err
139-
}
140-
141-
// mask some internal service errors at frontend
142-
switch err.(type) {
143-
case *serviceerrors.ShardOwnershipLost:
144-
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
145-
case *serviceerror.DataLoss:
146-
err = serviceerror.NewUnavailable("internal history service error")
147-
}
148-
149-
addHeadersForResourceExhausted(ctx, logger, err)
150-
151-
return resp, err
152-
}
153-
}
154-
155-
func addHeadersForResourceExhausted(ctx context.Context, logger log.Logger, err error) {
156-
var reErr *serviceerror.ResourceExhausted
157-
if errors.As(err, &reErr) {
158-
headerErr := grpc.SetHeader(ctx, metadata.Pairs(
159-
ResourceExhaustedCauseHeader, reErr.Cause.String(),
160-
ResourceExhaustedScopeHeader, reErr.Scope.String(),
161-
))
162-
if headerErr != nil {
163-
logger.Error("Failed to add Resource-Exhausted headers to response", tag.Error(headerErr))
164-
}
165-
}
166-
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package interceptor
2+
3+
import (
4+
"context"
5+
6+
"go.temporal.io/api/serviceerror"
7+
"go.temporal.io/server/common/api"
8+
"go.temporal.io/server/common/log"
9+
"go.temporal.io/server/common/log/tag"
10+
serviceerrors "go.temporal.io/server/common/serviceerror"
11+
"google.golang.org/grpc"
12+
"google.golang.org/grpc/metadata"
13+
)
14+
15+
const (
16+
// ResourceExhaustedCauseHeader is added to rpc response if request returns ResourceExhausted error.
17+
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"
18+
19+
// ResourceExhaustedScopeHeader is added to rpc response if request returns ResourceExhausted error.
20+
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
21+
)
22+
23+
// NewFrontendServiceErrorInterceptor returns a gRPC interceptor that has two responsibilities:
24+
// 1. Mask certain internal service error details.
25+
// 2. Propagate resource exhaustion details via gRPC headers.
26+
func NewFrontendServiceErrorInterceptor(
27+
logger log.Logger,
28+
) grpc.UnaryServerInterceptor {
29+
return func(
30+
ctx context.Context,
31+
req interface{},
32+
info *grpc.UnaryServerInfo,
33+
handler grpc.UnaryHandler,
34+
) (interface{}, error) {
35+
resp, err := handler(ctx, req)
36+
if err == nil {
37+
return resp, nil
38+
}
39+
40+
switch serviceErr := err.(type) {
41+
case *serviceerrors.ShardOwnershipLost:
42+
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
43+
case *serviceerror.DataLoss:
44+
err = serviceerror.NewUnavailable("internal history service error")
45+
case *serviceerror.ResourceExhausted:
46+
if headerErr := grpc.SetHeader(ctx, metadata.Pairs(
47+
ResourceExhaustedCauseHeader, serviceErr.Cause.String(),
48+
ResourceExhaustedScopeHeader, serviceErr.Scope.String(),
49+
)); headerErr != nil {
50+
// So while this is *not* a user-facing error or problem in itself,
51+
// it indicates that there might be larger connection issues at play.
52+
logger.Error("Failed to add Resource-Exhausted headers to response",
53+
tag.Operation(api.MethodName(info.FullMethod)),
54+
tag.Error(headerErr))
55+
}
56+
}
57+
58+
return resp, err
59+
}
60+
}

0 commit comments

Comments
 (0)