diff --git a/CHANGELOG.md b/CHANGELOG.md index cb6919a574..2d266b0ed2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ * [ENHANCEMENT] AlertManager: Retrying AlertManager Delete Silence on error #5794 * [ENHANCEMENT] Ingester: Add new ingester metric `cortex_ingester_max_inflight_query_requests`. #5798 * [ENHANCEMENT] Query: Added `query_storage_wall_time` to Query Frontend and Ruler query stats log for wall time spent on fetching data from storage. Query evaluation is not included. #5799 +* [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808 * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 36ae8fa5d6..7e72d635c8 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -233,6 +233,12 @@ querier: # engine. # CLI flag: -querier.thanos-engine [thanos_engine: | default = false] + + # If enabled, ignore max query length check at Querier select method. Users + # can choose to ignore it since the validation can be done before Querier + # evaluation like at Query Frontend or Ruler. + # CLI flag: -querier.ignore-max-query-length + [ignore_max_query_length: | default = false] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 47da41edbd..89e9a26703 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3714,6 +3714,12 @@ store_gateway_client: # engine. # CLI flag: -querier.thanos-engine [thanos_engine: | default = false] + +# If enabled, ignore max query length check at Querier select method. Users can +# choose to ignore it since the validation can be done before Querier evaluation +# like at Query Frontend or Ruler. +# CLI flag: -querier.ignore-max-query-length +[ignore_max_query_length: | default = false] ``` ### `query_frontend_config` diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 291791ac51..3b39646310 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -457,12 +457,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryAnalyzer, prometheusCodec, shardedPrometheusCodec, + t.Cfg.Querier.LookbackDelta, ) if err != nil { return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer) + instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, queryAnalyzer, t.Cfg.Querier.LookbackDelta) if err != nil { return nil, err } @@ -548,6 +549,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { return nil, nil } + t.Cfg.Ruler.LookbackDelta = t.Cfg.Querier.LookbackDelta t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 29247e3cf7..d453330b7a 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -85,6 +85,9 @@ type Config struct { // Experimental. Use https://github.com/thanos-io/promql-engine rather than // the Prometheus query engine. ThanosEngine bool `yaml:"thanos_engine"` + + // Ignore max query length check at Querier. + IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` } var ( @@ -119,6 +122,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).") f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") + f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") } // Validate the config @@ -256,16 +260,17 @@ type limiterHolder struct { func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides) storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { q := querier{ - now: time.Now(), - mint: mint, - maxt: maxt, - chunkIterFn: chunkIterFn, - limits: limits, - maxQueryIntoFuture: cfg.MaxQueryIntoFuture, - queryStoreForLabels: cfg.QueryStoreForLabels, - distributor: distributor, - stores: stores, - limiterHolder: &limiterHolder{}, + now: time.Now(), + mint: mint, + maxt: maxt, + chunkIterFn: chunkIterFn, + limits: limits, + maxQueryIntoFuture: cfg.MaxQueryIntoFuture, + queryStoreForLabels: cfg.QueryStoreForLabels, + ignoreMaxQueryLength: cfg.IgnoreMaxQueryLength, + distributor: distributor, + stores: stores, + limiterHolder: &limiterHolder{}, } return q, nil @@ -283,6 +288,8 @@ type querier struct { distributor QueryableWithFilter stores []QueryableWithFilter limiterHolder *limiterHolder + + ignoreMaxQueryLength bool } func (q querier) setupFromCtx(ctx context.Context) (context.Context, *querier_stats.QueryStats, string, int64, int64, storage.Querier, []storage.Querier, error) { @@ -397,9 +404,11 @@ func (q querier) Select(ctx context.Context, sortSeries bool, sp *storage.Select // Validate query time range. This validation should be done only for instant / range queries and // NOT for metadata queries (series, labels) because the query-frontend doesn't support splitting // of such queries. - if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength { - limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength)) - return storage.ErrSeriesSet(limitErr) + if !q.ignoreMaxQueryLength { + if maxQueryLength := q.limits.MaxQueryLength(userID); maxQueryLength > 0 && endTime.Sub(startTime) > maxQueryLength { + limitErr := validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, endTime.Sub(startTime), maxQueryLength)) + return storage.ErrSeriesSet(limitErr) + } } if len(queriers) == 1 { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 002569bff5..4e54f4d176 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -833,6 +833,9 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { queryStartTime time.Time queryEndTime time.Time expected error + + // If enabled, skip max query length check at Querier. + ignoreMaxQueryLength bool }{ "should allow query on short time range and rate time window close to the limit": { query: "rate(foo[29d])", @@ -858,6 +861,13 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { queryEndTime: time.Now(), expected: errors.New("expanding series: the query time range exceeds the limit (query length: 721h1m0s, limit: 720h0m0s)"), }, + "max query length check ignored, invalid query is still allowed": { + query: "rate(foo[1m])", + queryStartTime: time.Now().Add(-maxQueryLength).Add(-time.Hour), + queryEndTime: time.Now(), + expected: nil, + ignoreMaxQueryLength: true, + }, } opts := promql.EngineOpts{ @@ -873,6 +883,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { flagext.DefaultValues(&cfg) // Disable active query tracker to avoid mmap error. cfg.ActiveQueryTrackerDir = "" + cfg.IgnoreMaxQueryLength = testData.ignoreMaxQueryLength limits := DefaultLimitsConfig() limits.MaxQueryLength = model.Duration(maxQueryLength) diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index b88515e6be..5e4698c81a 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -1,6 +1,8 @@ package instantquery import ( + "time" + "github.com/go-kit/log" "github.com/thanos-io/thanos/pkg/querysharding" @@ -11,9 +13,11 @@ func Middlewares( log log.Logger, limits tripperware.Limits, queryAnalyzer querysharding.Analyzer, + lookbackDelta time.Duration, ) ([]tripperware.Middleware, error) { - var m []tripperware.Middleware - - m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer)) + m := []tripperware.Middleware{ + NewLimitsMiddleware(limits, lookbackDelta), + tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, queryAnalyzer), + } return m, nil } diff --git a/pkg/querier/tripperware/instantquery/limits.go b/pkg/querier/tripperware/instantquery/limits.go new file mode 100644 index 0000000000..64c5f4e443 --- /dev/null +++ b/pkg/querier/tripperware/instantquery/limits.go @@ -0,0 +1,64 @@ +package instantquery + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type limitsMiddleware struct { + tripperware.Limits + next tripperware.Handler + + lookbackDelta time.Duration +} + +// NewLimitsMiddleware creates a new Middleware that enforces query limits. +func NewLimitsMiddleware(l tripperware.Limits, lookbackDelta time.Duration) tripperware.Middleware { + return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { + return limitsMiddleware{ + next: next, + Limits: l, + + lookbackDelta: lookbackDelta, + } + }) +} + +func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) { + log, ctx := spanlogger.New(ctx, "limits") + defer log.Finish() + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + // Enforce the max query length. + if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { + expr, err := parser.ParseExpr(r.GetQuery()) + if err != nil { + // Let Querier propagates the parsing error. + return l.next.Do(ctx, r) + } + + // Enforce query length across all selectors in the query. + min, max := promql.FindMinMaxTime(&parser.EvalStmt{Expr: expr, Start: util.TimeFromMillis(0), End: util.TimeFromMillis(0), LookbackDelta: l.lookbackDelta}) + diff := util.TimeFromMillis(max).Sub(util.TimeFromMillis(min)) + if diff > maxQueryLength { + return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, diff, maxQueryLength) + } + } + + return l.next.Do(ctx, r) +} diff --git a/pkg/querier/tripperware/instantquery/limits_test.go b/pkg/querier/tripperware/instantquery/limits_test.go new file mode 100644 index 0000000000..1900831388 --- /dev/null +++ b/pkg/querier/tripperware/instantquery/limits_test.go @@ -0,0 +1,109 @@ +package instantquery + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { + t.Parallel() + const ( + thirtyDays = 30 * 24 * time.Hour + ) + + tests := map[string]struct { + maxQueryLength time.Duration + query string + expectedErr string + }{ + "should skip validation if max length is disabled": { + maxQueryLength: 0, + }, + "even though failed to parse expression, should return no error since request will pass to next middleware": { + query: `up[`, + maxQueryLength: thirtyDays, + }, + "should succeed on a query not exceeding time range": { + query: `up`, + maxQueryLength: thirtyDays, + }, + "should succeed on a query not exceeding time range2": { + query: `up[29d]`, + maxQueryLength: thirtyDays, + }, + "should succeed on a query not exceeding time range3": { + query: `rate(up[29d]) + rate(test[29d])`, + maxQueryLength: thirtyDays, + }, + "should fail on a query exceeding time range": { + query: `rate(up[31d])`, + maxQueryLength: thirtyDays, + expectedErr: "the query time range exceeds the limit", + }, + "should fail on a query exceeding time range, work for multiple selects": { + query: `rate(up[20d]) + rate(up[20d] offset 20d)`, + maxQueryLength: thirtyDays, + expectedErr: "the query time range exceeds the limit", + }, + } + + for testName, testData := range tests { + testData := testData + t.Run(testName, func(t *testing.T) { + t.Parallel() + req := &PrometheusRequest{Query: testData.query} + + limits := &mockLimits{maxQueryLength: testData.maxQueryLength} + middleware := NewLimitsMiddleware(limits, 5*time.Minute) + + innerRes := NewEmptyPrometheusInstantQueryResponse() + inner := &mockHandler{} + inner.On("Do", mock.Anything, mock.Anything).Return(innerRes, nil) + + ctx := user.InjectOrgID(context.Background(), "test") + outer := middleware.Wrap(inner) + res, err := outer.Do(ctx, req) + + if testData.expectedErr != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), testData.expectedErr) + assert.Nil(t, res) + assert.Len(t, inner.Calls, 0) + } else { + // We expect the response returned by the inner handler. + require.NoError(t, err) + assert.Same(t, innerRes, res) + + // The time range of the request passed to the inner handler should have not been manipulated. + require.Len(t, inner.Calls, 1) + } + }) + } +} + +type mockLimits struct { + validation.Overrides + maxQueryLength time.Duration +} + +func (m mockLimits) MaxQueryLength(string) time.Duration { + return m.maxQueryLength +} + +type mockHandler struct { + mock.Mock +} + +func (m *mockHandler) Do(ctx context.Context, req tripperware.Request) (tripperware.Response, error) { + args := m.Called(ctx, req) + return args.Get(0).(tripperware.Response), args.Error(1) +} diff --git a/pkg/querier/tripperware/queryrange/limits.go b/pkg/querier/tripperware/queryrange/limits.go index 878e33d860..5931501cd6 100644 --- a/pkg/querier/tripperware/queryrange/limits.go +++ b/pkg/querier/tripperware/queryrange/limits.go @@ -7,6 +7,8 @@ import ( "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/querier/tripperware" @@ -19,14 +21,18 @@ import ( type limitsMiddleware struct { tripperware.Limits next tripperware.Handler + + lookbackDelta time.Duration } // NewLimitsMiddleware creates a new Middleware that enforces query limits. -func NewLimitsMiddleware(l tripperware.Limits) tripperware.Middleware { +func NewLimitsMiddleware(l tripperware.Limits, lookbackDelta time.Duration) tripperware.Middleware { return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler { return limitsMiddleware{ next: next, Limits: l, + + lookbackDelta: lookbackDelta, } }) } @@ -69,11 +75,25 @@ func (l limitsMiddleware) Do(ctx context.Context, r tripperware.Request) (trippe } // Enforce the max query length. - if maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength); maxQueryLength > 0 { + maxQueryLength := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, l.MaxQueryLength) + if maxQueryLength > 0 { queryLen := timestamp.Time(r.GetEnd()).Sub(timestamp.Time(r.GetStart())) if queryLen > maxQueryLength { return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, queryLen, maxQueryLength) } + + expr, err := parser.ParseExpr(r.GetQuery()) + if err != nil { + // Let Querier propagates the parsing error. + return l.next.Do(ctx, r) + } + + // Enforce query length across all selectors in the query. + min, max := promql.FindMinMaxTime(&parser.EvalStmt{Expr: expr, Start: util.TimeFromMillis(0), End: util.TimeFromMillis(0), LookbackDelta: l.lookbackDelta}) + diff := util.TimeFromMillis(max).Sub(util.TimeFromMillis(min)) + if diff > maxQueryLength { + return nil, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, diff, maxQueryLength) + } } return l.next.Do(ctx, r) diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index d5d4a6b230..6d01883186 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -77,7 +77,7 @@ func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) { } limits := mockLimits{maxQueryLookback: testData.maxQueryLookback} - middleware := NewLimitsMiddleware(limits) + middleware := NewLimitsMiddleware(limits, 5*time.Minute) innerRes := NewEmptyPrometheusResponse() inner := &mockHandler{} @@ -117,6 +117,7 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { tests := map[string]struct { maxQueryLength time.Duration + query string reqStartTime time.Time reqEndTime time.Time expectedErr string @@ -126,11 +127,31 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { reqStartTime: time.Unix(0, 0), reqEndTime: now, }, + "even though failed to parse expression, should return no error since request will pass to next middleware": { + query: `up[`, + reqStartTime: now.Add(-time.Hour), + reqEndTime: now, + maxQueryLength: thirtyDays, + }, "should succeed on a query on short time range, ending now": { maxQueryLength: thirtyDays, reqStartTime: now.Add(-time.Hour), reqEndTime: now, }, + "should fail on query with time window > max query length": { + query: "up[31d]", + maxQueryLength: thirtyDays, + reqStartTime: now.Add(-time.Hour), + reqEndTime: now, + expectedErr: "the query time range exceeds the limit", + }, + "should fail on query with time window > max query length, considering multiple selects": { + query: "rate(up[20d]) + rate(up[20d] offset 20d)", + maxQueryLength: thirtyDays, + reqStartTime: now.Add(-time.Hour), + reqEndTime: now, + expectedErr: "the query time range exceeds the limit", + }, "should succeed on a query on short time range, ending in the past": { maxQueryLength: thirtyDays, reqStartTime: now.Add(-2 * thirtyDays).Add(-time.Hour), @@ -160,12 +181,16 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() req := &PrometheusRequest{ + Query: testData.query, Start: util.TimeToMillis(testData.reqStartTime), End: util.TimeToMillis(testData.reqEndTime), } + if req.Query == "" { + req.Query = "up" + } limits := mockLimits{maxQueryLength: testData.maxQueryLength} - middleware := NewLimitsMiddleware(limits) + middleware := NewLimitsMiddleware(limits, 5*time.Minute) innerRes := NewEmptyPrometheusResponse() inner := &mockHandler{} diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 0201856179..b84d0f83ed 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -79,11 +79,12 @@ func Middlewares( queryAnalyzer querysharding.Analyzer, prometheusCodec tripperware.Codec, shardedPrometheusCodec tripperware.Codec, + lookbackDelta time.Duration, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) - queryRangeMiddleware := []tripperware.Middleware{NewLimitsMiddleware(limits)} + queryRangeMiddleware := []tripperware.Middleware{NewLimitsMiddleware(limits, lookbackDelta)} if cfg.AlignQueriesWithStep { queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware) } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 027dddb446..619a65c210 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -60,6 +60,7 @@ func TestRoundTrip(t *testing.T) { qa, PrometheusCodec, ShardedPrometheusCodec, + 5*time.Minute, ) require.NoError(t, err) diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index 564263216a..5004c60a80 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -309,7 +309,7 @@ func TestSplitByDay(t *testing.T) { roundtripper := tripperware.NewRoundTripper(singleHostRoundTripper{ host: u.Host, next: http.DefaultTransport, - }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil)) + }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}, 5*time.Minute), SplitByIntervalMiddleware(interval, mockLimits{}, PrometheusCodec, nil)) req, err := http.NewRequest("GET", tc.path, http.NoBody) require.NoError(t, err) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index bfc64dbb8b..e5a535cf69 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -3,6 +3,7 @@ package ruler import ( "context" "errors" + "fmt" "time" "github.com/go-kit/log" @@ -15,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage" v1 "github.com/prometheus/prometheus/web/api/v1" @@ -24,6 +26,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -145,6 +148,7 @@ func (t *PusherAppendable) Appender(ctx context.Context) storage.Appender { // RulesLimits defines limits used by Ruler. type RulesLimits interface { EvaluationDelay(userID string) time.Duration + MaxQueryLength(userID string) time.Duration RulerTenantShardSize(userID string) int RulerMaxRuleGroupsPerTenant(userID string) int RulerMaxRulesPerRuleGroup(userID string) int @@ -154,8 +158,24 @@ type RulesLimits interface { // EngineQueryFunc returns a new engine query function by passing an altered timestamp. // Modified from Prometheus rules.EngineQueryFunc // https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189. -func EngineQueryFunc(engine v1.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string) rules.QueryFunc { +func EngineQueryFunc(engine v1.QueryEngine, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + // Enforce the max query length. + maxQueryLength := overrides.MaxQueryLength(userID) + if maxQueryLength > 0 { + expr, err := parser.ParseExpr(qs) + // If failed to parse expression, skip checking select range. + // Fail the query in the engine. + if err == nil { + // Enforce query length across all selectors in the query. + min, max := promql.FindMinMaxTime(&parser.EvalStmt{Expr: expr, Start: util.TimeFromMillis(0), End: util.TimeFromMillis(0), LookbackDelta: lookbackDelta}) + diff := util.TimeFromMillis(max).Sub(util.TimeFromMillis(min)) + if diff > maxQueryLength { + return nil, validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, diff, maxQueryLength)) + } + } + } + evaluationDelay := overrides.EvaluationDelay(userID) q, err := engine.NewInstantQuery(ctx, q, nil, qs, t.Add(-evaluationDelay)) if err != nil { @@ -296,7 +316,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID) failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID) - engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID) + engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID, cfg.LookbackDelta) metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries) return rules.NewManager(&rules.ManagerOptions{ diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 33fd464ccf..71db759d64 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -3,6 +3,7 @@ package ruler import ( "context" "errors" + "fmt" "math" "net/http" "testing" @@ -19,6 +20,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/util/validation" ) type fakePusher struct { @@ -173,9 +175,10 @@ func TestPusherErrors(t *testing.T) { func TestMetricsQueryFuncErrors(t *testing.T) { for name, tc := range map[string]struct { - returnedError error - expectedQueries int - expectedFailedQueries int + returnedError error + expectedQueries int + expectedFailedQueries int + notWrapQueryableErrors bool }{ "no error": { expectedQueries: 1, @@ -223,13 +226,24 @@ func TestMetricsQueryFuncErrors(t *testing.T) { expectedQueries: 1, expectedFailedQueries: 1, // unknown errors are not 400, so they are reported. }, + + "max query length validation error": { + returnedError: validation.LimitError(fmt.Sprintf(validation.ErrQueryTooLong, "10000", "1000")), + expectedQueries: 1, + expectedFailedQueries: 0, + notWrapQueryableErrors: true, + }, } { t.Run(name, func(t *testing.T) { queries := prometheus.NewCounter(prometheus.CounterOpts{}) failures := prometheus.NewCounter(prometheus.CounterOpts{}) mockFunc := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - return promql.Vector{}, WrapQueryableErrors(tc.returnedError) + err := tc.returnedError + if !tc.notWrapQueryableErrors { + err = WrapQueryableErrors(err) + } + return promql.Vector{}, err } qf := MetricsQueryFunc(mockFunc, queries, failures) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 42941bf59c..b846515526 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -136,6 +136,9 @@ type Config struct { RingCheckPeriod time.Duration `yaml:"-"` + // Field will be populated during runtime. + LookbackDelta time.Duration `yaml:"-"` + EnableQueryStats bool `yaml:"query_stats_enabled"` DisableRuleGroupLabel bool `yaml:"disable_rule_group_label"` } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index b046bc7be9..91863a22d3 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -87,6 +87,7 @@ type ruleLimits struct { maxRulesPerRuleGroup int maxRuleGroups int disabledRuleGroups validation.DisabledRuleGroups + maxQueryLength time.Duration } func (r ruleLimits) EvaluationDelay(_ string) time.Duration { @@ -109,6 +110,8 @@ func (r ruleLimits) DisabledRuleGroups(userID string) validation.DisabledRuleGro return r.disabledRuleGroups } +func (r ruleLimits) MaxQueryLength(_ string) time.Duration { return r.maxQueryLength } + func newEmptyQueryable() storage.Queryable { return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) { return emptyQuerier{}, nil