diff --git a/CHANGELOG.md b/CHANGELOG.md index efcf471713..7a8a00c5d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619 * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 +* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 ## 1.16.0 2023-11-20 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index c2c07da5e4..b5ab659830 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3041,6 +3041,18 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -frontend.max-outstanding-requests-per-tenant [max_outstanding_requests_per_tenant: | default = 100] +# Configuration for query priority. +query_priority: + # Whether queries are assigned with priorities. + [enabled: | default = false] + + # Priority assigned to all queries by default. Must be a unique value. Use + # this as a baseline to make certain queries higher/lower priority. + [default_priority: | default = 0] + + # List of priority definitions. + [priorities: | default = []] + # Duration to delay the evaluation of rules to ensure the underlying metrics # have been pushed to Cortex. # CLI flag: -ruler.evaluation-delay-duration @@ -5032,6 +5044,37 @@ otel: [tls_insecure_skip_verify: | default = false] ``` +### `PriorityDef` + +```yaml +# Priority level. Must be a unique value. +[priority: | default = 0] + +# Number of reserved queriers to handle priorities higher or equal to the +# priority level. Value between 0 and 1 will be used as a percentage. +[reserved_queriers: | default = 0] + +# List of query attributes to assign the priority. +[query_attributes: | default = []] +``` + +### `QueryAttribute` + +```yaml +# Regex that the query string should match. If not set, it won't be checked. +[regex: | default = ""] + +# Time window that the query should be within. If not set, it won't be checked. +time_window: + # Start of the time window that the query should be within. If set to 0, it + # won't be checked. + [start: | default = 0] + + # End of the time window that the query should be within. If set to 0, it + # won't be checked. + [end: | default = 0] +``` + ### `DisabledRuleGroup` ```yaml diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 7858483c68..4fc0969ee7 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -487,6 +487,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro queryAnalyzer, t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, + t.Cfg.Querier.LookbackDelta, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 8ef8fa3603..03dff13980 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -59,7 +59,7 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i cfg.FrontendV2.Port = grpcListenPort } - fr, err := v2.NewFrontend(cfg.FrontendV2, log, reg, retry) + fr, err := v2.NewFrontend(cfg.FrontendV2, limits, log, reg, retry) return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), nil, fr, err default: diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index d323a138cd..700145312f 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -343,6 +343,9 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u if ua := r.Header.Get("User-Agent"); len(ua) > 0 { logMessage = append(logMessage, "user_agent", ua) } + if queryPriority := r.Header.Get(util.QueryPriorityHeaderKey); len(queryPriority) > 0 { + logMessage = append(logMessage, "priority", queryPriority) + } if error != nil { s, ok := status.FromError(error) diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index 955323fa99..801bda6d81 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -21,6 +21,7 @@ import ( "github.com/weaveworks/common/user" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/util" ) type roundTripperFunc func(*http.Request) (*http.Response, error) @@ -301,34 +302,63 @@ func TestReportQueryStatsFormat(t *testing.T) { outputBuf := bytes.NewBuffer(nil) logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf)) handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, http.DefaultTransport, logger, nil) - userID := "fake" - queryString := url.Values(map[string][]string{"query": {"up"}}) - req, err := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil) - require.NoError(t, err) - req.Header = http.Header{ - "User-Agent": []string{"Grafana"}, - } - resp := &http.Response{ - ContentLength: 1000, + req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil) + resp := &http.Response{ContentLength: 1000} + responseTime := time.Second + statusCode := http.StatusOK + + type testCase struct { + queryString url.Values + queryStats *querier_stats.QueryStats + header http.Header + responseErr error + expectedLog string } - stats := &querier_stats.QueryStats{ - Stats: querier_stats.Stats{ - WallTime: 3 * time.Second, - FetchedSeriesCount: 100, - FetchedChunksCount: 200, - FetchedSamplesCount: 300, - FetchedChunkBytes: 1024, - FetchedDataBytes: 2048, + + tests := map[string]testCase{ + "should not include query and header details if empty": { + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 status_code=200 response_size=1000`, + }, + "should include query length and string at the end": { + queryString: url.Values(map[string][]string{"query": {"up"}}), + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 status_code=200 response_size=1000 query_length=2 param_query=up`, + }, + "should include query stats": { + queryStats: &querier_stats.QueryStats{ + Stats: querier_stats.Stats{ + WallTime: 3 * time.Second, + FetchedSeriesCount: 100, + FetchedChunksCount: 200, + FetchedSamplesCount: 300, + FetchedChunkBytes: 1024, + FetchedDataBytes: 2048, + }, + }, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 status_code=200 response_size=1000`, + }, + "should include user agent": { + header: http.Header{"User-Agent": []string{"Grafana"}}, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 status_code=200 response_size=1000 user_agent=Grafana`, + }, + "should include response error": { + responseErr: errors.New("foo_err"), + expectedLog: `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 status_code=200 response_size=1000 error=foo_err`, + }, + "should include query priority": { + queryString: url.Values(map[string][]string{"query": {"up"}}), + header: http.Header{util.QueryPriorityHeaderKey: []string{"99"}}, + expectedLog: `level=info msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=0 fetched_series_count=0 fetched_chunks_count=0 fetched_samples_count=0 fetched_chunks_bytes=0 fetched_data_bytes=0 status_code=200 response_size=1000 query_length=2 priority=99 param_query=up`, }, } - responseErr := errors.New("foo_err") - handler.reportQueryStats(req, userID, queryString, time.Second, stats, responseErr, http.StatusOK, resp) - data, err := io.ReadAll(outputBuf) - require.NoError(t, err) - - expectedLog := `level=error msg="query stats" component=query-frontend method=GET path=/prometheus/api/v1/query response_time=1s query_wall_time_seconds=3 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 fetched_chunks_bytes=1024 fetched_data_bytes=2048 status_code=200 response_size=1000 query_length=2 user_agent=Grafana error=foo_err param_query=up -` - require.Equal(t, expectedLog, string(data)) + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + req.Header = testData.header + handler.reportQueryStats(req, userID, testData.queryString, responseTime, testData.queryStats, testData.responseErr, statusCode, resp) + data, err := io.ReadAll(outputBuf) + require.NoError(t, err) + require.Equal(t, testData.expectedLog+"\n", string(data)) + }) + } } diff --git a/pkg/frontend/v1/frontend.go b/pkg/frontend/v1/frontend.go index ac5074dd1c..024c1f961a 100644 --- a/pkg/frontend/v1/frontend.go +++ b/pkg/frontend/v1/frontend.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "net/http" + "strconv" "time" "github.com/go-kit/log" @@ -51,7 +52,8 @@ type Limits interface { // MockLimits implements the Limits interface. Used in tests only. type MockLimits struct { - Queriers float64 + Queriers float64 + queryPriority validation.QueryPriority queue.MockLimits } @@ -59,6 +61,10 @@ func (l MockLimits) MaxQueriersPerUser(_ string) float64 { return l.Queriers } +func (l MockLimits) QueryPriority(_ string) validation.QueryPriority { + return l.queryPriority +} + // Frontend queues HTTP requests, dispatches them to backends, and handles retries // for requests which failed. type Frontend struct { @@ -93,6 +99,15 @@ type request struct { response chan *httpgrpc.HTTPResponse } +func (r request) Priority() int64 { + priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*r.request, util.QueryPriorityHeaderKey), 10, 64) + if err != nil { + return 0 + } + + return priority +} + // New creates a new frontend. Frontend implements service, and must be started and stopped. func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { f := &Frontend{ @@ -103,11 +118,11 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist queueLength: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_query_frontend_queue_length", Help: "Number of queries in the queue.", - }, []string{"user"}), + }, []string{"user", "priority", "type"}), discardedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_discarded_requests_total", Help: "Total number of query requests discarded.", - }, []string{"user"}), + }, []string{"user", "priority"}), queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_query_frontend_queue_duration_seconds", Help: "Time spend by requests queued.", @@ -160,8 +175,12 @@ func (f *Frontend) stopping(_ error) error { } func (f *Frontend) cleanupInactiveUserMetrics(user string) { - f.queueLength.DeleteLabelValues(user) - f.discardedRequests.DeleteLabelValues(user) + f.queueLength.DeletePartialMatch(prometheus.Labels{ + "user": user, + }) + f.discardedRequests.DeletePartialMatch(prometheus.Labels{ + "user": user, + }) } // RoundTripGRPC round trips a proto (instead of a HTTP request). diff --git a/pkg/frontend/v1/frontend_test.go b/pkg/frontend/v1/frontend_test.go index 7951dd1d60..1206969a93 100644 --- a/pkg/frontend/v1/frontend_test.go +++ b/pkg/frontend/v1/frontend_test.go @@ -211,7 +211,7 @@ func TestFrontendMetricsCleanup(t *testing.T) { require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_query_frontend_queue_length Number of queries in the queue. # TYPE cortex_query_frontend_queue_length gauge - cortex_query_frontend_queue_length{user="1"} 0 + cortex_query_frontend_queue_length{priority="0",type="fifo",user="1"} 0 `), "cortex_query_frontend_queue_length")) fr.cleanupInactiveUserMetrics("1") diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index dea15faeaf..2df0f8f344 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/grpcclient" @@ -64,10 +65,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Frontend struct { services.Service - cfg Config - log log.Logger - - retry *transport.Retry + cfg Config + log log.Logger + limits scheduler.Limits + retry *transport.Retry lastQueryID atomic.Uint64 @@ -112,7 +113,7 @@ type enqueueResult struct { } // NewFrontend creates a new frontend. -func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { +func NewFrontend(cfg Config, limits scheduler.Limits, log log.Logger, reg prometheus.Registerer, retry *transport.Retry) (*Frontend, error) { requestsCh := make(chan *frontendRequest) schedulerWorkers, err := newFrontendSchedulerWorkers(cfg, fmt.Sprintf("%s:%d", cfg.Addr, cfg.Port), requestsCh, log) @@ -122,6 +123,7 @@ func NewFrontend(cfg Config, log log.Logger, reg prometheus.Registerer, retry *t f := &Frontend{ cfg: cfg, + limits: limits, log: log, requestsCh: requestsCh, schedulerWorkers: schedulerWorkers, diff --git a/pkg/frontend/v2/frontend_test.go b/pkg/frontend/v2/frontend_test.go index 59729e1757..6b20926e89 100644 --- a/pkg/frontend/v2/frontend_test.go +++ b/pkg/frontend/v2/frontend_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb" "github.com/cortexproject/cortex/pkg/querier/stats" + "github.com/cortexproject/cortex/pkg/scheduler/queue" "github.com/cortexproject/cortex/pkg/scheduler/schedulerpb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -48,7 +49,7 @@ func setupFrontend(t *testing.T, schedulerReplyFunc func(f *Frontend, msg *sched //logger := log.NewLogfmtLogger(os.Stdout) logger := log.NewNopLogger() - f, err := NewFrontend(cfg, logger, nil, transport.NewRetry(maxRetries, nil)) + f, err := NewFrontend(cfg, queue.MockLimits{}, logger, nil, transport.NewRetry(maxRetries, nil)) require.NoError(t, err) frontendv2pb.RegisterFrontendForQuerierServer(server, f) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index a7350e65d5..252443f5a2 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -8,14 +8,12 @@ import ( "net/http" "net/url" "sort" - "strconv" "strings" "time" jsoniter "github.com/json-iterator/go" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" - "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -132,7 +130,7 @@ func (resp *PrometheusInstantQueryResponse) HTTPHeaders() map[string][]string { func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := PrometheusRequest{Headers: map[string][]string{}} var err error - result.Time, err = parseTimeParam(r, "time", c.now().Unix()) + result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix()) if err != nil { return nil, decorateWithParamName(err, "time") } @@ -630,15 +628,3 @@ func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) { return s.Result.GetRawBytes(), nil } } - -func parseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) { - val := r.FormValue(paramName) - if val == "" { - val = strconv.FormatInt(defaultValue, 10) - } - result, err := util.ParseTime(val) - if err != nil { - return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName) - } - return result, nil -} diff --git a/pkg/querier/tripperware/limits.go b/pkg/querier/tripperware/limits.go index 15ce78592f..815693b3c1 100644 --- a/pkg/querier/tripperware/limits.go +++ b/pkg/querier/tripperware/limits.go @@ -1,6 +1,10 @@ package tripperware -import "time" +import ( + "time" + + "github.com/cortexproject/cortex/pkg/util/validation" +) // Limits allows us to specify per-tenant runtime limits on the behavior of // the query handling code. @@ -21,4 +25,7 @@ type Limits interface { // QueryVerticalShardSize returns the maximum number of queriers that can handle requests for this user. QueryVerticalShardSize(userID string) int + + // QueryPriority returns the query priority config for the tenant, including different priorities and their attributes. + QueryPriority(userID string) validation.QueryPriority } diff --git a/pkg/querier/tripperware/priority.go b/pkg/querier/tripperware/priority.go new file mode 100644 index 0000000000..4a8d3ae07f --- /dev/null +++ b/pkg/querier/tripperware/priority.go @@ -0,0 +1,100 @@ +package tripperware + +import ( + "net/http" + "strings" + "time" + + "github.com/prometheus/prometheus/promql/parser" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func GetPriority(r *http.Request, userID string, limits Limits, now time.Time, lookbackDelta time.Duration) (int64, error) { + isQuery := strings.HasSuffix(r.URL.Path, "/query") + isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range") + queryPriority := limits.QueryPriority(userID) + query := r.FormValue("query") + + if (!isQuery && !isQueryRange) || !queryPriority.Enabled || query == "" { + return 0, nil + } + + expr, err := parser.ParseExpr(query) + if err != nil { + return 0, err + } + + if len(queryPriority.Priorities) == 0 { + return queryPriority.DefaultPriority, nil + } + + var startTime, endTime int64 + if isQuery { + if t, err := util.ParseTimeParam(r, "time", now.Unix()); err == nil { + startTime = t + endTime = t + } + } else if isQueryRange { + if st, err := util.ParseTime(r.FormValue("start")); err == nil { + if et, err := util.ParseTime(r.FormValue("end")); err == nil { + startTime = st + endTime = et + } + } + } + + es := &parser.EvalStmt{ + Expr: expr, + Start: util.TimeFromMillis(startTime), + End: util.TimeFromMillis(endTime), + LookbackDelta: lookbackDelta, + } + + minTime, maxTime := FindMinMaxTime(es) + + for _, priority := range queryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { + if attribute.Regex != "" && attribute.Regex != ".*" && attribute.Regex != ".+" { + if attribute.CompiledRegex != nil && !attribute.CompiledRegex.MatchString(query) { + continue + } + } + + if isWithinTimeAttributes(attribute.TimeWindow, now, minTime, maxTime) { + return priority.Priority, nil + } + } + } + + return queryPriority.DefaultPriority, nil +} + +func isWithinTimeAttributes(timeWindow validation.TimeWindow, now time.Time, startTime, endTime int64) bool { + if timeWindow.Start == 0 && timeWindow.End == 0 { + return true + } + + if timeWindow.Start != 0 { + startTimeThreshold := now.Add(-1 * time.Duration(timeWindow.Start).Abs()).Truncate(time.Second).Unix() + if startTime < startTimeThreshold { + return false + } + } + + if timeWindow.End != 0 { + endTimeThreshold := now.Add(-1 * time.Duration(timeWindow.End).Abs()).Add(1 * time.Second).Truncate(time.Second).Unix() + if endTime > endTimeThreshold { + return false + } + } + + return true +} + +func FindMinMaxTime(s *parser.EvalStmt) (int64, int64) { + // Placeholder until Prometheus is updated to include + // https://github.com/prometheus/prometheus/commit/9e3df532d8294d4fe3284bde7bc96db336a33552 + return s.Start.Unix(), s.End.Unix() +} diff --git a/pkg/querier/tripperware/priority_test.go b/pkg/querier/tripperware/priority_test.go new file mode 100644 index 0000000000..e95d9170d9 --- /dev/null +++ b/pkg/querier/tripperware/priority_test.go @@ -0,0 +1,279 @@ +package tripperware + +import ( + "bytes" + "net/http" + "regexp" + "strconv" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func Test_GetPriorityShouldReturnDefaultPriorityIfNotEnabledOrEmptyQueryString(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + }, + }, + }, + }, + }} + + type testCase struct { + url string + queryPriorityEnabled bool + } + + tests := map[string]testCase{ + "should miss if query priority not enabled": { + url: "/query?query=up", + }, + "should miss if query string empty": { + url: "/query?query=", + queryPriorityEnabled: true, + }, + "should miss if query string empty - range query": { + url: "/query_range?query=", + queryPriorityEnabled: true, + }, + "should miss if neither instant nor range query": { + url: "/series", + queryPriorityEnabled: true, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits.queryPriority.Enabled = testData.queryPriorityEnabled + req, _ := http.NewRequest(http.MethodPost, testData.url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now, 0) + assert.NoError(t, err) + assert.Equal(t, int64(0), priority) + }) + } +} + +func Test_GetPriorityShouldConsiderRegex(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + {}, + }, + }, + }, + }} + + type testCase struct { + regex string + query string + expectedPriority int + } + + tests := map[string]testCase{ + "should hit if regex matches": { + regex: "(^sum|c(.+)t)", + query: "sum(up)", + expectedPriority: 1, + }, + "should miss if regex doesn't match": { + regex: "(^sum|c(.+)t)", + query: "min(up)", + expectedPriority: 0, + }, + "should hit if regex matches - .*": { + regex: ".*", + query: "count(sum(up))", + expectedPriority: 1, + }, + "should hit if regex matches - .+": { + regex: ".+", + query: "count(sum(up))", + expectedPriority: 1, + }, + "should hit if regex is an empty string": { + regex: "", + query: "sum(up)", + expectedPriority: 1, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + limits.queryPriority.Priorities[0].QueryAttributes[0].Regex = testData.regex + limits.queryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = regexp.MustCompile(testData.regex) + req, _ := http.NewRequest(http.MethodPost, "/query?query="+testData.query, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now, 0) + assert.NoError(t, err) + assert.Equal(t, int64(testData.expectedPriority), priority) + }) + } +} + +func Test_GetPriorityShouldConsiderStartAndEndTime(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: ".*", + CompiledRegex: regexp.MustCompile(".*"), + TimeWindow: validation.TimeWindow{ + Start: model.Duration(45 * time.Minute), + End: model.Duration(15 * time.Minute), + }, + }, + }, + }, + }, + }} + + type testCase struct { + time time.Time + start time.Time + end time.Time + expectedPriority int + } + + tests := map[string]testCase{ + "should hit instant query between start and end time": { + time: now.Add(-30 * time.Minute), + expectedPriority: 1, + }, + "should hit instant query equal to start time": { + time: now.Add(-45 * time.Minute), + expectedPriority: 1, + }, + "should hit instant query equal to end time": { + time: now.Add(-15 * time.Minute), + expectedPriority: 1, + }, + "should miss instant query outside of end time": { + expectedPriority: 0, + }, + "should miss instant query outside of start time": { + time: now.Add(-60 * time.Minute), + expectedPriority: 0, + }, + "should hit range query between start and end time": { + start: now.Add(-40 * time.Minute), + end: now.Add(-20 * time.Minute), + expectedPriority: 1, + }, + "should hit range query equal to start and end time": { + start: now.Add(-45 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedPriority: 1, + }, + "should miss range query outside of start time": { + start: now.Add(-50 * time.Minute), + end: now.Add(-15 * time.Minute), + expectedPriority: 0, + }, + "should miss range query completely outside of start time": { + start: now.Add(-50 * time.Minute), + end: now.Add(-45 * time.Minute), + expectedPriority: 0, + }, + "should miss range query outside of end time": { + start: now.Add(-45 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedPriority: 0, + }, + "should miss range query completely outside of end time": { + start: now.Add(-15 * time.Minute), + end: now.Add(-10 * time.Minute), + expectedPriority: 0, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + var url string + if !testData.time.IsZero() { + url = "/query?query=sum(up)&time=" + strconv.FormatInt(testData.time.Unix(), 10) + } else if !testData.start.IsZero() { + url = "/query_range?query=sum(up)&start=" + strconv.FormatInt(testData.start.Unix(), 10) + url += "&end=" + strconv.FormatInt(testData.end.Unix(), 10) + } else { + url = "/query?query=sum(up)" + } + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now, 0) + assert.NoError(t, err) + assert.Equal(t, int64(testData.expectedPriority), priority) + }) + } +} + +func Test_GetPriorityShouldNotConsiderStartAndEndTimeIfEmpty(t *testing.T) { + now := time.Now() + limits := mockLimits{queryPriority: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + QueryAttributes: []validation.QueryAttribute{ + { + Regex: "^sum\\(up\\)$", + }, + }, + }, + }, + }} + + type testCase struct { + time time.Time + start time.Time + end time.Time + } + + tests := map[string]testCase{ + "should hit instant query with no time": {}, + "should hit instant query with future time": { + time: now.Add(1000000 * time.Hour), + }, + "should hit instant query with very old time": { + time: now.Add(-1000000 * time.Hour), + }, + "should hit range query with very wide time window": { + start: now.Add(-1000000 * time.Hour), + end: now.Add(1000000 * time.Hour), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + var url string + if !testData.time.IsZero() { + url = "/query?query=sum(up)&time=" + strconv.FormatInt(testData.time.Unix(), 10) + } else if !testData.start.IsZero() { + url = "/query_range?query=sum(up)&start=" + strconv.FormatInt(testData.start.Unix(), 10) + url += "&end=" + strconv.FormatInt(testData.end.Unix(), 10) + } else { + url = "/query?query=sum(up)" + } + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) + priority, err := GetPriority(req, "", limits, now, 0) + assert.NoError(t, err) + assert.Equal(t, int64(1), priority) + }) + } +} diff --git a/pkg/querier/tripperware/queryrange/limits_test.go b/pkg/querier/tripperware/queryrange/limits_test.go index 1569ea2e3a..d5d4a6b230 100644 --- a/pkg/querier/tripperware/queryrange/limits_test.go +++ b/pkg/querier/tripperware/queryrange/limits_test.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestLimitsMiddleware_MaxQueryLookback(t *testing.T) { @@ -219,6 +220,10 @@ func (m mockLimits) QueryVerticalShardSize(userID string) int { return 0 } +func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { + return validation.QueryPriority{} +} + type mockHandler struct { mock.Mock } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 6276be72bd..518cdce885 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -75,6 +75,7 @@ func TestRoundTrip(t *testing.T) { qa, time.Minute, 0, + 0, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 6aefe4ccec..8ad3dd7fe0 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -19,6 +19,7 @@ import ( "context" "io" "net/http" + "strconv" "strings" "time" @@ -104,6 +105,7 @@ func NewQueryTripperware( queryAnalyzer querysharding.Analyzer, defaultSubQueryInterval time.Duration, maxSubQuerySteps int64, + lookbackDelta time.Duration, ) Tripperware { // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ @@ -142,15 +144,27 @@ func NewQueryTripperware( if err != nil { return nil, err } + now := time.Now() userStr := tenant.JoinTenantIDs(tenantIDs) - activeUsers.UpdateUserTimestamp(userStr, time.Now()) + activeUsers.UpdateUserTimestamp(userStr, now) queriesPerTenant.WithLabelValues(op, userStr).Inc() - if maxSubQuerySteps > 0 && (isQuery || isQueryRange) { + if isQuery || isQueryRange { query := r.FormValue("query") - // Check subquery step size. - if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { - return nil, err + + if maxSubQuerySteps > 0 { + // Check subquery step size. + if err := SubQueryStepSizeCheck(query, defaultSubQueryInterval, maxSubQuerySteps); err != nil { + return nil, err + } + } + + if limits != nil && limits.QueryPriority(userStr).Enabled { + priority, err := GetPriority(r, userStr, limits, now, lookbackDelta) + if err != nil { + return nil, err + } + r.Header.Set(util.QueryPriorityHeaderKey, strconv.FormatInt(priority, 10)) } } diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index e52514ee8d..2754c17baa 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -201,6 +201,7 @@ func TestRoundTrip(t *testing.T) { querysharding.NewQueryAnalyzer(), time.Minute, tc.maxSubQuerySteps, + 0, ) resp, err := tw(downstream).RoundTrip(req) if tc.expectedErr == nil { diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index 657d7daa3a..5cbad93ca8 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -21,6 +21,7 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/querysharding" + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestQueryShardQuery(t *testing.T, instantQueryCodec Codec, shardedPrometheusCodec Codec) { @@ -466,6 +467,7 @@ type mockLimits struct { maxQueryLength time.Duration maxCacheFreshness time.Duration shardSize int + queryPriority validation.QueryPriority } func (m mockLimits) MaxQueryLookback(string) time.Duration { @@ -488,6 +490,10 @@ func (m mockLimits) QueryVerticalShardSize(userID string) int { return m.shardSize } +func (m mockLimits) QueryPriority(userID string) validation.QueryPriority { + return m.queryPriority +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/scheduler/queue/queue.go b/pkg/scheduler/queue/queue.go index 42b8238b56..0d634debed 100644 --- a/pkg/scheduler/queue/queue.go +++ b/pkg/scheduler/queue/queue.go @@ -2,6 +2,7 @@ package queue import ( "context" + "strconv" "sync" "time" @@ -44,7 +45,9 @@ func FirstUser() UserIndex { } // Request stored into the queue. -type Request interface{} +type Request interface { + util.PriorityOp +} // RequestQueue holds incoming requests in per-user queues. It also assigns each user specified number of queriers, // and when querier asks for next request to handle (using GetNextRequestForQuerier), it returns requests @@ -59,20 +62,18 @@ type RequestQueue struct { queues *queues stopped bool - queueLength *prometheus.GaugeVec // Per user and reason. - totalRequests *prometheus.CounterVec // Per user. - discardedRequests *prometheus.CounterVec // Per user. + totalRequests *prometheus.CounterVec // Per user and priority. + discardedRequests *prometheus.CounterVec // Per user and priority. } func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, queueLength *prometheus.GaugeVec, discardedRequests *prometheus.CounterVec, limits Limits, registerer prometheus.Registerer) *RequestQueue { q := &RequestQueue{ - queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits), + queues: newUserQueues(maxOutstandingPerTenant, forgetDelay, limits, queueLength), connectedQuerierWorkers: atomic.NewInt32(0), - queueLength: queueLength, totalRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_request_queue_requests_total", Help: "Total number of query requests going to the request queue.", - }, []string{"user"}), + }, []string{"user", "priority"}), discardedRequests: discardedRequests, } @@ -95,27 +96,30 @@ func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers fl return ErrStopped } - shardSize := util.DynamicShardSize(maxQueriers, len(q.queues.queriers)) - queue := q.queues.getOrAddQueue(userID, shardSize) + maxQuerierCount := util.DynamicShardSize(maxQueriers, len(q.queues.queriers)) + queue := q.queues.getOrAddQueue(userID, maxQuerierCount) + maxOutstandingRequests := q.queues.limits.MaxOutstandingPerTenant(userID) + priority := strconv.FormatInt(req.Priority(), 10) + if queue == nil { // This can only happen if userID is "". return errors.New("no queue found") } - q.totalRequests.WithLabelValues(userID).Inc() - select { - case queue <- req: - q.queueLength.WithLabelValues(userID).Inc() - q.cond.Broadcast() - // Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns. - if successFn != nil { - successFn() - } - return nil - default: - q.discardedRequests.WithLabelValues(userID).Inc() + q.totalRequests.WithLabelValues(userID, priority).Inc() + + if queue.length() >= maxOutstandingRequests { + q.discardedRequests.WithLabelValues(userID, priority).Inc() return ErrTooManyRequests } + + queue.enqueueRequest(req) + q.cond.Broadcast() + // Call this function while holding a lock. This guarantees that no querier can fetch the request before function returns. + if successFn != nil { + successFn() + } + return nil } // GetNextRequestForQuerier find next user queue and takes the next request off of it. Will block if there are no requests. @@ -151,12 +155,17 @@ FindQueue: // Pick next request from the queue. for { - request := <-queue - if len(queue) == 0 { - q.queues.deleteQueue(userID) + minPriority, matchMinPriority := q.getPriorityForQuerier(userID, querierID) + request := queue.dequeueRequest(minPriority, matchMinPriority) + if request == nil { + // The queue does not contain request with the priority, wait for more requests + querierWait = true + goto FindQueue } - q.queueLength.WithLabelValues(userID).Dec() + if queue.length() == 0 { + q.queues.deleteQueue(userID) + } // Tell close() we've processed a request. q.cond.Broadcast() @@ -171,6 +180,13 @@ FindQueue: goto FindQueue } +func (q *RequestQueue) getPriorityForQuerier(userID string, querierID string) (int64, bool) { + if priority, ok := q.queues.userQueues[userID].reservedQueriers[querierID]; ok { + return priority, true + } + return 0, false +} + func (q *RequestQueue) forgetDisconnectedQueriers(_ context.Context) error { q.mtx.Lock() defer q.mtx.Unlock() diff --git a/pkg/scheduler/queue/queue_test.go b/pkg/scheduler/queue/queue_test.go index 8e8f0a94d0..52abe26270 100644 --- a/pkg/scheduler/queue/queue_test.go +++ b/pkg/scheduler/queue/queue_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/validation" ) func BenchmarkGetNextRequest(b *testing.B) { @@ -24,8 +25,8 @@ func BenchmarkGetNextRequest(b *testing.B) { for n := 0; n < b.N; n++ { queue := NewRequestQueue(maxOutstandingPerTenant, 0, - prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), MockLimits{MaxOutstanding: 100}, nil, ) @@ -39,7 +40,7 @@ func BenchmarkGetNextRequest(b *testing.B) { for j := 0; j < numTenants; j++ { userID := strconv.Itoa(j) - err := queue.EnqueueRequest(userID, "request", 0, nil) + err := queue.EnqueueRequest(userID, MockRequest{}, 0, nil) if err != nil { b.Fatal(err) } @@ -79,12 +80,12 @@ func BenchmarkQueueRequest(b *testing.B) { queues := make([]*RequestQueue, 0, b.N) users := make([]string, 0, numTenants) - requests := make([]string, 0, numTenants) + requests := make([]MockRequest, 0, numTenants) for n := 0; n < b.N; n++ { q := NewRequestQueue(maxOutstandingPerTenant, 0, - prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), MockLimits{MaxOutstanding: 100}, nil, ) @@ -96,7 +97,106 @@ func BenchmarkQueueRequest(b *testing.B) { queues = append(queues, q) for j := 0; j < numTenants; j++ { - requests = append(requests, fmt.Sprintf("%d-%d", n, j)) + requests = append(requests, MockRequest{id: fmt.Sprintf("%d-%d", n, j)}) + users = append(users, strconv.Itoa(j)) + } + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + for i := 0; i < maxOutstandingPerTenant; i++ { + for j := 0; j < numTenants; j++ { + err := queues[n].EnqueueRequest(users[j], requests[j], 0, nil) + if err != nil { + b.Fatal(err) + } + } + } + } +} + +func BenchmarkGetNextRequestPriorityQueue(b *testing.B) { + const maxOutstandingPerTenant = 2 + const numTenants = 50 + const queriers = 5 + + queues := make([]*RequestQueue, 0, b.N) + + for n := 0; n < b.N; n++ { + queue := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), + MockLimits{MaxOutstanding: 100, QueryPriorityVal: validation.QueryPriority{Enabled: true}}, + nil, + ) + queues = append(queues, queue) + + for ix := 0; ix < queriers; ix++ { + queue.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + } + + for i := 0; i < maxOutstandingPerTenant; i++ { + for j := 0; j < numTenants; j++ { + userID := strconv.Itoa(j) + + err := queue.EnqueueRequest(userID, MockRequest{priority: int64(i)}, 0, nil) + if err != nil { + b.Fatal(err) + } + } + } + } + + ctx := context.Background() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + idx := FirstUser() + for j := 0; j < maxOutstandingPerTenant*numTenants; j++ { + querier := "" + b: + // Find querier with at least one request to avoid blocking in getNextRequestForQuerier. + for _, q := range queues[i].queues.userQueues { + for qid := range q.queriers { + querier = qid + break b + } + } + + _, nidx, err := queues[i].GetNextRequestForQuerier(ctx, idx, querier) + if err != nil { + b.Fatal(err) + } + idx = nidx + } + } +} + +func BenchmarkQueueRequestPriorityQueue(b *testing.B) { + const maxOutstandingPerTenant = 2 + const numTenants = 50 + const queriers = 5 + + queues := make([]*RequestQueue, 0, b.N) + users := make([]string, 0, numTenants) + requests := make([]MockRequest, 0, numTenants) + + for n := 0; n < b.N; n++ { + q := NewRequestQueue(maxOutstandingPerTenant, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), + MockLimits{MaxOutstanding: 100, QueryPriorityVal: validation.QueryPriority{Enabled: true}}, + nil, + ) + + for ix := 0; ix < queriers; ix++ { + q.RegisterQuerierConnection(fmt.Sprintf("querier-%d", ix)) + } + + queues = append(queues, q) + + for j := 0; j < numTenants; j++ { + requests = append(requests, MockRequest{id: fmt.Sprintf("%d-%d", n, j), priority: int64(j)}) users = append(users, strconv.Itoa(j)) } } @@ -118,8 +218,8 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe const forgetDelay = 3 * time.Second queue := NewRequestQueue(1, forgetDelay, - prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}), - prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}), + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), MockLimits{MaxOutstanding: 100}, nil, ) @@ -149,7 +249,7 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe // Enqueue a request from an user which would be assigned to querier-1. // NOTE: "user-1" hash falls in the querier-1 shard. - require.NoError(t, queue.EnqueueRequest("user-1", "request", 1, nil)) + require.NoError(t, queue.EnqueueRequest("user-1", MockRequest{}, 1, nil)) startTime := time.Now() querier2wg.Wait() @@ -158,3 +258,146 @@ func TestRequestQueue_GetNextRequestForQuerier_ShouldGetRequestAfterReshardingBe // We expect that querier-2 got the request only after querier-1 forget delay is passed. assert.GreaterOrEqual(t, waitTime.Milliseconds(), forgetDelay.Milliseconds()) } + +func TestQueriersShouldGetHighPriorityQueryFirst(t *testing.T) { + queue := NewRequestQueue(0, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), + MockLimits{MaxOutstanding: 3, QueryPriorityVal: validation.QueryPriority{Enabled: true}}, + nil, + ) + ctx := context.Background() + queue.RegisterQuerierConnection("querier-1") + + normalRequest1 := MockRequest{ + id: "normal query 1", + } + normalRequest2 := MockRequest{ + id: "normal query 2", + } + highPriorityRequest := MockRequest{ + id: "high priority query", + priority: 1, + } + + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest1, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest2, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", highPriorityRequest, 1, func() {})) + + assert.Error(t, queue.EnqueueRequest("userID", highPriorityRequest, 1, func() {})) // should fail due to maxOutstandingPerTenant = 3 + nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + assert.Equal(t, highPriorityRequest, nextRequest) // high priority request returned, although it was enqueued the last +} + +func TestReservedQueriersShouldOnlyGetHighPriorityQueries(t *testing.T) { + queue := NewRequestQueue(0, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), + MockLimits{ + MaxOutstanding: 3, + QueryPriorityVal: validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + ReservedQueriers: 1, + }, + }, + }, + }, + nil, + ) + ctx := context.Background() + + queue.RegisterQuerierConnection("querier-1") + + normalRequest := MockRequest{ + id: "normal query", + } + priority1Request := MockRequest{ + id: "priority 1", + priority: 1, + } + + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", priority1Request, 1, func() {})) + + nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + assert.Equal(t, priority1Request, nextRequest) + + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + assert.Equal(t, priority1Request, nextRequest) + + ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + time.AfterFunc(2*time.Second, func() { + queue.cond.Broadcast() + }) + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1") + assert.Nil(t, nextRequest) + assert.Equal(t, 1, queue.queues.userQueues["userID"].queue.length()) + + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + + ctxTimeout, cancel = context.WithTimeout(ctx, 1*time.Second) + defer cancel() + + time.AfterFunc(2*time.Second, func() { + queue.cond.Broadcast() + }) + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctxTimeout, FirstUser(), "querier-1") + assert.Nil(t, nextRequest) + assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length()) +} + +func TestExitingRequestsShouldPersistEvenIfTheConfigHasChanged(t *testing.T) { + limits := MockLimits{ + MaxOutstanding: 3, + } + queue := NewRequestQueue(0, 0, + prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user", "priority", "type"}), + prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user", "priority"}), + limits, + nil, + ) + + ctx := context.Background() + + queue.RegisterQuerierConnection("querier-1") + + normalRequest := MockRequest{ + id: "normal query", + } + highPriorityRequest := MockRequest{ + id: "high priority query", + priority: 1, + } + + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", highPriorityRequest, 1, func() {})) + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + + limits.MaxOutstanding = 4 + limits.QueryPriorityVal = validation.QueryPriority{Enabled: true} + queue.queues.limits = limits + + assert.NoError(t, queue.EnqueueRequest("userID", normalRequest, 1, func() {})) + + nextRequest, _, _ := queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + assert.Equal(t, highPriorityRequest, nextRequest) + + nextRequest, _, _ = queue.GetNextRequestForQuerier(ctx, FirstUser(), "querier-1") + assert.Equal(t, normalRequest, nextRequest) + assert.Equal(t, 2, queue.queues.userQueues["userID"].queue.length()) +} + +type MockRequest struct { + id string + priority int64 +} + +func (r MockRequest) Priority() int64 { + return r.priority +} diff --git a/pkg/scheduler/queue/user_queues.go b/pkg/scheduler/queue/user_queues.go index 0f9cd1a081..25f562ee02 100644 --- a/pkg/scheduler/queue/user_queues.go +++ b/pkg/scheduler/queue/user_queues.go @@ -5,7 +5,10 @@ import ( "sort" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/validation" ) // Limits needed for the Query Scheduler - interface used for decoupling. @@ -13,6 +16,10 @@ type Limits interface { // MaxOutstandingPerTenant returns the limit to the maximum number // of outstanding requests per tenant per request queue. MaxOutstandingPerTenant(user string) int + + // QueryPriority returns query priority config for the tenant, including priority level, + // their attributes, and how many reserved queriers each priority has. + QueryPriority(user string) validation.QueryPriority } // querier holds information about a querier registered in the queue. @@ -50,15 +57,25 @@ type queues struct { sortedQueriers []string limits Limits + + queueLength *prometheus.GaugeVec // Per user, type and priority. } type userQueue struct { - ch chan Request + queue userRequestQueue // If not nil, only these queriers can handle user requests. If nil, all queriers can. // We set this to nil if number of available queriers <= maxQueriers. - queriers map[string]struct{} - maxQueriers int + queriers map[string]struct{} + + // Contains assigned priority for querier ID + reservedQueriers map[string]int64 + + // Stores last limit config for the user. When changed, re-populate queriers and reservedQueriers + maxQueriers int + maxOutstanding int + priorityList []int64 + priorityEnabled bool // Seed for shuffle sharding of queriers. This seed is based on userID only and is therefore consistent // between different frontends. @@ -68,7 +85,7 @@ type userQueue struct { index int } -func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits) *queues { +func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limits, queueLength *prometheus.GaugeVec) *queues { return &queues{ userQueues: map[string]*userQueue{}, users: nil, @@ -77,6 +94,7 @@ func newUserQueues(maxUserQueueSize int, forgetDelay time.Duration, limits Limit queriers: map[string]*querier{}, sortedQueriers: nil, limits: limits, + queueLength: queueLength, } } @@ -103,7 +121,8 @@ func (q *queues) deleteQueue(userID string) { // MaxQueriers is used to compute which queriers should handle requests for this user. // If maxQueriers is <= 0, all queriers can handle this user's requests. // If maxQueriers has changed since the last call, queriers for this are recomputed. -func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { +// It's also responsible to store user configs and update the attributes if related configs have changed. +func (q *queues) getOrAddQueue(userID string, maxQueriers int) userRequestQueue { // Empty user is not allowed, as that would break our users list ("" is used for free spot). if userID == "" { return nil @@ -114,19 +133,18 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { } uq := q.userQueues[userID] + priorityEnabled := q.limits.QueryPriority(userID).Enabled + maxOutstanding := q.limits.MaxOutstandingPerTenant(userID) + priorityList := getPriorityList(q.limits.QueryPriority(userID), maxQueriers) if uq == nil { - queueSize := q.limits.MaxOutstandingPerTenant(userID) - // 0 is the default value of the flag. If the old flag is set - // then we use its value for compatibility reason. - if q.maxUserQueueSize != 0 { - queueSize = q.maxUserQueueSize - } uq = &userQueue{ - ch: make(chan Request, queueSize), seed: util.ShuffleShardSeed(userID, ""), index: -1, } + + uq.queue = q.createUserRequestQueue(userID) + uq.maxOutstanding = q.limits.MaxOutstandingPerTenant(userID) q.userQueues[userID] = uq // Add user to the list of users... find first free spot, and put it there. @@ -143,6 +161,17 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { uq.index = len(q.users) q.users = append(q.users, userID) } + } else if (uq.priorityEnabled != priorityEnabled) || (!priorityEnabled && uq.maxOutstanding != maxOutstanding) { + tmpQueue := q.createUserRequestQueue(userID) + + // flush to new queue + for uq.queue.length() > 0 { + tmpQueue.enqueueRequest(uq.queue.dequeueRequest(0, false)) + } + + uq.queue = tmpQueue + uq.maxOutstanding = q.limits.MaxOutstandingPerTenant(userID) + uq.priorityEnabled = priorityEnabled } if uq.maxQueriers != maxQueriers { @@ -150,13 +179,48 @@ func (q *queues) getOrAddQueue(userID string, maxQueriers int) chan Request { uq.queriers = shuffleQueriersForUser(uq.seed, maxQueriers, q.sortedQueriers, nil) } - return uq.ch + if priorityEnabled && hasPriorityListChanged(uq.priorityList, priorityList) { + reservedQueriers := make(map[string]int64) + + i := 0 + for _, querierID := range q.sortedQueriers { + if i == len(priorityList) { + break + } + if _, ok := uq.queriers[querierID]; ok || uq.queriers == nil { + reservedQueriers[querierID] = priorityList[i] + i++ + } + } + + uq.reservedQueriers = reservedQueriers + uq.priorityList = priorityList + uq.priorityEnabled = priorityEnabled + } + + return uq.queue +} + +func (q *queues) createUserRequestQueue(userID string) userRequestQueue { + if q.limits.QueryPriority(userID).Enabled { + return NewPriorityRequestQueue(util.NewPriorityQueue(nil), userID, q.queueLength) + } + + queueSize := q.limits.MaxOutstandingPerTenant(userID) + + // 0 is the default value of the flag. If the old flag is set + // then we use its value for compatibility reason. + if q.maxUserQueueSize != 0 { + queueSize = q.maxUserQueueSize + } + + return NewFIFORequestQueue(make(chan Request, queueSize), userID, q.queueLength) } // Finds next queue for the querier. To support fair scheduling between users, client is expected // to pass last user index returned by this function as argument. Is there was no previous // last user index, use -1. -func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (chan Request, string, int) { +func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (userRequestQueue, string, int) { uid := lastUserIndex for iters := 0; iters < len(q.users); iters++ { @@ -173,16 +237,16 @@ func (q *queues) getNextQueueForQuerier(lastUserIndex int, querierID string) (ch continue } - q := q.userQueues[u] + uq := q.userQueues[u] - if q.queriers != nil { - if _, ok := q.queriers[querierID]; !ok { + if uq.queriers != nil { + if _, ok := uq.queriers[querierID]; !ok { // This querier is not handling the user. continue } } - return q.ch, u, uid + return uq.queue, u, uid } return nil, "", uid } @@ -302,7 +366,7 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri return nil } - result := make(map[string]struct{}, queriersToSelect) + queriers := make(map[string]struct{}, queriersToSelect) rnd := rand.New(rand.NewSource(userSeed)) scratchpad = scratchpad[:0] @@ -311,20 +375,63 @@ func shuffleQueriersForUser(userSeed int64, queriersToSelect int, allSortedQueri last := len(scratchpad) - 1 for i := 0; i < queriersToSelect; i++ { r := rnd.Intn(last + 1) - result[scratchpad[r]] = struct{}{} - // move selected item to the end, it won't be selected anymore. + queriers[scratchpad[r]] = struct{}{} scratchpad[r], scratchpad[last] = scratchpad[last], scratchpad[r] last-- } - return result + return queriers +} + +// getPriorityList returns a list of priorities, each priority repeated as much as number of reserved queriers. +// This is used when creating map of reserved queriers. +func getPriorityList(queryPriority validation.QueryPriority, totalQuerierCount int) []int64 { + var priorityList []int64 + + if queryPriority.Enabled { + for _, priority := range queryPriority.Priorities { + reservedQuerierShardSize := util.DynamicShardSize(priority.ReservedQueriers, totalQuerierCount) + + for i := 0; i < reservedQuerierShardSize; i++ { + priorityList = append(priorityList, priority.Priority) + } + } + } + + if len(priorityList) > totalQuerierCount { + return []int64{} + } + + return priorityList +} + +func hasPriorityListChanged(old, new []int64) bool { + if len(old) != len(new) { + return true + } + for i := range old { + if old[i] != new[i] { + return true + } + } + return false } // MockLimits implements the Limits interface. Used in tests only. type MockLimits struct { - MaxOutstanding int + MaxOutstanding int + MaxQueriersPerUserVal float64 + QueryPriorityVal validation.QueryPriority +} + +func (l MockLimits) MaxQueriersPerUser(_ string) float64 { + return l.MaxQueriersPerUserVal } func (l MockLimits) MaxOutstandingPerTenant(_ string) int { return l.MaxOutstanding } + +func (l MockLimits) QueryPriority(_ string) validation.QueryPriority { + return l.QueryPriorityVal +} diff --git a/pkg/scheduler/queue/user_queues_test.go b/pkg/scheduler/queue/user_queues_test.go index cc986cac41..ded597baa0 100644 --- a/pkg/scheduler/queue/user_queues_test.go +++ b/pkg/scheduler/queue/user_queues_test.go @@ -10,10 +10,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/validation" ) func TestQueues(t *testing.T) { - uq := newUserQueues(0, 0, MockLimits{}) + uq := newUserQueues(0, 0, MockLimits{}, nil) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -68,7 +70,7 @@ func TestQueues(t *testing.T) { } func TestQueuesWithQueriers(t *testing.T) { - uq := newUserQueues(0, 0, MockLimits{}) + uq := newUserQueues(0, 0, MockLimits{}, nil) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -145,7 +147,7 @@ func TestQueuesConsistency(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - uq := newUserQueues(0, testData.forgetDelay, MockLimits{}) + uq := newUserQueues(0, testData.forgetDelay, MockLimits{}, nil) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -194,7 +196,7 @@ func TestQueues_ForgetDelay(t *testing.T) { ) now := time.Now() - uq := newUserQueues(0, forgetDelay, MockLimits{}) + uq := newUserQueues(0, forgetDelay, MockLimits{}, nil) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -286,7 +288,7 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget ) now := time.Now() - uq := newUserQueues(0, forgetDelay, MockLimits{}) + uq := newUserQueues(0, forgetDelay, MockLimits{}, nil) assert.NotNil(t, uq) assert.NoError(t, isConsistent(uq)) @@ -351,6 +353,110 @@ func TestQueues_ForgetDelay_ShouldCorrectlyHandleQuerierReconnectingBeforeForget } } +func TestGetOrAddQueueShouldUpdateProperties(t *testing.T) { + limits := MockLimits{ + MaxOutstanding: 3, + } + q := newUserQueues(0, 0, limits, nil) + q.addQuerierConnection("q-1") + q.addQuerierConnection("q-2") + q.addQuerierConnection("q-3") + q.addQuerierConnection("q-4") + q.addQuerierConnection("q-5") + + queue := q.getOrAddQueue("userID", 2) + queue.enqueueRequest(MockRequest{}) + + assert.NotNil(t, q.userQueues["userID"]) + assert.Equal(t, 3, q.userQueues["userID"].maxOutstanding) + assert.Equal(t, 2, q.userQueues["userID"].maxQueriers) + assert.Equal(t, 5, len(q.queriers)) + assert.Equal(t, 2, len(q.userQueues["userID"].queriers)) + assert.Subset(t, getKeys(q.queriers), getKeys(q.userQueues["userID"].queriers)) + assert.IsType(t, &FIFORequestQueue{}, queue) + assert.Equal(t, 1, queue.length()) + + limits.MaxOutstanding = 10 + q.limits = limits + queue = q.getOrAddQueue("userID", 0) + + assert.Equal(t, 10, q.userQueues["userID"].maxOutstanding) + assert.Equal(t, 0, q.userQueues["userID"].maxQueriers) + assert.Nil(t, q.userQueues["userID"].queriers) + assert.IsType(t, &FIFORequestQueue{}, queue) + assert.Equal(t, 1, queue.length()) + + limits.QueryPriorityVal = validation.QueryPriority{Enabled: true, Priorities: []validation.PriorityDef{ + { + Priority: 1, + ReservedQueriers: 2, + }, + }} + q.limits = limits + queue = q.getOrAddQueue("userID", 3) + + assert.Equal(t, 10, q.userQueues["userID"].maxOutstanding) + assert.Equal(t, 3, q.userQueues["userID"].maxQueriers) + assert.Equal(t, 5, len(q.queriers)) + assert.Equal(t, 3, len(q.userQueues["userID"].queriers)) + assert.Equal(t, 2, len(q.userQueues["userID"].reservedQueriers)) + assert.IsType(t, &PriorityRequestQueue{}, queue) + assert.Equal(t, 1, queue.length()) + assert.ElementsMatch(t, []int64{1, 1}, q.userQueues["userID"].priorityList) + assert.True(t, q.userQueues["userID"].priorityEnabled) + assert.Subset(t, getKeys(q.queriers), getKeys(q.userQueues["userID"].queriers)) + assert.Subset(t, getKeys(q.userQueues["userID"].queriers), getKeys(q.userQueues["userID"].reservedQueriers)) + + limits.QueryPriorityVal = validation.QueryPriority{Enabled: true, Priorities: []validation.PriorityDef{ + { + Priority: 1, + ReservedQueriers: 0.5, + }, + }} + q.limits = limits + _ = q.getOrAddQueue("userID", 3) + + assert.Equal(t, 10, q.userQueues["userID"].maxOutstanding) + assert.Equal(t, 3, q.userQueues["userID"].maxQueriers) + assert.Equal(t, 5, len(q.queriers)) + assert.Equal(t, 3, len(q.userQueues["userID"].queriers)) + assert.Equal(t, 2, len(q.userQueues["userID"].reservedQueriers)) + assert.ElementsMatch(t, []int64{1, 1}, q.userQueues["userID"].priorityList) + + limits.QueryPriorityVal = validation.QueryPriority{Enabled: true, Priorities: []validation.PriorityDef{ + { + Priority: 1, + ReservedQueriers: 10, + }, + }} + q.limits = limits + _ = q.getOrAddQueue("userID", 3) + + assert.Equal(t, 10, q.userQueues["userID"].maxOutstanding) + assert.Equal(t, 3, q.userQueues["userID"].maxQueriers) + assert.Equal(t, 5, len(q.queriers)) + assert.Equal(t, 3, len(q.userQueues["userID"].queriers)) + assert.Equal(t, 0, len(q.userQueues["userID"].reservedQueriers)) + assert.ElementsMatch(t, []int64{}, q.userQueues["userID"].priorityList) + + limits.QueryPriorityVal.Enabled = false + q.limits = limits + queue = q.getOrAddQueue("userID", 3) + assert.IsType(t, &FIFORequestQueue{}, queue) + + // check the queriers and reservedQueriers map are consistent + for i := 0; i < 100; i++ { + queriers := q.userQueues["userID"].queriers + reservedQueriers := q.userQueues["userID"].reservedQueriers + q.userQueues["userID"].maxQueriers = 0 // reset to trigger querier assignment + q.userQueues["userID"].priorityList = []int64{} // reset to trigger reserved querier assignment + _ = q.getOrAddQueue("userID", 3) + + assert.Equal(t, queriers, q.userQueues["userID"].queriers) + assert.Equal(t, reservedQueriers, q.userQueues["userID"].reservedQueriers) + } +} + func generateTenant(r *rand.Rand) string { return fmt.Sprint("tenant-", r.Int()%5) } @@ -359,7 +465,7 @@ func generateQuerier(r *rand.Rand) string { return fmt.Sprint("querier-", r.Int()%5) } -func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Request { +func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) userRequestQueue { q := uq.getOrAddQueue(tenant, maxQueriers) assert.NotNil(t, q) assert.NoError(t, isConsistent(uq)) @@ -367,8 +473,8 @@ func getOrAdd(t *testing.T, uq *queues, tenant string, maxQueriers int) chan Req return q } -func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...chan Request) int { - var n chan Request +func confirmOrderForQuerier(t *testing.T, uq *queues, querier string, lastUserIndex int, qs ...userRequestQueue) int { + var n userRequestQueue for _, q := range qs { n, _, lastUserIndex = uq.getNextQueueForQuerier(lastUserIndex, querier) assert.Equal(t, q, n) @@ -437,11 +543,35 @@ func getUsersByQuerier(queues *queues, querierID string) []string { return userIDs } +func getKeys(x interface{}) []string { + var keys []string + + switch i := x.(type) { + case map[string]struct{}: + for k := range i { + keys = append(keys, k) + } + case map[string]*querier: + for k := range i { + keys = append(keys, k) + } + case map[string]int64: + for k := range i { + keys = append(keys, k) + } + } + + return keys +} + func TestShuffleQueriers(t *testing.T) { allQueriers := []string{"a", "b", "c", "d", "e"} - require.Nil(t, shuffleQueriersForUser(12345, 10, allQueriers, nil)) - require.Nil(t, shuffleQueriersForUser(12345, len(allQueriers), allQueriers, nil)) + queriers := shuffleQueriersForUser(12345, 10, allQueriers, nil) + require.Nil(t, queriers) + + queriers = shuffleQueriersForUser(12345, len(allQueriers), allQueriers, nil) + require.Nil(t, queriers) r1 := shuffleQueriersForUser(12345, 3, allQueriers, nil) require.Equal(t, 3, len(r1)) @@ -484,3 +614,36 @@ func TestShuffleQueriersCorrectness(t *testing.T) { } } } + +func TestHasPriorityListChanged(t *testing.T) { + require.True(t, hasPriorityListChanged([]int64{1, 2}, []int64{1, 3})) + require.False(t, hasPriorityListChanged([]int64{1, 2}, []int64{1, 2})) + require.True(t, hasPriorityListChanged([]int64{1, 2}, []int64{1})) + require.False(t, hasPriorityListChanged([]int64{}, []int64{})) +} + +func TestGetPriorityList(t *testing.T) { + queryPriority := validation.QueryPriority{ + Enabled: true, + Priorities: []validation.PriorityDef{ + { + Priority: 1, + ReservedQueriers: 2, + }, + { + Priority: 2, + ReservedQueriers: 3, + }, + }, + } + + assert.EqualValues(t, []int64{1, 1, 2, 2, 2}, getPriorityList(queryPriority, 10)) + assert.EqualValues(t, []int64{}, getPriorityList(queryPriority, 1)) + + queryPriority.Priorities[0].ReservedQueriers = 0.4 + queryPriority.Priorities[1].ReservedQueriers = 0.6 + assert.EqualValues(t, []int64{1, 1, 1, 1, 2, 2, 2, 2, 2, 2}, getPriorityList(queryPriority, 10)) + + queryPriority.Enabled = false + assert.Nil(t, getPriorityList(queryPriority, 10)) +} diff --git a/pkg/scheduler/queue/user_request_queue.go b/pkg/scheduler/queue/user_request_queue.go new file mode 100644 index 0000000000..20588988c2 --- /dev/null +++ b/pkg/scheduler/queue/user_request_queue.go @@ -0,0 +1,76 @@ +package queue + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/util" +) + +type userRequestQueue interface { + enqueueRequest(Request) + dequeueRequest(int64, bool) Request + length() int +} + +type FIFORequestQueue struct { + queue chan Request + userID string + queueLength *prometheus.GaugeVec +} + +func NewFIFORequestQueue(queue chan Request, userID string, queueLength *prometheus.GaugeVec) *FIFORequestQueue { + return &FIFORequestQueue{queue: queue, userID: userID, queueLength: queueLength} +} + +func (f *FIFORequestQueue) enqueueRequest(r Request) { + f.queue <- r + if f.queueLength != nil { + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Inc() + } +} + +func (f *FIFORequestQueue) dequeueRequest(_ int64, _ bool) Request { + r := <-f.queue + if f.queueLength != nil { + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "fifo").Dec() + } + return r +} + +func (f *FIFORequestQueue) length() int { + return len(f.queue) +} + +type PriorityRequestQueue struct { + queue *util.PriorityQueue + userID string + queueLength *prometheus.GaugeVec +} + +func NewPriorityRequestQueue(queue *util.PriorityQueue, userID string, queueLength *prometheus.GaugeVec) *PriorityRequestQueue { + return &PriorityRequestQueue{queue: queue, userID: userID, queueLength: queueLength} +} + +func (f *PriorityRequestQueue) enqueueRequest(r Request) { + f.queue.Enqueue(r) + if f.queueLength != nil { + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Inc() + } +} + +func (f *PriorityRequestQueue) dequeueRequest(minPriority int64, checkMinPriority bool) Request { + if checkMinPriority && f.queue.Peek().Priority() < minPriority { + return nil + } + r := f.queue.Dequeue() + if f.queueLength != nil { + f.queueLength.WithLabelValues(f.userID, strconv.FormatInt(r.Priority(), 10), "priority").Dec() + } + return r +} + +func (f *PriorityRequestQueue) length() int { + return f.queue.Length() +} diff --git a/pkg/scheduler/queue/user_request_queue_test.go b/pkg/scheduler/queue/user_request_queue_test.go new file mode 100644 index 0000000000..f1f2314585 --- /dev/null +++ b/pkg/scheduler/queue/user_request_queue_test.go @@ -0,0 +1,146 @@ +package queue + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + + "github.com/cortexproject/cortex/pkg/util" +) + +func TestFIFORequestQueue(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + queueLength := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_queue_length", + Help: "Number of queries in the queue.", + }, []string{"user", "priority", "type"}) + + queue := NewFIFORequestQueue(make(chan Request, 2), "userID", queueLength) + request1 := MockRequest{ + id: "request 1", + priority: 1, + } + request2 := MockRequest{ + id: "request 2", + priority: 2, + } + + queue.enqueueRequest(request1) + queue.enqueueRequest(request2) + + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="fifo",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="fifo",user="userID"} 1 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, 2, queue.length()) + assert.Equal(t, request1, queue.dequeueRequest(0, false)) + assert.Equal(t, 1, queue.length()) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="fifo",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="2",type="fifo",user="userID"} 1 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, request2, queue.dequeueRequest(0, false)) + assert.Equal(t, 0, queue.length()) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="fifo",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="2",type="fifo",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) +} + +func TestPriorityRequestQueue(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + queueLength := promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_query_scheduler_queue_length", + Help: "Number of queries in the queue.", + }, []string{"user", "priority", "type"}) + + queue := NewPriorityRequestQueue(util.NewPriorityQueue(nil), "userID", queueLength) + request1 := MockRequest{ + id: "request 1", + priority: 1, + } + request2 := MockRequest{ + id: "request 2", + priority: 2, + } + request3 := MockRequest{ + id: "request 3", + priority: 3, + } + + queue.enqueueRequest(request1) + queue.enqueueRequest(request2) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 1 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, 2, queue.length()) + assert.Equal(t, request2, queue.dequeueRequest(0, false)) + assert.Equal(t, 1, queue.length()) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, request1, queue.dequeueRequest(0, false)) + assert.Equal(t, 0, queue.length()) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) + + queue.enqueueRequest(request1) + queue.enqueueRequest(request2) + queue.enqueueRequest(request3) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="3",type="priority",user="userID"} 1 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, 3, queue.length()) + assert.Equal(t, request3, queue.dequeueRequest(2, true)) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="3",type="priority",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, 2, queue.length()) + assert.Equal(t, request2, queue.dequeueRequest(2, true)) + assert.Equal(t, 1, queue.length()) + assert.Nil(t, queue.dequeueRequest(2, true)) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 1 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="3",type="priority",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) + assert.Equal(t, request1, queue.dequeueRequest(2, false)) + assert.Equal(t, 0, queue.length()) + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_query_scheduler_queue_length Number of queries in the queue. + # TYPE cortex_query_scheduler_queue_length gauge + cortex_query_scheduler_queue_length{priority="1",type="priority",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="2",type="priority",user="userID"} 0 + cortex_query_scheduler_queue_length{priority="3",type="priority",user="userID"} 0 + `), "cortex_query_scheduler_queue_length")) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f03def5ea3..fa28485298 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -5,6 +5,7 @@ import ( "flag" "io" "net/http" + "strconv" "sync" "time" @@ -105,12 +106,12 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe s.queueLength = promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_query_scheduler_queue_length", Help: "Number of queries in the queue.", - }, []string{"user"}) + }, []string{"user", "priority", "type"}) s.discardedRequests = promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_scheduler_discarded_requests_total", Help: "Total number of query requests discarded.", - }, []string{"user"}) + }, []string{"user", "priority"}) s.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, s.queueLength, s.discardedRequests, s.limits, registerer) @@ -165,6 +166,15 @@ type schedulerRequest struct { parentSpanContext opentracing.SpanContext } +func (s schedulerRequest) Priority() int64 { + priority, err := strconv.ParseInt(httpgrpcutil.GetHeader(*s.request, util.QueryPriorityHeaderKey), 10, 64) + if err != nil { + return 0 + } + + return priority +} + // FrontendLoop handles connection from frontend. func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error { frontendAddress, frontendCtx, err := s.frontendConnected(frontend) @@ -514,8 +524,12 @@ func (s *Scheduler) stopping(_ error) error { } func (s *Scheduler) cleanupMetricsForInactiveUser(user string) { - s.queueLength.DeleteLabelValues(user) - s.discardedRequests.DeleteLabelValues(user) + s.queueLength.DeletePartialMatch(prometheus.Labels{ + "user": user, + }) + s.discardedRequests.DeletePartialMatch(prometheus.Labels{ + "user": user, + }) } func (s *Scheduler) getConnectedFrontendClientsMetric() float64 { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index b2ef66cc65..798843284e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -37,7 +37,7 @@ func setupScheduler(t *testing.T, reg prometheus.Registerer) (*Scheduler, schedu flagext.DefaultValues(&cfg) cfg.MaxOutstandingPerTenant = testMaxOutstandingPerTenant - s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: 100}}, log.NewNopLogger(), reg) + s, err := NewScheduler(cfg, frontendv1.MockLimits{Queriers: 2, MockLimits: queue.MockLimits{MaxOutstanding: testMaxOutstandingPerTenant}}, log.NewNopLogger(), reg) require.NoError(t, err) server := grpc.NewServer() @@ -430,8 +430,8 @@ func TestSchedulerMetrics(t *testing.T) { require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_query_scheduler_queue_length Number of queries in the queue. # TYPE cortex_query_scheduler_queue_length gauge - cortex_query_scheduler_queue_length{user="another"} 1 - cortex_query_scheduler_queue_length{user="test"} 1 + cortex_query_scheduler_queue_length{priority="0",type="fifo",user="another"} 1 + cortex_query_scheduler_queue_length{priority="0",type="fifo",user="test"} 1 `), "cortex_query_scheduler_queue_length")) scheduler.cleanupMetricsForInactiveUser("test") @@ -439,7 +439,7 @@ func TestSchedulerMetrics(t *testing.T) { require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_query_scheduler_queue_length Number of queries in the queue. # TYPE cortex_query_scheduler_queue_length gauge - cortex_query_scheduler_queue_length{user="another"} 1 + cortex_query_scheduler_queue_length{priority="0",type="fifo",user="another"} 1 `), "cortex_query_scheduler_queue_length")) } diff --git a/pkg/util/http.go b/pkg/util/http.go index 09fb3df38c..41daae0fc6 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -21,6 +21,7 @@ import ( yamlv3 "gopkg.in/yaml.v3" ) +const QueryPriorityHeaderKey = "X-Cortex-Query-Priority" const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)" // IsRequestBodyTooLarge returns true if the error is "http: request body too large". diff --git a/pkg/util/httpgrpcutil/header.go b/pkg/util/httpgrpcutil/header.go new file mode 100644 index 0000000000..ed453f01fa --- /dev/null +++ b/pkg/util/httpgrpcutil/header.go @@ -0,0 +1,27 @@ +package httpgrpcutil + +import ( + "github.com/weaveworks/common/httpgrpc" +) + +// GetHeader is similar to http.Header.Get, which gets the first value associated with the given key. +// If there are no values associated with the key, it returns "". +func GetHeader(r httpgrpc.HTTPRequest, key string) string { + values := GetHeaderValues(r, key) + if len(values) == 0 { + return "" + } + + return values[0] +} + +// GetHeaderValues is similar to http.Header.Values, which returns all values associated with the given key. +func GetHeaderValues(r httpgrpc.HTTPRequest, key string) []string { + for _, header := range r.Headers { + if header.GetKey() == key { + return header.GetValues() + } + } + + return []string{} +} diff --git a/pkg/util/priority_queue.go b/pkg/util/priority_queue.go index 1c2fbadd3b..9937c231c9 100644 --- a/pkg/util/priority_queue.go +++ b/pkg/util/priority_queue.go @@ -13,18 +13,16 @@ type PriorityQueue struct { cond *sync.Cond closing bool closed bool - hit map[string]struct{} queue queue lengthGauge prometheus.Gauge } -// Op is an operation on the priority queue. -type Op interface { - Key() string +// PriorityOp is an operation on the priority queue. +type PriorityOp interface { Priority() int64 // The larger the number the higher the priority. } -type queue []Op +type queue []PriorityOp func (q queue) Len() int { return len(q) } func (q queue) Less(i, j int) bool { return q[i].Priority() > q[j].Priority() } @@ -33,7 +31,7 @@ func (q queue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } // Push and Pop use pointer receivers because they modify the slice's length, // not just its contents. func (q *queue) Push(x interface{}) { - *q = append(*q, x.(Op)) + *q = append(*q, x.(PriorityOp)) } func (q *queue) Pop() interface{} { @@ -47,7 +45,6 @@ func (q *queue) Pop() interface{} { // NewPriorityQueue makes a new priority queue. func NewPriorityQueue(lengthGauge prometheus.Gauge) *PriorityQueue { pq := &PriorityQueue{ - hit: map[string]struct{}{}, lengthGauge: lengthGauge, } pq.cond = sync.NewCond(&pq.lock) @@ -67,7 +64,11 @@ func (pq *PriorityQueue) Length() int { func (pq *PriorityQueue) Close() { pq.lock.Lock() defer pq.lock.Unlock() - pq.closing = true + if len(pq.queue) > 0 { + pq.closing = true + } else { + pq.closed = true + } pq.cond.Broadcast() } @@ -77,37 +78,28 @@ func (pq *PriorityQueue) DiscardAndClose() { defer pq.lock.Unlock() pq.closed = true pq.queue = nil - pq.hit = map[string]struct{}{} pq.cond.Broadcast() } -// Enqueue adds an operation to the queue in priority order. Returns -// true if added; false if the operation was already on the queue. -func (pq *PriorityQueue) Enqueue(op Op) bool { +// Enqueue adds an operation to the queue in priority order. +func (pq *PriorityQueue) Enqueue(op PriorityOp) { pq.lock.Lock() defer pq.lock.Unlock() - if pq.closed { + if pq.closing || pq.closed { panic("enqueue on closed queue") } - _, enqueued := pq.hit[op.Key()] - if enqueued { - return false - } - - pq.hit[op.Key()] = struct{}{} heap.Push(&pq.queue, op) pq.cond.Broadcast() if pq.lengthGauge != nil { pq.lengthGauge.Inc() } - return true } -// Dequeue will return the op with the highest priority; block if queue is +// Dequeue will remove and return the op with the highest priority; block if queue is // empty; returns nil if queue is closed. -func (pq *PriorityQueue) Dequeue() Op { +func (pq *PriorityQueue) Dequeue() PriorityOp { pq.lock.Lock() defer pq.lock.Unlock() @@ -120,10 +112,15 @@ func (pq *PriorityQueue) Dequeue() Op { return nil } - op := heap.Pop(&pq.queue).(Op) - delete(pq.hit, op.Key()) + op := heap.Pop(&pq.queue).(PriorityOp) if pq.lengthGauge != nil { pq.lengthGauge.Dec() } return op } + +// Peek will return the op with the highest priority without removing it from the queue +func (pq *PriorityQueue) Peek() PriorityOp { + op := pq.queue[0] + return op +} diff --git a/pkg/util/priority_queue_test.go b/pkg/util/priority_queue_test.go index 79eaf2784f..168ed96137 100644 --- a/pkg/util/priority_queue_test.go +++ b/pkg/util/priority_queue_test.go @@ -2,7 +2,6 @@ package util import ( "runtime" - "strconv" "testing" "time" @@ -15,10 +14,6 @@ func (i simpleItem) Priority() int64 { return int64(i) } -func (i simpleItem) Key() string { - return strconv.FormatInt(int64(i), 10) -} - func TestPriorityQueueBasic(t *testing.T) { queue := NewPriorityQueue(nil) assert.Equal(t, 0, queue.Length(), "Expected length = 0") @@ -38,7 +33,10 @@ func TestPriorityQueuePriorities(t *testing.T) { queue := NewPriorityQueue(nil) queue.Enqueue(simpleItem(1)) queue.Enqueue(simpleItem(2)) + queue.Enqueue(simpleItem(2)) + assert.Equal(t, simpleItem(2), queue.Peek().(simpleItem), "Expected to peek simpleItem(2)") + assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)") @@ -50,7 +48,10 @@ func TestPriorityQueuePriorities2(t *testing.T) { queue := NewPriorityQueue(nil) queue.Enqueue(simpleItem(2)) queue.Enqueue(simpleItem(1)) + queue.Enqueue(simpleItem(2)) + assert.Equal(t, simpleItem(2), queue.Peek().(simpleItem), "Expected to peek simpleItem(2)") + assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") assert.Equal(t, simpleItem(2), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(2)") assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem), "Expected to dequeue simpleItem(1)") @@ -75,3 +76,17 @@ func TestPriorityQueueWait(t *testing.T) { t.Fatal("Close didn't unblock Dequeue.") } } + +func TestPriorityQueueClose(t *testing.T) { + queue := NewPriorityQueue(nil) + queue.Enqueue(simpleItem(1)) + queue.Close() + assert.Panics(t, func() { queue.Enqueue(simpleItem(2)) }) + assert.Equal(t, simpleItem(1), queue.Dequeue().(simpleItem)) + + queue = NewPriorityQueue(nil) + queue.Enqueue(simpleItem(1)) + queue.DiscardAndClose() + assert.Panics(t, func() { queue.Enqueue(simpleItem(2)) }) + assert.Nil(t, queue.Dequeue()) +} diff --git a/pkg/util/time.go b/pkg/util/time.go index 8816b1d7d2..a28c84b046 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -7,6 +7,7 @@ import ( "strconv" "time" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" ) @@ -48,6 +49,19 @@ func ParseTime(s string) (int64, error) { return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s) } +// ParseTimeParam parses the time request parameter into an int64, milliseconds since epoch. +func ParseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) { + val := r.FormValue(paramName) + if val == "" { + val = strconv.FormatInt(defaultValue, 10) + } + result, err := ParseTime(val) + if err != nil { + return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName) + } + return result, nil +} + // DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval. func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { // No duration? No jitter. diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index d60b2ca8e6..a10e3f1f78 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -6,9 +6,11 @@ import ( "errors" "flag" "math" + "regexp" "strings" "time" + "github.com/cespare/xxhash/v2" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "golang.org/x/time/rate" @@ -17,6 +19,8 @@ import ( ) var errMaxGlobalSeriesPerUserValidation = errors.New("The ingester.max-global-series-per-user limit is unsupported if distributor.shard-by-all-labels is disabled") +var errDuplicateQueryPriorities = errors.New("duplicate entry of priorities found. Make sure they are all unique, including the default priority") +var errCompilingQueryPriorityRegex = errors.New("error compiling query priority regex") // Supported values for enum limits const ( @@ -46,6 +50,29 @@ type DisabledRuleGroup struct { type DisabledRuleGroups []DisabledRuleGroup +type QueryPriority struct { + Enabled bool `yaml:"enabled" json:"enabled" doc:"nocli|description=Whether queries are assigned with priorities.|default=false"` + DefaultPriority int64 `yaml:"default_priority" json:"default_priority" doc:"nocli|description=Priority assigned to all queries by default. Must be a unique value. Use this as a baseline to make certain queries higher/lower priority.|default=0"` + Priorities []PriorityDef `yaml:"priorities" json:"priorities" doc:"nocli|description=List of priority definitions."` +} + +type PriorityDef struct { + Priority int64 `yaml:"priority" json:"priority" doc:"nocli|description=Priority level. Must be a unique value.|default=0"` + ReservedQueriers float64 `yaml:"reserved_queriers" json:"reserved_queriers" doc:"nocli|description=Number of reserved queriers to handle priorities higher or equal to the priority level. Value between 0 and 1 will be used as a percentage.|default=0"` + QueryAttributes []QueryAttribute `yaml:"query_attributes" json:"query_attributes" doc:"nocli|description=List of query attributes to assign the priority."` +} + +type QueryAttribute struct { + Regex string `yaml:"regex" json:"regex" doc:"nocli|description=Regex that the query string should match. If not set, it won't be checked."` + TimeWindow TimeWindow `yaml:"time_window" json:"time_window" doc:"nocli|description=Time window that the query should be within. If not set, it won't be checked."` + CompiledRegex *regexp.Regexp +} + +type TimeWindow struct { + Start model.Duration `yaml:"start" json:"start" doc:"nocli|description=Start of the time window that the query should be within. If set to 0, it won't be checked.|default=0"` + End model.Duration `yaml:"end" json:"end" doc:"nocli|description=End of the time window that the query should be within. If set to 0, it won't be checked.|default=0"` +} + // Limits describe all the limits for users; can be used to describe global default // limits via flags, or per-user limits via yaml config. type Limits struct { @@ -101,7 +128,10 @@ type Limits struct { QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"` // Query Frontend / Scheduler enforced limits. - MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` + MaxOutstandingPerTenant int `yaml:"max_outstanding_requests_per_tenant" json:"max_outstanding_requests_per_tenant"` + QueryPriority QueryPriority `yaml:"query_priority" json:"query_priority" doc:"nocli|description=Configuration for query priority."` + queryPriorityRegexHash uint64 + queryPriorityCompiledRegex map[string]*regexp.Regexp // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` @@ -244,7 +274,15 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { l.copyNotificationIntegrationLimits(defaultLimits.NotificationRateLimitPerIntegration) } type plain Limits - return unmarshal((*plain)(l)) + if err := unmarshal((*plain)(l)); err != nil { + return err + } + + if err := l.compileQueryPriorityRegex(); err != nil { + return err + } + + return nil } // UnmarshalJSON implements the json.Unmarshaler interface. @@ -262,7 +300,15 @@ func (l *Limits) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) dec.DisallowUnknownFields() - return dec.Decode((*plain)(l)) + if err := dec.Decode((*plain)(l)); err != nil { + return err + } + + if err := l.compileQueryPriorityRegex(); err != nil { + return err + } + + return nil } func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimitMap) { @@ -272,6 +318,61 @@ func (l *Limits) copyNotificationIntegrationLimits(defaults NotificationRateLimi } } +func (l *Limits) hasQueryPriorityRegexChanged() bool { + var newHash uint64 + + var seps = []byte{'\xff'} + h := xxhash.New() + for _, priority := range l.QueryPriority.Priorities { + for _, attribute := range priority.QueryAttributes { + _, _ = h.WriteString(attribute.Regex) + _, _ = h.Write(seps) + } + } + newHash = h.Sum64() + + if newHash != l.queryPriorityRegexHash { + l.queryPriorityRegexHash = newHash + return true + } + return false +} + +func (l *Limits) compileQueryPriorityRegex() error { + if l.QueryPriority.Enabled { + hasQueryPriorityRegexChanged := l.hasQueryPriorityRegexChanged() + prioritySet := map[int64]struct{}{} + newQueryPriorityCompiledRegex := map[string]*regexp.Regexp{} + + for i, priority := range l.QueryPriority.Priorities { + // Check for duplicate priority entry + if _, exists := prioritySet[priority.Priority]; exists { + return errDuplicateQueryPriorities + } + prioritySet[priority.Priority] = struct{}{} + + for j, attribute := range priority.QueryAttributes { + if hasQueryPriorityRegexChanged { + compiledRegex, err := regexp.Compile(attribute.Regex) + if err != nil { + return errors.Join(errCompilingQueryPriorityRegex, err) + } + newQueryPriorityCompiledRegex[attribute.Regex] = compiledRegex + l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = compiledRegex + } else { + l.QueryPriority.Priorities[i].QueryAttributes[j].CompiledRegex = l.queryPriorityCompiledRegex[attribute.Regex] + } + } + } + + if hasQueryPriorityRegexChanged { + l.queryPriorityCompiledRegex = newQueryPriorityCompiledRegex + } + } + + return nil +} + // When we load YAML from disk, we want the various per-customer limits // to default to any values specified on the command line, not default // command line values. This global contains those values. I (Tom) cannot @@ -492,6 +593,11 @@ func (o *Overrides) MaxOutstandingPerTenant(userID string) int { return o.GetOverridesForUser(userID).MaxOutstandingPerTenant } +// QueryPriority returns the query priority config for the tenant, including different priorities and their attributes +func (o *Overrides) QueryPriority(userID string) QueryPriority { + return o.GetOverridesForUser(userID).QueryPriority +} + // EnforceMetricName whether to enforce the presence of a metric name. func (o *Overrides) EnforceMetricName(userID string) bool { return o.GetOverridesForUser(userID).EnforceMetricName diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index 2631354893..e3b8c6d3ff 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -3,6 +3,7 @@ package validation import ( "encoding/json" "reflect" + "regexp" "strings" "testing" "time" @@ -596,3 +597,81 @@ tenant2: require.Equal(t, 3, ov.MaxDownloadedBytesPerRequest("tenant2")) require.Equal(t, 5, ov.MaxDownloadedBytesPerRequest("tenant3")) } + +func TestHasQueryPriorityRegexChanged(t *testing.T) { + l := Limits{ + QueryPriority: QueryPriority{ + Priorities: []PriorityDef{ + { + Priority: 1, + QueryAttributes: []QueryAttribute{ + { + Regex: "test", + }, + }, + }, + }, + }, + } + + require.True(t, l.hasQueryPriorityRegexChanged()) + + l.QueryPriority.Priorities[0].QueryAttributes[0].Regex = "new" + + require.True(t, l.hasQueryPriorityRegexChanged()) + + l.QueryPriority.Priorities[0].QueryAttributes[0].TimeWindow.Start = model.Duration(2 * time.Hour) + + require.False(t, l.hasQueryPriorityRegexChanged()) + + l.QueryPriority.Priorities[0].QueryAttributes = append(l.QueryPriority.Priorities[0].QueryAttributes, QueryAttribute{Regex: "hi"}) + + require.True(t, l.hasQueryPriorityRegexChanged()) + + l.QueryPriority.Priorities[0].QueryAttributes = l.QueryPriority.Priorities[0].QueryAttributes[:1] + + require.True(t, l.hasQueryPriorityRegexChanged()) +} + +func TestCompileQueryPriorityRegex(t *testing.T) { + l := Limits{ + QueryPriority: QueryPriority{ + Enabled: true, + Priorities: []PriorityDef{ + { + Priority: 1, + QueryAttributes: []QueryAttribute{ + { + Regex: "test", + }, + }, + }, + }, + }, + } + + require.Nil(t, l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + + err := l.compileQueryPriorityRegex() + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("test"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + + l.QueryPriority.Priorities[0].QueryAttributes[0].Regex = "new" + + err = l.compileQueryPriorityRegex() + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("new"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + + l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = nil + + err = l.compileQueryPriorityRegex() + require.NoError(t, err) + require.Equal(t, regexp.MustCompile("new"), l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) + + l.QueryPriority.Enabled = false + l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex = nil + + err = l.compileQueryPriorityRegex() + require.NoError(t, err) + require.Nil(t, l.QueryPriority.Priorities[0].QueryAttributes[0].CompiledRegex) +} diff --git a/tools/doc-generator/parser.go b/tools/doc-generator/parser.go index 3ebb313368..28ee31ba4f 100644 --- a/tools/doc-generator/parser.go +++ b/tools/doc-generator/parser.go @@ -227,7 +227,7 @@ func parseConfig(block *configBlock, cfg interface{}, flags map[uintptr]*flag.Fl required: isFieldRequired(field), fieldDesc: getFieldDescription(field, ""), fieldType: fieldType, - fieldDefault: fieldDefault, + fieldDefault: getFieldDefault(field, fieldDefault), }) continue } @@ -409,6 +409,10 @@ func getCustomFieldEntry(parent reflect.Type, field reflect.StructField, fieldVa return nil, err } + if fieldFlag == nil { + return nil, nil + } + return &configEntry{ kind: "field", name: getFieldName(field), @@ -470,6 +474,14 @@ func getFieldDescription(f reflect.StructField, fallback string) string { return fallback } +func getFieldDefault(f reflect.StructField, fieldDefault string) string { + if defaultValue := getDocTagValue(f, "default"); defaultValue != "" { + return defaultValue + } + + return fieldDefault +} + func isRootBlock(t reflect.Type) (string, string, bool) { for _, rootBlock := range rootBlocks { if t == rootBlock.structType {