diff --git a/CHANGELOG.md b/CHANGELOG.md index 32dd61e7da..384fe24050 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132 * [ENHANCEMENT] Add retry logic to S3 bucket client. #5135 * [ENHANCEMENT] Update Go version to 1.20.1. #5159 +* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/cortexpb/slicesPool.go b/pkg/cortexpb/slicesPool.go new file mode 100644 index 0000000000..e28d51d4f2 --- /dev/null +++ b/pkg/cortexpb/slicesPool.go @@ -0,0 +1,61 @@ +package cortexpb + +import ( + "math" + "sync" +) + +const ( + minPoolSizePower = 5 +) + +type byteSlicePools struct { + pools []sync.Pool +} + +func newSlicePool(pools int) *byteSlicePools { + sp := byteSlicePools{} + sp.init(pools) + return &sp +} + +func (sp *byteSlicePools) init(pools int) { + sp.pools = make([]sync.Pool, pools) + for i := 0; i < pools; i++ { + size := int(math.Pow(2, float64(i+minPoolSizePower))) + sp.pools[i] = sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, size) + return &buf + }, + } + } +} + +func (sp *byteSlicePools) getSlice(size int) *[]byte { + index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower + + if index >= len(sp.pools) { + buf := make([]byte, size) + return &buf + } + + // if the size is < than the minPoolSizePower we return an array from the first pool + if index < 0 { + index = 0 + } + + s := sp.pools[index].Get().(*[]byte) + *s = (*s)[:size] + return s +} + +func (sp *byteSlicePools) reuseSlice(s *[]byte) { + index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower + + if index >= len(sp.pools) || index < 0 { + return + } + + sp.pools[index].Put(s) +} diff --git a/pkg/cortexpb/slicesPool_test.go b/pkg/cortexpb/slicesPool_test.go new file mode 100644 index 0000000000..9bc56cdec3 --- /dev/null +++ b/pkg/cortexpb/slicesPool_test.go @@ -0,0 +1,30 @@ +package cortexpb + +import ( + "math" + "math/rand" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFuzzyByteSlicePools(t *testing.T) { + sut := newSlicePool(20) + maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1)) + + for i := 0; i < 1000; i++ { + size := rand.Int() % maxByteSize + s := sut.getSlice(size) + assert.Equal(t, len(*s), size) + sut.reuseSlice(s) + } +} + +func TestReturnSliceSmallerThanMin(t *testing.T) { + sut := newSlicePool(20) + size := 3 + buff := make([]byte, 0, size) + sut.reuseSlice(&buff) + buff2 := sut.getSlice(size * 2) + assert.Equal(t, len(*buff2), size*2) +} diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index 5f2a9788bf..5a2fcaf85b 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -37,6 +37,15 @@ var ( } }, } + + writeRequestPool = sync.Pool{ + New: func() interface{} { + return &PreallocWriteRequest{ + WriteRequest: WriteRequest{}, + } + }, + } + bytePool = newSlicePool(20) ) // PreallocConfig configures how structures will be preallocated to optimise @@ -53,6 +62,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) { // PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal. type PreallocWriteRequest struct { WriteRequest + data *[]byte } // Unmarshal implements proto.Message. @@ -72,6 +82,32 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error { return p.TimeSeries.Unmarshal(dAtA) } +func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) { + size := p.Size() + p.data = bytePool.getSlice(size) + dAtA = *p.data + n, err := p.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func ReuseWriteRequest(req *PreallocWriteRequest) { + if req.data != nil { + bytePool.reuseSlice(req.data) + req.data = nil + } + req.Source = 0 + req.Metadata = nil + req.Timeseries = nil + writeRequestPool.Put(req) +} + +func PreallocWriteRequestFromPool() *PreallocWriteRequest { + return writeRequestPool.Get().(*PreallocWriteRequest) +} + // LabelAdapter is a labels.Label that can be marshalled to/from protos. type LabelAdapter labels.Label diff --git a/pkg/cortexpb/timeseries_test.go b/pkg/cortexpb/timeseries_test.go index 7b322fdb8d..0932ef1741 100644 --- a/pkg/cortexpb/timeseries_test.go +++ b/pkg/cortexpb/timeseries_test.go @@ -1,8 +1,10 @@ package cortexpb import ( + "fmt" "testing" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -64,3 +66,67 @@ func TestTimeseriesFromPool(t *testing.T) { assert.Len(t, reused.Samples, 0) }) } + +func BenchmarkMarshallWriteRequest(b *testing.B) { + ts := PreallocTimeseriesSliceFromPool() + + for i := 0; i < 100; i++ { + ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()}) + ts[i].Labels = []LabelAdapter{ + {Name: "foo", Value: "bar"}, + {Name: "very long label name", Value: "very long label value"}, + {Name: "very long label name 2", Value: "very long label value 2"}, + {Name: "very long label name 3", Value: "very long label value 3"}, + {Name: "int", Value: fmt.Sprint(i)}, + } + ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}} + } + + tests := []struct { + name string + writeRequestFactory func() proto.Marshaler + clean func(in interface{}) + }{ + { + name: "no-pool", + writeRequestFactory: func() proto.Marshaler { + return &WriteRequest{Timeseries: ts} + }, + clean: func(in interface{}) {}, + }, + { + name: "byte pool", + writeRequestFactory: func() proto.Marshaler { + w := &PreallocWriteRequest{} + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequest(in.(*PreallocWriteRequest)) + }, + }, + { + name: "byte and write pool", + writeRequestFactory: func() proto.Marshaler { + w := PreallocWriteRequestFromPool() + w.Timeseries = ts + return w + }, + clean: func(in interface{}) { + ReuseWriteRequest(in.(*PreallocWriteRequest)) + }, + }, + } + + for _, tc := range tests { + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + w := tc.writeRequestFactory() + _, err := w.Marshal() + require.NoError(b, err) + tc.clean(w) + } + b.ReportAllocs() + }) + } +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 336550a840..92698d5009 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -835,14 +835,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time if err != nil { return err } - c := h.(ingester_client.IngesterClient) + c := h.(ingester_client.HealthAndIngesterClient) - req := cortexpb.WriteRequest{ - Timeseries: timeseries, - Metadata: metadata, - Source: source, - } - _, err = c.Push(ctx, &req) + req := cortexpb.PreallocWriteRequestFromPool() + req.Timeseries = timeseries + req.Metadata = metadata + req.Source = source + + _, err = c.PushPreAlloc(ctx, req) + cortexpb.ReuseWriteRequest(req) if len(metadata) > 0 { d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc() diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 2112b40c93..be5de572ed 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2688,6 +2688,10 @@ func (i *mockIngester) Close() error { return nil } +func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + return i.Push(ctx, &in.WriteRequest, opts...) +} + func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { i.Lock() defer i.Unlock() diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 6b017a20e2..e6c7e334e1 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -1,15 +1,17 @@ package client import ( + "context" "flag" + "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - - "github.com/cortexproject/cortex/pkg/util/grpcclient" ) var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ @@ -24,6 +26,7 @@ type HealthAndIngesterClient interface { IngesterClient grpc_health_v1.HealthClient Close() error + PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) } type closableHealthAndIngesterClient struct { @@ -32,6 +35,15 @@ type closableHealthAndIngesterClient struct { conn *grpc.ClientConn } +func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) { + out := new(cortexpb.WriteResponse) + err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // MakeIngesterClient makes a new IngesterClient func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) { dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))