Skip to content

Commit d382e1d

Browse files
authored
Enforce "max chunks per query" limit in ingesters too (#4125)
* Deprecated -store.query-chunk-limit in favour of the new config -querier.max-fetched-chunks-per-query which is applied both to ingesters and long-term storage Signed-off-by: Marco Pracucci <[email protected]> * Fixed PR number in CHANGELOG Signed-off-by: Marco Pracucci <[email protected]> * Moved CHANGELOG entry to unreleased Signed-off-by: Marco Pracucci <[email protected]> * Addressed review feedback Signed-off-by: Marco Pracucci <[email protected]>
1 parent ceaefc8 commit d382e1d

File tree

21 files changed

+281
-68
lines changed

21 files changed

+281
-68
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## master / unreleased
44

5+
* [CHANGE] Querier / ruler: deprecated `-store.query-chunk-limit` CLI flag (and its respective YAML config option `max_chunks_per_query`) in favour of `-querier.max-fetched-chunks-per-query` (and its respective YAML config option `max_fetched_chunks_per_query`). The new limit specifies the maximum number of chunks that can be fetched in a single query from ingesters and long-term storage: the total number of actual fetched chunks could be 2x the limit, being independently applied when querying ingesters and long-term storage. #4125
6+
57
## 1.9.0 in progress
68

79
* [CHANGE] Fix for CVE-2021-31232: Local file disclosure vulnerability when `-experimental.alertmanager.enable-api` is used. The HTTP basic auth `password_file` can be used as an attack vector to send any file content via a webhook. The alertmanager templates can be used as an attack vector to send any file content because the alertmanager can load any text file specified in the templates list. #4129
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
# This file is left empty. It can be configured with overrides or other runtime config.
1+
# This file can be used to set overrides or other runtime config.
2+
ingester_stream_chunks_when_using_blocks: true
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
# This file is left empty. It can be configured with overrides or other runtime config.
1+
# This file can be used to set overrides or other runtime config.
2+
ingester_stream_chunks_when_using_blocks: true
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
# This file is left empty. It can be configured with overrides or other runtime config.
1+
# This file can be used to set overrides or other runtime config.
2+
ingester_stream_chunks_when_using_blocks: true

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4007,14 +4007,25 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
40074007
# CLI flag: -ingester.max-global-metadata-per-metric
40084008
[max_global_metadata_per_metric: <int> | default = 0]
40094009
4010-
# Maximum number of chunks that can be fetched in a single query. This limit is
4011-
# enforced when fetching chunks from the long-term storage. When running the
4012-
# Cortex chunks storage, this limit is enforced in the querier, while when
4013-
# running the Cortex blocks storage this limit is both enforced in the querier
4014-
# and store-gateway. 0 to disable.
4010+
# Deprecated. Use -querier.max-fetched-chunks-per-query CLI flag and its
4011+
# respective YAML config option instead. Maximum number of chunks that can be
4012+
# fetched in a single query. This limit is enforced when fetching chunks from
4013+
# the long-term storage only. When running the Cortex chunks storage, this limit
4014+
# is enforced in the querier and ruler, while when running the Cortex blocks
4015+
# storage this limit is enforced in the querier, ruler and store-gateway. 0 to
4016+
# disable.
40154017
# CLI flag: -store.query-chunk-limit
40164018
[max_chunks_per_query: <int> | default = 2000000]
40174019
4020+
# Maximum number of chunks that can be fetched in a single query from ingesters
4021+
# and long-term storage: the total number of actual fetched chunks could be 2x
4022+
# the limit, being independently applied when querying ingesters and long-term
4023+
# storage. This limit is enforced in the ingester (if chunks streaming is
4024+
# enabled), querier, ruler and store-gateway. Takes precedence over the
4025+
# deprecated -store.query-chunk-limit. 0 to disable.
4026+
# CLI flag: -querier.max-fetched-chunks-per-query
4027+
[max_fetched_chunks_per_query: <int> | default = 0]
4028+
40184029
# Limit how long back data (series and metadata) can be queried, up until
40194030
# <lookback> duration ago. This limit is enforced in the query-frontend, querier
40204031
# and ruler. If the requested time range is outside the allowed range, the

pkg/chunk/chunk_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th
355355
filtered := filterChunksByTime(from, through, chunks)
356356
level.Debug(log).Log("Chunks post filtering", len(chunks))
357357

358-
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
358+
maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID)
359359
if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery {
360360
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery))
361361
level.Error(log).Log("err", err)

pkg/chunk/composite_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
// StoreLimits helps get Limits specific to Queries for Stores
1616
type StoreLimits interface {
17-
MaxChunksPerQuery(userID string) int
17+
MaxChunksPerQueryFromStore(userID string) int
1818
MaxQueryLength(userID string) time.Duration
1919
}
2020

pkg/chunk/series_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode
112112
chunks := chks[0]
113113
fetcher := fetchers[0]
114114
// Protect ourselves against OOMing.
115-
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
115+
maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID)
116116
if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery {
117117
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery))
118118
level.Error(log).Log("err", err)

pkg/chunk/storage/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func RegisterIndexStore(name string, indexClientFactory IndexClientFactoryFunc,
7373
// StoreLimits helps get Limits specific to Queries for Stores
7474
type StoreLimits interface {
7575
CardinalityLimit(userID string) int
76-
MaxChunksPerQuery(userID string) int
76+
MaxChunksPerQueryFromStore(userID string) int
7777
MaxQueryLength(userID string) time.Duration
7878
}
7979

pkg/distributor/distributor_test.go

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,60 @@ func TestDistributor_PushQuery(t *testing.T) {
870870
}
871871
}
872872

873+
func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReached(t *testing.T) {
874+
const maxChunksLimit = 30 // Chunks are duplicated due to replication factor.
875+
876+
limits := &validation.Limits{}
877+
flagext.DefaultValues(limits)
878+
limits.MaxChunksPerQuery = maxChunksLimit
879+
880+
// Prepare distributors.
881+
ds, _, r, _ := prepare(t, prepConfig{
882+
numIngesters: 3,
883+
happyIngesters: 3,
884+
numDistributors: 1,
885+
shardByAllLabels: true,
886+
limits: limits,
887+
})
888+
defer stopAll(ds, r)
889+
890+
// Push a number of series below the max chunks limit. Each series has 1 sample,
891+
// so expect 1 chunk per series when querying back.
892+
initialSeries := maxChunksLimit / 3
893+
writeReq := makeWriteRequest(0, initialSeries, 0)
894+
writeRes, err := ds[0].Push(ctx, writeReq)
895+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
896+
assert.Nil(t, err)
897+
898+
allSeriesMatchers := []*labels.Matcher{
899+
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
900+
}
901+
902+
// Since the number of series (and thus chunks) is equal to the limit (but doesn't
903+
// exceed it), we expect a query running on all series to succeed.
904+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
905+
require.NoError(t, err)
906+
assert.Len(t, queryRes.Chunkseries, initialSeries)
907+
908+
// Push more series to exceed the limit once we'll query back all series.
909+
writeReq = &cortexpb.WriteRequest{}
910+
for i := 0; i < maxChunksLimit; i++ {
911+
writeReq.Timeseries = append(writeReq.Timeseries,
912+
makeWriteRequestTimeseries([]cortexpb.LabelAdapter{{Name: model.MetricNameLabel, Value: fmt.Sprintf("another_series_%d", i)}}, 0, 0),
913+
)
914+
}
915+
916+
writeRes, err = ds[0].Push(ctx, writeReq)
917+
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
918+
assert.Nil(t, err)
919+
920+
// Since the number of series (and thus chunks) is exceeding to the limit, we expect
921+
// a query running on all series to fail.
922+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
923+
require.Error(t, err)
924+
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
925+
}
926+
873927
func TestDistributor_Push_LabelRemoval(t *testing.T) {
874928
ctx = user.InjectOrgID(context.Background(), "user")
875929

@@ -1754,22 +1808,12 @@ func stopAll(ds []*Distributor, r *ring.Ring) {
17541808
func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortexpb.WriteRequest {
17551809
request := &cortexpb.WriteRequest{}
17561810
for i := 0; i < samples; i++ {
1757-
ts := cortexpb.PreallocTimeseries{
1758-
TimeSeries: &cortexpb.TimeSeries{
1759-
Labels: []cortexpb.LabelAdapter{
1760-
{Name: model.MetricNameLabel, Value: "foo"},
1761-
{Name: "bar", Value: "baz"},
1762-
{Name: "sample", Value: fmt.Sprintf("%d", i)},
1763-
},
1764-
},
1765-
}
1766-
ts.Samples = []cortexpb.Sample{
1767-
{
1768-
Value: float64(i),
1769-
TimestampMs: startTimestampMs + int64(i),
1770-
},
1771-
}
1772-
request.Timeseries = append(request.Timeseries, ts)
1811+
request.Timeseries = append(request.Timeseries, makeWriteRequestTimeseries(
1812+
[]cortexpb.LabelAdapter{
1813+
{Name: model.MetricNameLabel, Value: "foo"},
1814+
{Name: "bar", Value: "baz"},
1815+
{Name: "sample", Value: fmt.Sprintf("%d", i)},
1816+
}, startTimestampMs+int64(i), float64(i)))
17731817
}
17741818

17751819
for i := 0; i < metadata; i++ {
@@ -1784,6 +1828,20 @@ func makeWriteRequest(startTimestampMs int64, samples int, metadata int) *cortex
17841828
return request
17851829
}
17861830

1831+
func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts int64, value float64) cortexpb.PreallocTimeseries {
1832+
return cortexpb.PreallocTimeseries{
1833+
TimeSeries: &cortexpb.TimeSeries{
1834+
Labels: labels,
1835+
Samples: []cortexpb.Sample{
1836+
{
1837+
Value: value,
1838+
TimestampMs: ts,
1839+
},
1840+
},
1841+
},
1842+
}
1843+
}
1844+
17871845
func makeWriteRequestHA(samples int, replica, cluster string) *cortexpb.WriteRequest {
17881846
request := &cortexpb.WriteRequest{}
17891847
for i := 0; i < samples; i++ {

pkg/distributor/query.go

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@ package distributor
22

33
import (
44
"context"
5+
"fmt"
56
"io"
67
"time"
78

89
"github.com/opentracing/opentracing-go"
910
"github.com/prometheus/common/model"
1011
"github.com/prometheus/prometheus/pkg/labels"
1112
"github.com/weaveworks/common/instrument"
13+
"go.uber.org/atomic"
1214

1315
"github.com/cortexproject/cortex/pkg/cortexpb"
1416
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
@@ -17,6 +19,11 @@ import (
1719
"github.com/cortexproject/cortex/pkg/util"
1820
"github.com/cortexproject/cortex/pkg/util/extract"
1921
grpc_util "github.com/cortexproject/cortex/pkg/util/grpc"
22+
"github.com/cortexproject/cortex/pkg/util/validation"
23+
)
24+
25+
var (
26+
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from ingesters for %s (limit: %d)"
2027
)
2128

2229
// Query multiple ingesters and returns a Matrix of samples.
@@ -50,6 +57,11 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
5057
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
5158
var result *ingester_client.QueryStreamResponse
5259
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
60+
userID, err := tenant.TenantID(ctx)
61+
if err != nil {
62+
return err
63+
}
64+
5365
req, err := ingester_client.ToQueryRequest(from, to, matchers)
5466
if err != nil {
5567
return err
@@ -60,7 +72,7 @@ func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matc
6072
return err
6173
}
6274

63-
result, err = d.queryIngesterStream(ctx, replicationSet, req)
75+
result, err = d.queryIngesterStream(ctx, userID, replicationSet, req)
6476
if err != nil {
6577
return err
6678
}
@@ -173,7 +185,12 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
173185
}
174186

175187
// queryIngesterStream queries the ingesters using the new streaming API.
176-
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
188+
func (d *Distributor) queryIngesterStream(ctx context.Context, userID string, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
189+
var (
190+
chunksLimit = d.limits.MaxChunksPerQueryFromIngesters(userID)
191+
chunksCount = atomic.Int32{}
192+
)
193+
177194
// Fetch samples from multiple ingesters
178195
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
179196
client, err := d.ingesterPool.GetClientFor(ing.Addr)
@@ -203,6 +220,17 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
203220
return nil, err
204221
}
205222

223+
// Enforce the max chunks limits.
224+
if chunksLimit > 0 {
225+
if count := int(chunksCount.Add(int32(resp.ChunksCount()))); count > chunksLimit {
226+
// We expect to be always able to convert the label matchers back to Prometheus ones.
227+
// In case we fail (unexpected) the error will not include the matchers, but the core
228+
// logic doesn't break.
229+
matchers, _ := ingester_client.FromLabelMatchers(req.Matchers)
230+
return nil, validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), chunksLimit))
231+
}
232+
}
233+
206234
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
207235
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
208236
}

pkg/ingester/client/compat.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ
2525

2626
// FromQueryRequest unpacks a QueryRequest proto.
2727
func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
28-
matchers, err := fromLabelMatchers(req.Matchers)
28+
matchers, err := FromLabelMatchers(req.Matchers)
2929
if err != nil {
3030
return 0, 0, nil, err
3131
}
@@ -90,7 +90,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
9090
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
9191
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
9292
for _, matchers := range req.MatchersSet {
93-
matchers, err := fromLabelMatchers(matchers.Matchers)
93+
matchers, err := FromLabelMatchers(matchers.Matchers)
9494
if err != nil {
9595
return 0, 0, nil, err
9696
}
@@ -131,7 +131,7 @@ func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*l
131131
var matchers []*labels.Matcher
132132

133133
if req.Matchers != nil {
134-
matchers, err = fromLabelMatchers(req.Matchers.Matchers)
134+
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
135135
if err != nil {
136136
return "", 0, 0, nil, err
137137
}
@@ -165,7 +165,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
165165
return result, nil
166166
}
167167

168-
func fromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
168+
func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
169169
result := make([]*labels.Matcher, 0, len(matchers))
170170
for _, matcher := range matchers {
171171
var mtype labels.MatchType

pkg/ingester/client/custom.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package client
2+
3+
// ChunksCount returns the number of chunks in response.
4+
func (m *QueryStreamResponse) ChunksCount() int {
5+
if len(m.Chunkseries) == 0 {
6+
return 0
7+
}
8+
9+
count := 0
10+
for _, entry := range m.Chunkseries {
11+
count += len(entry.Chunks)
12+
}
13+
return count
14+
}

pkg/querier/blocks_store_queryable.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ const (
5353

5454
var (
5555
errNoStoreGatewayAddress = errors.New("no store-gateway address configured")
56-
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks for %s (limit: %d)"
56+
errMaxChunksPerQueryLimit = "the query hit the max number of chunks limit while fetching chunks from store-gateways for %s (limit: %d)"
5757
)
5858

5959
// BlocksStoreSet is the interface used to get the clients to query series on a set of blocks.
@@ -89,7 +89,7 @@ type BlocksStoreClient interface {
8989
type BlocksStoreLimits interface {
9090
bucket.TenantConfigProvider
9191

92-
MaxChunksPerQuery(userID string) int
92+
MaxChunksPerQueryFromStore(userID string) int
9393
StoreGatewayTenantShardSize(userID string) int
9494
}
9595

@@ -401,7 +401,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
401401
resSeriesSets = []storage.SeriesSet(nil)
402402
resWarnings = storage.Warnings(nil)
403403

404-
maxChunksLimit = q.limits.MaxChunksPerQuery(q.userID)
404+
maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID)
405405
leftChunksLimit = maxChunksLimit
406406

407407
resultMtx sync.Mutex
@@ -615,7 +615,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
615615
if maxChunksLimit > 0 {
616616
actual := numChunks.Add(int32(len(s.Chunks)))
617617
if actual > int32(leftChunksLimit) {
618-
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, convertMatchersToString(matchers), maxChunksLimit))
618+
return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit))
619619
}
620620
}
621621
}
@@ -937,19 +937,3 @@ func countSeriesBytes(series []*storepb.Series) (count uint64) {
937937

938938
return count
939939
}
940-
941-
func convertMatchersToString(matchers []*labels.Matcher) string {
942-
out := strings.Builder{}
943-
out.WriteRune('{')
944-
945-
for idx, m := range matchers {
946-
if idx > 0 {
947-
out.WriteRune(',')
948-
}
949-
950-
out.WriteString(m.String())
951-
}
952-
953-
out.WriteRune('}')
954-
return out.String()
955-
}

pkg/querier/blocks_store_queryable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1300,7 +1300,7 @@ type blocksStoreLimitsMock struct {
13001300
storeGatewayTenantShardSize int
13011301
}
13021302

1303-
func (m *blocksStoreLimitsMock) MaxChunksPerQuery(_ string) int {
1303+
func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int {
13041304
return m.maxChunksPerQuery
13051305
}
13061306

0 commit comments

Comments
 (0)