From e5efa93451a30c626a6e9b4e687f3f23fd6e25f3 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 7 Feb 2017 18:52:25 +0000 Subject: [PATCH 1/2] Parallelise and short cut distributor queries. --- distributor/distributor.go | 106 +++++++++++++++----------- distributor/distributor_test.go | 127 +++++++++++++++++++++++++++++++- 2 files changed, 185 insertions(+), 48 deletions(-) diff --git a/distributor/distributor.go b/distributor/distributor.go index b159cf45f68..cd56f55c096 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -81,7 +81,6 @@ type ReadRing interface { // create a Distributor type Config struct { ReplicationFactor int - MinReadSuccesses int HeartbeatTimeout time.Duration RemoteTimeout time.Duration ClientCleanupPeriod time.Duration @@ -95,7 +94,6 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") - flag.IntVar(&cfg.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") flag.DurationVar(&cfg.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.") flag.DurationVar(&cfg.ClientCleanupPeriod, "distributor.client-cleanup-period", 15*time.Second, "How frequently to clean up clients for ingesters that have gone away.") @@ -108,9 +106,6 @@ func New(cfg Config, ring ReadRing) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } - if cfg.MinReadSuccesses > cfg.ReplicationFactor { - return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor) - } d := &Distributor{ cfg: cfg, ring: ring, @@ -424,14 +419,12 @@ func (d *Distributor) sendSamplesErr(ctx context.Context, ingester *ring.Ingeste func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) { var result model.Matrix err := instrument.TimeRequestHistogram(ctx, "Distributor.Query", d.queryDuration, func(ctx context.Context) error { - fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} - - metricName, _, err := util.ExtractMetricNameFromMatchers(matchers) + userID, err := user.GetID(ctx) if err != nil { return err } - userID, err := user.GetID(ctx) + metricName, _, err := util.ExtractMetricNameFromMatchers(matchers) if err != nil { return err } @@ -441,61 +434,84 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - if len(ingesters) < d.cfg.MinReadSuccesses { - return fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), d.cfg.MinReadSuccesses) + // We need a response from a quorum of ingesters, which is n/2 + 1. + minSuccess := (len(ingesters) / 2) + 1 + maxErrs := len(ingesters) - minSuccess + if len(ingesters) < minSuccess { + return fmt.Errorf("could only find %d ingesters for query. Need at least %d", len(ingesters), minSuccess) } - // Fetch samples from multiple ingesters and group them by fingerprint (unsorted - // and with overlap). - successes := 0 - var lastErr error - for _, ing := range ingesters { - client, err := d.getClientFor(ing) - if err != nil { - return err - } + req, err := util.ToQueryRequest(from, to, matchers) + if err != nil { + return err + } - req, err := util.ToQueryRequest(from, to, matchers) - if err != nil { - return err - } + // Fetch samples from multiple ingesters + var numErrs int32 + errs := make(chan error) + results := make(chan model.Matrix, len(ingesters)) - resp, err := client.Query(ctx, req) - d.ingesterQueries.WithLabelValues(ing.Addr).Inc() - if err != nil { - lastErr = err - d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() - continue - } - successes++ - - for _, ss := range util.FromQueryResponse(resp) { - fp := ss.Metric.Fingerprint() - if mss, ok := fpToSampleStream[fp]; !ok { - fpToSampleStream[fp] = &model.SampleStream{ - Metric: ss.Metric, - Values: ss.Values, + for _, ing := range ingesters { + go func(ing *ring.IngesterDesc) { + result, err := d.queryIngester(ctx, ing, req) + if err != nil { + if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) { + errs <- err } } else { - mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values) + results <- result } - } + }(ing) } - if successes < d.cfg.MinReadSuccesses { - return fmt.Errorf("too few successful reads, last error was: %v", lastErr) + // Only wait for minSuccess ingesters (or an error), and accumulate the samples + // by fingerprint, merging them into any existing samples. + fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} + for i := 0; i < minSuccess; i++ { + select { + case err := <-errs: + return err + + case result := <-results: + for _, ss := range result { + fp := ss.Metric.Fingerprint() + if mss, ok := fpToSampleStream[fp]; !ok { + fpToSampleStream[fp] = &model.SampleStream{ + Metric: ss.Metric, + Values: ss.Values, + } + } else { + mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values) + } + } + } } result = make(model.Matrix, 0, len(fpToSampleStream)) for _, ss := range fpToSampleStream { result = append(result, ss) } - return nil }) return result, err } +func (d *Distributor) queryIngester(ctx context.Context, ing *ring.IngesterDesc, req *cortex.QueryRequest) (model.Matrix, error) { + client, err := d.getClientFor(ing) + if err != nil { + return nil, err + } + + resp, err := client.Query(ctx, req) + d.ingesterQueries.WithLabelValues(ing.Addr).Inc() + if err != nil { + d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc() + return nil, err + } + + return util.FromQueryResponse(resp), nil +} + // forAllIngesters runs f, in parallel, for all ingesters func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{}, error)) ([]interface{}, error) { resps, errs := make(chan interface{}), make(chan error) @@ -527,7 +543,7 @@ func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{} numErrs++ } } - if numErrs > (d.cfg.ReplicationFactor - d.cfg.MinReadSuccesses) { + if numErrs > 1 { return nil, lastErr } return result, nil diff --git a/distributor/distributor_test.go b/distributor/distributor_test.go index ad908db28b2..7dfa7f9c922 100644 --- a/distributor/distributor_test.go +++ b/distributor/distributor_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "golang.org/x/net/context" @@ -50,7 +52,31 @@ func (i mockIngester) Push(ctx context.Context, in *remote.WriteRequest, opts .. } func (i mockIngester) Query(ctx context.Context, in *cortex.QueryRequest, opts ...grpc.CallOption) (*cortex.QueryResponse, error) { - return nil, nil + if !i.happy { + return nil, fmt.Errorf("Fail") + } + return &cortex.QueryResponse{ + Timeseries: []*remote.TimeSeries{ + { + Labels: []*remote.LabelPair{ + { + Name: "__name__", + Value: "foo", + }, + }, + Samples: []*remote.Sample{ + { + Value: 0, + TimestampMs: 0, + }, + { + Value: 1, + TimestampMs: 1, + }, + }, + }, + }, + }, nil } func (i mockIngester) LabelValues(ctx context.Context, in *cortex.LabelValuesRequest, opts ...grpc.CallOption) (*cortex.LabelValuesResponse, error) { @@ -65,7 +91,7 @@ func (i mockIngester) MetricsForLabelMatchers(ctx context.Context, in *cortex.Me return nil, nil } -func TestDistributor(t *testing.T) { +func TestDistributorPush(t *testing.T) { ctx := user.WithID(context.Background(), "user") for i, tc := range []struct { ingesters []mockIngester @@ -128,7 +154,6 @@ func TestDistributor(t *testing.T) { d, err := New(Config{ ReplicationFactor: 3, - MinReadSuccesses: 2, HeartbeatTimeout: 1 * time.Minute, RemoteTimeout: 1 * time.Minute, ClientCleanupPeriod: 1 * time.Minute, @@ -167,3 +192,99 @@ func TestDistributor(t *testing.T) { }) } } + +func TestDistributorQuery(t *testing.T) { + ctx := user.WithID(context.Background(), "user") + + expectedResponse := func(start, end int) model.Matrix { + result := model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{"__name__": "foo"}, + }, + } + for i := start; i < end; i++ { + result[0].Values = append(result[0].Values, + model.SamplePair{ + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }, + ) + } + return result + } + + for i, tc := range []struct { + ingesters []mockIngester + expectedResponse model.Matrix + expectedError error + }{ + // A query to 3 happy ingesters should succeed + { + ingesters: []mockIngester{{true}, {true}, {true}}, + expectedResponse: expectedResponse(0, 2), + }, + + // A query to 2 happy ingesters should succeed + { + ingesters: []mockIngester{{}, {true}, {true}}, + expectedResponse: expectedResponse(0, 2), + }, + + // A query to 1 happy ingesters should fail + { + ingesters: []mockIngester{{}, {}, {true}}, + expectedError: fmt.Errorf("Fail"), + }, + + // A query to 0 happy ingesters should succeed + { + ingesters: []mockIngester{{}, {}, {}}, + expectedError: fmt.Errorf("Fail"), + }, + } { + t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { + ingesterDescs := []*ring.IngesterDesc{} + ingesters := map[string]mockIngester{} + for i, ingester := range tc.ingesters { + addr := fmt.Sprintf("%d", i) + ingesterDescs = append(ingesterDescs, &ring.IngesterDesc{ + Addr: addr, + Timestamp: time.Now().Unix(), + }) + ingesters[addr] = ingester + } + + ring := mockRing{ + Counter: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "foo", + }), + ingesters: ingesterDescs, + } + + d, err := New(Config{ + ReplicationFactor: 3, + HeartbeatTimeout: 1 * time.Minute, + RemoteTimeout: 1 * time.Minute, + ClientCleanupPeriod: 1 * time.Minute, + IngestionRateLimit: 10000, + IngestionBurstSize: 10000, + + ingesterClientFactory: func(addr string) cortex.IngesterClient { + return ingesters[addr] + }, + }, ring) + if err != nil { + t.Fatal(err) + } + defer d.Stop() + + matcher, err := metric.NewLabelMatcher(metric.Equal, model.LabelName("__name__"), model.LabelValue("foo")) + if err != nil { + t.Fatal(err) + } + response, err := d.Query(ctx, 0, 10, matcher) + assert.Equal(t, tc.expectedResponse, response, "Wrong response") + assert.Equal(t, tc.expectedError, err, "Wrong error") + }) + } +} From 3e508ac4dd74744f7251ede304b59d86c1e74870 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 8 Feb 2017 11:11:59 +0000 Subject: [PATCH 2/2] Review feedback --- distributor/distributor.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/distributor/distributor.go b/distributor/distributor.go index cd56f55c096..bda8469d403 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -448,7 +448,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . // Fetch samples from multiple ingesters var numErrs int32 - errs := make(chan error) + errReceived := make(chan error) results := make(chan model.Matrix, len(ingesters)) for _, ing := range ingesters { @@ -456,7 +456,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . result, err := d.queryIngester(ctx, ing, req) if err != nil { if atomic.AddInt32(&numErrs, 1) == int32(maxErrs+1) { - errs <- err + errReceived <- err } } else { results <- result @@ -469,7 +469,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . fpToSampleStream := map[model.Fingerprint]*model.SampleStream{} for i := 0; i < minSuccess; i++ { select { - case err := <-errs: + case err := <-errReceived: return err case result := <-results: @@ -543,7 +543,7 @@ func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{} numErrs++ } } - if numErrs > 1 { + if numErrs > d.cfg.ReplicationFactor/2 { return nil, lastErr } return result, nil