Skip to content

Commit 116d82b

Browse files
committed
csi-host-path: add csi driver proxy support
This is a code lift from mock driver in favour of replacing mock driver with hostpath driver in Kubernetes e2e testing. kubernetes-csi/csi-test@ef07baf
1 parent b2e6494 commit 116d82b

File tree

5 files changed

+347
-33
lines changed

5 files changed

+347
-33
lines changed

cmd/hostpathplugin/main.go

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
"flag"
2122
"fmt"
2223
"os"
24+
"os/signal"
2325
"path"
26+
"syscall"
2427

28+
"github.com/golang/glog"
29+
"github.com/kubernetes-csi/csi-driver-host-path/internal/proxy"
2530
"github.com/kubernetes-csi/csi-driver-host-path/pkg/hostpath"
2631
)
2732

@@ -30,7 +35,7 @@ func init() {
3035
}
3136

3237
var (
33-
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
38+
csiEndpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
3439
driverName = flag.String("drivername", "hostpath.csi.k8s.io", "name of the driver")
3540
nodeID = flag.String("nodeid", "", "node id")
3641
ephemeral = flag.Bool("ephemeral", false, "publish volumes in ephemeral mode even if kubelet did not ask for it (only needed for Kubernetes 1.15)")
@@ -41,7 +46,8 @@ var (
4146
flag.Var(c, "capacity", "Simulate storage capacity. The parameter is <kind>=<quantity> where <kind> is the value of a 'kind' storage class parameter and <quantity> is the total amount of bytes for that kind. The flag may be used multiple times to configure different kinds.")
4247
return c
4348
}()
44-
enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
49+
enableAttach = flag.Bool("enable-attach", false, "Enables RPC_PUBLISH_UNPUBLISH_VOLUME capability.")
50+
proxyEndpoint = flag.String("proxy-endpoint", "", "Instead of running the CSI driver code, just proxy connections from csiEndpoint to the given listening socket.")
4551
// Set by the build process
4652
version = ""
4753
)
@@ -59,7 +65,30 @@ func main() {
5965
fmt.Fprintln(os.Stderr, "Deprecation warning: The ephemeral flag is deprecated and should only be used when deploying on Kubernetes 1.15. It will be removed in the future.")
6066
}
6167

62-
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *endpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach)
68+
if *proxyEndpoint != "" {
69+
ctx, cancel := context.WithCancel(context.Background())
70+
defer cancel()
71+
closer, err := proxy.Run(ctx, *csiEndpoint, *proxyEndpoint)
72+
if err != nil {
73+
glog.Fatalf("failed to run proxy: %v", err)
74+
}
75+
defer closer.Close()
76+
77+
// Wait for signal
78+
sigc := make(chan os.Signal, 1)
79+
sigs := []os.Signal{
80+
syscall.SIGTERM,
81+
syscall.SIGHUP,
82+
syscall.SIGINT,
83+
syscall.SIGQUIT,
84+
}
85+
signal.Notify(sigc, sigs...)
86+
87+
<-sigc
88+
return
89+
}
90+
91+
driver, err := hostpath.NewHostPathDriver(*driverName, *nodeID, *csiEndpoint, *ephemeral, *maxVolumesPerNode, version, capacity, *enableAttach)
6392
if err != nil {
6493
fmt.Printf("Failed to initialize driver: %s", err.Error())
6594
os.Exit(1)

internal/endpoint/endpoint.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"fmt"
21+
"net"
22+
"os"
23+
"strings"
24+
)
25+
26+
func Parse(ep string) (string, string, error) {
27+
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
28+
s := strings.SplitN(ep, "://", 2)
29+
if s[1] != "" {
30+
return s[0], s[1], nil
31+
}
32+
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
33+
}
34+
// Assume everything else is a file path for a Unix Domain Socket.
35+
return "unix", ep, nil
36+
}
37+
38+
func Listen(endpoint string) (net.Listener, func(), error) {
39+
proto, addr, err := Parse(endpoint)
40+
if err != nil {
41+
return nil, nil, err
42+
}
43+
44+
cleanup := func() {}
45+
if proto == "unix" {
46+
addr = "/" + addr
47+
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { //nolint: vetshadow
48+
return nil, nil, fmt.Errorf("%s: %q", addr, err)
49+
}
50+
cleanup = func() {
51+
os.Remove(addr)
52+
}
53+
}
54+
55+
l, err := net.Listen(proto, addr)
56+
return l, cleanup, err
57+
}

internal/proxy/proxy.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package proxy makes it possible to forward a listening socket in
18+
// situations where the proxy cannot connect to some other address.
19+
// Instead, it creates two listening sockets, pairs two incoming
20+
// connections and then moves data back and forth. This matches
21+
// the behavior of the following socat command:
22+
// socat -d -d -d UNIX-LISTEN:/tmp/socat,fork TCP-LISTEN:9000,reuseport
23+
//
24+
// The advantage over that command is that both listening
25+
// sockets are always open, in contrast to the socat solution
26+
// where the TCP port is only open when there actually is a connection
27+
// available.
28+
//
29+
// To establish a connection, someone has to poll the proxy with a dialer.
30+
package proxy
31+
32+
import (
33+
"context"
34+
"fmt"
35+
"io"
36+
"net"
37+
38+
"github.com/golang/glog"
39+
40+
"github.com/kubernetes-csi/csi-driver-host-path/internal/endpoint"
41+
)
42+
43+
// New listens on both endpoints and starts accepting connections
44+
// until closed or the context is done.
45+
func Run(ctx context.Context, endpoint1, endpoint2 string) (io.Closer, error) {
46+
proxy := &proxy{}
47+
failedProxy := proxy
48+
defer func() {
49+
if failedProxy != nil {
50+
failedProxy.Close()
51+
}
52+
}()
53+
54+
proxy.ctx, proxy.cancel = context.WithCancel(ctx)
55+
56+
var err error
57+
proxy.s1, proxy.cleanup1, err = endpoint.Listen(endpoint1)
58+
if err != nil {
59+
return nil, fmt.Errorf("listen %s: %v", endpoint1, err)
60+
}
61+
proxy.s2, proxy.cleanup2, err = endpoint.Listen(endpoint2)
62+
if err != nil {
63+
return nil, fmt.Errorf("listen %s: %v", endpoint2, err)
64+
}
65+
66+
glog.V(3).Infof("proxy listening on %s and %s", endpoint1, endpoint2)
67+
68+
go func() {
69+
for {
70+
// We block on the first listening socket.
71+
// The Linux kernel proactively accepts connections
72+
// on the second one which we will take over below.
73+
conn1 := accept(proxy.ctx, proxy.s1, endpoint1)
74+
if conn1 == nil {
75+
// Done, shut down.
76+
glog.V(5).Infof("proxy endpoint %s closed, shutting down", endpoint1)
77+
return
78+
}
79+
conn2 := accept(proxy.ctx, proxy.s2, endpoint2)
80+
if conn2 == nil {
81+
// Done, shut down. The already accepted
82+
// connection gets closed.
83+
glog.V(5).Infof("proxy endpoint %s closed, shutting down and close established connection", endpoint2)
84+
conn1.Close()
85+
return
86+
}
87+
88+
glog.V(3).Infof("proxy established a new connection between %s and %s", endpoint1, endpoint2)
89+
go copy(conn1, conn2, endpoint1, endpoint2)
90+
go copy(conn2, conn1, endpoint2, endpoint1)
91+
}
92+
}()
93+
94+
failedProxy = nil
95+
return proxy, nil
96+
}
97+
98+
type proxy struct {
99+
ctx context.Context
100+
cancel func()
101+
s1, s2 net.Listener
102+
cleanup1, cleanup2 func()
103+
}
104+
105+
func (p *proxy) Close() error {
106+
if p.cancel != nil {
107+
p.cancel()
108+
}
109+
if p.s1 != nil {
110+
p.s1.Close()
111+
}
112+
if p.s2 != nil {
113+
p.s2.Close()
114+
}
115+
if p.cleanup1 != nil {
116+
p.cleanup1()
117+
}
118+
if p.cleanup2 != nil {
119+
p.cleanup2()
120+
}
121+
return nil
122+
}
123+
124+
func copy(from, to net.Conn, fromEndpoint, toEndpoint string) {
125+
glog.V(5).Infof("starting to copy %s -> %s", fromEndpoint, toEndpoint)
126+
// Signal recipient that no more data is going to come.
127+
// This also stops reading from it.
128+
defer to.Close()
129+
// Copy data until EOF.
130+
cnt, err := io.Copy(to, from)
131+
glog.V(5).Infof("done copying %s -> %s: %d bytes, %v", fromEndpoint, toEndpoint, cnt, err)
132+
}
133+
134+
func accept(ctx context.Context, s net.Listener, endpoint string) net.Conn {
135+
for {
136+
c, err := s.Accept()
137+
if err == nil {
138+
return c
139+
}
140+
// Ignore error if we are shutting down.
141+
if ctx.Err() != nil {
142+
return nil
143+
}
144+
glog.V(3).Infof("accept on %s failed: %v", endpoint, err)
145+
}
146+
}

internal/proxy/proxy_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package proxy
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"io"
23+
"net"
24+
"testing"
25+
)
26+
27+
func TestProxy(t *testing.T) {
28+
tmpdir := t.TempDir()
29+
ctx, cancel := context.WithCancel(context.Background())
30+
defer cancel()
31+
32+
endpoint1 := tmpdir + "/a.sock"
33+
endpoint2 := tmpdir + "/b.sock"
34+
35+
closer, err := Run(ctx, endpoint1, endpoint2)
36+
if err != nil {
37+
t.Fatalf("proxy error: %v", err)
38+
}
39+
defer closer.Close()
40+
41+
t.Run("a-to-b", func(t *testing.T) {
42+
sendReceive(t, endpoint1, endpoint2)
43+
})
44+
t.Run("b-to-a", func(t *testing.T) {
45+
sendReceive(t, endpoint2, endpoint1)
46+
})
47+
}
48+
49+
func sendReceive(t *testing.T, endpoint1, endpoint2 string) {
50+
conn1, err := net.Dial("unix", endpoint1)
51+
if err != nil {
52+
t.Fatalf("error connecting to first endpoint %s: %v", endpoint1, err)
53+
}
54+
defer conn1.Close()
55+
conn2, err := net.Dial("unix", endpoint2)
56+
if err != nil {
57+
t.Fatalf("error connecting to second endpoint %s: %v", endpoint2, err)
58+
}
59+
defer conn2.Close()
60+
61+
req1 := "ping"
62+
if _, err := conn1.Write([]byte(req1)); err != nil {
63+
t.Fatalf("error writing %q: %v", req1, err)
64+
}
65+
buffer := make([]byte, 100)
66+
len, err := conn2.Read(buffer)
67+
if err != nil {
68+
t.Fatalf("error reading %q: %v", req1, err)
69+
}
70+
if string(buffer[:len]) != req1 {
71+
t.Fatalf("expected %q, got %q", req1, string(buffer[:len]))
72+
}
73+
74+
resp1 := "pong-pong"
75+
if _, err := conn2.Write([]byte(resp1)); err != nil {
76+
t.Fatalf("error writing %q: %v", resp1, err)
77+
}
78+
buffer = make([]byte, 100)
79+
len, err = conn1.Read(buffer)
80+
if err != nil {
81+
t.Fatalf("error reading %q: %v", resp1, err)
82+
}
83+
if string(buffer[:len]) != resp1 {
84+
t.Fatalf("expected %q, got %q", resp1, string(buffer[:len]))
85+
}
86+
87+
// Closing one side should be noticed at the other end.
88+
err = conn1.Close()
89+
if err != nil {
90+
t.Fatalf("error closing connection to %s: %v", endpoint1, err)
91+
}
92+
len2, err := io.Copy(&bytes.Buffer{}, conn2)
93+
if err != nil {
94+
t.Fatalf("error reading from %s: %v", endpoint2, err)
95+
}
96+
if len2 != 0 {
97+
t.Fatalf("unexpected data via %s: %d", endpoint2, len2)
98+
}
99+
err = conn2.Close()
100+
if err != nil {
101+
t.Fatalf("error closing connection to %s: %v", endpoint2, err)
102+
}
103+
}

0 commit comments

Comments
 (0)