Skip to content

Commit 3aec4af

Browse files
authored
Merge pull request #49 from pohly/csi-driver-metrics
metrics: support usage inside CSI driver
2 parents 771facd + 0f04b64 commit 3aec4af

File tree

4 files changed

+628
-63
lines changed

4 files changed

+628
-63
lines changed

connection/connection.go

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ func connect(
109109
grpc.WithBlock(), // Block until connection succeeds.
110110
grpc.WithChainUnaryInterceptor(
111111
LogGRPC, // Log all messages.
112-
extendedCSIMetricsManager{metricsManager}.recordMetricsInterceptor, // Record metrics for each gRPC call.
112+
ExtendedCSIMetricsManager{metricsManager}.RecordMetricsClientInterceptor, // Record metrics for each gRPC call.
113113
),
114114
)
115115
unixPrefix := "unix://"
@@ -187,12 +187,13 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
187187
return err
188188
}
189189

190-
type extendedCSIMetricsManager struct {
190+
type ExtendedCSIMetricsManager struct {
191191
metrics.CSIMetricsManager
192192
}
193193

194-
// recordMetricsInterceptor is a gPRC unary interceptor for recording metrics for CSI operations.
195-
func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
194+
// RecordMetricsClientInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
195+
// in a gRPC client.
196+
func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
196197
ctx context.Context,
197198
method string,
198199
req, reply interface{},
@@ -209,3 +210,17 @@ func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
209210
)
210211
return err
211212
}
213+
214+
// RecordMetricsServerInterceptor is a gPRC unary interceptor for recording metrics for CSI operations
215+
// in a gRCP server.
216+
func (cmm ExtendedCSIMetricsManager) RecordMetricsServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
217+
start := time.Now()
218+
resp, err := handler(ctx, req)
219+
duration := time.Since(start)
220+
cmm.RecordMetrics(
221+
info.FullMethod, /* operationName */
222+
err, /* operationErr */
223+
duration, /* operationDuration */
224+
)
225+
return resp, err
226+
}

connection/connection_test.go

Lines changed: 64 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,34 @@ const (
5151
serverSock = "server.sock"
5252
)
5353

54+
type identityServer struct{}
55+
56+
func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
57+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
58+
}
59+
60+
func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
61+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
62+
}
63+
64+
func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
65+
return nil, status.Error(codes.Unimplemented, "Unimplemented")
66+
}
67+
5468
// startServer creates a gRPC server without any registered services.
5569
// The returned address can be used to connect to it. The cleanup
5670
// function stops it. It can be called multiple times.
57-
func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controller csi.ControllerServer) (string, func()) {
71+
func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controller csi.ControllerServer, cmm metrics.CSIMetricsManager) (string, func()) {
5872
addr := path.Join(tmp, serverSock)
5973
listener, err := net.Listen("unix", addr)
6074
require.NoError(t, err, "listening on %s", addr)
61-
server := grpc.NewServer()
75+
var opts []grpc.ServerOption
76+
if cmm != nil {
77+
opts = append(opts,
78+
grpc.UnaryInterceptor(ExtendedCSIMetricsManager{cmm}.RecordMetricsServerInterceptor),
79+
)
80+
}
81+
server := grpc.NewServer(opts...)
6282
if identity != nil {
6383
csi.RegisterIdentityServer(server, identity)
6484
}
@@ -85,7 +105,7 @@ func startServer(t *testing.T, tmp string, identity csi.IdentityServer, controll
85105
func TestConnect(t *testing.T) {
86106
tmp := tmpDir(t)
87107
defer os.RemoveAll(tmp)
88-
addr, stopServer := startServer(t, tmp, nil, nil)
108+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
89109
defer stopServer()
90110

91111
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
@@ -100,7 +120,7 @@ func TestConnect(t *testing.T) {
100120
func TestConnectUnix(t *testing.T) {
101121
tmp := tmpDir(t)
102122
defer os.RemoveAll(tmp)
103-
addr, stopServer := startServer(t, tmp, nil, nil)
123+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
104124
defer stopServer()
105125

106126
conn, err := Connect("unix:///"+addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
@@ -141,7 +161,7 @@ func TestWaitForServer(t *testing.T) {
141161
t.Logf("sleeping %s before starting server", delay)
142162
time.Sleep(delay)
143163
startTimeServer = time.Now()
144-
_, stopServer = startServer(t, tmp, nil, nil)
164+
_, stopServer = startServer(t, tmp, nil, nil, nil)
145165
}()
146166
conn, err := Connect(path.Join(tmp, serverSock), metrics.NewCSIMetricsManager("fake.csi.driver.io"))
147167
if assert.NoError(t, err, "connect via absolute path") {
@@ -175,7 +195,7 @@ func TestTimout(t *testing.T) {
175195
func TestReconnect(t *testing.T) {
176196
tmp := tmpDir(t)
177197
defer os.RemoveAll(tmp)
178-
addr, stopServer := startServer(t, tmp, nil, nil)
198+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
179199
defer func() {
180200
stopServer()
181201
}()
@@ -202,7 +222,7 @@ func TestReconnect(t *testing.T) {
202222
}
203223

204224
// No reconnection either when the server comes back.
205-
_, stopServer = startServer(t, tmp, nil, nil)
225+
_, stopServer = startServer(t, tmp, nil, nil, nil)
206226
// We need to give gRPC some time. It does not attempt to reconnect
207227
// immediately. If we send the method call too soon, the test passes
208228
// even though a later method call will go through again.
@@ -220,7 +240,7 @@ func TestReconnect(t *testing.T) {
220240
func TestDisconnect(t *testing.T) {
221241
tmp := tmpDir(t)
222242
defer os.RemoveAll(tmp)
223-
addr, stopServer := startServer(t, tmp, nil, nil)
243+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
224244
defer func() {
225245
stopServer()
226246
}()
@@ -251,7 +271,7 @@ func TestDisconnect(t *testing.T) {
251271
}
252272

253273
// No reconnection either when the server comes back.
254-
_, stopServer = startServer(t, tmp, nil, nil)
274+
_, stopServer = startServer(t, tmp, nil, nil, nil)
255275
// We need to give gRPC some time. It does not attempt to reconnect
256276
// immediately. If we send the method call too soon, the test passes
257277
// even though a later method call will go through again.
@@ -271,7 +291,7 @@ func TestDisconnect(t *testing.T) {
271291
func TestExplicitReconnect(t *testing.T) {
272292
tmp := tmpDir(t)
273293
defer os.RemoveAll(tmp)
274-
addr, stopServer := startServer(t, tmp, nil, nil)
294+
addr, stopServer := startServer(t, tmp, nil, nil, nil)
275295
defer func() {
276296
stopServer()
277297
}()
@@ -302,7 +322,7 @@ func TestExplicitReconnect(t *testing.T) {
302322
}
303323

304324
// No reconnection either when the server comes back.
305-
_, stopServer = startServer(t, tmp, nil, nil)
325+
_, stopServer = startServer(t, tmp, nil, nil, nil)
306326
// We need to give gRPC some time. It does not attempt to reconnect
307327
// immediately. If we send the method call too soon, the test passes
308328
// even though a later method call will go through again.
@@ -322,7 +342,10 @@ func TestExplicitReconnect(t *testing.T) {
322342
func TestConnectMetrics(t *testing.T) {
323343
tmp := tmpDir(t)
324344
defer os.RemoveAll(tmp)
325-
addr, stopServer := startServer(t, tmp, nil, nil)
345+
cmmServer := metrics.NewCSIMetricsManagerForPlugin("fake.csi.driver.io")
346+
// We have to have a real implementation of the gRPC call, otherwise the metrics
347+
// interceptor is not called. The CSI identity service is used because it's simple.
348+
addr, stopServer := startServer(t, tmp, &identityServer{}, nil, cmmServer)
326349
defer stopServer()
327350

328351
cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
@@ -332,38 +355,49 @@ func TestConnectMetrics(t *testing.T) {
332355
defer conn.Close()
333356
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
334357

335-
if err := conn.Invoke(context.Background(), "/csi.v1.Controller/ControllerGetCapabilities", nil, nil); assert.Error(t, err) {
358+
identityClient := csi.NewIdentityClient(conn)
359+
if _, err := identityClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}); assert.Error(t, err) {
336360
errStatus, _ := status.FromError(err)
337361
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
338362
}
339363
}
340364

341365
expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
342366
# TYPE csi_sidecar_operations_seconds histogram
343-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.1"} 1
344-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.25"} 1
345-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.5"} 1
346-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="1"} 1
347-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="2.5"} 1
348-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="5"} 1
349-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="10"} 1
350-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="15"} 1
351-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="25"} 1
352-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="50"} 1
353-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="120"} 1
354-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="300"} 1
355-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="600"} 1
356-
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="+Inf"} 1
357-
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 0
358-
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 1
367+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.1"} 1
368+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.25"} 1
369+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="0.5"} 1
370+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="1"} 1
371+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="2.5"} 1
372+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="5"} 1
373+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="10"} 1
374+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="15"} 1
375+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="25"} 1
376+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="50"} 1
377+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="120"} 1
378+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="300"} 1
379+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="600"} 1
380+
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo",le="+Inf"} 1
381+
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 0
382+
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Identity/GetPluginInfo"} 1
359383
`
360384

361385
if err := testutil.GatherAndCompare(
362386
cmm.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
363387
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
364388
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
365389
if err != nil {
366-
t.Errorf("Expected metrics not found -- %v", err)
390+
t.Errorf("Expected client metrics not found -- %v", err)
391+
}
392+
}
393+
394+
expectedMetrics = strings.Replace(expectedMetrics, "csi_sidecar", metrics.SubsystemPlugin, -1)
395+
if err := testutil.GatherAndCompare(
396+
cmmServer.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
397+
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
398+
err = verifyMetricsError(t, err, metrics.SubsystemPlugin+"_operations_seconds_sum")
399+
if err != nil {
400+
t.Errorf("Expected server metrics not found -- %v", err)
367401
}
368402
}
369403
}

0 commit comments

Comments
 (0)