From 72898646639a882132b000b05d93455621fd3a3c Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 24 Feb 2021 17:25:25 +0000 Subject: [PATCH 1/3] Refactor: pass userID in to Query functions Instead of extracting it from the context. This way is less surprising, and less code. Signed-off-by: Bryan Boreham --- pkg/distributor/distributor_test.go | 11 ++++++----- pkg/distributor/query.go | 17 ++++++----------- pkg/querier/distributor_queryable.go | 22 +++++++++++----------- pkg/querier/distributor_queryable_test.go | 12 +++++++----- pkg/querier/querier_test.go | 8 ++++---- 5 files changed, 34 insertions(+), 36 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6b392d1b24f..09a43699e70 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -49,7 +49,8 @@ import ( var ( errFail = fmt.Errorf("Fail") emptyResponse = &cortexpb.WriteResponse{} - ctx = user.InjectOrgID(context.Background(), "user") + testUserID = "user" + ctx = user.InjectOrgID(context.Background(), testUserID) ) func TestConfig_Validate(t *testing.T) { @@ -710,12 +711,12 @@ func TestDistributor_PushQuery(t *testing.T) { assert.Equal(t, &cortexpb.WriteResponse{}, writeResponse) assert.Nil(t, err) - response, err := ds[0].Query(ctx, 0, 10, tc.matchers...) + response, err := ds[0].Query(ctx, testUserID, 0, 10, tc.matchers...) sort.Sort(response) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) - series, err := ds[0].QueryStream(ctx, 0, 10, tc.matchers...) + series, err := ds[0].QueryStream(ctx, testUserID, 0, 10, tc.matchers...) assert.Equal(t, tc.expectedError, err) if series == nil { @@ -1006,10 +1007,10 @@ func TestSlowQueries(t *testing.T) { }) defer stopAll(ds, r) - _, err := ds[0].Query(ctx, 0, 10, nameMatcher) + _, err := ds[0].Query(ctx, testUserID, 0, 10, nameMatcher) assert.Equal(t, expectedErr, err) - _, err = ds[0].QueryStream(ctx, 0, 10, nameMatcher) + _, err = ds[0].QueryStream(ctx, testUserID, 0, 10, nameMatcher) assert.Equal(t, expectedErr, err) }) } diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 32395a85606..984fb534b4d 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -19,7 +19,7 @@ import ( ) // Query multiple ingesters and returns a Matrix of samples. -func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { +func (d *Distributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { var matrix model.Matrix err := instrument.CollectedRequest(ctx, "Distributor.Query", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { req, err := ingester_client.ToQueryRequest(from, to, matchers) @@ -27,7 +27,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - replicationSet, err := d.GetIngestersForQuery(ctx, matchers...) + replicationSet, err := d.GetIngestersForQuery(ctx, userID, matchers...) if err != nil { return err } @@ -46,7 +46,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . } // QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks. -func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) { +func (d *Distributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) { var result *ingester_client.QueryStreamResponse err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error { req, err := ingester_client.ToQueryRequest(from, to, matchers) @@ -54,12 +54,12 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc return err } - replicationSet, err := d.GetIngestersForQuery(ctx, matchers...) + replicationSet, err := d.GetIngestersForQuery(ctx, userID, matchers...) if err != nil { return err } - result, err = d.queryIngesterStream(ctx, replicationSet, req) + result, err = d.queryIngesterStream(ctx, userID, replicationSet, req) if err != nil { return err } @@ -74,12 +74,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc // GetIngestersForQuery returns a replication set including all ingesters that should be queried // to fetch series matching input label matchers. -func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error) { - userID, err := tenant.TenantID(ctx) - if err != nil { - return ring.ReplicationSet{}, err - } - +func (d *Distributor) GetIngestersForQuery(ctx context.Context, userID string, matchers ...*labels.Matcher) (ring.ReplicationSet, error) { // If shuffle sharding is enabled we should only query ingesters which are // part of the tenant's subring. if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index 8462ac674ef..bdd5af53d1d 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -24,8 +24,8 @@ import ( // Distributor is the read interface to the distributor, made an interface here // to reduce package coupling. type Distributor interface { - Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) - QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) + Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) + QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, matchers ...*labels.Matcher) ([]string, error) LabelNames(context.Context, model.Time, model.Time) ([]string, error) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) @@ -116,11 +116,16 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. } } + userID, err := tenant.TenantID(q.ctx) + if err != nil { + return storage.ErrSeriesSet(err) + } + if q.streaming { - return q.streamingSelect(ctx, minT, maxT, matchers) + return q.streamingSelect(ctx, userID, minT, maxT, matchers) } - matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...) + matrix, err := q.distributor.Query(ctx, userID, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(err) } @@ -129,13 +134,8 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. return series.MatrixToSeriesSet(matrix) } -func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { - userID, err := tenant.TenantID(ctx) - if err != nil { - return storage.ErrSeriesSet(err) - } - - results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), matchers...) +func (q *distributorQuerier) streamingSelect(ctx context.Context, userID string, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet { + results, err := q.distributor.QueryStream(ctx, userID, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index ab284d846e0..5ebb28ff93e 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -25,6 +25,7 @@ import ( const ( maxt, mint = 0, 10 + testUserID = "test" ) func TestDistributorQuerier(t *testing.T) { @@ -45,8 +46,9 @@ func TestDistributorQuerier(t *testing.T) { }, nil) + ctx := user.InjectOrgID(context.Background(), testUserID) queryable := newDistributorQueryable(d, false, nil, 0) - querier, err := queryable.Querier(context.Background(), mint, maxt) + querier, err := queryable.Querier(ctx, mint, maxt) require.NoError(t, err) seriesSet := querier.Select(true, &storage.SelectHints{Start: mint, End: maxt}) @@ -118,10 +120,10 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) { distributor := &mockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) - distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) + distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) - ctx := user.InjectOrgID(context.Background(), "test") + ctx := user.InjectOrgID(context.Background(), testUserID) queryable := newDistributorQueryable(distributor, streamingEnabled, nil, testData.queryIngestersWithin) querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT) require.NoError(t, err) @@ -330,11 +332,11 @@ type mockDistributor struct { mock.Mock } -func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { +func (m *mockDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { args := m.Called(ctx, from, to, matchers) return args.Get(0).(model.Matrix), args.Error(1) } -func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { +func (m *mockDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { args := m.Called(ctx, from, to, matchers) return args.Get(0).(*client.QueryStreamResponse), args.Error(1) } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index c61301180a1..0c1699150ee 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -737,10 +737,10 @@ type errDistributor struct{} var errDistributorError = fmt.Errorf("errDistributorError") -func (m *errDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { +func (m *errDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { return nil, errDistributorError } -func (m *errDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { +func (m *errDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { return nil, errDistributorError } func (m *errDistributor) LabelValuesForLabelName(context.Context, model.Time, model.Time, model.LabelName, ...*labels.Matcher) ([]string, error) { @@ -777,11 +777,11 @@ func (c *emptyChunkStore) IsCalled() bool { type emptyDistributor struct{} -func (d *emptyDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { +func (d *emptyDistributor) Query(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { return nil, nil } -func (d *emptyDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { +func (d *emptyDistributor) QueryStream(ctx context.Context, userID string, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { return &client.QueryStreamResponse{}, nil } From 052d2656002295b91720e56a5d537a63aab1b0c3 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 24 Feb 2021 17:44:15 +0000 Subject: [PATCH 2/3] Check limits on series and samples in querier For streaming queries (which are the default) Signed-off-by: Bryan Boreham --- pkg/distributor/query.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 984fb534b4d..d0d59e2f40d 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -2,6 +2,7 @@ package distributor import ( "context" + "fmt" "io" "time" @@ -167,7 +168,10 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re } // queryIngesterStream queries the ingesters using the new streaming API. -func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { +func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) { + maxSeries := d.limits.MaxSeriesPerQuery(userID) + maxSamples := d.limits.MaxSamplesPerQuery(userID) + // Fetch samples from multiple ingesters results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) @@ -199,6 +203,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...) result.Timeseries = append(result.Timeseries, resp.Timeseries...) + + if len(result.Chunkseries) > maxSeries || len(result.Timeseries) > maxSeries { + return nil, fmt.Errorf("exceeded maximum number of series in a query (limit %d)", maxSeries) + } } return result, nil }) @@ -209,6 +217,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{} hashToTimeSeries := map[string]ingester_client.TimeSeries{} + sampleCount := 0 for _, result := range results { response := result.(*ingester_client.QueryStreamResponse) @@ -226,12 +235,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri key := ingester_client.LabelsToKeyString(ingester_client.FromLabelAdaptersToLabels(series.Labels)) existing := hashToTimeSeries[key] existing.Labels = series.Labels + previousCount := len(existing.Samples) if existing.Samples == nil { existing.Samples = series.Samples } else { existing.Samples = mergeSamples(existing.Samples, series.Samples) } hashToTimeSeries[key] = existing + sampleCount += len(existing.Samples) - previousCount + if sampleCount > maxSamples { + return nil, fmt.Errorf("exceeded maximum number of samples in a query (limit %d)", maxSamples) + } } } From 0a08d5a9b822f11ca7570e273201f407d7d50154 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 24 Feb 2021 18:22:02 +0000 Subject: [PATCH 3/3] Check series limit after combining all series from ingesters Signed-off-by: Bryan Boreham --- pkg/distributor/query.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index d0d59e2f40d..bc3f117b1d8 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -249,6 +249,10 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, re } } + if len(hashToChunkseries) > maxSeries || len(hashToTimeSeries) > maxSeries { + return nil, fmt.Errorf("exceeded maximum number of series in a query (limit %d)", maxSeries) + } + resp := &ingester_client.QueryStreamResponse{ Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)), Timeseries: make([]ingester_client.TimeSeries, 0, len(hashToTimeSeries)),