diff --git a/src/bootstrap/cloud/terraform/service-account.tf b/src/bootstrap/cloud/terraform/service-account.tf index 0c538e74f..b5cd88164 100644 --- a/src/bootstrap/cloud/terraform/service-account.tf +++ b/src/bootstrap/cloud/terraform/service-account.tf @@ -60,7 +60,7 @@ resource "google_project_iam_member" "robot-service-roles" { project = data.google_project.project.project_id member = "serviceAccount:${google_service_account.robot-service.email}" for_each = toset([ - "roles/cloudtrace.agent", # Upload cloud traces + "roles/cloudtrace.agent", # Upload cloud traces "roles/logging.logWriter", # Upload text logs to Cloud logging # Required to use robot-service@ for GKE clusters that simulate robots "roles/monitoring.viewer", diff --git a/src/go/cmd/http-relay-client/client/client.go b/src/go/cmd/http-relay-client/client/client.go index 221bb22c4..f29bef878 100644 --- a/src/go/cmd/http-relay-client/client/client.go +++ b/src/go/cmd/http-relay-client/client/client.go @@ -91,18 +91,6 @@ type ClientConfig struct { ForceHttp2 bool } -type RelayServerError struct { - msg string -} - -func NewRelayServerError(msg string) error { - return &RelayServerError{msg} -} - -func (e *RelayServerError) Error() string { - return e.msg -} - func DefaultClientConfig() ClientConfig { return ClientConfig{ RemoteRequestTimeout: 60 * time.Second, @@ -396,9 +384,9 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error { return fmt.Errorf("couldn't read relay server's response body: %v", err) } if resp.StatusCode != http.StatusOK { - err := NewRelayServerError(fmt.Sprintf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)) + err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body) if resp.StatusCode == http.StatusBadRequest { - // http-relay-server may have restarted or the client cancelled the request. + // http-relay-server may have restarted during the request. return backoff.Permanent(err) } return err @@ -576,7 +564,6 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p defer span.End() resp, hresp, err := makeBackendRequest(ctx, local, req, id) - defer hresp.Body.Close() if err != nil { // Even if we couldn't handle the backend request, send an // answer to the relay that signals the error. @@ -656,11 +643,8 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err) }, ) - // Any error suggests the request should be aborted. - // A missing chunk will cause clients to receive corrupted data, in most cases it is better - // to close the connection to avoid that. - if err != nil { - log.Printf("[%s] Closing backend connection: %v", *resp.Id, err) + if _, ok := err.(*backoff.PermanentError); ok { + // A permanent error suggests the request should be aborted. break } } diff --git a/src/go/cmd/http-relay-client/client/client_test.go b/src/go/cmd/http-relay-client/client/client_test.go index 26980024e..0353c0caf 100644 --- a/src/go/cmd/http-relay-client/client/client_test.go +++ b/src/go/cmd/http-relay-client/client/client_test.go @@ -106,7 +106,7 @@ func TestLocalProxy(t *testing.T) { client := NewClient(config) err := client.localProxy(&http.Client{}, &http.Client{}) if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Errorf("Unexpected error: %s", err) } assertMocksDoneWithin(t, 10*time.Second) } @@ -175,7 +175,7 @@ func TestBackendError(t *testing.T) { // 3. retrieves the response from the backend and sends it to the relay-server err := client.localProxy(&http.Client{}, &http.Client{}) if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Errorf("Unexpected error: %s", err) } assertMocksDoneWithin(t, 10*time.Second) } @@ -206,7 +206,7 @@ func TestServerTimeout(t *testing.T) { client := NewClient(config) err := client.localProxy(&http.Client{}, &http.Client{}) if err != ErrTimeout { - t.Errorf("Unexpected error: %v", err) + t.Errorf("Unexpected error: %s", err) } assertMocksDoneWithin(t, 10*time.Second) } diff --git a/src/go/cmd/http-relay-server/server/broker.go b/src/go/cmd/http-relay-server/server/broker.go index c12687407..94c20f0dd 100644 --- a/src/go/cmd/http-relay-server/server/broker.go +++ b/src/go/cmd/http-relay-server/server/broker.go @@ -171,14 +171,6 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p } } -// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend -// with the relay id to not be recognized, resulting in the relay server returning an error. -func (r *broker) StopRelayRequest(requestId string) { - r.m.Lock() - defer r.m.Unlock() - delete(r.resp, requestId) -} - // GetRequest obtains a client's request for the server identifier. It blocks // until a client makes a request. func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) { diff --git a/src/go/cmd/http-relay-server/server/broker_test.go b/src/go/cmd/http-relay-server/server/broker_test.go index b6e9c659f..c38945268 100644 --- a/src/go/cmd/http-relay-server/server/broker_test.go +++ b/src/go/cmd/http-relay-server/server/broker_test.go @@ -54,11 +54,11 @@ func runSender(t *testing.T, b *broker, s string, m string, wg *sync.WaitGroup) func runReceiver(t *testing.T, b *broker, s string, wg *sync.WaitGroup) { req, err := b.GetRequest(context.Background(), s, "/") if err != nil { - t.Errorf("Error when getting request: %v", err) + t.Errorf("Error when getting request: %s", err) } err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)}) if err != nil { - t.Errorf("Error when sending response: %v", err) + t.Errorf("Error when sending response: %s", err) } wg.Done() } @@ -89,17 +89,17 @@ func runSenderStream(t *testing.T, b *broker, s string, m string, wg *sync.WaitG func runReceiverStream(t *testing.T, b *broker, s string, wg *sync.WaitGroup, done <-chan bool) { req, err := b.GetRequest(context.Background(), s, "/") if err != nil { - t.Errorf("Error when getting request: %v", err) + t.Errorf("Error when getting request: %s", err) } err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(false)}) if err != nil { - t.Errorf("Error when sending response: %v", err) + t.Errorf("Error when sending response: %s", err) } go func() { <-done err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)}) if err != nil { - t.Errorf("Error when sending response: %v", err) + t.Errorf("Error when sending response: %s", err) } wg.Done() }() diff --git a/src/go/cmd/http-relay-server/server/server.go b/src/go/cmd/http-relay-server/server/server.go index dcc263123..913a37f2a 100644 --- a/src/go/cmd/http-relay-server/server/server.go +++ b/src/go/cmd/http-relay-server/server/server.go @@ -257,10 +257,8 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW numBytes := 0 for responseChunk := range responseChunks { - if _, err = w.Write(responseChunk.Body); err != nil { - log.Printf("[%s] Error writing response to bidi-stream: %v", backendCtx.Id, err) - return - } + // TODO(b/130706300): detect dropped connection and end request in broker + _, _ = bufrw.Write(responseChunk.Body) bufrw.Flush() numBytes += len(responseChunk.Body) } @@ -380,7 +378,6 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - defer s.b.StopRelayRequest(backendCtx.Id) header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan) if done { @@ -395,10 +392,8 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) { // i.e. this will block until numBytes := 0 for responseChunk := range responseChunksChan { - if _, err = w.Write(responseChunk.Body); err != nil { - log.Printf("[%s] Error writing response to user-client: %v", backendCtx.Id, err) - return - } + // TODO(b/130706300): detect dropped connection and end request in broker + _, _ = w.Write(responseChunk.Body) if flush, ok := w.(http.Flusher); ok { flush.Flush() } @@ -558,6 +553,6 @@ func (s *Server) Start(port int, blockSize int) { // update) or a failed liveness check (eg broker deadlock), we can't // easily tell. We panic to help debugging: if the environment sets // GOTRACEBACK=all they will see stacktraces after the panic. - log.Panicf("Server terminated abnormally: %v", err) + log.Panicf("Server terminated abnormally: %s", err) } } diff --git a/src/go/cmd/http-relay-server/server/server_test.go b/src/go/cmd/http-relay-server/server/server_test.go index fb276b89b..f80b6242e 100644 --- a/src/go/cmd/http-relay-server/server/server_test.go +++ b/src/go/cmd/http-relay-server/server/server_test.go @@ -223,7 +223,7 @@ func TestClientBadRequest(t *testing.T) { if tc.wantMsg != "" { body, err := ioutil.ReadAll(resp.Body) if err != nil { - t.Errorf("Failed to read body stream: %v", err) + t.Errorf("Failed to read body stream: %s", err) } if !strings.Contains(string(body), tc.wantMsg) { t.Errorf("Wrong response body; want %q; got %q", tc.wantMsg, body) @@ -326,7 +326,7 @@ func TestServerRequestResponseHandler(t *testing.T) { } backendRespBody, err := proto.Marshal(backendResp) if err != nil { - t.Errorf("Failed to marshal test response: %v", err) + t.Errorf("Failed to marshal test response: %s", err) } req := httptest.NewRequest("GET", "/server/request?server=b", strings.NewReader("")) @@ -354,7 +354,7 @@ func TestServerRequestResponseHandler(t *testing.T) { } body, err := ioutil.ReadAll(reqRecorder.Result().Body) if err != nil { - t.Errorf("Failed to read body stream: %v", err) + t.Errorf("Failed to read body stream: %s", err) } if !strings.Contains(string(body), "/my/url") { t.Errorf("Serialize request didn't contain URL: %s", string(body)) @@ -385,7 +385,7 @@ func TestServerResponseHandlerWithInvalidRequestID(t *testing.T) { } backendRespBody, err := proto.Marshal(backendResp) if err != nil { - t.Errorf("Failed to marshal test response: %v", err) + t.Errorf("Failed to marshal test response: %s", err) } resp := httptest.NewRequest("POST", "/server/response", bytes.NewReader(backendRespBody)) diff --git a/src/go/tests/relay/nok8s_relay_test.go b/src/go/tests/relay/nok8s_relay_test.go index 481cf6687..ef6cf68fe 100644 --- a/src/go/tests/relay/nok8s_relay_test.go +++ b/src/go/tests/relay/nok8s_relay_test.go @@ -15,7 +15,6 @@ package main import ( - "bufio" "bytes" "context" "fmt" @@ -129,7 +128,7 @@ func (r *relay) stop() error { } // TestHttpRelay launches a local http relay (client + server) and connects a -// test-http-server as a backend. The test is then interacting with the backend +// test-hhtp-server as a backend. The test is then interacting with the backend // through the local relay. func TestHttpRelay(t *testing.T) { tests := []struct { @@ -202,63 +201,6 @@ func TestHttpRelay(t *testing.T) { } } -// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection, -// it is propagated to the relay server and client, closing the backend connection as well. -func TestDroppedUserClientFreesRelayChannel(t *testing.T) { - // setup http test server - connClosed := make(chan error) - defer close(connClosed) - finishServer := make(chan bool) - defer close(finishServer) - - // mock a long running backend that uses chunking to send periodic updates - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - for { - select { - case <-finishServer: - return - default: - if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil { - connClosed <- err - return - } - if flusher, ok := w.(http.Flusher); ok { - flusher.Flush() - } else { - t.Fatal("cannot flush") - } - time.Sleep(time.Second) - } - } - })) - defer ts.Close() - - backendAddress := strings.TrimPrefix(ts.URL, "http://") - r := &relay{} - if err := r.start(backendAddress); err != nil { - t.Fatal("failed to start relay: ", err) - } - defer r.stop() - relayAddress := "http://127.0.0.1:" + r.rsPort - - res, err := http.Get(relayAddress + "/client/remote1/") - if err != nil { - t.Fatal(err) - } - // receive the first chunk then terminates the connection - if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil { - t.Fatal(err) - } - res.Body.Close() - - // wait for up to 30s for backend connection to be closed - select { - case <-connClosed: - case <-time.After(30 * time.Second): - t.Error("Server did not close connection") - } -} - type testServer struct { testpb.UnimplementedTestServiceServer responsePayload []byte