Skip to content

Introduce a CSI Metrics Library #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
38 changes: 34 additions & 4 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strings"
"time"

"github.com/kubernetes-csi/csi-lib-utils/metrics"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc"
"k8s.io/klog"
Expand Down Expand Up @@ -58,8 +59,8 @@ const terminationLogPath = "/dev/termination-log"
//
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, options ...Option) (*grpc.ClientConn, error) {
return connect(address, []grpc.DialOption{}, options)
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the creation of the /metrics endpoint still reside in the caller?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. But I'll add a helper function here to make that easier.

return connect(address, metricsManager, []grpc.DialOption{}, options)
}

// Option is the type of all optional parameters for Connect.
Expand Down Expand Up @@ -93,7 +94,10 @@ type options struct {
}

// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
func connect(
address string,
metricsManager metrics.CSIMetricsManager,
dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) {
var o options
for _, option := range connectOptions {
option(&o)
Expand All @@ -103,7 +107,10 @@ func connect(address string, dialOptions []grpc.DialOption, connectOptions []Opt
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
grpc.WithUnaryInterceptor(LogGRPC), // Log all messages.
grpc.WithChainUnaryInterceptor(
LogGRPC, // Log all messages.
extendedCSIMetricsManager{metricsManager}.recordMetricsInterceptor, // Record metrics for each gRPC call.
),
)
unixPrefix := "unix://"
if strings.HasPrefix(address, "/") {
Expand Down Expand Up @@ -179,3 +186,26 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
klog.V(5).Infof("GRPC error: %v", err)
return err
}

type extendedCSIMetricsManager struct {
metrics.CSIMetricsManager
}

// recordMetricsInterceptor is a gPRC unary interceptor for recording metrics for CSI operations.
func (cmm extendedCSIMetricsManager) recordMetricsInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption) error {
start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
duration := time.Since(start)
cmm.RecordMetrics(
method, /* operationName */
err, /* operationErr */
duration, /* operationDuration */
)
return err
}
96 changes: 89 additions & 7 deletions connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net"
"os"
"path"
"strings"
"sync"
"testing"
"time"
Expand All @@ -35,6 +36,9 @@ import (
"github.com/stretchr/testify/require"

"github.com/container-storage-interface/spec/lib/go/csi"

"github.com/kubernetes-csi/csi-lib-utils/metrics"
"k8s.io/component-base/metrics/testutil"
)

func tmpDir(t *testing.T) string {
Expand Down Expand Up @@ -84,7 +88,7 @@ func TestConnect(t *testing.T) {
addr, stopServer := startServer(t, tmp, nil, nil)
defer stopServer()

conn, err := Connect(addr)
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
Expand All @@ -99,7 +103,7 @@ func TestConnectUnix(t *testing.T) {
addr, stopServer := startServer(t, tmp, nil, nil)
defer stopServer()

conn, err := Connect("unix:///" + addr)
conn, err := Connect("unix:///"+addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
if assert.NoError(t, err, "connect with unix:/// prefix") &&
assert.NotNil(t, conn, "got a connection") {
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")
Expand Down Expand Up @@ -139,7 +143,7 @@ func TestWaitForServer(t *testing.T) {
startTimeServer = time.Now()
_, stopServer = startServer(t, tmp, nil, nil)
}()
conn, err := Connect(path.Join(tmp, serverSock))
conn, err := Connect(path.Join(tmp, serverSock), metrics.NewCSIMetricsManager("fake.csi.driver.io"))
if assert.NoError(t, err, "connect via absolute path") {
endTime := time.Now()
assert.NotNil(t, conn, "got a connection")
Expand All @@ -158,7 +162,7 @@ func TestTimout(t *testing.T) {

startTime := time.Now()
timeout := 5 * time.Second
conn, err := connect(path.Join(tmp, "no-such.sock"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
conn, err := connect(path.Join(tmp, "no-such.sock"), metrics.NewCSIMetricsManager("fake.csi.driver.io"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil)
endTime := time.Now()
if assert.Error(t, err, "connection should fail") {
assert.InEpsilon(t, timeout, endTime.Sub(startTime), 1, "connection timeout")
Expand All @@ -177,7 +181,7 @@ func TestReconnect(t *testing.T) {
}()

// Allow reconnection (the default).
conn, err := Connect(addr)
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"))
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
Expand Down Expand Up @@ -222,7 +226,7 @@ func TestDisconnect(t *testing.T) {
}()

reconnectCount := 0
conn, err := Connect(addr, OnConnectionLoss(func() bool {
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"), OnConnectionLoss(func() bool {
reconnectCount++
// Don't reconnect.
return false
Expand Down Expand Up @@ -273,7 +277,7 @@ func TestExplicitReconnect(t *testing.T) {
}()

reconnectCount := 0
conn, err := Connect(addr, OnConnectionLoss(func() bool {
conn, err := Connect(addr, metrics.NewCSIMetricsManager("fake.csi.driver.io"), OnConnectionLoss(func() bool {
reconnectCount++
// Reconnect.
return true
Expand Down Expand Up @@ -314,3 +318,81 @@ func TestExplicitReconnect(t *testing.T) {
assert.Equal(t, 1, reconnectCount, "connection loss callback should be called once")
}
}

func TestConnectMetrics(t *testing.T) {
tmp := tmpDir(t)
defer os.RemoveAll(tmp)
addr, stopServer := startServer(t, tmp, nil, nil)
defer stopServer()

cmm := metrics.NewCSIMetricsManager("fake.csi.driver.io")
conn, err := Connect(addr, cmm)
if assert.NoError(t, err, "connect via absolute path") &&
assert.NotNil(t, conn, "got a connection") {
defer conn.Close()
assert.Equal(t, connectivity.Ready, conn.GetState(), "connection ready")

if err := conn.Invoke(context.Background(), "/csi.v1.Controller/ControllerGetCapabilities", nil, nil); assert.Error(t, err) {
errStatus, _ := status.FromError(err)
assert.Equal(t, codes.Unimplemented, errStatus.Code(), "not implemented")
}
}

expectedMetrics := `# HELP csi_sidecar_operations_seconds [ALPHA] Container Storage Interface operation duration with gRPC error code status total
# TYPE csi_sidecar_operations_seconds histogram
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="0.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="1"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="2.5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="5"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="10"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="15"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="25"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="50"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="120"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="300"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="600"} 1
csi_sidecar_operations_seconds_bucket{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities",le="+Inf"} 1
csi_sidecar_operations_seconds_sum{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 0
csi_sidecar_operations_seconds_count{driver_name="fake.csi.driver.io",grpc_status_code="Unimplemented",method_name="/csi.v1.Controller/ControllerGetCapabilities"} 1
`

if err := testutil.GatherAndCompare(
cmm.GetRegistry(), strings.NewReader(expectedMetrics)); err != nil {
// Ignore mismatches on csi_sidecar_operations_seconds_sum metric because execution time will vary from test to test.
err = verifyMetricsError(t, err, "csi_sidecar_operations_seconds_sum")
if err != nil {
t.Errorf("Expected metrics not found -- %v", err)
}
}
}

func verifyMetricsError(t *testing.T, err error, metricToIgnore string) error {
errStringArr := strings.Split(err.Error(), "got:")

if len(errStringArr) != 2 {
return err
}

want := errStringArr[0]
got := strings.TrimSpace(errStringArr[1])

if want == "" || got == "" {
return err
}

wantArr := strings.Split(err.Error(), "want:")
if len(wantArr) != 2 {
return err
}

want = strings.TrimSpace(wantArr[1])

if matchErr := metrics.VerifyMetricsMatch(want, got, metricToIgnore); matchErr != nil {
t.Errorf("%v", matchErr)
return err
}

return nil
}
86 changes: 59 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,68 @@ go 1.12

require (
github.com/container-storage-interface/spec v1.1.0
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.1
github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect
github.com/golang/protobuf v1.3.2
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 // indirect
github.com/onsi/ginkgo v1.10.2 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/pkg/errors v0.8.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect
golang.org/x/net v0.0.0-20190328230028-74de082e2cca
golang.org/x/oauth2 v0.0.0-20190319182350-c85d3e98c914 // indirect
golang.org/x/sys v0.0.0-20190329044733-9eb1bfa1ce65 // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
google.golang.org/appengine v1.5.0 // indirect
google.golang.org/genproto v0.0.0-20190327125643-d831d65fe17d // indirect
google.golang.org/grpc v1.19.1
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
k8s.io/api v0.0.0-20190313235455-40a48860b5ab
k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1 // indirect
k8s.io/client-go v11.0.0+incompatible
k8s.io/klog v0.2.0
k8s.io/kube-openapi v0.0.0-20190320154901-5e45bb682580 // indirect
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 // indirect
sigs.k8s.io/yaml v1.1.0 // indirect
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553
golang.org/x/sys v0.0.0-20191220220014-0732a990476f // indirect
google.golang.org/genproto v0.0.0-20191220175831-5c49e3ecc1c1 // indirect
google.golang.org/grpc v1.26.0
k8s.io/api v0.17.0
k8s.io/apimachinery v0.17.1-beta.0 // indirect
k8s.io/client-go v0.17.0
k8s.io/component-base v0.17.0
k8s.io/klog v1.0.0
)

replace k8s.io/api => k8s.io/api v0.17.0

replace k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.17.0

replace k8s.io/apimachinery => k8s.io/apimachinery v0.17.1-beta.0

replace k8s.io/apiserver => k8s.io/apiserver v0.17.0

replace k8s.io/cli-runtime => k8s.io/cli-runtime v0.17.0

replace k8s.io/client-go => k8s.io/client-go v0.17.0

replace k8s.io/cloud-provider => k8s.io/cloud-provider v0.17.0

replace k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.17.0

replace k8s.io/code-generator => k8s.io/code-generator v0.17.1-beta.0

replace k8s.io/component-base => k8s.io/component-base v0.17.0

replace k8s.io/cri-api => k8s.io/cri-api v0.17.1-beta.0

replace k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.17.0

replace k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.17.0

replace k8s.io/kube-controller-manager => k8s.io/kube-controller-manager v0.17.0

replace k8s.io/kube-proxy => k8s.io/kube-proxy v0.17.0

replace k8s.io/kube-scheduler => k8s.io/kube-scheduler v0.17.0

replace k8s.io/kubectl => k8s.io/kubectl v0.17.0

replace k8s.io/kubelet => k8s.io/kubelet v0.17.0

replace k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.17.0

replace k8s.io/metrics => k8s.io/metrics v0.17.0

replace k8s.io/node-api => k8s.io/node-api v0.17.0

replace k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.17.0

replace k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.17.0

replace k8s.io/sample-controller => k8s.io/sample-controller v0.17.0
Loading