Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/.golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ linters:
staticcheck:
checks:
- "all"
- "-ST1000" # disable: package comment is missing
godox:
keywords:
- FIXME
Expand Down
37 changes: 37 additions & 0 deletions common/contextutil/deadline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package contextutil

import (
"context"
"time"
)

var noop = func() {}

// WithDeadlineBuffer creates a child context with desired timeout.
// If buffer is non-zero, then child context timeout will be
// the minOf(parentCtx.Deadline()-buffer, maxTimeout). Use this
// method to create child context when childContext cannot use
// all of parent's deadline but instead there is a need to leave
// some time for parent to do some post-work
func WithDeadlineBuffer(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this here out of Matching - with some minor modifications.

parent context.Context,
timeout time.Duration,
buffer time.Duration,
) (context.Context, context.CancelFunc) {
if parent.Err() != nil {
return parent, noop
}

deadline, hasDeadline := parent.Deadline()

if !hasDeadline {
return context.WithTimeout(parent, timeout)
}

remaining := time.Until(deadline) - buffer
if remaining < timeout {
// Cap the timeout to the remaining time minus buffer.
timeout = max(0, remaining)
}
return context.WithTimeout(parent, timeout)
}
59 changes: 59 additions & 0 deletions common/contextutil/deadline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package contextutil

import (
"context"
"math"
"testing"
"time"

"github.com/stretchr/testify/require"
)

const testTolerance = 5 * time.Second

func TestWithDeadlineBuffer(t *testing.T) {
const timeout = 10 * time.Minute
const buffer = 1 * time.Minute
start := time.Now()

t.Run("parent is cancelled", func(t *testing.T) {
parent, cancel := context.WithCancel(context.Background())
cancel()

child, _ := WithDeadlineBuffer(parent, timeout, buffer)
require.Equal(t, parent, child)
})

t.Run("parent has no deadline", func(t *testing.T) {
parent := context.Background()

t.Run("timeout specified", func(t *testing.T) {
child, _ := WithDeadlineBuffer(parent, timeout, 0)
dl, _ := child.Deadline()
require.WithinDuration(t, start.Add(timeout), dl, testTolerance)
})
})

t.Run("parent has deadline", func(t *testing.T) {
parent, parentCancel := context.WithTimeout(context.Background(), timeout)
defer parentCancel()
parentDeadline, _ := parent.Deadline()

t.Run("enough buffer left", func(t *testing.T) {
child, _ := WithDeadlineBuffer(parent, math.MaxInt, buffer)
dl, _ := child.Deadline()
require.WithinDuration(t, parentDeadline.Add(-buffer), dl, testTolerance)
})

t.Run("no buffer left", func(t *testing.T) {
child, _ := WithDeadlineBuffer(parent, math.MaxInt, math.MaxInt)
require.Equal(t, child.Err(), context.DeadlineExceeded)
})

t.Run("enough buffer left but less than max timeout", func(t *testing.T) {
child, _ := WithDeadlineBuffer(parent, timeout/2, buffer)
dl, _ := child.Deadline()
require.WithinDuration(t, parentDeadline.Add(-timeout/2), dl, testTolerance)
})
})
}
38 changes: 9 additions & 29 deletions common/metrics/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/suite"
metricsspb "go.temporal.io/server/api/metrics/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/testing/rpctest"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (s *grpcSuite) TearDownTest() {}
func (s *grpcSuite) TestMetadataMetricInjection() {
logger := log.NewMockLogger(s.controller)
ctx := context.Background()
ssts := newMockServerTransportStream()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this to a separate package for re-use.

ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjection")
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)
anyMetricName := "any_metric_name"

Expand Down Expand Up @@ -73,8 +74,9 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
)

s.Nil(err)
s.Equal(len(ssts.trailers), 1)
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
trailers := ssts.CapturedTrailers()
s.Equal(1, len(trailers))
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
s.NotNil(propagationContextBlobs)
s.Equal(1, len(propagationContextBlobs))
baggage := &metricsspb.Baggage{}
Expand All @@ -93,7 +95,7 @@ func (s *grpcSuite) TestMetadataMetricInjection() {
func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
logger := log.NewMockLogger(s.controller)
ctx := context.Background()
ssts := newMockServerTransportStream()
ssts := rpctest.NewMockServerTransportStream("/temporal.test/MetadataMetricInjectionNoMetric")
ctx = grpc.NewContextWithServerTransportStream(ctx, ssts)

smcii := NewServerMetricsContextInjectorInterceptor()
Expand Down Expand Up @@ -128,8 +130,9 @@ func (s *grpcSuite) TestMetadataMetricInjection_NoMetricPresent() {
)

s.Nil(err)
s.Equal(len(ssts.trailers), 1)
propagationContextBlobs := ssts.trailers[0].Get(metricsTrailerKey)
trailers := ssts.CapturedTrailers()
s.Equal(1, len(trailers))
propagationContextBlobs := trailers[0].Get(metricsTrailerKey)
s.NotNil(propagationContextBlobs)
s.Equal(1, len(propagationContextBlobs))
baggage := &metricsspb.Baggage{}
Expand Down Expand Up @@ -162,26 +165,3 @@ func (s *grpcSuite) TestContextCounterAddNoMetricsContext() {
testCounterName := "test_counter"
ContextCounterAdd(context.Background(), testCounterName, 3)
}

func newMockServerTransportStream() *mockServerTransportStream {
return &mockServerTransportStream{trailers: []*metadata.MD{}}
}

type mockServerTransportStream struct {
trailers []*metadata.MD
}

func (s *mockServerTransportStream) Method() string {
return "mockssts"
}
func (s *mockServerTransportStream) SetHeader(md metadata.MD) error {
return nil
}
func (s *mockServerTransportStream) SendHeader(md metadata.MD) error {
return nil
}
func (s *mockServerTransportStream) SetTrailer(md metadata.MD) error {
mdCopy := md.Copy()
s.trailers = append(s.trailers, &mdCopy)
return nil
}
55 changes: 0 additions & 55 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,17 @@ package rpc
import (
"context"
"crypto/tls"
"errors"
"time"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/rpc/interceptor"
serviceerrors "go.temporal.io/server/common/serviceerror"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)

Expand All @@ -44,14 +40,6 @@ const (

// maxInternodeRecvPayloadSize indicates the internode max receive payload size.
maxInternodeRecvPayloadSize = 128 * 1024 * 1024 // 128 Mb

// ResourceExhaustedCauseHeader will be added to rpc response if request returns ResourceExhausted error.
// Value of this header will be ResourceExhaustedCause.
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"

// ResourceExhaustedScopeHeader will be added to rpc response if request returns ResourceExhausted error.
// Value of this header will be the scope of exhausted resource.
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
)

// Dial creates a client connection to the given target with default options.
Expand Down Expand Up @@ -121,46 +109,3 @@ func headersInterceptor(
ctx = headers.Propagate(ctx)
return invoker(ctx, method, req, reply, cc, opts...)
}

func NewFrontendServiceErrorInterceptor(
logger log.Logger,
) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
_ *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {

resp, err := handler(ctx, req)

if err == nil {
return resp, err
}

// mask some internal service errors at frontend
switch err.(type) {
case *serviceerrors.ShardOwnershipLost:
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
case *serviceerror.DataLoss:
err = serviceerror.NewUnavailable("internal history service error")
}

addHeadersForResourceExhausted(ctx, logger, err)

return resp, err
}
}

func addHeadersForResourceExhausted(ctx context.Context, logger log.Logger, err error) {
var reErr *serviceerror.ResourceExhausted
if errors.As(err, &reErr) {
headerErr := grpc.SetHeader(ctx, metadata.Pairs(
ResourceExhaustedCauseHeader, reErr.Cause.String(),
ResourceExhaustedScopeHeader, reErr.Scope.String(),
))
if headerErr != nil {
logger.Error("Failed to add Resource-Exhausted headers to response", tag.Error(headerErr))
}
}
}
60 changes: 60 additions & 0 deletions common/rpc/interceptor/frontend_service_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package interceptor

import (
"context"

"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/api"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
serviceerrors "go.temporal.io/server/common/serviceerror"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

const (
// ResourceExhaustedCauseHeader is added to rpc response if request returns ResourceExhausted error.
ResourceExhaustedCauseHeader = "X-Resource-Exhausted-Cause"

// ResourceExhaustedScopeHeader is added to rpc response if request returns ResourceExhausted error.
ResourceExhaustedScopeHeader = "X-Resource-Exhausted-Scope"
)

// NewFrontendServiceErrorInterceptor returns a gRPC interceptor that has two responsibilities:
// 1. Mask certain internal service error details.
// 2. Propagate resource exhaustion details via gRPC headers.
func NewFrontendServiceErrorInterceptor(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this here from common/rpc/grpc.go and merged helper into it.

logger log.Logger,
) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
resp, err := handler(ctx, req)
if err == nil {
return resp, nil
}

switch serviceErr := err.(type) {
case *serviceerrors.ShardOwnershipLost:
err = serviceerror.NewUnavailable("shard unavailable, please backoff and retry")
case *serviceerror.DataLoss:
err = serviceerror.NewUnavailable("internal history service error")
case *serviceerror.ResourceExhausted:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does history/matching service not needing this header?

Copy link
Contributor Author

@stephanos stephanos Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think they do since this is the frontend adding extra headers for the user's benefit.

The behavior hasn't changed from current main there.

if headerErr := grpc.SetHeader(ctx, metadata.Pairs(
ResourceExhaustedCauseHeader, serviceErr.Cause.String(),
ResourceExhaustedScopeHeader, serviceErr.Scope.String(),
)); headerErr != nil {
// So while this is *not* a user-facing error or problem in itself,
// it indicates that there might be larger connection issues at play.
logger.Error("Failed to add Resource-Exhausted headers to response",
tag.Operation(api.MethodName(info.FullMethod)),
tag.Error(headerErr))
}
}

return resp, err
}
}
Loading
Loading