Skip to content

Commit d024cc1

Browse files
committed
metrics: support usage inside CSI driver
A CSI driver is a gRPC server, which implies that the interceptor must use a slightly different API. The extended CSIMetricsManager gets exported with a suitable method for that. To avoid counting the same operation twice in the same metric, CSI sidecar and driver should use different subsystem names. Two new functions provide the CSIMetricsManager that they are expected to use for the sake of consistency. However, special cases need additional flexibility: - constant labels (same for all samples) - varying labels (same label names, but per-sample values) - configurable subsystem - configurable stability
1 parent 771facd commit d024cc1

File tree

4 files changed

+251
-60
lines changed

4 files changed

+251
-60
lines changed

connection/connection.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func connect(
109109
grpc.WithBlock(), // Block until connection succeeds.
110110
grpc.WithChainUnaryInterceptor(
111111
LogGRPC, // Log all messages.
112-
extendedCSIMetricsManager{metricsManager}.recordMetricsInterceptor, // Record metrics for each gRPC call.
112+
ExtendedCSIMetricsManager{metricsManager}.RecordMetricsClientInterceptor, // Record metrics for each gRPC call.
113113
),
114114
)
115115
unixPrefix := "unix://"
@@ -187,12 +187,13 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
187187
return err
188188
}
189189

190-
type extendedCSIMetricsManager struct {
190+
type ExtendedCSIMetricsManager struct {
191191
metrics.CSIMetricsManager
192192
}
193193

194-
// recordMetricsInterceptor is a gPRC unary interceptor for recording metrics for CSI operations.
195-
func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
194+
// RecordMetricsClientInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
195+
// in a gRPC client.
196+
func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
196197
ctx context.Context,
197198
method string,
198199
req, reply interface{},
@@ -209,3 +210,17 @@ func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
209210
)
210211
return err
211212
}
213+
214+
// RecordMetricsServerInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
215+
// in a gRCP server.
216+
func (cmm ExtendedCSIMetricsManager) RecordMetricsServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
217+
start := time.Now()
218+
resp, err := handler(ctx, req)
219+
duration := time.Since(start)
220+
cmm.RecordMetrics(
221+
info.FullMethod, /* operationName */
222+
err, /* operationErr */
223+
duration, /* operationDuration */
224+
)
225+
return resp, err
226+
}

connection/connection_test.go

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,34 @@ const (
5151
serverSock = "server.sock"
5252
)
5353

54+
type identityServer struct{}
55+
56+
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
57+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
58+
}
59+
60+
func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
61+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
62+
}
63+
64+
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
65+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
66+
}
67+
5468
// startServer creates a gRPC server without any registered services.
5569
// The returned address can be used to connect to it. The cleanup
5670
// function stops it. It can be called multiple times.
57-
func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controller csi.ControllerServer) (string, func()) {
71+
func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controller csi.ControllerServer, cmm metrics.CSIMetricsManager) (string, func()) {
5872
addr := path.Join(tmp, serverSock)
5973
listener, err := net.Listen("unix", addr)
6074
require.NoError(t, err, "listening on %s", addr)
61-
server := grpc.NewServer()
75+
var opts []grpc.ServerOption
76+
if cmm != nil {
77+
opts = append(opts,
78+
grpc.UnaryInterceptor(ExtendedCSIMetricsManager{cmm}.RecordMetricsServerInterceptor),
79+
)
80+
}
81+
server := grpc.NewServer(opts...)
6282
if identity != nil {
6383
csi.RegisterIdentityServer(server, identity)
6484
}
@@ -85,7 +105,7 @@ func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controll
85105
func TestConnect(t *testing.T) {
86106
tmp := tmpDir(t)
87107
defer os.RemoveAll(tmp)
88-
addr, stopServer := startServer(t, tmp, nil, nil)
108+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
89109
defer stopServer()
90110

91111
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
@@ -100,7 +120,7 @@ func TestConnect(t *testing.T) {
100120
func TestConnectUnix(t *testing.T) {
101121
tmp := tmpDir(t)
102122
defer os.RemoveAll(tmp)
103-
addr, stopServer := startServer(t, tmp, nil, nil)
123+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
104124
defer stopServer()
105125

106126
conn, err := Connect("unix:///"+addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
@@ -141,7 +161,7 @@ func TestWaitForServer(t *testing.T) {
141161
t.Logf("sleeping %s before starting server", delay)
142162
time.Sleep(delay)
143163
startTimeServer = time.Now()
144-
_, stopServer = startServer(t, tmp, nil, nil)
164+
_, stopServer = startServer(t, tmp, nil, nil, nil)
145165
}()
146166
conn, err := Connect(path.Join(tmp, serverSock), metrics.NewCSIMetricsManager("fake.csi.driver.io"))
147167
if assert.NoError(t, err, "connect via absolute path") {
@@ -175,7 +195,7 @@ func TestTimout(t *testing.T) {
175195
func TestReconnect(t *testing.T) {
176196
tmp := tmpDir(t)
177197
defer os.RemoveAll(tmp)
178-
addr, stopServer := startServer(t, tmp, nil, nil)
198+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
179199
defer func() {
180200
stopServer()
181201
}()
@@ -202,7 +222,7 @@ func TestReconnect(t *testing.T) {
202222
}
203223

204224
// No reconnection either when the server comes back.
205-
_, stopServer = startServer(t, tmp, nil, nil)
225+
_, stopServer = startServer(t, tmp, nil, nil, nil)
206226
// We need to give gRPC some time. It does not attempt to reconnect
207227
// immediately. If we send the method call too soon, the test passes
208228
// even though a later method call will go through again.
@@ -220,7 +240,7 @@ func TestReconnect(t *testing.T) {
220240
func TestDisconnect(t *testing.T) {
221241
tmp := tmpDir(t)
222242
defer os.RemoveAll(tmp)
223-
addr, stopServer := startServer(t, tmp, nil, nil)
243+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
224244
defer func() {
225245
stopServer()
226246
}()
@@ -251,7 +271,7 @@ func TestDisconnect(t *testing.T) {
251271
}
252272

253273
// No reconnection either when the server comes back.
254-
_, stopServer = startServer(t, tmp, nil, nil)
274+
_, stopServer = startServer(t, tmp, nil, nil, nil)
255275
// We need to give gRPC some time. It does not attempt to reconnect
256276
// immediately. If we send the method call too soon, the test passes
257277
// even though a later method call will go through again.
@@ -271,7 +291,7 @@ func TestDisconnect(t *testing.T) {
271291
func TestExplicitReconnect(t *testing.T) {
272292
tmp := tmpDir(t)
273293
defer os.RemoveAll(tmp)
274-
addr, stopServer := startServer(t, tmp, nil, nil)
294+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
275295
defer func() {
276296
stopServer()
277297
}()
@@ -302,7 +322,7 @@ func TestExplicitReconnect(t *testing.T) {
302322
}
303323

304324
// No reconnection either when the server comes back.
305-
_, stopServer = startServer(t, tmp, nil, nil)
325+
_, stopServer = startServer(t, tmp, nil, nil, nil)
306326
// We need to give gRPC some time. It does not attempt to reconnect
307327
// immediately. If we send the method call too soon, the test passes
308328
// even though a later method call will go through again.
@@ -322,7 +342,12 @@ func TestExplicitReconnect(t *testing.T) {
322342
func TestConnectMetrics(t *testing.T) {
323343
tmp := tmpDir(t)
324344
defer os.RemoveAll(tmp)
325-
addr, stopServer := startServer(t, tmp, nil, nil)
345+
cmmServer := metrics.NewCSIMetricsManager("fake.csi.driver.io",
346+
metrics.WithSubsystem(metrics.SubsystemPlugin),
347+
)
348+
// We have to have a real implementation of the gRPC call, otherwise the metrics
349+
// interceptor is not called. The CSI identity service is used because it's simple.
350+
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
326351
defer stopServer()
327352

328353
cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
@@ -332,38 +357,49 @@ func TestConnectMetrics(t *testing.T) {
332357
defer conn.Close()
333358
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
334359

335-
if err := conn.Invoke(context.Background(), "/csi.v1.Controller/ControllerGetCapabilities", nil, nil); assert.Error(t, err) {
360+
identityClient := csi.NewIdentityClient(conn)
361+
if _, err := identityClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
336362
errStatus, _ := status.FromError(err)
337363
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
338364
}
339365
}
340366

341367
expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
342368
# TYPE csi_sidecar_operations_seconds histogram
343-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.1"} 1
344-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.25"} 1
345-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.5"} 1
346-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="1"} 1
347-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="2.5"} 1
348-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="5"} 1
349-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="10"} 1
350-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="15"} 1
351-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="25"} 1
352-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="50"} 1
353-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="120"} 1
354-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="300"} 1
355-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="600"} 1
356-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="+Inf"} 1
357-
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 0
358-
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 1
369+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
370+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
371+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
372+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
373+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
374+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
375+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
376+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
377+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
378+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
379+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
380+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
381+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
382+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
383+
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
384+
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
359385
`
360386

361387
if err := testutil.GatherAndCompare(
362388
cmm.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
363389
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
364390
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
365391
if err != nil {
366-
t.Errorf("Expected metrics not found -- %v", err)
392+
t.Errorf("Expected client metrics not found -- %v", err)
393+
}
394+
}
395+
396+
expectedMetrics = strings.Replace(expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
397+
if err := testutil.GatherAndCompare(
398+
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
399+
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
400+
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
401+
if err != nil {
402+
t.Errorf("Expected server metrics not found -- %v", err)
367403
}
368404
}
369405
}

0 commit comments

Comments
 (0)