Skip to content

Commit 26a1153

Browse files
authored
fix leaking connections when user client closes connection (#222)
Same as #220, but ported to this repo. --- When the user client closes the connection, it is not propagated to the backend. For simple "one shot" requests, there is no huge impact as the response would just be dropped, however for long running requests (chunked encoding or websockets), the backend doesn't know that the user client has closed the connection and it keeps sending new updates to the relay client, which also doesn't know and so keep forwarding that to the relay server. The fix applied is: * Add `StopRelayRequest` to the broker to "forget" a relaying request. This will cause the relay server to response with a permanent error the next time the relay client tries to send a response for that id. This will in turn cause the relay client to close the connection to the backend. * There is a bug in the relay client when checking for backoff permanent errors. When the backoff operation encounters a permanent error, it actually unwraps it and return the underlying error, but the client is still checking for `backoff.Permanent`, resulting in the check to always fail and never handling it. --------- Signed-off-by: Teo Koon Peng <[email protected]>
1 parent 2ba4efe commit 26a1153

File tree

5 files changed

+96
-9
lines changed

5 files changed

+96
-9
lines changed

src/bootstrap/cloud/terraform/service-account.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ resource "google_project_iam_member" "robot-service-roles" {
6060
project = data.google_project.project.project_id
6161
member = "serviceAccount:${google_service_account.robot-service.email}"
6262
for_each = toset([
63-
"roles/cloudtrace.agent", # Upload cloud traces
63+
"roles/cloudtrace.agent", # Upload cloud traces
6464
"roles/logging.logWriter", # Upload text logs to Cloud logging
6565
# Required to use robot-service@ for GKE clusters that simulate robots
6666
"roles/monitoring.viewer",

src/go/cmd/http-relay-client/client/client.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ type ClientConfig struct {
9191
ForceHttp2 bool
9292
}
9393

94+
type RelayServerError struct {
95+
msg string
96+
}
97+
98+
func NewRelayServerError(msg string) error {
99+
return &RelayServerError{msg}
100+
}
101+
102+
func (e *RelayServerError) Error() string {
103+
return e.msg
104+
}
105+
94106
func DefaultClientConfig() ClientConfig {
95107
return ClientConfig{
96108
RemoteRequestTimeout: 60 * time.Second,
@@ -384,7 +396,7 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error {
384396
return fmt.Errorf("couldn't read relay server's response body: %v", err)
385397
}
386398
if resp.StatusCode != http.StatusOK {
387-
err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)
399+
err := NewRelayServerError(fmt.Sprintf("relay server responded %s or the client cancelled the request: %s", http.StatusText(resp.StatusCode), body))
388400
if resp.StatusCode == http.StatusBadRequest {
389401
// http-relay-server may have restarted during the request.
390402
return backoff.Permanent(err)
@@ -564,6 +576,7 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
564576
defer span.End()
565577

566578
resp, hresp, err := makeBackendRequest(ctx, local, req, id)
579+
defer hresp.Body.Close()
567580
if err != nil {
568581
// Even if we couldn't handle the backend request, send an
569582
// answer to the relay that signals the error.
@@ -643,8 +656,11 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
643656
log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err)
644657
},
645658
)
646-
if _, ok := err.(*backoff.PermanentError); ok {
647-
// A permanent error suggests the request should be aborted.
659+
// Any error suggests the request should be aborted.
660+
// A missing chunk will cause clients to receive corrupted data, in most cases it is better
661+
// to close the connection to avoid that.
662+
if err != nil {
663+
log.Printf("[%s] Closing backend connection (%s)", *resp.Id, err)
648664
break
649665
}
650666
}

src/go/cmd/http-relay-server/server/broker.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p
171171
}
172172
}
173173

174+
// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend
175+
// with the relay id to not be recognized, resulting in the relay server returning an error.
176+
func (r *broker) StopRelayRequest(requestId string) {
177+
r.m.Lock()
178+
defer r.m.Unlock()
179+
delete(r.resp, requestId)
180+
}
181+
174182
// GetRequest obtains a client's request for the server identifier. It blocks
175183
// until a client makes a request.
176184
func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) {

src/go/cmd/http-relay-server/server/server.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,10 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW
257257

258258
numBytes := 0
259259
for responseChunk := range responseChunks {
260-
// TODO(b/130706300): detect dropped connection and end request in broker
261-
_, _ = bufrw.Write(responseChunk.Body)
260+
if _, err = w.Write(responseChunk.Body); err != nil {
261+
log.Printf("[%s] %s", backendCtx.Id, err)
262+
return
263+
}
262264
bufrw.Flush()
263265
numBytes += len(responseChunk.Body)
264266
}
@@ -378,6 +380,7 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
378380
http.Error(w, err.Error(), http.StatusInternalServerError)
379381
return
380382
}
383+
defer s.b.StopRelayRequest(backendCtx.Id)
381384

382385
header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan)
383386
if done {
@@ -392,8 +395,10 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
392395
// i.e. this will block until
393396
numBytes := 0
394397
for responseChunk := range responseChunksChan {
395-
// TODO(b/130706300): detect dropped connection and end request in broker
396-
_, _ = w.Write(responseChunk.Body)
398+
if _, err = w.Write(responseChunk.Body); err != nil {
399+
log.Printf("[%s] %s", backendCtx.Id, err)
400+
return
401+
}
397402
if flush, ok := w.(http.Flusher); ok {
398403
flush.Flush()
399404
}

src/go/tests/relay/nok8s_relay_test.go

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package main
1616

1717
import (
18+
"bufio"
1819
"bytes"
1920
"context"
2021
"fmt"
@@ -128,7 +129,7 @@ func (r *relay) stop() error {
128129
}
129130

130131
// TestHttpRelay launches a local http relay (client + server) and connects a
131-
// test-hhtp-server as a backend. The test is then interacting with the backend
132+
// test-http-server as a backend. The test is then interacting with the backend
132133
// through the local relay.
133134
func TestHttpRelay(t *testing.T) {
134135
tests := []struct {
@@ -201,6 +202,63 @@ func TestHttpRelay(t *testing.T) {
201202
}
202203
}
203204

205+
// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection,
206+
// it is propagated to the relay server and client, closing the backend connection as well.
207+
func TestDroppedUserClientFreesRelayChannel(t *testing.T) {
208+
// setup http test server
209+
connClosed := make(chan error)
210+
defer close(connClosed)
211+
finishServer := make(chan bool)
212+
defer close(finishServer)
213+
214+
// mock a long running backend that uses chunking to send periodic updates
215+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
216+
for {
217+
select {
218+
case <-finishServer:
219+
return
220+
default:
221+
if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil {
222+
connClosed <- err
223+
return
224+
}
225+
if flusher, ok := w.(http.Flusher); ok {
226+
flusher.Flush()
227+
} else {
228+
t.Fatal("cannot flush")
229+
}
230+
time.Sleep(time.Second)
231+
}
232+
}
233+
}))
234+
defer ts.Close()
235+
236+
backendAddress := strings.TrimPrefix(ts.URL, "http://")
237+
r := &relay{}
238+
if err := r.start(backendAddress); err != nil {
239+
t.Fatal("failed to start relay: ", err)
240+
}
241+
defer r.stop()
242+
relayAddress := "http://127.0.0.1:" + r.rsPort
243+
244+
res, err := http.Get(relayAddress + "/client/remote1/")
245+
if err != nil {
246+
t.Fatal(err)
247+
}
248+
// receive the first chunk then terminates the connection
249+
if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil {
250+
t.Fatal(err)
251+
}
252+
res.Body.Close()
253+
254+
// wait for up to 30s for backend connection to be closed
255+
select {
256+
case <-connClosed:
257+
case <-time.After(30 * time.Second):
258+
t.Error("Server did not close connection")
259+
}
260+
}
261+
204262
type testServer struct {
205263
testpb.UnimplementedTestServiceServer
206264
responsePayload []byte

0 commit comments

Comments
 (0)