Skip to content

Commit 3843677

Browse files
committed
Add tracing entry span with W3C propagation to EPP handler
Signed-off-by: sallyom <somalley@redhat.com>
1 parent 55f5e44 commit 3843677

File tree

2 files changed

+29
-5
lines changed

2 files changed

+29
-5
lines changed

pkg/epp/handlers/request.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@ limitations under the License.
1717
package handlers
1818

1919
import (
20+
"context"
2021
"strconv"
2122
"time"
2223

2324
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
2425
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
"go.opentelemetry.io/otel"
27+
"go.opentelemetry.io/otel/propagation"
2528
"google.golang.org/protobuf/types/known/structpb"
2629

2730
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metadata"
@@ -51,7 +54,7 @@ func (s *StreamingServer) HandleRequestHeaders(reqCtx *RequestContext, req *extP
5154
}
5255
reqCtx.TargetEndpoint = pod.GetIPAddress() + ":" + pod.GetPort()
5356
reqCtx.RequestSize = 0
54-
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
57+
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
5558
return nil
5659
}
5760

@@ -90,7 +93,7 @@ func (s *StreamingServer) generateRequestBodyResponses(requestBodyBytes []byte)
9093
return responses
9194
}
9295

93-
func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext) *extProcPb.ProcessingResponse {
96+
func (s *StreamingServer) generateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext) *extProcPb.ProcessingResponse {
9497
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
9598
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
9699
// options for gateway providers.
@@ -100,7 +103,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
100103
Response: &extProcPb.CommonResponse{
101104
ClearRouteCache: true,
102105
HeaderMutation: &extProcPb.HeaderMutation{
103-
SetHeaders: s.generateHeaders(reqCtx),
106+
SetHeaders: s.generateHeaders(ctx, reqCtx),
104107
},
105108
},
106109
},
@@ -109,7 +112,7 @@ func (s *StreamingServer) generateRequestHeaderResponse(reqCtx *RequestContext)
109112
}
110113
}
111114

112-
func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.HeaderValueOption {
115+
func (s *StreamingServer) generateHeaders(ctx context.Context, reqCtx *RequestContext) []*configPb.HeaderValueOption {
113116
// can likely refactor these two bespoke headers to be updated in PostDispatch, to centralize logic.
114117
headers := []*configPb.HeaderValueOption{
115118
{
@@ -130,6 +133,19 @@ func (s *StreamingServer) generateHeaders(reqCtx *RequestContext) []*configPb.He
130133
})
131134
}
132135

136+
// Inject trace context headers for propagation to downstream services
137+
traceHeaders := make(map[string]string)
138+
propagator := otel.GetTextMapPropagator()
139+
propagator.Inject(ctx, propagation.MapCarrier(traceHeaders))
140+
for key, value := range traceHeaders {
141+
headers = append(headers, &configPb.HeaderValueOption{
142+
Header: &configPb.HeaderValue{
143+
Key: key,
144+
RawValue: []byte(value),
145+
},
146+
})
147+
}
148+
133149
// Include any non-system-owned headers.
134150
for key, value := range reqCtx.Request.Headers {
135151
if request.IsSystemOwnedHeader(key) {

pkg/epp/handlers/server.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
2828
"github.com/go-logr/logr"
2929
"github.com/google/uuid"
30+
"go.opentelemetry.io/otel"
31+
"go.opentelemetry.io/otel/trace"
3032
"google.golang.org/grpc/codes"
3133
"google.golang.org/grpc/status"
3234

@@ -131,6 +133,12 @@ const (
131133

132134
func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
133135
ctx := srv.Context()
136+
137+
// Start tracing span for the request
138+
tracer := otel.Tracer("gateway-api-inference-extension")
139+
ctx, span := tracer.Start(ctx, "gateway.request", trace.WithSpanKind(trace.SpanKindServer))
140+
defer span.End()
141+
134142
logger := log.FromContext(ctx)
135143
loggerTrace := logger.V(logutil.TRACE)
136144
loggerTrace.Info("Processing")
@@ -234,7 +242,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
234242
break
235243
}
236244
reqCtx.RequestSize = len(requestBodyBytes)
237-
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(reqCtx)
245+
reqCtx.reqHeaderResp = s.generateRequestHeaderResponse(ctx, reqCtx)
238246
reqCtx.reqBodyResp = s.generateRequestBodyResponses(requestBodyBytes)
239247

240248
metrics.RecordRequestCounter(reqCtx.IncomingModelName, reqCtx.TargetModelName)

0 commit comments

Comments
 (0)