Skip to content

grpc: Move some stats handler calls to gRPC layer, and add local address to peer.Peer #6716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials any // func (*grpc.Server) credentials.TransportCredentials
// GetConnection gets the connection from the context.
GetConnection any // func (context.Context) net.Conn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to give this a type to avoid the casts, since it's only things from the stdlib.

But, it should also be fine to implement it fully within the internal package. But then.. why not just leave it where it is and export the setter? Or find another place in internal for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about this, decided to move back to transport. I don't really think any of the current internal directories make semantical sense, and as per offline discussion I think there's complications about trying to keep file paths consistent.

// CanonicalString returns the canonical string of the code defined here:
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md.
//
Expand Down
45 changes: 21 additions & 24 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,16 @@ func (ht *serverHandlerTransport) Close(err error) {
})
}

func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
func (ht *serverHandlerTransport) Peer() *peer.Peer {
var localAddr net.Addr
if la := ht.req.Context().Value(http.LocalAddrContextKey); la != nil {
localAddr = la.(net.Addr)
}
return &peer.Peer{
Addr: strAddr(ht.req.RemoteAddr),
LocalAddr: localAddr,
}
}

// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
// the empty string if unknown.
Expand Down Expand Up @@ -347,9 +356,8 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
return err
}

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
func (ht *serverHandlerTransport) HandleStreams(_ context.Context, startStream func(*Stream)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't ignoring the context mess everything up?

Can grpc not pass the http.Request.Context() (with things like Peer added) so that it doesn't need to be ignored here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was being ignored in master already, and it uses the requests Context (ctx := ht.req.Context()). I think this is the same functionality, and I don't think I want to add something to gRPC layer using the http.Request since it's a server handler transport specific concept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline; decided to move peer creation to construction time and read it in gRPC before calling this.

// With this transport type there will be exactly 1 stream: this HTTP request.

ctx := ht.req.Context()
var cancel context.CancelFunc
if ht.timeoutSet {
Expand All @@ -371,34 +379,23 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream)) {
}()

req := ht.req

s := &Stream{
id: 0, // irrelevant
requestRead: func(int) {},
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
}
pr := &peer.Peer{
Addr: ht.RemoteAddr(),
id: 0, // irrelevant
requestRead: func(int) {},
cancel: cancel,
buf: newRecvBuffer(),
st: ht,
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
pr := ht.Peer()
if req.TLS != nil {
pr.AuthInfo = credentials.TLSInfo{State: *req.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
}
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
s.ctx = peer.NewContext(ctx, pr)
for _, sh := range ht.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: ht.RemoteAddr(),
Compression: s.recvCompress,
}
sh.HandleRPC(s.ctx, inHeader)
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
windowHandler: func(int) {},
Expand Down
10 changes: 5 additions & 5 deletions internal/transport/handler_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (s) TestHandlerTransport_HandleStreams(t *testing.T) {
st.ht.WriteStatus(s, status.New(codes.OK, ""))
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -347,7 +347,7 @@ func handleStreamCloseBodyTest(t *testing.T, statusCode codes.Code, msg string)
st.ht.WriteStatus(s, status.New(statusCode, msg))
}
st.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -396,7 +396,7 @@ func (s) TestHandlerTransport_HandleStreams_Timeout(t *testing.T) {
ht.WriteStatus(s, status.New(codes.DeadlineExceeded, "too slow"))
}
ht.HandleStreams(
func(s *Stream) { go runStream(s) },
context.Background(), func(s *Stream) { go runStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down Expand Up @@ -448,7 +448,7 @@ func (s) TestHandlerTransport_HandleStreams_WriteStatusWrite(t *testing.T) {
func testHandlerTransportHandleStreams(t *testing.T, handleStream func(st *handleStreamTest, s *Stream)) {
st := newHandleStreamTest(t)
st.ht.HandleStreams(
func(s *Stream) { go handleStream(st, s) },
context.Background(), func(s *Stream) { go handleStream(st, s) },
)
}

Expand Down Expand Up @@ -481,7 +481,7 @@ func (s) TestHandlerTransport_HandleStreams_ErrDetails(t *testing.T) {
hst.ht.WriteStatus(s, st)
}
hst.ht.HandleStreams(
func(s *Stream) { go handleStream(s) },
context.Background(), func(s *Stream) { go handleStream(s) },
)
wantHeader := http.Header{
"Date": nil,
Expand Down
74 changes: 15 additions & 59 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ var serverConnectionCounter uint64
// http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
ctx context.Context
done chan struct{}
conn net.Conn
loopy *loopyWriter
Expand Down Expand Up @@ -244,7 +243,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

done := make(chan struct{})
t := &http2Server{
ctx: setConnection(context.Background(), rawConn),
done: done,
conn: conn,
remoteAddr: conn.RemoteAddr(),
Expand All @@ -267,8 +265,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
bufferPool: newBufferPool(),
}
t.logger = prefixLoggerForServerTransport(t)
// Add peer information to the http2server context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())

t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
Expand All @@ -277,14 +273,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
for _, sh := range t.stats {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{}
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
if err != nil {
return nil, err
Expand Down Expand Up @@ -342,7 +330,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,

// operateHeaders takes action on the decoded headers. Returns an error if fatal
// error encountered and transport needs to close, otherwise returns nil.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
// Acquire max stream ID lock for entire duration
t.maxStreamMu.Lock()
defer t.maxStreamMu.Unlock()
Expand All @@ -369,10 +357,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(

buf := newRecvBuffer()
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
headerWireLength: int(frame.Header().Length),
}
var (
// if false, content-type was missing or invalid
Expand Down Expand Up @@ -511,9 +500,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.state = streamReadDone
}
if timeoutSet {
s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)
s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
} else {
s.ctx, s.cancel = context.WithCancel(t.ctx)
s.ctx, s.cancel = context.WithCancel(ctx)
}

// Attach the received metadata to the context.
Expand Down Expand Up @@ -592,18 +581,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
for _, sh := range t.stats {
s.ctx = sh.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
inHeader := &stats.InHeader{
FullMethod: s.method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: s.recvCompress,
WireLength: int(frame.Header().Length),
Header: mdata.Copy(),
}
sh.HandleRPC(s.ctx, inHeader)
}
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
Expand All @@ -629,7 +606,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream)) {
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer close(t.readerDone)
for {
t.controlBuf.throttle()
Expand Down Expand Up @@ -664,7 +641,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if err := t.operateHeaders(frame, handle); err != nil {
if err := t.operateHeaders(ctx, frame, handle); err != nil {
t.Close(err)
break
}
Expand Down Expand Up @@ -1242,10 +1219,6 @@ func (t *http2Server) Close(err error) {
for _, s := range streams {
s.cancel()
}
for _, sh := range t.stats {
connEnd := &stats.ConnEnd{}
sh.HandleConn(t.ctx, connEnd)
}
}

// deleteStream deletes the stream s from transport's active streams.
Expand Down Expand Up @@ -1311,10 +1284,6 @@ func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eo
})
}

func (t *http2Server) RemoteAddr() net.Addr {
return t.remoteAddr
}

func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -1433,10 +1402,12 @@ func (t *http2Server) getOutFlowWindow() int64 {
}
}

func (t *http2Server) getPeer() *peer.Peer {
// Peer returns the peer of the transport.
func (t *http2Server) Peer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
Addr: t.remoteAddr,
LocalAddr: t.localAddr,
AuthInfo: t.authInfo, // Can be nil
}
}

Expand All @@ -1449,18 +1420,3 @@ func getJitter(v time.Duration) time.Duration {
j := grpcrand.Int63n(2*r) - r
return time.Duration(j)
}

type connectionKey struct{}

// GetConnection gets the connection from the context.
func GetConnection(ctx context.Context) net.Conn {
conn, _ := ctx.Value(connectionKey{}).(net.Conn)
return conn
}

// SetConnection adds the connection to the context to be able to get
// information about the destination ip and port for an incoming RPC. This also
// allows any unary or streaming interceptors to see the connection.
func setConnection(ctx context.Context, conn net.Conn) context.Context {
return context.WithValue(ctx, connectionKey{}, conn)
}
22 changes: 18 additions & 4 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -265,7 +266,8 @@ type Stream struct {
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value). Not valid on server side.
headerValid bool
headerValid bool
headerWireLength int // Only set on server side.

// hdrMu protects header and trailer metadata on the server-side.
hdrMu sync.Mutex
Expand Down Expand Up @@ -425,6 +427,12 @@ func (s *Stream) Context() context.Context {
return s.ctx
}

// SetContext sets the context of the stream. This will be deleted once the
// stats handler callouts all move to gRPC layer.
func (s *Stream) SetContext(ctx context.Context) {
s.ctx = ctx
}

// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
Expand All @@ -437,6 +445,12 @@ func (s *Stream) Status() *status.Status {
return s.status
}

// HeaderWireLength returns the size of the headers of the stream as received
// from the wire. Valid only on the server.
func (s *Stream) HeaderWireLength() int {
return s.headerWireLength
}

// SetHeader sets the header metadata. This can be called multiple times.
// Server side only.
// This should not be called in parallel to other data writes.
Expand Down Expand Up @@ -698,7 +712,7 @@ type ClientTransport interface {
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"the Context given is used as the base context for all streams started on this transport."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per comment above, will wait until further discussion on the server handler transport to add this docstring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to do this, added this comment.

HandleStreams(func(*Stream))
HandleStreams(context.Context, func(*Stream))

// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
Expand All @@ -717,8 +731,8 @@ type ServerTransport interface {
// handlers will be terminated asynchronously.
Close(err error)

// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
// Peer returns the peer of the server transport.
Peer() *peer.Peer
Comment on lines +734 to +735
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we delete Local/RemoteAddr and only have this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done. There's an in flight PR for this but the contributor isn't getting back to my comments so I'll go ahead and just add it to this PR.


// Drain notifies the client this ServerTransport stops accepting new RPCs.
Drain(debugData string)
Expand Down
Loading