Skip to content

Reuse write request from distributor to Ingesters #5193

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
* [ENHANCEMENT] Update Go version to 1.20.1. #5159
* [ENHANCEMENT] Distributor: Reuse byte slices when serializing requests from distributors to ingesters. #5193
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
61 changes: 61 additions & 0 deletions pkg/cortexpb/slicesPool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package cortexpb

import (
"math"
"sync"
)

const (
minPoolSizePower = 5
)

type byteSlicePools struct {
pools []sync.Pool
}

func newSlicePool(pools int) *byteSlicePools {
sp := byteSlicePools{}
sp.init(pools)
return &sp
}

func (sp *byteSlicePools) init(pools int) {
sp.pools = make([]sync.Pool, pools)
for i := 0; i < pools; i++ {
size := int(math.Pow(2, float64(i+minPoolSizePower)))
sp.pools[i] = sync.Pool{
New: func() interface{} {
buf := make([]byte, 0, size)
return &buf
},
}
}
}

func (sp *byteSlicePools) getSlice(size int) *[]byte {
index := int(math.Ceil(math.Log2(float64(size)))) - minPoolSizePower

if index >= len(sp.pools) {
buf := make([]byte, size)
return &buf
}

// if the size is < than the minPoolSizePower we return an array from the first pool
if index < 0 {
index = 0
}

s := sp.pools[index].Get().(*[]byte)
*s = (*s)[:size]
return s
}

func (sp *byteSlicePools) reuseSlice(s *[]byte) {
index := int(math.Floor(math.Log2(float64(cap(*s))))) - minPoolSizePower

if index >= len(sp.pools) || index < 0 {
return
}

sp.pools[index].Put(s)
}
30 changes: 30 additions & 0 deletions pkg/cortexpb/slicesPool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cortexpb

import (
"math"
"math/rand"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFuzzyByteSlicePools(t *testing.T) {
sut := newSlicePool(20)
maxByteSize := int(math.Pow(2, 20+minPoolSizePower-1))

for i := 0; i < 1000; i++ {
size := rand.Int() % maxByteSize
s := sut.getSlice(size)
assert.Equal(t, len(*s), size)
sut.reuseSlice(s)
}
}

func TestReturnSliceSmallerThanMin(t *testing.T) {
sut := newSlicePool(20)
size := 3
buff := make([]byte, 0, size)
sut.reuseSlice(&buff)
buff2 := sut.getSlice(size * 2)
assert.Equal(t, len(*buff2), size*2)
}
36 changes: 36 additions & 0 deletions pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ var (
}
},
}

writeRequestPool = sync.Pool{
New: func() interface{} {
return &PreallocWriteRequest{
WriteRequest: WriteRequest{},
}
},
}
bytePool = newSlicePool(20)
)

// PreallocConfig configures how structures will be preallocated to optimise
Expand All @@ -53,6 +62,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) {
// PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal.
type PreallocWriteRequest struct {
WriteRequest
data *[]byte
}

// Unmarshal implements proto.Message.
Expand All @@ -72,6 +82,32 @@ func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
return p.TimeSeries.Unmarshal(dAtA)
}

func (p *PreallocWriteRequest) Marshal() (dAtA []byte, err error) {
size := p.Size()
p.data = bytePool.getSlice(size)
dAtA = *p.data
n, err := p.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}

func ReuseWriteRequest(req *PreallocWriteRequest) {
if req.data != nil {
bytePool.reuseSlice(req.data)
req.data = nil
}
req.Source = 0
req.Metadata = nil
req.Timeseries = nil
writeRequestPool.Put(req)
}

func PreallocWriteRequestFromPool() *PreallocWriteRequest {
return writeRequestPool.Get().(*PreallocWriteRequest)
}

// LabelAdapter is a labels.Label that can be marshalled to/from protos.
type LabelAdapter labels.Label

Expand Down
66 changes: 66 additions & 0 deletions pkg/cortexpb/timeseries_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cortexpb

import (
"fmt"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -64,3 +66,67 @@ func TestTimeseriesFromPool(t *testing.T) {
assert.Len(t, reused.Samples, 0)
})
}

func BenchmarkMarshallWriteRequest(b *testing.B) {
ts := PreallocTimeseriesSliceFromPool()

for i := 0; i < 100; i++ {
ts = append(ts, PreallocTimeseries{TimeSeries: TimeseriesFromPool()})
ts[i].Labels = []LabelAdapter{
{Name: "foo", Value: "bar"},
{Name: "very long label name", Value: "very long label value"},
{Name: "very long label name 2", Value: "very long label value 2"},
{Name: "very long label name 3", Value: "very long label value 3"},
{Name: "int", Value: fmt.Sprint(i)},
}
ts[i].Samples = []Sample{{Value: 1, TimestampMs: 2}}
}

tests := []struct {
name string
writeRequestFactory func() proto.Marshaler
clean func(in interface{})
}{
{
name: "no-pool",
writeRequestFactory: func() proto.Marshaler {
return &WriteRequest{Timeseries: ts}
},
clean: func(in interface{}) {},
},
{
name: "byte pool",
writeRequestFactory: func() proto.Marshaler {
w := &PreallocWriteRequest{}
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
{
name: "byte and write pool",
writeRequestFactory: func() proto.Marshaler {
w := PreallocWriteRequestFromPool()
w.Timeseries = ts
return w
},
clean: func(in interface{}) {
ReuseWriteRequest(in.(*PreallocWriteRequest))
},
},
}

for _, tc := range tests {
b.Run(tc.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
w := tc.writeRequestFactory()
_, err := w.Marshal()
require.NoError(b, err)
tc.clean(w)
}
b.ReportAllocs()
})
}
}
15 changes: 8 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,14 +835,15 @@ func (d *Distributor) send(ctx context.Context, ingester ring.InstanceDesc, time
if err != nil {
return err
}
c := h.(ingester_client.IngesterClient)
c := h.(ingester_client.HealthAndIngesterClient)

req := cortexpb.WriteRequest{
Timeseries: timeseries,
Metadata: metadata,
Source: source,
}
_, err = c.Push(ctx, &req)
req := cortexpb.PreallocWriteRequestFromPool()
req.Timeseries = timeseries
req.Metadata = metadata
req.Source = source

_, err = c.PushPreAlloc(ctx, req)
cortexpb.ReuseWriteRequest(req)

if len(metadata) > 0 {
d.ingesterAppends.WithLabelValues(ingester.Addr, typeMetadata).Inc()
Expand Down
4 changes: 4 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2688,6 +2688,10 @@ func (i *mockIngester) Close() error {
return nil
}

func (i *mockIngester) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
return i.Push(ctx, &in.WriteRequest, opts...)
}

func (i *mockIngester) Push(ctx context.Context, req *cortexpb.WriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
i.Lock()
defer i.Unlock()
Expand Down
16 changes: 14 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package client

import (
"context"
"flag"

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util/grpcclient"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Expand All @@ -24,6 +26,7 @@ type HealthAndIngesterClient interface {
IngesterClient
grpc_health_v1.HealthClient
Close() error
PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error)
}

type closableHealthAndIngesterClient struct {
Expand All @@ -32,6 +35,15 @@ type closableHealthAndIngesterClient struct {
conn *grpc.ClientConn
}

func (c *closableHealthAndIngesterClient) PushPreAlloc(ctx context.Context, in *cortexpb.PreallocWriteRequest, opts ...grpc.CallOption) (*cortexpb.WriteResponse, error) {
out := new(cortexpb.WriteResponse)
err := c.conn.Invoke(ctx, "/cortex.Ingester/Push", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(addr string, cfg Config) (HealthAndIngesterClient, error) {
dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(ingesterClientRequestDuration))
Expand Down