Skip to content

Revert #222 #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/bootstrap/cloud/terraform/service-account.tf
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ resource "google_project_iam_member" "robot-service-roles" {
project = data.google_project.project.project_id
member = "serviceAccount:${google_service_account.robot-service.email}"
for_each = toset([
"roles/cloudtrace.agent", # Upload cloud traces
"roles/cloudtrace.agent", # Upload cloud traces
"roles/logging.logWriter", # Upload text logs to Cloud logging
# Required to use robot-service@ for GKE clusters that simulate robots
"roles/monitoring.viewer",
Expand Down
24 changes: 4 additions & 20 deletions src/go/cmd/http-relay-client/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,6 @@ type ClientConfig struct {
ForceHttp2 bool
}

type RelayServerError struct {
msg string
}

func NewRelayServerError(msg string) error {
return &RelayServerError{msg}
}

func (e *RelayServerError) Error() string {
return e.msg
}

func DefaultClientConfig() ClientConfig {
return ClientConfig{
RemoteRequestTimeout: 60 * time.Second,
Expand Down Expand Up @@ -396,9 +384,9 @@ func (c *Client) postResponse(remote *http.Client, br *pb.HttpResponse) error {
return fmt.Errorf("couldn't read relay server's response body: %v", err)
}
if resp.StatusCode != http.StatusOK {
err := NewRelayServerError(fmt.Sprintf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body))
err := fmt.Errorf("relay server responded %s: %s", http.StatusText(resp.StatusCode), body)
if resp.StatusCode == http.StatusBadRequest {
// http-relay-server may have restarted or the client cancelled the request.
// http-relay-server may have restarted during the request.
return backoff.Permanent(err)
}
return err
Expand Down Expand Up @@ -576,7 +564,6 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
defer span.End()

resp, hresp, err := makeBackendRequest(ctx, local, req, id)
defer hresp.Body.Close()
if err != nil {
// Even if we couldn't handle the backend request, send an
// answer to the relay that signals the error.
Expand Down Expand Up @@ -656,11 +643,8 @@ func (c *Client) handleRequest(remote *http.Client, local *http.Client, pbreq *p
log.Printf("[%s] Failed to post response to relay: %v", *resp.Id, err)
},
)
// Any error suggests the request should be aborted.
// A missing chunk will cause clients to receive corrupted data, in most cases it is better
// to close the connection to avoid that.
if err != nil {
log.Printf("[%s] Closing backend connection: %v", *resp.Id, err)
if _, ok := err.(*backoff.PermanentError); ok {
// A permanent error suggests the request should be aborted.
break
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/go/cmd/http-relay-client/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestLocalProxy(t *testing.T) {
client := NewClient(config)
err := client.localProxy(&http.Client{}, &http.Client{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Errorf("Unexpected error: %s", err)
}
assertMocksDoneWithin(t, 10*time.Second)
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestBackendError(t *testing.T) {
// 3. retrieves the response from the backend and sends it to the relay-server
err := client.localProxy(&http.Client{}, &http.Client{})
if err != nil {
t.Errorf("Unexpected error: %v", err)
t.Errorf("Unexpected error: %s", err)
}
assertMocksDoneWithin(t, 10*time.Second)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestServerTimeout(t *testing.T) {
client := NewClient(config)
err := client.localProxy(&http.Client{}, &http.Client{})
if err != ErrTimeout {
t.Errorf("Unexpected error: %v", err)
t.Errorf("Unexpected error: %s", err)
}
assertMocksDoneWithin(t, 10*time.Second)
}
Expand Down
8 changes: 0 additions & 8 deletions src/go/cmd/http-relay-server/server/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@ func (r *broker) RelayRequest(server string, request *pb.HttpRequest) (<-chan *p
}
}

// StopRelayRequest forgets a relaying request, this causes the next chunk from the backend
// with the relay id to not be recognized, resulting in the relay server returning an error.
func (r *broker) StopRelayRequest(requestId string) {
r.m.Lock()
defer r.m.Unlock()
delete(r.resp, requestId)
}

// GetRequest obtains a client's request for the server identifier. It blocks
// until a client makes a request.
func (r *broker) GetRequest(ctx context.Context, server, path string) (*pb.HttpRequest, error) {
Expand Down
10 changes: 5 additions & 5 deletions src/go/cmd/http-relay-server/server/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ func runSender(t *testing.T, b *broker, s string, m string, wg *sync.WaitGroup)
func runReceiver(t *testing.T, b *broker, s string, wg *sync.WaitGroup) {
req, err := b.GetRequest(context.Background(), s, "/")
if err != nil {
t.Errorf("Error when getting request: %v", err)
t.Errorf("Error when getting request: %s", err)
}
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)})
if err != nil {
t.Errorf("Error when sending response: %v", err)
t.Errorf("Error when sending response: %s", err)
}
wg.Done()
}
Expand Down Expand Up @@ -89,17 +89,17 @@ func runSenderStream(t *testing.T, b *broker, s string, m string, wg *sync.WaitG
func runReceiverStream(t *testing.T, b *broker, s string, wg *sync.WaitGroup, done <-chan bool) {
req, err := b.GetRequest(context.Background(), s, "/")
if err != nil {
t.Errorf("Error when getting request: %v", err)
t.Errorf("Error when getting request: %s", err)
}
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(false)})
if err != nil {
t.Errorf("Error when sending response: %v", err)
t.Errorf("Error when sending response: %s", err)
}
go func() {
<-done
err = b.SendResponse(&pb.HttpResponse{Id: req.Id, Body: []byte(*req.Id), Eof: proto.Bool(true)})
if err != nil {
t.Errorf("Error when sending response: %v", err)
t.Errorf("Error when sending response: %s", err)
}
wg.Done()
}()
Expand Down
15 changes: 5 additions & 10 deletions src/go/cmd/http-relay-server/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,8 @@ func (s *Server) bidirectionalStream(backendCtx backendContext, w http.ResponseW

numBytes := 0
for responseChunk := range responseChunks {
if _, err = w.Write(responseChunk.Body); err != nil {
log.Printf("[%s] Error writing response to bidi-stream: %v", backendCtx.Id, err)
return
}
// TODO(b/130706300): detect dropped connection and end request in broker
_, _ = bufrw.Write(responseChunk.Body)
bufrw.Flush()
numBytes += len(responseChunk.Body)
}
Expand Down Expand Up @@ -380,7 +378,6 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer s.b.StopRelayRequest(backendCtx.Id)

header, responseChunksChan, done := s.waitForFirstResponseAndHandleSwitching(ctx, *backendCtx, w, backendRespChan)
if done {
Expand All @@ -395,10 +392,8 @@ func (s *Server) userClientRequest(w http.ResponseWriter, r *http.Request) {
// i.e. this will block until
numBytes := 0
for responseChunk := range responseChunksChan {
if _, err = w.Write(responseChunk.Body); err != nil {
log.Printf("[%s] Error writing response to user-client: %v", backendCtx.Id, err)
return
}
// TODO(b/130706300): detect dropped connection and end request in broker
_, _ = w.Write(responseChunk.Body)
if flush, ok := w.(http.Flusher); ok {
flush.Flush()
}
Expand Down Expand Up @@ -558,6 +553,6 @@ func (s *Server) Start(port int, blockSize int) {
// update) or a failed liveness check (eg broker deadlock), we can't
// easily tell. We panic to help debugging: if the environment sets
// GOTRACEBACK=all they will see stacktraces after the panic.
log.Panicf("Server terminated abnormally: %v", err)
log.Panicf("Server terminated abnormally: %s", err)
}
}
8 changes: 4 additions & 4 deletions src/go/cmd/http-relay-server/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestClientBadRequest(t *testing.T) {
if tc.wantMsg != "" {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("Failed to read body stream: %v", err)
t.Errorf("Failed to read body stream: %s", err)
}
if !strings.Contains(string(body), tc.wantMsg) {
t.Errorf("Wrong response body; want %q; got %q", tc.wantMsg, body)
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestServerRequestResponseHandler(t *testing.T) {
}
backendRespBody, err := proto.Marshal(backendResp)
if err != nil {
t.Errorf("Failed to marshal test response: %v", err)
t.Errorf("Failed to marshal test response: %s", err)
}

req := httptest.NewRequest("GET", "/server/request?server=b", strings.NewReader(""))
Expand Down Expand Up @@ -354,7 +354,7 @@ func TestServerRequestResponseHandler(t *testing.T) {
}
body, err := ioutil.ReadAll(reqRecorder.Result().Body)
if err != nil {
t.Errorf("Failed to read body stream: %v", err)
t.Errorf("Failed to read body stream: %s", err)
}
if !strings.Contains(string(body), "/my/url") {
t.Errorf("Serialize request didn't contain URL: %s", string(body))
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestServerResponseHandlerWithInvalidRequestID(t *testing.T) {
}
backendRespBody, err := proto.Marshal(backendResp)
if err != nil {
t.Errorf("Failed to marshal test response: %v", err)
t.Errorf("Failed to marshal test response: %s", err)
}

resp := httptest.NewRequest("POST", "/server/response", bytes.NewReader(backendRespBody))
Expand Down
60 changes: 1 addition & 59 deletions src/go/tests/relay/nok8s_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package main

import (
"bufio"
"bytes"
"context"
"fmt"
Expand Down Expand Up @@ -129,7 +128,7 @@ func (r *relay) stop() error {
}

// TestHttpRelay launches a local http relay (client + server) and connects a
// test-http-server as a backend. The test is then interacting with the backend
// test-hhtp-server as a backend. The test is then interacting with the backend
// through the local relay.
func TestHttpRelay(t *testing.T) {
tests := []struct {
Expand Down Expand Up @@ -202,63 +201,6 @@ func TestHttpRelay(t *testing.T) {
}
}

// TestDroppedUserClientFreesRelayChannel checks that when the user client closes a connection,
// it is propagated to the relay server and client, closing the backend connection as well.
func TestDroppedUserClientFreesRelayChannel(t *testing.T) {
// setup http test server
connClosed := make(chan error)
defer close(connClosed)
finishServer := make(chan bool)
defer close(finishServer)

// mock a long running backend that uses chunking to send periodic updates
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
for {
select {
case <-finishServer:
return
default:
if _, err := fmt.Fprintln(w, "DEADBEEF"); err != nil {
connClosed <- err
return
}
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
} else {
t.Fatal("cannot flush")
}
time.Sleep(time.Second)
}
}
}))
defer ts.Close()

backendAddress := strings.TrimPrefix(ts.URL, "http://")
r := &relay{}
if err := r.start(backendAddress); err != nil {
t.Fatal("failed to start relay: ", err)
}
defer r.stop()
relayAddress := "http://127.0.0.1:" + r.rsPort

res, err := http.Get(relayAddress + "/client/remote1/")
if err != nil {
t.Fatal(err)
}
// receive the first chunk then terminates the connection
if _, err := bufio.NewReader(res.Body).ReadString('\n'); err != nil {
t.Fatal(err)
}
res.Body.Close()

// wait for up to 30s for backend connection to be closed
select {
case <-connClosed:
case <-time.After(30 * time.Second):
t.Error("Server did not close connection")
}
}

type testServer struct {
testpb.UnimplementedTestServiceServer
responsePayload []byte
Expand Down