Skip to content

Commit 2d96cf1

Browse files
committed
add ignore max query length and additional max query length check for query
Signed-off-by: Ben Ye <[email protected]>
1 parent 9bc04ce commit 2d96cf1

18 files changed

+328
-29
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* [ENHANCEMENT] AlertManager: Retrying AlertManager Delete Silence on error #5794
2727
* [ENHANCEMENT] Ingester: Add new ingester metric `cortex_ingester_max_inflight_query_requests`. #5798
2828
* [ENHANCEMENT] Query: Added `query_storage_wall_time` to Query Frontend and Ruler query stats log for wall time spent on fetching data from storage. Query evaluation is not included. #5799
29+
* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808
2930
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
3031
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
3132
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734

docs/blocks-storage/querier.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,12 @@ querier:
233233
# engine.
234234
# CLI flag: -querier.thanos-engine
235235
[thanos_engine: <boolean> | default = false]
236+
237+
# If enabled, ignore max query length check at Querier select method. Users
238+
# can choose to ignore it since the validation can be done before Querier
239+
# evaluation like at Query Frontend or Ruler.
240+
# CLI flag: -querier.ignore-max-query-length
241+
[ignore_max_query_length: <boolean> | default = false]
236242
```
237243
238244
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3714,6 +3714,12 @@ store_gateway_client:
37143714
# engine.
37153715
# CLI flag: -querier.thanos-engine
37163716
[thanos_engine: <boolean> | default = false]
3717+
3718+
# If enabled, ignore max query length check at Querier select method. Users can
3719+
# choose to ignore it since the validation can be done before Querier evaluation
3720+
# like at Query Frontend or Ruler.
3721+
# CLI flag: -querier.ignore-max-query-length
3722+
[ignore_max_query_length: <boolean> | default = false]
37173723
```
37183724
37193725
### `query_frontend_config`

pkg/cortex/modules.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,12 +457,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
457457
queryAnalyzer,
458458
prometheusCodec,
459459
shardedPrometheusCodec,
460+
t.Cfg.Querier.LookbackDelta,
460461
)
461462
if err != nil {
462463
return nil, err
463464
}
464465

465-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer)
466+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
466467
if err != nil {
467468
return nil, err
468469
}
@@ -548,6 +549,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
548549
return nil, nil
549550
}
550551

552+
t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta
551553
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
552554
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)
553555

pkg/querier/querier.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ type Config struct {
8585
// Experimental. Use https://github.com/thanos-io/promql-engine rather than
8686
// the Prometheus query engine.
8787
ThanosEngine bool `yaml:"thanos_engine"`
88+
89+
// Ignore max query length check at Querier.
90+
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
8891
}
8992

9093
var (
@@ -119,6 +122,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
119122
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
120123
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
121124
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
125+
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
122126
}
123127

124128
// Validate the config
@@ -256,16 +260,17 @@ type limiterHolder struct {
256260
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable {
257261
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
258262
q := querier{
259-
now: time.Now(),
260-
mint: mint,
261-
maxt: maxt,
262-
chunkIterFn: chunkIterFn,
263-
limits: limits,
264-
maxQueryIntoFuture: cfg.MaxQueryIntoFuture,
265-
queryStoreForLabels: cfg.QueryStoreForLabels,
266-
distributor: distributor,
267-
stores: stores,
268-
limiterHolder: &limiterHolder{},
263+
now: time.Now(),
264+
mint: mint,
265+
maxt: maxt,
266+
chunkIterFn: chunkIterFn,
267+
limits: limits,
268+
maxQueryIntoFuture: cfg.MaxQueryIntoFuture,
269+
queryStoreForLabels: cfg.QueryStoreForLabels,
270+
ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength,
271+
distributor: distributor,
272+
stores: stores,
273+
limiterHolder: &limiterHolder{},
269274
}
270275

271276
return q, nil
@@ -283,6 +288,8 @@ type querier struct {
283288
distributor QueryableWithFilter
284289
stores []QueryableWithFilter
285290
limiterHolder *limiterHolder
291+
292+
ignoreMaxQueryLength bool
286293
}
287294

288295
func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_stats.QueryStats, string, int64, int64, storage.Querier, []storage.Querier, error) {
@@ -397,9 +404,11 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select
397404
// Validate query time range. This validation should be done only for instant / range queries and
398405
// NOT for metadata queries (series, labels) because the query-frontend doesn't support splitting
399406
// of such queries.
400-
if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength {
401-
limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength))
402-
return storage.ErrSeriesSet(limitErr)
407+
if !q.ignoreMaxQueryLength {
408+
if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength {
409+
limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength))
410+
return storage.ErrSeriesSet(limitErr)
411+
}
403412
}
404413

405414
if len(queriers) == 1 {

pkg/querier/querier_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -833,6 +833,9 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) {
833833
queryStartTime time.Time
834834
queryEndTime time.Time
835835
expected error
836+
837+
// If enabled, skip max query length check at Querier.
838+
ignoreMaxQueryLength bool
836839
}{
837840
"should allow query on short time range and rate time window close to the limit": {
838841
query: "rate(foo[29d])",
@@ -858,6 +861,13 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) {
858861
queryEndTime: time.Now(),
859862
expected: errors.New("expanding series: the query time range exceeds the limit (query length: 721h1m0s, limit: 720h0m0s)"),
860863
},
864+
"max query length check ignored, invalid query is still allowed": {
865+
query: "rate(foo[1m])",
866+
queryStartTime: time.Now().Add(-maxQueryLength).Add(-time.Hour),
867+
queryEndTime: time.Now(),
868+
expected: nil,
869+
ignoreMaxQueryLength: true,
870+
},
861871
}
862872

863873
opts := promql.EngineOpts{
@@ -873,6 +883,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) {
873883
flagext.DefaultValues(&cfg)
874884
// Disable active query tracker to avoid mmap error.
875885
cfg.ActiveQueryTrackerDir = ""
886+
cfg.IgnoreMaxQueryLength = testData.ignoreMaxQueryLength
876887

877888
limits := DefaultLimitsConfig()
878889
limits.MaxQueryLength = model.Duration(maxQueryLength)
Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package instantquery
22

33
import (
4+
"time"
5+
46
"github.com/go-kit/log"
57
"github.com/thanos-io/thanos/pkg/querysharding"
68

@@ -11,9 +13,11 @@ func Middlewares(
1113
log log.Logger,
1214
limits tripperware.Limits,
1315
queryAnalyzer querysharding.Analyzer,
16+
lookbackDelta time.Duration,
1417
) ([]tripperware.Middleware, error) {
15-
var m []tripperware.Middleware
16-
17-
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer))
18+
m := []tripperware.Middleware{
19+
NewLimitsMiddleware(limits, lookbackDelta),
20+
tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer),
21+
}
1822
return m, nil
1923
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/prometheus/prometheus/promql"
9+
"github.com/prometheus/prometheus/promql/parser"
10+
"github.com/weaveworks/common/httpgrpc"
11+
12+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
13+
"github.com/cortexproject/cortex/pkg/tenant"
14+
"github.com/cortexproject/cortex/pkg/util"
15+
"github.com/cortexproject/cortex/pkg/util/spanlogger"
16+
"github.com/cortexproject/cortex/pkg/util/validation"
17+
)
18+
19+
type limitsMiddleware struct {
20+
tripperware.Limits
21+
next tripperware.Handler
22+
23+
lookbackDelta time.Duration
24+
}
25+
26+
// NewLimitsMiddleware creates a new Middleware that enforces query limits.
27+
func NewLimitsMiddleware(l tripperware.Limits, lookbackDelta time.Duration) tripperware.Middleware {
28+
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
29+
return limitsMiddleware{
30+
next: next,
31+
Limits: l,
32+
33+
lookbackDelta: lookbackDelta,
34+
}
35+
})
36+
}
37+
38+
func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
39+
log, ctx := spanlogger.New(ctx, "limits")
40+
defer log.Finish()
41+
42+
tenantIDs, err := tenant.TenantIDs(ctx)
43+
if err != nil {
44+
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
45+
}
46+
47+
// Enforce the max query length.
48+
if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 {
49+
expr, err := parser.ParseExpr(r.GetQuery())
50+
if err != nil {
51+
// Let Querier propagates the parsing error.
52+
return l.next.Do(ctx, r)
53+
}
54+
55+
// Enforce query length across all selectors in the query.
56+
min, max := promql.FindMinMaxTime(&parser.EvalStmt{Expr: expr, Start: util.TimeFromMillis(0), End: util.TimeFromMillis(0), LookbackDelta: l.lookbackDelta})
57+
diff := util.TimeFromMillis(max).Sub(util.TimeFromMillis(min))
58+
if diff > maxQueryLength {
59+
return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, diff, maxQueryLength)
60+
}
61+
}
62+
63+
return l.next.Do(ctx, r)
64+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/mock"
10+
"github.com/stretchr/testify/require"
11+
"github.com/weaveworks/common/user"
12+
13+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
14+
"github.com/cortexproject/cortex/pkg/util/validation"
15+
)
16+
17+
func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
18+
t.Parallel()
19+
const (
20+
thirtyDays = 30 * 24 * time.Hour
21+
)
22+
23+
tests := map[string]struct {
24+
maxQueryLength time.Duration
25+
query string
26+
expectedErr string
27+
}{
28+
"should skip validation if max length is disabled": {
29+
maxQueryLength: 0,
30+
},
31+
"even though failed to parse expression, should return no error since request will pass to next middleware": {
32+
query: `up[`,
33+
maxQueryLength: thirtyDays,
34+
},
35+
"should succeed on a query not exceeding time range": {
36+
query: `up`,
37+
maxQueryLength: thirtyDays,
38+
},
39+
"should succeed on a query not exceeding time range2": {
40+
query: `up[29d]`,
41+
maxQueryLength: thirtyDays,
42+
},
43+
"should succeed on a query not exceeding time range3": {
44+
query: `rate(up[29d]) + rate(test[29d])`,
45+
maxQueryLength: thirtyDays,
46+
},
47+
"should fail on a query exceeding time range": {
48+
query: `rate(up[31d])`,
49+
maxQueryLength: thirtyDays,
50+
expectedErr: "the query time range exceeds the limit",
51+
},
52+
"should fail on a query exceeding time range, work for multiple selects": {
53+
query: `rate(up[20d]) + rate(up[20d] offset 20d)`,
54+
maxQueryLength: thirtyDays,
55+
expectedErr: "the query time range exceeds the limit",
56+
},
57+
}
58+
59+
for testName, testData := range tests {
60+
testData := testData
61+
t.Run(testName, func(t *testing.T) {
62+
t.Parallel()
63+
req := &PrometheusRequest{Query: testData.query}
64+
65+
limits := &mockLimits{maxQueryLength: testData.maxQueryLength}
66+
middleware := NewLimitsMiddleware(limits, 5*time.Minute)
67+
68+
innerRes := NewEmptyPrometheusInstantQueryResponse()
69+
inner := &mockHandler{}
70+
inner.On("Do", mock.Anything, mock.Anything).Return(innerRes, nil)
71+
72+
ctx := user.InjectOrgID(context.Background(), "test")
73+
outer := middleware.Wrap(inner)
74+
res, err := outer.Do(ctx, req)
75+
76+
if testData.expectedErr != "" {
77+
require.Error(t, err)
78+
assert.Contains(t, err.Error(), testData.expectedErr)
79+
assert.Nil(t, res)
80+
assert.Len(t, inner.Calls, 0)
81+
} else {
82+
// We expect the response returned by the inner handler.
83+
require.NoError(t, err)
84+
assert.Same(t, innerRes, res)
85+
86+
// The time range of the request passed to the inner handler should have not been manipulated.
87+
require.Len(t, inner.Calls, 1)
88+
}
89+
})
90+
}
91+
}
92+
93+
type mockLimits struct {
94+
validation.Overrides
95+
maxQueryLength time.Duration
96+
}
97+
98+
func (m mockLimits) MaxQueryLength(string) time.Duration {
99+
return m.maxQueryLength
100+
}
101+
102+
type mockHandler struct {
103+
mock.Mock
104+
}
105+
106+
func (m *mockHandler) Do(ctx context.Context, req tripperware.Request) (tripperware.Response, error) {
107+
args := m.Called(ctx, req)
108+
return args.Get(0).(tripperware.Response), args.Error(1)
109+
}

0 commit comments

Comments
 (0)