Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Removed

- The `deprecated StreamServerInterceptor function` from `otelgrpc` is removed.
Comment thread
MrAlias marked this conversation as resolved.
Outdated

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ func BenchmarkNoInstrumentation(b *testing.B) {
benchmark(b, nil, nil)
}

func BenchmarkStreamServerInterceptor(b *testing.B) {
benchmark(b, nil, []grpc.ServerOption{
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(tracerProvider),
)),
})
}

func BenchmarkStreamClientInterceptor(b *testing.B) {
benchmark(b, []grpc.DialOption{
grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(
Expand Down
69 changes: 10 additions & 59 deletions instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@

// serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
// SendMsg method call.
//
//nolint:unused
Comment thread
MrAlias marked this conversation as resolved.
Outdated
type serverStream struct {
grpc.ServerStream
ctx context.Context
Expand All @@ -209,11 +211,12 @@
sentEvent bool
}

//nolint:unused
func (w *serverStream) Context() context.Context {
return w.ctx
}

func (w *serverStream) RecvMsg(m interface{}) error {

Check failure on line 219 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View workflow job for this annotation

GitHub Actions / lint

func (*serverStream).RecvMsg is unused (unused)
err := w.ServerStream.RecvMsg(m)

if err == nil {
Expand All @@ -226,7 +229,7 @@
return err
}

func (w *serverStream) SendMsg(m interface{}) error {

Check failure on line 232 in instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

View workflow job for this annotation

GitHub Actions / lint

func (*serverStream).SendMsg is unused (unused)
err := w.ServerStream.SendMsg(m)

w.sentMessageID++
Expand All @@ -237,6 +240,7 @@
return err
}

//nolint:unused
func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *serverStream {
return &serverStream{
ServerStream: ss,
Expand All @@ -246,68 +250,13 @@
}
}

// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// Deprecated: Use [NewServerHandler] instead.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := newConfig(opts)
tracer := cfg.TracerProvider.Tracer(
ScopeName,
trace.WithInstrumentationVersion(Version()),
)

return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := ss.Context()
i := &InterceptorInfo{
StreamServerInfo: info,
Type: StreamServer,
}
if cfg.InterceptorFilter != nil && !cfg.InterceptorFilter(i) {
return handler(srv, wrapServerStream(ctx, ss, cfg))
}

ctx = extract(ctx, cfg.Propagators)
name, attr := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))

startOpts := append([]trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
},
cfg.SpanStartOptions...,
)

ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
startOpts...,
)
defer span.End()

err := handler(srv, wrapServerStream(ctx, ss, cfg))
if err != nil {
s, _ := status.FromError(err)
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
} else {
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
}

return err
}
}

// telemetryAttributes returns a span name and span and metric attributes from
// the gRPC method and peer address.
func telemetryAttributes(fullMethod, sererAddr string) (string, []attribute.KeyValue) {
//
//nolint:unused
func telemetryAttributes(fullMethod, serverAddr string) (string, []attribute.KeyValue) {
name, methodAttrs := internal.ParseFullMethod(fullMethod)
srvAttrs := serverAddrAttrs(sererAddr)
srvAttrs := serverAddrAttrs(serverAddr)

attrs := make([]attribute.KeyValue, 0, 1+len(methodAttrs)+len(srvAttrs))
attrs = append(attrs, semconv.RPCSystemGRPC)
Expand All @@ -334,6 +283,8 @@
}

// peerFromCtx returns a peer address from a context, if one exists.
//
//nolint:unused
func peerFromCtx(ctx context.Context) string {
p, ok := peer.FromContext(ctx)
if !ok {
Expand Down
198 changes: 1 addition & 197 deletions instrumentation/google.golang.org/grpc/otelgrpc/test/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ func TestInterceptors(t *testing.T) {
clientStreamSR := tracetest.NewSpanRecorder()
clientStreamTP := trace.NewTracerProvider(trace.WithSpanProcessor(clientStreamSR))

serverStreamSR := tracetest.NewSpanRecorder()
serverStreamTP := trace.NewTracerProvider(trace.WithSpanProcessor(serverStreamSR))

listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err, "failed to open port")
client := newGrpcTest(t, listener,
Expand All @@ -93,13 +90,7 @@ func TestInterceptors(t *testing.T) {
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
[]grpc.ServerOption{
//nolint:staticcheck // Interceptors are deprecated and will be removed in the next release.
grpc.StreamInterceptor(otelgrpc.StreamServerInterceptor(
otelgrpc.WithTracerProvider(serverStreamTP),
otelgrpc.WithMessageEvents(otelgrpc.ReceivedEvents, otelgrpc.SentEvents),
)),
},
[]grpc.ServerOption{},
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -108,10 +99,6 @@ func TestInterceptors(t *testing.T) {
t.Run("StreamClientSpans", func(t *testing.T) {
checkStreamClientSpans(t, clientStreamSR.Ended(), listener.Addr().String())
})

t.Run("StreamServerSpans", func(t *testing.T) {
checkStreamServerSpans(t, serverStreamSR.Ended())
})
}

func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr string) {
Expand Down Expand Up @@ -287,189 +274,6 @@ func checkStreamClientSpans(t *testing.T, spans []trace.ReadOnlySpan, addr strin
}, pingPong.Attributes())
}

func checkStreamServerSpans(t *testing.T, spans []trace.ReadOnlySpan) {
require.Len(t, spans, 3)

streamInput := spans[0]
assert.False(t, streamInput.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/StreamingInputCall", streamInput.Name())
// sizes from reqSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(2),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(3),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(4),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
}, streamInput.Events())
port, ok := findAttribute(streamInput.Attributes(), semconv.ServerPortKey)
assert.True(t, ok)

assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod("StreamingInputCall"),
semconv.RPCService("grpc.testing.TestService"),
semconv.RPCSystemGRPC,
semconv.RPCGRPCStatusCodeOk,
semconv.ServerAddress("127.0.0.1"),
port,
}, streamInput.Attributes())

streamOutput := spans[1]
assert.False(t, streamOutput.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/StreamingOutputCall", streamOutput.Name())
// sizes from respSizes in "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/test".
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(2),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(3),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(4),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
}, streamOutput.Events())

port, ok = findAttribute(streamOutput.Attributes(), semconv.ServerPortKey)
assert.True(t, ok)

assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod("StreamingOutputCall"),
semconv.RPCService("grpc.testing.TestService"),
semconv.RPCSystemGRPC,
semconv.RPCGRPCStatusCodeOk,
semconv.ServerAddress("127.0.0.1"),
port,
}, streamOutput.Attributes())

pingPong := spans[2]
assert.False(t, pingPong.EndTime().IsZero())
assert.Equal(t, "grpc.testing.TestService/FullDuplexCall", pingPong.Name())
assertEvents(t, []trace.Event{
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(1),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(2),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(2),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(3),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(3),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(4),
semconv.RPCMessageTypeKey.String("RECEIVED"),
},
},
{
Name: "message",
Attributes: []attribute.KeyValue{
semconv.RPCMessageIDKey.Int(4),
semconv.RPCMessageTypeKey.String("SENT"),
},
},
}, pingPong.Events())
port, ok = findAttribute(pingPong.Attributes(), semconv.ServerPortKey)
assert.True(t, ok)
assert.ElementsMatch(t, []attribute.KeyValue{
semconv.RPCMethod("FullDuplexCall"),
semconv.RPCService("grpc.testing.TestService"),
semconv.RPCSystemGRPC,
semconv.RPCGRPCStatusCodeOk,
semconv.ServerAddress("127.0.0.1"),
port,
}, pingPong.Attributes())
}

func assertEvents(t *testing.T, expected, actual []trace.Event) bool { //nolint:unparam
if !assert.Len(t, actual, len(expected)) {
return false
Expand Down
Loading
Loading