From 1c0b89dd8512c513369bcd540633023b7807c5dd Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 3 Mar 2023 17:02:51 -0800 Subject: [PATCH 01/10] wip Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool.go | 49 +++++++++++++++++++++++++++++++++ pkg/cortexpb/slicesPool_test.go | 18 ++++++++++++ pkg/cortexpb/timeseries.go | 34 +++++++++++++++++++++++ pkg/distributor/distributor.go | 15 +++++----- pkg/ingester/client/client.go | 12 ++++++++ 5 files changed, 121 insertions(+), 7 deletions(-) create mode 100644 pkg/cortexpb/slicesPool.go create mode 100644 pkg/cortexpb/slicesPool_test.go diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go new file mode 100644 index 0000000000..97c0398d8c --- /dev/null +++ b/pkg/cortexpb/slicesPool.go @@ -0,0 +1,49 @@ +package cortexpb + +import ( + "math" + "sync" +) + +type byteSlicePools struct { + pools []sync.Pool +} + +func NewSlicePool(pools int) *byteSlicePools { + sp := byteSlicePools{} + sp.init(pools) + return &sp +} + +func (sp *byteSlicePools) init(pools int) { + sp.pools = make([]sync.Pool, pools) + for i := 0; i < pools; i++ { + size := int(math.Pow(2, float64(i))) + sp.pools[i] = sync.Pool{ + New: func() interface{} { + return make([]byte, 0, size) + }, + } + } +} + +func (sp *byteSlicePools) getSlice(size int) []byte { + index := int(math.Ceil(math.Log2(float64(size)))) + + if index < 0 || index >= len(sp.pools) { + return make([]byte, size) + } + + s := sp.pools[index].Get().([]byte) + return s[:size] +} + +func (sp *byteSlicePools) reuseSlice(s []byte) { + index := int(math.Ceil(math.Log2(float64(cap(s))))) + + if index < 0 || index >= len(sp.pools) { + return + } + + sp.pools[index].Put(s) +} diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go new file mode 100644 index 0000000000..52450e8ca2 --- /dev/null +++ b/pkg/cortexpb/slicesPool_test.go @@ -0,0 +1,18 @@ +package cortexpb + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestByteSlicePools(t *testing.T) { + sut := NewSlicePool(20) + + for i := 0; i < 1024*1024; i = i + 128 { + s := sut.getSlice(i) + assert.Equal(t, len(s), i) + sut.reuseSlice(s) + } + +} diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index 5f2a9788bf..f5cac4d09b 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -37,6 +37,15 @@ var ( } }, } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &PreallocWriteRequest{ + WriteRequest: WriteRequest{}, + } + }, + } + bytePool = NewSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -53,6 +62,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) { // PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal. type PreallocWriteRequest struct { WriteRequest + data []byte } // Unmarshal implements proto.Message. @@ -72,6 +82,30 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { return p.TimeSeries.Unmarshal(dAtA) } +func (m *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = bytePool.getSlice(size) + m.data = dAtA + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func ReuseWriteRequest(req *PreallocWriteRequest) { + if req.data != nil { + bytePool.reuseSlice(req.data) + req.data = nil + } + + writeRequestPool.Put(req) +} + +func PreallocWriteRequestFromPool() *PreallocWriteRequest { + return writeRequestPool.Get().(*PreallocWriteRequest) +} + // LabelAdapter is a labels.Label that can be marshalled to/from protos. type LabelAdapter labels.Label diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 336550a840..92698d5009 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -835,14 +835,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time if err != nil { return err } - c := h.(ingester_client.IngesterClient) + c := h.(ingester_client.HealthAndIngesterClient) - req := cortexpb.WriteRequest{ - Timeseries: timeseries, - Metadata: metadata, - Source: source, - } - _, err = c.Push(ctx, &req) + req := cortexpb.PreallocWriteRequestFromPool() + req.Timeseries = timeseries + req.Metadata = metadata + req.Source = source + + _, err = c.PushPreAlloc(ctx, req) + cortexpb.ReuseWriteRequest(req) if len(metadata) > 0 { d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc() diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 6b017a20e2..163d460c7f 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -1,8 +1,10 @@ package client import ( + "context" "flag" + "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -24,6 +26,7 @@ type HealthAndIngesterClient interface { IngesterClient grpc_health_v1.HealthClient Close() error + PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) } type closableHealthAndIngesterClient struct { @@ -32,6 +35,15 @@ type closableHealthAndIngesterClient struct { conn *grpc.ClientConn } +func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + out := new(cortexpb.WriteResponse) + err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MakeIngesterClient makes a new IngesterClient func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) { dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration)) From 1a19229e7e708fa741eb6ba0dc428de95518c637 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 6 Mar 2023 17:53:39 -0800 Subject: [PATCH 02/10] branchmark Signed-off-by: Alan Protasio --- pkg/cortexpb/timeseries_test.go | 55 +++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/pkg/cortexpb/timeseries_test.go b/pkg/cortexpb/timeseries_test.go index 7b322fdb8d..103a8e4dfe 100644 --- a/pkg/cortexpb/timeseries_test.go +++ b/pkg/cortexpb/timeseries_test.go @@ -1,8 +1,10 @@ package cortexpb import ( + "fmt" "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,3 +66,56 @@ func TestTimeseriesFromPool(t *testing.T) { assert.Len(t, reused.Samples, 0) }) } + +func BenchmarkMarshallWriteRequest(b *testing.B) { + ts := PreallocTimeseriesSliceFromPool() + + for i := 0; i < 100; i++ { + ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()}) + ts[i].Labels = []LabelAdapter{ + {Name: "foo", Value: "bar"}, + {Name: "very long label name", Value: "very long label value"}, + {Name: "very long label name 2", Value: "very long label value 2"}, + {Name: "very long label name 3", Value: "very long label value 3"}, + {Name: "int", Value: fmt.Sprint(i)}, + } + ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}} + } + + tests := []struct { + name string + writeRequestFactory func() proto.Marshaler + clean func(in interface{}) + }{ + { + name: "no-pool", + writeRequestFactory: func() proto.Marshaler { + return &WriteRequest{Timeseries: ts} + }, + clean: func(in interface{}) {}, + }, + { + name: "pool", + writeRequestFactory: func() proto.Marshaler { + w := PreallocWriteRequestFromPool() + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequest(in.(*PreallocWriteRequest)) + }, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + w := tc.writeRequestFactory() + _, err := w.Marshal() + require.NoError(b, err) + tc.clean(w) + } + b.ReportAllocs() + }) + } +} From 2a33354501c6a7e62a77a925ac2b1fec6908d8fd Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 09:37:33 -0800 Subject: [PATCH 03/10] fix some linting / test Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool.go | 4 ++-- pkg/cortexpb/slicesPool_test.go | 4 +--- pkg/cortexpb/timeseries.go | 10 +++++----- pkg/distributor/distributor_test.go | 4 ++++ 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index 97c0398d8c..ac5e4fed1f 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -9,7 +9,7 @@ type byteSlicePools struct { pools []sync.Pool } -func NewSlicePool(pools int) *byteSlicePools { +func newSlicePool(pools int) *byteSlicePools { sp := byteSlicePools{} sp.init(pools) return &sp @@ -39,7 +39,7 @@ func (sp *byteSlicePools) getSlice(size int) []byte { } func (sp *byteSlicePools) reuseSlice(s []byte) { - index := int(math.Ceil(math.Log2(float64(cap(s))))) + index := int(math.Floor(math.Log2(float64(cap(s))))) if index < 0 || index >= len(sp.pools) { return diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index 52450e8ca2..b8305056d3 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -7,12 +7,10 @@ import ( ) func TestByteSlicePools(t *testing.T) { - sut := NewSlicePool(20) - + sut := newSlicePool(20) for i := 0; i < 1024*1024; i = i + 128 { s := sut.getSlice(i) assert.Equal(t, len(s), i) sut.reuseSlice(s) } - } diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index f5cac4d09b..bb7244f237 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -45,7 +45,7 @@ var ( } }, } - bytePool = NewSlicePool(20) + bytePool = newSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -82,11 +82,11 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { return p.TimeSeries.Unmarshal(dAtA) } -func (m *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { - size := m.Size() +func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { + size := p.Size() dAtA = bytePool.getSlice(size) - m.data = dAtA - n, err := m.MarshalToSizedBuffer(dAtA[:size]) + p.data = dAtA + n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2112b40c93..be5de572ed 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2688,6 +2688,10 @@ func (i *mockIngester) Close() error { return nil } +func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return i.Push(ctx, &in.WriteRequest, opts...) +} + func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { i.Lock() defer i.Unlock() From bbd207f4668777ac9abf1dba38a638098db19cd6 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 09:54:54 -0800 Subject: [PATCH 04/10] No allocation Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool.go | 17 ++++++++++------- pkg/cortexpb/slicesPool_test.go | 2 +- pkg/cortexpb/timeseries.go | 6 +++--- pkg/ingester/client/client.go | 4 ++-- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index ac5e4fed1f..dfaa0b85de 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -21,25 +21,28 @@ func (sp *byteSlicePools) init(pools int) { size := int(math.Pow(2, float64(i))) sp.pools[i] = sync.Pool{ New: func() interface{} { - return make([]byte, 0, size) + buf := make([]byte, 0, size) + return &buf }, } } } -func (sp *byteSlicePools) getSlice(size int) []byte { +func (sp *byteSlicePools) getSlice(size int) *[]byte { index := int(math.Ceil(math.Log2(float64(size)))) if index < 0 || index >= len(sp.pools) { - return make([]byte, size) + buf := make([]byte, size) + return &buf } - s := sp.pools[index].Get().([]byte) - return s[:size] + s := sp.pools[index].Get().(*[]byte) + *s = (*s)[:size] + return s } -func (sp *byteSlicePools) reuseSlice(s []byte) { - index := int(math.Floor(math.Log2(float64(cap(s))))) +func (sp *byteSlicePools) reuseSlice(s *[]byte) { + index := int(math.Floor(math.Log2(float64(cap(*s))))) if index < 0 || index >= len(sp.pools) { return diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index b8305056d3..021bb11bc3 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -10,7 +10,7 @@ func TestByteSlicePools(t *testing.T) { sut := newSlicePool(20) for i := 0; i < 1024*1024; i = i + 128 { s := sut.getSlice(i) - assert.Equal(t, len(s), i) + assert.Equal(t, len(*s), i) sut.reuseSlice(s) } } diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index bb7244f237..4023f0e7d1 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -62,7 +62,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) { // PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal. type PreallocWriteRequest struct { WriteRequest - data []byte + data *[]byte } // Unmarshal implements proto.Message. @@ -84,8 +84,8 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { size := p.Size() - dAtA = bytePool.getSlice(size) - p.data = dAtA + p.data = bytePool.getSlice(size) + dAtA = *p.data n, err := p.MarshalToSizedBuffer(dAtA[:size]) if err != nil { return nil, err diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 163d460c7f..ec3d3310f3 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -5,13 +5,13 @@ import ( "flag" "github.com/cortexproject/cortex/pkg/cortexpb" + + "github.com/cortexproject/cortex/pkg/util/grpcclient" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - - "github.com/cortexproject/cortex/pkg/util/grpcclient" ) var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ From 2a191b7b43f613af3586c66ceab025fbbc4a5058 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 10:17:02 -0800 Subject: [PATCH 05/10] min pool size Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool.go | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index dfaa0b85de..6f8470f703 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -5,6 +5,10 @@ import ( "sync" ) +const ( + minPoolSizePower = 5 +) + type byteSlicePools struct { pools []sync.Pool } @@ -18,7 +22,7 @@ func newSlicePool(pools int) *byteSlicePools { func (sp *byteSlicePools) init(pools int) { sp.pools = make([]sync.Pool, pools) for i := 0; i < pools; i++ { - size := int(math.Pow(2, float64(i))) + size := int(math.Pow(2, float64(i+minPoolSizePower))) sp.pools[i] = sync.Pool{ New: func() interface{} { buf := make([]byte, 0, size) @@ -29,24 +33,32 @@ func (sp *byteSlicePools) init(pools int) { } func (sp *byteSlicePools) getSlice(size int) *[]byte { - index := int(math.Ceil(math.Log2(float64(size)))) + index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower - if index < 0 || index >= len(sp.pools) { + if index >= len(sp.pools) { buf := make([]byte, size) return &buf } + if index < 0 { + index = 0 + } + s := sp.pools[index].Get().(*[]byte) *s = (*s)[:size] return s } func (sp *byteSlicePools) reuseSlice(s *[]byte) { - index := int(math.Floor(math.Log2(float64(cap(*s))))) + index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower - if index < 0 || index >= len(sp.pools) { + if index >= len(sp.pools) { return } + if index < 0 { + index = 0 + } + sp.pools[index].Put(s) } From 7fe42cef982eabf822e071127169651ba2fa028e Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 10:17:14 -0800 Subject: [PATCH 06/10] min pool size Signed-off-by: Alan Protasio --- pkg/ingester/client/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ec3d3310f3..e6c7e334e1 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -5,8 +5,8 @@ import ( "flag" "github.com/cortexproject/cortex/pkg/cortexpb" - "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" From f64ffef8caf6f9eb661c8f7c2df03d848fe0007f Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 10:54:44 -0800 Subject: [PATCH 07/10] fuzzy test Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index 021bb11bc3..1725ecf028 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -1,16 +1,21 @@ package cortexpb import ( + "math" + "math/rand" "testing" "github.com/stretchr/testify/assert" ) -func TestByteSlicePools(t *testing.T) { +func TestFuzzyByteSlicePools(t *testing.T) { sut := newSlicePool(20) - for i := 0; i < 1024*1024; i = i + 128 { - s := sut.getSlice(i) - assert.Equal(t, len(*s), i) + maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1)) + + for i := 0; i < 1000; i++ { + size := rand.Int() % maxByteSize + s := sut.getSlice(size) + assert.Equal(t, len(*s), size) sut.reuseSlice(s) } } From efcd9b354bc5827945d5b6e6e4dd625f84e82f72 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 11:03:53 -0800 Subject: [PATCH 08/10] changelog Signed-off-by: Alan Protasio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 32dd61e7da..384fe24050 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132 * [ENHANCEMENT] Add retry logic to S3 bucket client. #5135 * [ENHANCEMENT] Update Go version to 1.20.1. #5159 +* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 From 02729b1a5e08ef640f8fbb0cababa207b79d9878 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 12:33:05 -0800 Subject: [PATCH 09/10] more benchmark Signed-off-by: Alan Protasio --- pkg/cortexpb/timeseries_test.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/cortexpb/timeseries_test.go b/pkg/cortexpb/timeseries_test.go index 103a8e4dfe..0932ef1741 100644 --- a/pkg/cortexpb/timeseries_test.go +++ b/pkg/cortexpb/timeseries_test.go @@ -95,7 +95,18 @@ func BenchmarkMarshallWriteRequest(b *testing.B) { clean: func(in interface{}) {}, }, { - name: "pool", + name: "byte pool", + writeRequestFactory: func() proto.Marshaler { + w := &PreallocWriteRequest{} + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequest(in.(*PreallocWriteRequest)) + }, + }, + { + name: "byte and write pool", writeRequestFactory: func() proto.Marshaler { w := PreallocWriteRequestFromPool() w.Timeseries = ts From 22a1a366481663bf0a7e4126f75c7f0b50ae8e8d Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 7 Mar 2023 13:48:55 -0800 Subject: [PATCH 10/10] fix bug on the reuse Signed-off-by: Alan Protasio --- pkg/cortexpb/slicesPool.go | 7 ++----- pkg/cortexpb/slicesPool_test.go | 9 +++++++++ pkg/cortexpb/timeseries.go | 4 +++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go index 6f8470f703..e28d51d4f2 100644 --- a/pkg/cortexpb/slicesPool.go +++ b/pkg/cortexpb/slicesPool.go @@ -40,6 +40,7 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte { return &buf } + // if the size is < than the minPoolSizePower we return an array from the first pool if index < 0 { index = 0 } @@ -52,13 +53,9 @@ func (sp *byteSlicePools) getSlice(size int) *[]byte { func (sp *byteSlicePools) reuseSlice(s *[]byte) { index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower - if index >= len(sp.pools) { + if index >= len(sp.pools) || index < 0 { return } - if index < 0 { - index = 0 - } - sp.pools[index].Put(s) } diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go index 1725ecf028..9bc56cdec3 100644 --- a/pkg/cortexpb/slicesPool_test.go +++ b/pkg/cortexpb/slicesPool_test.go @@ -19,3 +19,12 @@ func TestFuzzyByteSlicePools(t *testing.T) { sut.reuseSlice(s) } } + +func TestReturnSliceSmallerThanMin(t *testing.T) { + sut := newSlicePool(20) + size := 3 + buff := make([]byte, 0, size) + sut.reuseSlice(&buff) + buff2 := sut.getSlice(size * 2) + assert.Equal(t, len(*buff2), size*2) +} diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index 4023f0e7d1..5a2fcaf85b 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -98,7 +98,9 @@ func ReuseWriteRequest(req *PreallocWriteRequest) { bytePool.reuseSlice(req.data) req.data = nil } - + req.Source = 0 + req.Metadata = nil + req.Timeseries = nil writeRequestPool.Put(req) }