Skip to content

Parallelise and short cut distributor queries. #278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 8, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 61 additions & 45 deletions distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ type ReadRing interface {
// create a Distributor
type Config struct {
ReplicationFactor int
MinReadSuccesses int
HeartbeatTimeout time.Duration
RemoteTimeout time.Duration
ClientCleanupPeriod time.Duration
Expand All @@ -95,7 +94,6 @@ type Config struct {
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.")
flag.IntVar(&cfg.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.DurationVar(&cfg.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.")
Expand All @@ -108,9 +106,6 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) {
if 0 > cfg.ReplicationFactor {
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
}
if cfg.MinReadSuccesses > cfg.ReplicationFactor {
return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor)
}
d := &Distributor{
cfg: cfg,
ring: ring,
Expand Down Expand Up @@ -424,14 +419,12 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.Ingeste
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
var result model.Matrix
err := instrument.TimeRequestHistogram(ctx, "Distributor.Query", d.queryDuration, func(ctx context.Context) error {
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}

metricName, _, err := util.ExtractMetricNameFromMatchers(matchers)
userID, err := user.GetID(ctx)
if err != nil {
return err
}

userID, err := user.GetID(ctx)
metricName, _, err := util.ExtractMetricNameFromMatchers(matchers)
if err != nil {
return err
}
Expand All @@ -441,61 +434,84 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
return err
}

if len(ingesters) < d.cfg.MinReadSuccesses {
return fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), d.cfg.MinReadSuccesses)
// We need a response from a quorum of ingesters, which is n/2 + 1.
minSuccess := (len(ingesters) / 2) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By my understanding of the DynamoDB paper, the constraint isn't that quorum is n/2 + 1, but rather that r + w > n, where r is minimum read quorum and w minimum write.

Copy link
Contributor Author

@tomwilkie tomwilkie Feb 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/DynamoDB/Dynamo/

But yes, that is what the paper says. In our case, we set r = w = n / 2 + 1 (although the +1 is due to integer arithmetic). So r+w > n -> 2r > n -> 2n/2 + 2 > n -> n + 2 > n

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: "DynamoDB exposes a similar data model and derives its name from Dynamo, but has a different underlying implementation"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, DynamoDB shares virtually no relation to Dynamo, save for name. Consistency and replication model is very different.

maxErrs := len(ingesters) - minSuccess
if len(ingesters) < minSuccess {
return fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess)
}

// Fetch samples from multiple ingesters and group them by fingerprint (unsorted
// and with overlap).
successes := 0
var lastErr error
for _, ing := range ingesters {
client, err := d.getClientFor(ing)
if err != nil {
return err
}
req, err := util.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}

req, err := util.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
// Fetch samples from multiple ingesters
var numErrs int32
errs := make(chan error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The routine below is constructed such that this will only ever receive one error, so the name errs is misleading. tooManyErrs? errReceived? maxErr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

results := make(chan model.Matrix, len(ingesters))

resp, err := client.Query(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
if err != nil {
lastErr = err
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
continue
}
successes++

for _, ss := range util.FromQueryResponse(resp) {
fp := ss.Metric.Fingerprint()
if mss, ok := fpToSampleStream[fp]; !ok {
fpToSampleStream[fp] = &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
for _, ing := range ingesters {
go func(ing *ring.IngesterDesc) {
result, err := d.queryIngester(ctx, ing, req)
if err != nil {
if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) {
errs <- err
}
} else {
mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values)
results <- result
}
}
}(ing)
}

if successes < d.cfg.MinReadSuccesses {
return fmt.Errorf("too few successful reads, last error was: %v", lastErr)
// Only wait for minSuccess ingesters (or an error), and accumulate the samples
// by fingerprint, merging them into any existing samples.
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
for i := 0; i < minSuccess; i++ {
select {
case err := <-errs:
return err

case result := <-results:
for _, ss := range result {
fp := ss.Metric.Fingerprint()
if mss, ok := fpToSampleStream[fp]; !ok {
fpToSampleStream[fp] = &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
}
} else {
mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values)
}
}
}
}

result = make(model.Matrix, 0, len(fpToSampleStream))
for _, ss := range fpToSampleStream {
result = append(result, ss)
}

return nil
})
return result, err
}

func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *cortex.QueryRequest) (model.Matrix, error) {
client, err := d.getClientFor(ing)
if err != nil {
return nil, err
}

resp, err := client.Query(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
}

return util.FromQueryResponse(resp), nil
}

// forAllIngesters runs f, in parallel, for all ingesters
func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{}, error)) ([]interface{}, error) {
resps, errs := make(chan interface{}), make(chan error)
Expand Down Expand Up @@ -527,7 +543,7 @@ func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{}
numErrs++
}
}
if numErrs > (d.cfg.ReplicationFactor - d.cfg.MinReadSuccesses) {
if numErrs > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd have expected this to be zero. Why is one failure OK but two not OK?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess this should really be floor(d.cfg.ReplicationFactor / 2) to keep the invariant above... which for RF=3 is 1. Will make it dynamic.

return nil, lastErr
}
return result, nil
Expand Down
127 changes: 124 additions & 3 deletions distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/metric"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
Expand Down Expand Up @@ -50,7 +52,31 @@ func (i mockIngester) Push(ctx context.Context, in *remote.WriteRequest, opts ..
}

func (i mockIngester) Query(ctx context.Context, in *cortex.QueryRequest, opts ...grpc.CallOption) (*cortex.QueryResponse, error) {
return nil, nil
if !i.happy {
return nil, fmt.Errorf("Fail")
}
return &cortex.QueryResponse{
Timeseries: []*remote.TimeSeries{
{
Labels: []*remote.LabelPair{
{
Name: "__name__",
Value: "foo",
},
},
Samples: []*remote.Sample{
{
Value: 0,
TimestampMs: 0,
},
{
Value: 1,
TimestampMs: 1,
},
},
},
},
}, nil
}

func (i mockIngester) LabelValues(ctx context.Context, in *cortex.LabelValuesRequest, opts ...grpc.CallOption) (*cortex.LabelValuesResponse, error) {
Expand All @@ -65,7 +91,7 @@ func (i mockIngester) MetricsForLabelMatchers(ctx context.Context, in *cortex.Me
return nil, nil
}

func TestDistributor(t *testing.T) {
func TestDistributorPush(t *testing.T) {
ctx := user.WithID(context.Background(), "user")
for i, tc := range []struct {
ingesters []mockIngester
Expand Down Expand Up @@ -128,7 +154,6 @@ func TestDistributor(t *testing.T) {

d, err := New(Config{
ReplicationFactor: 3,
MinReadSuccesses: 2,
HeartbeatTimeout: 1 * time.Minute,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
Expand Down Expand Up @@ -167,3 +192,99 @@ func TestDistributor(t *testing.T) {
})
}
}

func TestDistributorQuery(t *testing.T) {
ctx := user.WithID(context.Background(), "user")

expectedResponse := func(start, end int) model.Matrix {
result := model.Matrix{
&model.SampleStream{
Metric: model.Metric{"__name__": "foo"},
},
}
for i := start; i < end; i++ {
result[0].Values = append(result[0].Values,
model.SamplePair{
Value: model.SampleValue(i),
Timestamp: model.Time(i),
},
)
}
return result
}

for i, tc := range []struct {
ingesters []mockIngester
expectedResponse model.Matrix
expectedError error
}{
// A query to 3 happy ingesters should succeed
{
ingesters: []mockIngester{{true}, {true}, {true}},
expectedResponse: expectedResponse(0, 2),
},

// A query to 2 happy ingesters should succeed
{
ingesters: []mockIngester{{}, {true}, {true}},
expectedResponse: expectedResponse(0, 2),
},

// A query to 1 happy ingesters should fail
{
ingesters: []mockIngester{{}, {}, {true}},
expectedError: fmt.Errorf("Fail"),
},

// A query to 0 happy ingesters should succeed
{
ingesters: []mockIngester{{}, {}, {}},
expectedError: fmt.Errorf("Fail"),
},
} {
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
ingesterDescs := []*ring.IngesterDesc{}
ingesters := map[string]mockIngester{}
for i, ingester := range tc.ingesters {
addr := fmt.Sprintf("%d", i)
ingesterDescs = append(ingesterDescs, &ring.IngesterDesc{
Addr: addr,
Timestamp: time.Now().Unix(),
})
ingesters[addr] = ingester
}

ring := mockRing{
Counter: prometheus.NewCounter(prometheus.CounterOpts{
Name: "foo",
}),
ingesters: ingesterDescs,
}

d, err := New(Config{
ReplicationFactor: 3,
HeartbeatTimeout: 1 * time.Minute,
RemoteTimeout: 1 * time.Minute,
ClientCleanupPeriod: 1 * time.Minute,
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string) cortex.IngesterClient {
return ingesters[addr]
},
}, ring)
if err != nil {
t.Fatal(err)
}
defer d.Stop()

matcher, err := metric.NewLabelMatcher(metric.Equal, model.LabelName("__name__"), model.LabelValue("foo"))
if err != nil {
t.Fatal(err)
}
response, err := d.Query(ctx, 0, 10, matcher)
assert.Equal(t, tc.expectedResponse, response, "Wrong response")
assert.Equal(t, tc.expectedError, err, "Wrong error")
})
}
}