Skip to content

Commit 46af5f9

Browse files
committed
Move conn begin conn end and in header to grpc layer
1 parent 32e3ef1 commit 46af5f9

File tree

9 files changed

+125
-151
lines changed

9 files changed

+125
-151
lines changed

internal/transport/handler_server.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ func (ht *serverHandlerTransport) Close(err error) {
167167

168168
func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
169169

170+
func (ht *serverHandlerTransport) LocalAddr() net.Addr { return nil } // Server Handler transport has no access to local addr (was simply not calling sh with local addr).
171+
172+
func (ht *serverHandlerTransport) Peer() *peer.Peer {
173+
return &peer.Peer{
174+
Addr: ht.RemoteAddr(),
175+
}
176+
}
177+
170178
// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
171179
// the empty string if unknown.
172180
type strAddr string
@@ -347,7 +355,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
347355
return err
348356
}
349357

350-
func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
358+
func (ht *serverHandlerTransport) HandleStreams(_ context.Context, startStream func(*Stream)) {
351359
// With this transport type there will be exactly 1 stream: this HTTP request.
352360

353361
ctx := ht.req.Context()
@@ -371,16 +379,16 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
371379
}()
372380

373381
req := ht.req
374-
375382
s := &Stream{
376-
id: 0, // irrelevant
377-
requestRead: func(int) {},
378-
cancel: cancel,
379-
buf: newRecvBuffer(),
380-
st: ht,
381-
method: req.URL.Path,
382-
recvCompress: req.Header.Get("grpc-encoding"),
383-
contentSubtype: ht.contentSubtype,
383+
id: 0, // irrelevant
384+
requestRead: func(int) {},
385+
cancel: cancel,
386+
buf: newRecvBuffer(),
387+
st: ht,
388+
method: req.URL.Path,
389+
recvCompress: req.Header.Get("grpc-encoding"),
390+
contentSubtype: ht.contentSubtype,
391+
headerWireLength: 0, // doesn't know header wire length, will call into stats handler as 0.
384392
}
385393
pr := &peer.Peer{
386394
Addr: ht.RemoteAddr(),
@@ -390,15 +398,6 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
390398
}
391399
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
392400
s.ctx = peer.NewContext(ctx, pr)
393-
for _, sh := range ht.stats {
394-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
395-
inHeader := &stats.InHeader{
396-
FullMethod: s.method,
397-
RemoteAddr: ht.RemoteAddr(),
398-
Compression: s.recvCompress,
399-
}
400-
sh.HandleRPC(s.ctx, inHeader)
401-
}
402401
s.trReader = &transportReader{
403402
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
404403
windowHandler: func(int) {},

internal/transport/handler_server_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
314314
st.ht.WriteStatus(s, status.New(codes.OK, ""))
315315
}
316316
st.ht.HandleStreams(
317-
func(s *Stream) { go handleStream(s) },
317+
context.Background(), func(s *Stream) { go handleStream(s) },
318318
)
319319
wantHeader := http.Header{
320320
"Date": nil,
@@ -347,7 +347,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
347347
st.ht.WriteStatus(s, status.New(statusCode, msg))
348348
}
349349
st.ht.HandleStreams(
350-
func(s *Stream) { go handleStream(s) },
350+
context.Background(), func(s *Stream) { go handleStream(s) },
351351
)
352352
wantHeader := http.Header{
353353
"Date": nil,
@@ -396,7 +396,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
396396
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
397397
}
398398
ht.HandleStreams(
399-
func(s *Stream) { go runStream(s) },
399+
context.Background(), func(s *Stream) { go runStream(s) },
400400
)
401401
wantHeader := http.Header{
402402
"Date": nil,
@@ -448,7 +448,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
448448
func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
449449
st := newHandleStreamTest(t)
450450
st.ht.HandleStreams(
451-
func(s *Stream) { go handleStream(st, s) },
451+
context.Background(), func(s *Stream) { go handleStream(st, s) },
452452
)
453453
}
454454

@@ -481,7 +481,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
481481
hst.ht.WriteStatus(s, st)
482482
}
483483
hst.ht.HandleStreams(
484-
func(s *Stream) { go handleStream(s) },
484+
context.Background(), func(s *Stream) { go handleStream(s) },
485485
)
486486
wantHeader := http.Header{
487487
"Date": nil,

internal/transport/http2_server.go

Lines changed: 16 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ var serverConnectionCounter uint64
6969
// http2Server implements the ServerTransport interface with HTTP2.
7070
type http2Server struct {
7171
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
72-
ctx context.Context
7372
done chan struct{}
7473
conn net.Conn
7574
loopy *loopyWriter
@@ -244,7 +243,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
244243

245244
done := make(chan struct{})
246245
t := &http2Server{
247-
ctx: setConnection(context.Background(), rawConn),
248246
done: done,
249247
conn: conn,
250248
remoteAddr: conn.RemoteAddr(),
@@ -267,8 +265,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
267265
bufferPool: newBufferPool(),
268266
}
269267
t.logger = prefixLoggerForServerTransport(t)
270-
// Add peer information to the http2server context.
271-
t.ctx = peer.NewContext(t.ctx, t.getPeer())
272268

273269
t.controlBuf = newControlBuffer(t.done)
274270
if dynamicWindow {
@@ -277,14 +273,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
277273
updateFlowControl: t.updateFlowControl,
278274
}
279275
}
280-
for _, sh := range t.stats {
281-
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
282-
RemoteAddr: t.remoteAddr,
283-
LocalAddr: t.localAddr,
284-
})
285-
connBegin := &stats.ConnBegin{}
286-
sh.HandleConn(t.ctx, connBegin)
287-
}
288276
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
289277
if err != nil {
290278
return nil, err
@@ -342,7 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
342330

343331
// operateHeaders takes action on the decoded headers. Returns an error if fatal
344332
// error encountered and transport needs to close, otherwise returns nil.
345-
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
333+
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
346334
// Acquire max stream ID lock for entire duration
347335
t.maxStreamMu.Lock()
348336
defer t.maxStreamMu.Unlock()
@@ -369,10 +357,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
369357

370358
buf := newRecvBuffer()
371359
s := &Stream{
372-
id: streamID,
373-
st: t,
374-
buf: buf,
375-
fc: &inFlow{limit: uint32(t.initialWindowSize)},
360+
id: streamID,
361+
st: t,
362+
buf: buf,
363+
fc: &inFlow{limit: uint32(t.initialWindowSize)},
364+
headerWireLength: int(frame.Header().Length),
376365
}
377366
var (
378367
// if false, content-type was missing or invalid
@@ -511,9 +500,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
511500
s.state = streamReadDone
512501
}
513502
if timeoutSet {
514-
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
503+
s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
515504
} else {
516-
s.ctx, s.cancel = context.WithCancel(t.ctx)
505+
s.ctx, s.cancel = context.WithCancel(ctx)
517506
}
518507

519508
// Attach the received metadata to the context.
@@ -592,18 +581,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
592581
s.requestRead = func(n int) {
593582
t.adjustWindow(s, uint32(n))
594583
}
595-
for _, sh := range t.stats {
596-
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
597-
inHeader := &stats.InHeader{
598-
FullMethod: s.method,
599-
RemoteAddr: t.remoteAddr,
600-
LocalAddr: t.localAddr,
601-
Compression: s.recvCompress,
602-
WireLength: int(frame.Header().Length),
603-
Header: mdata.Copy(),
604-
}
605-
sh.HandleRPC(s.ctx, inHeader)
606-
}
607584
s.ctxDone = s.ctx.Done()
608585
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
609586
s.trReader = &transportReader{
@@ -629,7 +606,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
629606
// HandleStreams receives incoming streams using the given handler. This is
630607
// typically run in a separate goroutine.
631608
// traceCtx attaches trace to ctx and returns the new context.
632-
func (t *http2Server) HandleStreams(handle func(*Stream)) {
609+
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
633610
defer close(t.readerDone)
634611
for {
635612
t.controlBuf.throttle()
@@ -664,7 +641,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
664641
}
665642
switch frame := frame.(type) {
666643
case *http2.MetaHeadersFrame:
667-
if err := t.operateHeaders(frame, handle); err != nil {
644+
if err := t.operateHeaders(ctx, frame, handle); err != nil {
668645
t.Close(err)
669646
break
670647
}
@@ -1242,10 +1219,6 @@ func (t *http2Server) Close(err error) {
12421219
for _, s := range streams {
12431220
s.cancel()
12441221
}
1245-
for _, sh := range t.stats {
1246-
connEnd := &stats.ConnEnd{}
1247-
sh.HandleConn(t.ctx, connEnd)
1248-
}
12491222
}
12501223

12511224
// deleteStream deletes the stream s from transport's active streams.
@@ -1311,6 +1284,10 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
13111284
})
13121285
}
13131286

1287+
func (t *http2Server) LocalAddr() net.Addr {
1288+
return t.localAddr
1289+
}
1290+
13141291
func (t *http2Server) RemoteAddr() net.Addr {
13151292
return t.remoteAddr
13161293
}
@@ -1433,7 +1410,8 @@ func (t *http2Server) getOutFlowWindow() int64 {
14331410
}
14341411
}
14351412

1436-
func (t *http2Server) getPeer() *peer.Peer {
1413+
// Peer returns the peer of the transport.
1414+
func (t *http2Server) Peer() *peer.Peer {
14371415
return &peer.Peer{
14381416
Addr: t.remoteAddr,
14391417
AuthInfo: t.authInfo, // Can be nil
@@ -1449,18 +1427,3 @@ func getJitter(v time.Duration) time.Duration {
14491427
j := grpcrand.Int63n(2*r) - r
14501428
return time.Duration(j)
14511429
}
1452-
1453-
type connectionKey struct{}
1454-
1455-
// GetConnection gets the connection from the context.
1456-
func GetConnection(ctx context.Context) net.Conn {
1457-
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1458-
return conn
1459-
}
1460-
1461-
// SetConnection adds the connection to the context to be able to get
1462-
// information about the destination ip and port for an incoming RPC. This also
1463-
// allows any unary or streaming interceptors to see the connection.
1464-
func setConnection(ctx context.Context, conn net.Conn) context.Context {
1465-
return context.WithValue(ctx, connectionKey{}, conn)
1466-
}

internal/transport/transport.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"google.golang.org/grpc/internal/channelz"
3838
"google.golang.org/grpc/keepalive"
3939
"google.golang.org/grpc/metadata"
40+
"google.golang.org/grpc/peer"
4041
"google.golang.org/grpc/resolver"
4142
"google.golang.org/grpc/stats"
4243
"google.golang.org/grpc/status"
@@ -265,7 +266,8 @@ type Stream struct {
265266
// headerValid indicates whether a valid header was received. Only
266267
// meaningful after headerChan is closed (always call waitOnHeader() before
267268
// reading its value). Not valid on server side.
268-
headerValid bool
269+
headerValid bool
270+
headerWireLength int // Only set on server side.
269271

270272
// hdrMu protects header and trailer metadata on the server-side.
271273
hdrMu sync.Mutex
@@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {
425427
return s.ctx
426428
}
427429

430+
// SetContext sets the context of the stream. This will be deleted once the
431+
// stats handler callouts all move to gRPC layer.
432+
func (s *Stream) SetContext(ctx context.Context) {
433+
s.ctx = ctx
434+
}
435+
428436
// Method returns the method for the stream.
429437
func (s *Stream) Method() string {
430438
return s.method
@@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {
437445
return s.status
438446
}
439447

448+
// HeaderWireLength returns the size of the headers of the stream as received
449+
// from the wire. Valid only on the server.
450+
func (s *Stream) HeaderWireLength() int {
451+
return s.headerWireLength
452+
}
453+
440454
// SetHeader sets the header metadata. This can be called multiple times.
441455
// Server side only.
442456
// This should not be called in parallel to other data writes.
@@ -698,7 +712,7 @@ type ClientTransport interface {
698712
// Write methods for a given Stream will be called serially.
699713
type ServerTransport interface {
700714
// HandleStreams receives incoming streams using the given handler.
701-
HandleStreams(func(*Stream))
715+
HandleStreams(context.Context, func(*Stream))
702716

703717
// WriteHeader sends the header metadata for the given stream.
704718
// WriteHeader may not be called on all streams.
@@ -717,9 +731,15 @@ type ServerTransport interface {
717731
// handlers will be terminated asynchronously.
718732
Close(err error)
719733

734+
// LocalAddr returns the local network address.
735+
LocalAddr() net.Addr
736+
720737
// RemoteAddr returns the remote network address.
721738
RemoteAddr() net.Addr
722739

740+
// Peer returns the peer of the server transport.
741+
Peer() *peer.Peer
742+
723743
// Drain notifies the client this ServerTransport stops accepting new RPCs.
724744
Drain(debugData string)
725745

0 commit comments

Comments
 (0)