From 5ba1c6dd0614face9ffe908f8bb49bba567d5345 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 22 Jul 2021 05:07:42 +0200 Subject: [PATCH 1/3] Reuse the byte buffer from GRPC response in the frontend. This PR allow to reuse the GRPC byte buffer casted as a `io.Reader` in the http rountripper of the frontend. This way we don't have to copy the content from the reader into another buffer when reading the http response. I've been testing this with in Loki and this shows good memory saving (around 1GB). Signed-off-by: Cyril Tovena --- pkg/frontend/transport/roundtripper.go | 12 +++++++++- pkg/querier/queryrange/query_range.go | 31 +++++++++++++++++--------- pkg/querier/queryrange/util.go | 4 ++++ 3 files changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/frontend/transport/roundtripper.go b/pkg/frontend/transport/roundtripper.go index 45cfb22225..d9ba57ccba 100644 --- a/pkg/frontend/transport/roundtripper.go +++ b/pkg/frontend/transport/roundtripper.go @@ -3,6 +3,7 @@ package transport import ( "bytes" "context" + "io" "io/ioutil" "net/http" @@ -24,6 +25,15 @@ type grpcRoundTripperAdapter struct { roundTripper GrpcRoundTripper } +type buffer struct { + buff []byte + io.ReadCloser +} + +func (b *buffer) Bytes() []byte { + return b.buff +} + func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) { req, err := server.HTTPRequest(r) if err != nil { @@ -37,7 +47,7 @@ func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, er httpResp := &http.Response{ StatusCode: int(resp.Code), - Body: ioutil.NopCloser(bytes.NewReader(resp.Body)), + Body: &buffer{buff: resp.Body, ReadCloser: ioutil.NopCloser(bytes.NewReader(resp.Body))}, Header: http.Header{}, ContentLength: int64(len(resp.Body)), } diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index 7506d4f507..6da4899ddf 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -266,20 +266,15 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck defer log.Finish() - // Preallocate the buffer with the exact size so we don't waste allocations - // while progressively growing an initial small buffer. The buffer capacity - // is increased by MinRead to avoid extra allocations due to how ReadFrom() - // internally works. - buf := bytes.NewBuffer(make([]byte, 0, r.ContentLength+bytes.MinRead)) - if _, err := buf.ReadFrom(r.Body); err != nil { + buf, err := bodyBuffer(r) + if err != nil { log.Error(err) - return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + return nil, err } - - log.LogFields(otlog.Int("bytes", buf.Len())) + log.LogFields(otlog.Int("bytes", len(buf))) var resp PrometheusResponse - if err := json.Unmarshal(buf.Bytes(), &resp); err != nil { + if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } @@ -289,6 +284,21 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R return &resp, nil } +func bodyBuffer(res *http.Response) ([]byte, error) { + if buffer := res.Body.(Buffer); buffer != nil { + return buffer.Bytes(), nil + } + // Preallocate the buffer with the exact size so we don't waste allocations + // while progressively growing an initial small buffer. The buffer capacity + // is increased by MinRead to avoid extra allocations due to how ReadFrom() + // internally works. + buf := bytes.NewBuffer(make([]byte, 0, res.ContentLength+bytes.MinRead)) + if _, err := buf.ReadFrom(res.Body); err != nil { + return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) + } + return buf.Bytes(), nil +} + func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.Response, error) { sp, _ := opentracing.StartSpanFromContext(ctx, "APIResponse.ToHTTPResponse") defer sp.Finish() @@ -392,7 +402,6 @@ func matrixMerge(resps []*PrometheusResponse) []SampleStream { // bigger than the given minTs. Empty slice is returned if minTs is bigger than all the // timestamps in samples. func sliceSamples(samples []cortexpb.Sample, minTs int64) []cortexpb.Sample { - if len(samples) <= 0 || minTs < samples[0].TimestampMs { return samples } diff --git a/pkg/querier/queryrange/util.go b/pkg/querier/queryrange/util.go index 2b82e8b3b0..fa30d6cf44 100644 --- a/pkg/querier/queryrange/util.go +++ b/pkg/querier/queryrange/util.go @@ -70,3 +70,7 @@ func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits return resps, firstErr } + +type Buffer interface { + Bytes() []byte +} From 185752bd7c0c66d4230e685684cb7fa7ddf98a3e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 22 Jul 2021 06:08:36 +0200 Subject: [PATCH 2/3] Use safe cast Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/query_range.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index 6da4899ddf..4a1e295a10 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -285,7 +285,7 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R } func bodyBuffer(res *http.Response) ([]byte, error) { - if buffer := res.Body.(Buffer); buffer != nil { + if buffer, ok := res.Body.(Buffer); ok { return buffer.Bytes(), nil } // Preallocate the buffer with the exact size so we don't waste allocations From 3e3b2804a31f7d557ac79685d32725d49a5fe77b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 29 Jul 2021 10:44:06 +0200 Subject: [PATCH 3/3] Review feedback. Signed-off-by: Cyril Tovena --- pkg/querier/queryrange/query_range.go | 8 ++++++++ pkg/querier/queryrange/util.go | 4 ---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/querier/queryrange/query_range.go b/pkg/querier/queryrange/query_range.go index 4a1e295a10..32962db908 100644 --- a/pkg/querier/queryrange/query_range.go +++ b/pkg/querier/queryrange/query_range.go @@ -284,7 +284,15 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R return &resp, nil } +// Buffer can be used to read a response body. +// This allows to avoid reading the body multiple times from the `http.Response.Body`. +type Buffer interface { + Bytes() []byte +} + func bodyBuffer(res *http.Response) ([]byte, error) { + // Attempt to cast the response body to a Buffer and use it if possible. + // This is because the frontend may have already read the body and buffered it. if buffer, ok := res.Body.(Buffer); ok { return buffer.Bytes(), nil } diff --git a/pkg/querier/queryrange/util.go b/pkg/querier/queryrange/util.go index fa30d6cf44..2b82e8b3b0 100644 --- a/pkg/querier/queryrange/util.go +++ b/pkg/querier/queryrange/util.go @@ -70,7 +70,3 @@ func DoRequests(ctx context.Context, downstream Handler, reqs []Request, limits return resps, firstErr } - -type Buffer interface { - Bytes() []byte -}