Skip to content

Commit f7b2e37

Browse files
committed
[supervisor] support tunneled ports
1 parent 53e708f commit f7b2e37

25 files changed

+3504
-374
lines changed

components/gitpod-cli/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/pkg/errors v0.9.1
1616
github.com/sirupsen/logrus v1.7.0
1717
github.com/spf13/cobra v0.0.5
18-
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd
18+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
1919
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
2020
google.golang.org/grpc v1.37.0
2121
gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20191105091915-95d230a53780 // indirect

components/gitpod-cli/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U
215215
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
216216
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
217217
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
218+
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
218219
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
219220
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
220221
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -273,6 +274,7 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
273274
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
274275
golang.org/x/net v0.0.0-20200822124328-c89045814202 h1:VvcQYSHwXgi7W+TpUR6A9g6Up98WAHf3f/ulnJ62IyA=
275276
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
277+
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
276278
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
277279
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
278280
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -318,6 +320,8 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w
318320
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
319321
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd h1:5CtCZbICpIOFdgO940moixOPjc0178IU44m4EjOO5IY=
320322
golang.org/x/sys v0.0.0-20201112073958-5cba982894dd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
323+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
324+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
321325
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
322326
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
323327
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

components/gitpod-protocol/go/gitpod-service.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,10 @@ var errNotConnected = errors.New("not connected to Gitpod server")
218218

219219
// ConnectToServerOpts configures the server connection
220220
type ConnectToServerOpts struct {
221-
Context context.Context
222-
Token string
223-
Log *logrus.Entry
221+
Context context.Context
222+
Token string
223+
Log *logrus.Entry
224+
ReconnectionHandler func()
224225
}
225226

226227
// ConnectToServer establishes a new websocket connection to the server
@@ -248,6 +249,7 @@ func ConnectToServer(endpoint string, opts ConnectToServerOpts) (*APIoverJSONRPC
248249
reqHeader.Set("Authorization", "Bearer "+opts.Token)
249250
}
250251
ws := NewReconnectingWebsocket(endpoint, reqHeader, opts.Log)
252+
ws.ReconnectionHandler = opts.ReconnectionHandler
251253
go ws.Dial()
252254

253255
var res APIoverJSONRPC

components/gitpod-protocol/go/reconnecting-ws.go

Lines changed: 77 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
package protocol
66

77
import (
8+
"context"
9+
"encoding/json"
810
"errors"
911
"net/http"
12+
"sync"
1013
"time"
1114

1215
"github.com/gorilla/websocket"
@@ -23,11 +26,14 @@ type ReconnectingWebsocket struct {
2326
maxReconnectionDelay time.Duration
2427
reconnectionDelayGrowFactor float64
2528

29+
once sync.Once
2630
closedCh chan struct{}
27-
connCh chan chan *websocket.Conn
31+
connCh chan chan *WebsocketConnection
2832
errCh chan error
2933

3034
log *logrus.Entry
35+
36+
ReconnectionHandler func()
3137
}
3238

3339
// NewReconnectingWebsocket creates a new instance of ReconnectingWebsocket
@@ -39,7 +45,7 @@ func NewReconnectingWebsocket(url string, reqHeader http.Header, log *logrus.Ent
3945
maxReconnectionDelay: 30 * time.Second,
4046
reconnectionDelayGrowFactor: 1.5,
4147
handshakeTimeout: 2 * time.Second,
42-
connCh: make(chan chan *websocket.Conn),
48+
connCh: make(chan chan *WebsocketConnection),
4349
closedCh: make(chan struct{}),
4450
errCh: make(chan error),
4551
log: log,
@@ -48,26 +54,27 @@ func NewReconnectingWebsocket(url string, reqHeader http.Header, log *logrus.Ent
4854

4955
// Close closes the underlying webscoket connection.
5056
func (rc *ReconnectingWebsocket) Close() error {
51-
close(rc.closedCh)
57+
rc.once.Do(func() {
58+
close(rc.closedCh)
59+
})
5260
return nil
5361
}
5462

55-
// WriteObject writes the JSON encoding of v as a message.
56-
// See the documentation for encoding/json Marshal for details about the conversion of Go values to JSON.
57-
func (rc *ReconnectingWebsocket) WriteObject(v interface{}) error {
63+
// EnsureConnection ensures ws connections
64+
// Returns only if connection is permanently failed
65+
// If the passed handler returns false as closed then err is returned to the client,
66+
// otherwise err is treated as a connection error, and new conneciton is provided.
67+
func (rc *ReconnectingWebsocket) EnsureConnection(handler func(conn *WebsocketConnection) (closed bool, err error)) error {
5868
for {
59-
connCh := make(chan *websocket.Conn, 1)
69+
connCh := make(chan *WebsocketConnection, 1)
6070
select {
6171
case <-rc.closedCh:
6272
return errors.New("closed")
6373
case rc.connCh <- connCh:
6474
}
6575
conn := <-connCh
66-
err := conn.WriteJSON(v)
67-
if err == nil {
68-
return nil
69-
}
70-
if !websocket.IsUnexpectedCloseError(err) {
76+
closed, err := handler(conn)
77+
if !closed {
7178
return err
7279
}
7380
select {
@@ -78,35 +85,62 @@ func (rc *ReconnectingWebsocket) WriteObject(v interface{}) error {
7885
}
7986
}
8087

88+
func isJSONError(err error) bool {
89+
_, isJsonErr := err.(*json.InvalidUTF8Error)
90+
if isJsonErr {
91+
return true
92+
}
93+
_, isJsonErr = err.(*json.InvalidUnmarshalError)
94+
if isJsonErr {
95+
return true
96+
}
97+
_, isJsonErr = err.(*json.MarshalerError)
98+
if isJsonErr {
99+
return true
100+
}
101+
_, isJsonErr = err.(*json.SyntaxError)
102+
if isJsonErr {
103+
return true
104+
}
105+
_, isJsonErr = err.(*json.UnmarshalFieldError)
106+
if isJsonErr {
107+
return true
108+
}
109+
_, isJsonErr = err.(*json.UnmarshalTypeError)
110+
if isJsonErr {
111+
return true
112+
}
113+
_, isJsonErr = err.(*json.UnsupportedTypeError)
114+
if isJsonErr {
115+
return true
116+
}
117+
_, isJsonErr = err.(*json.UnsupportedValueError)
118+
return isJsonErr
119+
}
120+
121+
// WriteObject writes the JSON encoding of v as a message.
122+
// See the documentation for encoding/json Marshal for details about the conversion of Go values to JSON.
123+
func (rc *ReconnectingWebsocket) WriteObject(v interface{}) error {
124+
return rc.EnsureConnection(func(conn *WebsocketConnection) (bool, error) {
125+
err := conn.WriteJSON(v)
126+
closed := err != nil && !isJSONError(err)
127+
return closed, err
128+
})
129+
}
130+
81131
// ReadObject reads the next JSON-encoded message from the connection and stores it in the value pointed to by v.
82132
// See the documentation for the encoding/json Unmarshal function for details about the conversion of JSON to a Go value.
83133
func (rc *ReconnectingWebsocket) ReadObject(v interface{}) error {
84-
for {
85-
connCh := make(chan *websocket.Conn, 1)
86-
select {
87-
case <-rc.closedCh:
88-
return errors.New("closed")
89-
case rc.connCh <- connCh:
90-
}
91-
conn := <-connCh
134+
return rc.EnsureConnection(func(conn *WebsocketConnection) (bool, error) {
92135
err := conn.ReadJSON(v)
93-
if err == nil {
94-
return nil
95-
}
96-
if !websocket.IsUnexpectedCloseError(err) {
97-
return err
98-
}
99-
select {
100-
case <-rc.closedCh:
101-
return errors.New("closed")
102-
case rc.errCh <- err:
103-
}
104-
}
136+
closed := err != nil && !isJSONError(err)
137+
return closed, err
138+
})
105139
}
106140

107141
// Dial creates a new client connection.
108142
func (rc *ReconnectingWebsocket) Dial() {
109-
var conn *websocket.Conn
143+
var conn *WebsocketConnection
110144
defer func() {
111145
if conn == nil {
112146
return
@@ -129,19 +163,26 @@ func (rc *ReconnectingWebsocket) Dial() {
129163

130164
time.Sleep(1 * time.Second)
131165
conn = rc.connect()
166+
if conn != nil && rc.ReconnectionHandler != nil {
167+
go rc.ReconnectionHandler()
168+
}
132169
}
133170
}
134171
}
135172

136-
func (rc *ReconnectingWebsocket) connect() *websocket.Conn {
173+
func (rc *ReconnectingWebsocket) connect() *WebsocketConnection {
137174
delay := rc.minReconnectionDelay
138175
for {
139176
dialer := websocket.Dialer{HandshakeTimeout: rc.handshakeTimeout}
140177
conn, _, err := dialer.Dial(rc.url, rc.reqHeader)
141178
if err == nil {
142179
rc.log.WithField("url", rc.url).Info("connection was successfully established")
143-
144-
return conn
180+
ws, err := NewWebsocketConnection(context.Background(), conn, func(staleErr error) {
181+
rc.errCh <- staleErr
182+
})
183+
if err == nil {
184+
return ws
185+
}
145186
}
146187

147188
rc.log.WithError(err).WithField("url", rc.url).Errorf("failed to connect, trying again in %d seconds...", uint32(delay.Seconds()))
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Copyright (c) 2021 Gitpod GmbH. All rights reserved.
2+
// Licensed under the GNU Affero General Public License (AGPL).
3+
// See License-AGPL.txt in the project root for license information.
4+
/*---------------------------------------------------------------------------------------------
5+
* Copyright (c) 2020 Jaime Pillora <[email protected]>. All rights reserved.
6+
* Licensed under the MIT License. See https://github.com/jpillora/chisel/blob/7aa0da95db178b8bc4f20ab49128368348fd4410/LICENSE for license information.
7+
*--------------------------------------------------------------------------------------------*/
8+
// copied and modified from https://github.com/jpillora/chisel/blob/33fa2010abd42ec76ed9011995f5240642b1a3c5/share/cnet/conn_ws.go
9+
package protocol
10+
11+
import (
12+
"context"
13+
"sync"
14+
"time"
15+
16+
"github.com/gorilla/websocket"
17+
)
18+
19+
type WebsocketConnection struct {
20+
*websocket.Conn
21+
buff []byte
22+
23+
Ctx context.Context
24+
cancel func()
25+
26+
once sync.Once
27+
closeErr error
28+
waitDone chan struct{}
29+
}
30+
31+
var (
32+
// Time allowed to write a message to the peer.
33+
writeWait = 10 * time.Second
34+
35+
// Time allowed to read the next pong message from the peer.
36+
pongWait = 15 * time.Second
37+
38+
// Send pings to peer with this period. Must be less than pongWait.
39+
pingPeriod = (pongWait * 9) / 10
40+
)
41+
42+
//NewWebsocketConnection converts a websocket.Conn into a net.Conn
43+
func NewWebsocketConnection(ctx context.Context, websocketConn *websocket.Conn, onStale func(staleErr error)) (*WebsocketConnection, error) {
44+
ctx, cancel := context.WithCancel(ctx)
45+
c := &WebsocketConnection{
46+
Conn: websocketConn,
47+
waitDone: make(chan struct{}),
48+
Ctx: ctx,
49+
cancel: cancel,
50+
}
51+
err := c.SetReadDeadline(time.Now().Add(pongWait))
52+
if err != nil {
53+
return nil, err
54+
}
55+
c.SetPongHandler(func(string) error { c.SetReadDeadline(time.Now().Add(pongWait)); return nil })
56+
57+
go func() {
58+
defer c.Close()
59+
ticker := time.NewTicker(pingPeriod)
60+
defer ticker.Stop()
61+
for {
62+
select {
63+
case <-ctx.Done():
64+
return
65+
case <-ticker.C:
66+
staleErr := c.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait))
67+
if staleErr != nil {
68+
onStale(staleErr)
69+
return
70+
}
71+
}
72+
}
73+
}()
74+
return c, nil
75+
}
76+
77+
// Close closes the connection
78+
func (c *WebsocketConnection) Close() error {
79+
c.once.Do(func() {
80+
c.cancel()
81+
c.closeErr = c.Conn.Close()
82+
close(c.waitDone)
83+
})
84+
return c.closeErr
85+
}
86+
87+
// Wait waits till the connection is closed.
88+
func (c *WebsocketConnection) Wait() error {
89+
<-c.waitDone
90+
return c.closeErr
91+
}
92+
93+
//Read is not threadsafe though thats okay since there
94+
//should never be more than one reader
95+
func (c *WebsocketConnection) Read(dst []byte) (int, error) {
96+
ldst := len(dst)
97+
//use buffer or read new message
98+
var src []byte
99+
if len(c.buff) > 0 {
100+
src = c.buff
101+
c.buff = nil
102+
} else if _, msg, err := c.Conn.ReadMessage(); err == nil {
103+
src = msg
104+
} else {
105+
return 0, err
106+
}
107+
//copy src->dest
108+
var n int
109+
if len(src) > ldst {
110+
//copy as much as possible of src into dst
111+
n = copy(dst, src[:ldst])
112+
//copy remainder into buffer
113+
r := src[ldst:]
114+
lr := len(r)
115+
c.buff = make([]byte, lr)
116+
copy(c.buff, r)
117+
} else {
118+
//copy all of src into dst
119+
n = copy(dst, src)
120+
}
121+
//return bytes copied
122+
return n, nil
123+
}
124+
125+
func (c *WebsocketConnection) Write(b []byte) (int, error) {
126+
err := c.Conn.WriteMessage(websocket.BinaryMessage, b)
127+
if err != nil {
128+
return 0, err
129+
}
130+
n := len(b)
131+
return n, nil
132+
}
133+
134+
func (c *WebsocketConnection) SetDeadline(t time.Time) error {
135+
if err := c.Conn.SetReadDeadline(t); err != nil {
136+
return err
137+
}
138+
return c.Conn.SetWriteDeadline(t)
139+
}

0 commit comments

Comments
 (0)