Skip to content

Commit 1348aad

Browse files
committed
*: add drpc client and server metrics
1 parent f9ed115 commit 1348aad

File tree

4 files changed

+35
-53
lines changed

4 files changed

+35
-53
lines changed

drpcconn/conn.go

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,7 @@ func (c *Conn) Close() (err error) { return c.man.Close() }
111111

112112
// Invoke issues the rpc on the transport serializing in, waits for a response, and
113113
// deserializes it into out. Only one Invoke or Stream may be open at a time.
114-
func (c *Conn) Invoke(
115-
ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message,
116-
) (err error) {
114+
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
117115
defer func() { err = drpc.ToRPCErr(err) }()
118116

119117
var metadata []byte
@@ -145,14 +143,7 @@ func (c *Conn) Invoke(
145143
return nil
146144
}
147145

148-
func (c *Conn) doInvoke(
149-
stream *drpcstream.Stream,
150-
enc drpc.Encoding,
151-
rpc string,
152-
data []byte,
153-
metadata []byte,
154-
out drpc.Message,
155-
) (err error) {
146+
func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) {
156147
if len(metadata) > 0 {
157148
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
158149
return err

drpcmanager/manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/zeebo/assert"
1616
grpcmetadata "google.golang.org/grpc/metadata"
1717
"storj.io/drpc/drpcmetadata"
18+
1819
"storj.io/drpc/drpctest"
1920
"storj.io/drpc/drpcwire"
2021
)

drpcmetrics/metrics.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -100,37 +100,34 @@ func StatusCodeLabel(err error) string {
100100
return status.Code(err).String()
101101
}
102102

103-
// WithStatusCode returns a copy of labels with the error_code label added.
104-
func WithStatusCode(labels map[string]string, err error) map[string]string {
105-
out := make(map[string]string, len(labels)+1)
106-
for k, v := range labels {
107-
out[k] = v
108-
}
103+
// WithStatusCode returns a label map with the RPC method name and status code.
104+
func WithStatusCode(rpc string, err error) map[string]string {
109105
statusCode := drpc.ToRPCErr(err)
110-
out[LabelStatusCode] = StatusCodeLabel(statusCode)
111-
return out
106+
labels := map[string]string{
107+
LabelRPCMethodName: rpc,
108+
LabelStatusCode: StatusCodeLabel(statusCode),
109+
}
110+
return labels
112111
}
113112

114113
// WithState returns a copy of labels with the state label added.
115-
func WithState(labels map[string]string, state string) map[string]string {
116-
out := make(map[string]string, len(labels)+1)
117-
for k, v := range labels {
118-
out[k] = v
114+
func WithState(rpc string, state string) map[string]string {
115+
labels := map[string]string{
116+
LabelRPCMethodName: rpc,
117+
LabelState: state,
119118
}
120-
out[LabelState] = state
121-
return out
119+
return labels
122120
}
123121

124122
// WithStateAndStatusCode returns a copy of labels with the state and status code labels added.
125-
func WithStateAndStatusCode(labels map[string]string,
123+
func WithStateAndStatusCode(rpc string,
126124
state string, err error) map[string]string {
127-
out := make(map[string]string, len(labels)+2)
128-
for k, v := range labels {
129-
out[k] = v
125+
labels := map[string]string{
126+
LabelRPCMethodName: rpc,
127+
LabelState: state,
128+
LabelStatusCode: StatusCodeLabel(drpc.ToRPCErr(err)),
130129
}
131-
out[LabelState] = state
132-
out[LabelStatusCode] = StatusCodeLabel(drpc.ToRPCErr(err))
133-
return out
130+
return labels
134131
}
135132

136133
// ClientMetrics holds optional metrics that the client connection will

drpcserver/server.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ type Options struct {
3232
// handling individual clients. It is not called if nil.
3333
Log func(error)
3434

35+
// CollectStats controls whether the server should collect stats on the
36+
// rpcs it serves.
37+
CollectStats bool
38+
3539
// TLSConfig, if non-nil, is used to wrap the listener with tls.NewListener
3640
// in Serve(). The TLS handshake is performed explicitly in ServeOne before
3741
// processing requests.
@@ -43,10 +47,6 @@ type Options struct {
4347
// restrictions. If it returns a non-nil error the connection is rejected.
4448
TLSCipherRestrict func(conn net.Conn) error
4549

46-
// CollectStats controls whether the server should collect stats on the
47-
// rpcs it serves.
48-
CollectStats bool
49-
5050
// Metrics holds optional metrics the server will populate. If nil, no
5151
// metrics are recorded.
5252
Metrics *ServerMetrics
@@ -86,32 +86,27 @@ func (m *ServerMetrics) wrapTransport(tr drpc.Transport) drpc.Transport {
8686
return &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: m.BytesSent, BytesRecv: m.BytesRecv}
8787
}
8888

89-
// rpcLabels builds a label map for the given RPC string.
90-
func (s *Server) rpcLabels(rpc string) map[string]string {
91-
return map[string]string{
92-
drpcmetrics.LabelRPCMethodName: rpc,
93-
}
94-
}
95-
9689
// recordStreamStart records stream start metrics.
97-
func (s *Server) recordStreamStart(labels map[string]string) {
90+
func (s *Server) recordStreamStart(rpc string) {
9891
sm := s.opts.Metrics
9992
if sm != nil && sm.Stream != nil && sm.Stream.StreamsTotal != nil {
100-
sm.Stream.StreamsTotal.Inc(drpcmetrics.WithState(labels,
93+
sm.Stream.StreamsTotal.Inc(drpcmetrics.WithState(rpc,
10194
drpcmetrics.StateStarted), 1)
10295
}
10396
}
10497

10598
// recordStreamEnd records stream end metrics.
106-
func (s *Server) recordStreamEnd(labels map[string]string, err error, duration time.Duration) {
99+
func (s *Server) recordStreamEnd(rpc string, err error,
100+
duration time.Duration) {
107101
sm := s.opts.Metrics
108102
if sm != nil && sm.Stream != nil {
109103
if sm.Stream.StreamsTotal != nil {
110-
sm.Stream.StreamsTotal.Inc(drpcmetrics.WithStateAndStatusCode(labels,
104+
sm.Stream.StreamsTotal.Inc(drpcmetrics.WithStateAndStatusCode(rpc,
111105
drpcmetrics.StateCompleted, err), 1)
112106
}
113107
if sm.Stream.StreamDuration != nil {
114-
sm.Stream.StreamDuration.Observe(drpcmetrics.WithStatusCode(labels, err),
108+
sm.Stream.StreamDuration.Observe(drpcmetrics.WithStatusCode(rpc,
109+
err),
115110
duration.Seconds())
116111
}
117112
}
@@ -293,26 +288,24 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) {
293288

294289
// handleRPC handles the rpc that has been requested by the stream.
295290
func (s *Server) handleRPC(stream *drpcstream.Stream, rpc string) (err error) {
296-
var labels map[string]string
297291
var start time.Time
298292
if s.opts.Metrics != nil {
299-
labels = s.rpcLabels(rpc)
300-
s.recordStreamStart(labels)
293+
s.recordStreamStart(rpc)
301294
start = time.Now()
302295
}
303296

304297
handlerErr := s.handler.HandleRPC(stream, rpc)
305298

306299
if handlerErr != nil {
307300
sendErr := errs.Wrap(stream.SendError(handlerErr))
308-
s.recordStreamEnd(labels, handlerErr, time.Since(start))
301+
s.recordStreamEnd(rpc, handlerErr, time.Since(start))
309302
return sendErr
310303
}
311304

312305
closeErr := errs.Wrap(stream.CloseSend())
313306
// Record success: the handler returned nil, so the RPC itself
314307
// succeeded. A CloseSend failure is a transport-level issue and
315308
// should not be attributed as the RPC's error code.
316-
s.recordStreamEnd(labels, nil, time.Since(start))
309+
s.recordStreamEnd(rpc, nil, time.Since(start))
317310
return closeErr
318311
}

0 commit comments

Comments
 (0)