Skip to content

Commit 94e109f

Browse files
committed
Add other common functions
1 parent bd468a0 commit 94e109f

File tree

2 files changed

+568
-19
lines changed

2 files changed

+568
-19
lines changed

connection/connection.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
"strings"
2525
"time"
2626

27+
"google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/status"
29+
2730
"github.com/container-storage-interface/spec/lib/go/csi"
2831
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
2932
"google.golang.org/grpc"
@@ -33,6 +36,9 @@ import (
3336
const (
3437
// Interval of logging connection errors
3538
connectionLoggingInterval = 10 * time.Second
39+
40+
// Interval of trying to call Probe() until it succeeds
41+
probeInterval = 1 * time.Second
3642
)
3743

3844
// Connect opens insecure gRPC connection to a CSI driver. Address must be either absolute path to UNIX domain socket
@@ -163,6 +169,7 @@ func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grp
163169
return err
164170
}
165171

172+
// GetDriverName returns name of CSI driver.
166173
func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
167174
client := csi.NewIdentityClient(conn)
168175

@@ -177,3 +184,104 @@ func GetDriverName(ctx context.Context, conn *grpc.ClientConn) (string, error) {
177184
}
178185
return name, nil
179186
}
187+
188+
// PluginCapabilitySet is set of CSI plugin capabilities. Only supported capabilities are in the map.
189+
type PluginCapabilitySet map[csi.PluginCapability_Service_Type]bool
190+
191+
// GetPluginCapabilities returns set of supported capabilities of CSI driver.
192+
func GetPluginCapabilities(ctx context.Context, conn *grpc.ClientConn) (PluginCapabilitySet, error) {
193+
client := csi.NewIdentityClient(conn)
194+
req := csi.GetPluginCapabilitiesRequest{}
195+
rsp, err := client.GetPluginCapabilities(ctx, &req)
196+
if err != nil {
197+
return nil, err
198+
}
199+
caps := PluginCapabilitySet{}
200+
for _, cap := range rsp.GetCapabilities() {
201+
if cap == nil {
202+
continue
203+
}
204+
srv := cap.GetService()
205+
if srv == nil {
206+
continue
207+
}
208+
t := srv.GetType()
209+
caps[t] = true
210+
}
211+
return caps, nil
212+
}
213+
214+
// ControllerCapabilitySet is set of CSI controller capabilities. Only supported capabilities are in the map.
215+
type ControllerCapabilitySet map[csi.ControllerServiceCapability_RPC_Type]bool
216+
217+
// GetControllerCapabilities returns set of supported controller capabilities of CSI driver.
218+
func GetControllerCapabilities(ctx context.Context, conn *grpc.ClientConn) (ControllerCapabilitySet, error) {
219+
client := csi.NewControllerClient(conn)
220+
req := csi.ControllerGetCapabilitiesRequest{}
221+
rsp, err := client.ControllerGetCapabilities(ctx, &req)
222+
if err != nil {
223+
return nil, err
224+
}
225+
226+
caps := ControllerCapabilitySet{}
227+
for _, cap := range rsp.GetCapabilities() {
228+
if cap == nil {
229+
continue
230+
}
231+
rpc := cap.GetRpc()
232+
if rpc == nil {
233+
continue
234+
}
235+
t := rpc.GetType()
236+
caps[t] = true
237+
}
238+
return caps, nil
239+
}
240+
241+
// Probe calls Probe() of a CSI driver and waits until the driver becomes ready.
242+
// Any error other than timeout is returned.
243+
func Probe(conn *grpc.ClientConn, singleProbeTimeout time.Duration) error {
244+
client := csi.NewIdentityClient(conn)
245+
246+
for {
247+
klog.Info("Probing CSI driver for readiness")
248+
ready, err := probeOnce(client, singleProbeTimeout)
249+
if err != nil {
250+
st, ok := status.FromError(err)
251+
if !ok {
252+
// This is not gRPC error. The probe must have failed before gRPC
253+
// method was called, otherwise we would get gRPC error.
254+
return fmt.Errorf("CSI driver probe failed: %s", err)
255+
}
256+
if st.Code() != codes.DeadlineExceeded {
257+
return fmt.Errorf("CSI driver probe failed: %s", err)
258+
}
259+
// Timeout -> driver is not ready. Fall through to sleep() below.
260+
klog.Warning("CSI driver probe timed out")
261+
} else {
262+
if ready {
263+
return nil
264+
}
265+
klog.Warning("CSI driver is not ready")
266+
}
267+
// Timeout was returned or driver is not ready.
268+
time.Sleep(probeInterval)
269+
}
270+
}
271+
272+
func probeOnce(client csi.IdentityClient, timeout time.Duration) (ready bool, err error) {
273+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
274+
defer cancel()
275+
req := csi.ProbeRequest{}
276+
rsp, err := client.Probe(ctx, &req)
277+
278+
if err != nil {
279+
return false, err
280+
}
281+
282+
r := rsp.GetReady()
283+
if r != nil && r.GetValue() {
284+
return true, nil
285+
}
286+
return false, nil
287+
}

0 commit comments

Comments
 (0)