From a8f11bffa5e876f702370dee5ab8c96071a7a538 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" <35939863+aallawala@users.noreply.github.com> Date: Mon, 9 Aug 2021 17:54:36 -0700 Subject: [PATCH 01/10] [querier] honor querier mint,maxt if no SelectHints are passed to Select Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/querier.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f260aa77cf..83bf9c3a21 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -300,16 +300,17 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers) } - // Kludge: Prometheus passes nil SelectHints if it is doing a 'series' operation, - // which needs only metadata. Here we expect that metadataQuerier querier will handle that. - // In Cortex it is not feasible to query entire history (with no mint/maxt), so we only ask ingesters and skip - // querying the long-term storage. + // If the querier receives a 'series' query, it means only metadata is needed. + // Here we expect that metadataQuerier querier will handle that. // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". // See: https://github.com/prometheus/prometheus/pull/8050 - if (sp == nil || sp.Func == "series") && !q.queryStoreForLabels { + if sp != nil && sp.Func == "series" && !q.queryStoreForLabels { // In this case, the query time range has already been validated when the querier has been // created. return q.metadataQuerier.Select(true, sp, matchers...) + } else if sp == nil { + // if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt + sp = &storage.SelectHints{Start: q.mint, End: q.maxt} } userID, err := tenant.TenantID(ctx) From 6ff18de7d95f5690c48914f57ac9d47e19366164 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Wed, 11 Aug 2021 18:10:22 -0700 Subject: [PATCH 02/10] export mockDistributor for later use in the ruler tests Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/distributor_queryable_test.go | 49 ++++------------------- pkg/querier/metadata_handler_test.go | 6 ++- pkg/querier/querier_test.go | 18 +++++---- pkg/querier/testutils/mock_distributor.go | 45 +++++++++++++++++++++ 4 files changed, 67 insertions(+), 51 deletions(-) create mode 100644 pkg/querier/testutils/mock_distributor.go diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 4c88aca64a..4f0b910609 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/querier/testutils" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/scrape" "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -29,7 +30,7 @@ const ( ) func TestDistributorQuerier(t *testing.T) { - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( model.Matrix{ // Matrixes are unsorted, so this tests that the labels get sorted. @@ -117,7 +118,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) for _, streamingEnabled := range []bool{false, true} { for testName, testData := range tests { t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) { - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) @@ -149,7 +150,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) } func TestDistributorQueryableFilter(t *testing.T) { - d := &mockDistributor{} + d := &testutils.MockDistributor{} dq := newDistributorQueryable(d, false, nil, 1*time.Hour) now := time.Now() @@ -175,7 +176,7 @@ func TestIngesterStreaming(t *testing.T) { }) require.NoError(t, err) - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ @@ -244,7 +245,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) { {Value: 5.5, TimestampMs: 5500}, } - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ @@ -316,7 +317,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { {Metric: model.Metric{"job": "baz"}}, {Metric: model.Metric{"job": "baz", "foo": "boom"}}, } - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers). Return(metrics, nil) @@ -350,37 +351,3 @@ func convertToChunks(t *testing.T, samples []cortexpb.Sample) []client.Chunk { return clientChunks } - -type mockDistributor struct { - mock.Mock -} - -func (m *mockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { - args := m.Called(ctx, from, to, matchers) - return args.Get(0).(model.Matrix), args.Error(1) -} -func (m *mockDistributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) { - args := m.Called(ctx, from, to, matchers) - return args.Get(0).(*client.ExemplarQueryResponse), args.Error(1) -} -func (m *mockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { - args := m.Called(ctx, from, to, matchers) - return args.Get(0).(*client.QueryStreamResponse), args.Error(1) -} -func (m *mockDistributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, lbl model.LabelName, matchers ...*labels.Matcher) ([]string, error) { - args := m.Called(ctx, from, to, lbl, matchers) - return args.Get(0).([]string), args.Error(1) -} -func (m *mockDistributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) { - args := m.Called(ctx, from, to) - return args.Get(0).([]string), args.Error(1) -} -func (m *mockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { - args := m.Called(ctx, from, to, matchers) - return args.Get(0).([]metric.Metric), args.Error(1) -} - -func (m *mockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { - args := m.Called(ctx) - return args.Get(0).([]scrape.MetricMetadata), args.Error(1) -} diff --git a/pkg/querier/metadata_handler_test.go b/pkg/querier/metadata_handler_test.go index 56c92641a1..e33b1d751f 100644 --- a/pkg/querier/metadata_handler_test.go +++ b/pkg/querier/metadata_handler_test.go @@ -7,13 +7,15 @@ import ( "net/http/httptest" "testing" + "github.com/cortexproject/cortex/pkg/querier/testutils" + "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func TestMetadataHandler_Success(t *testing.T) { - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("MetricsMetadata", mock.Anything).Return( []scrape.MetricMetadata{ {Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""}, @@ -51,7 +53,7 @@ func TestMetadataHandler_Success(t *testing.T) { } func TestMetadataHandler_Error(t *testing.T) { - d := &mockDistributor{} + d := &testutils.MockDistributor{} d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id")) handler := MetadataHandler(d) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7e68f701b8..d259418dd6 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/querier/testutils" + "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb" @@ -364,7 +366,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { t.Run(fmt.Sprintf("%s (ingester streaming enabled = %t)", name, cfg.IngesterStreaming), func(t *testing.T) { // We don't need to query any data for this test, so an empty store is fine. chunkStore := &emptyChunkStore{} - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) @@ -570,7 +572,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))} t.Run("query range", func(t *testing.T) { - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) @@ -599,7 +601,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("series", func(t *testing.T) { - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -631,7 +633,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("label names", func(t *testing.T) { - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -658,7 +660,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { matchers := []*labels.Matcher{ labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"), } - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -684,7 +686,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("label values", func(t *testing.T) { - distributor := &mockDistributor{} + distributor := &testutils.MockDistributor{} distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -713,7 +715,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { // mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor // so we can test everything is dedupe correctly. -func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *mockDistributor { +func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *testutils.MockDistributor { chunks, err := chunkcompat.ToChunks(cs.chunks) require.NoError(t, err) @@ -724,7 +726,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *moc matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) require.NoError(t, err) - result := &mockDistributor{} + result := &testutils.MockDistributor{} result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil) result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil) return result diff --git a/pkg/querier/testutils/mock_distributor.go b/pkg/querier/testutils/mock_distributor.go new file mode 100644 index 0000000000..0b0cf25626 --- /dev/null +++ b/pkg/querier/testutils/mock_distributor.go @@ -0,0 +1,45 @@ +package testutils + +import ( + "context" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/prom1/storage/metric" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/mock" +) + +type MockDistributor struct { + mock.Mock +} + +func (m *MockDistributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) { + args := m.Called(ctx, from, to, matchers) + return args.Get(0).(model.Matrix), args.Error(1) +} +func (m *MockDistributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*client.ExemplarQueryResponse, error) { + args := m.Called(ctx, from, to, matchers) + return args.Get(0).(*client.ExemplarQueryResponse), args.Error(1) +} +func (m *MockDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) { + args := m.Called(ctx, from, to, matchers) + return args.Get(0).(*client.QueryStreamResponse), args.Error(1) +} +func (m *MockDistributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, lbl model.LabelName, matchers ...*labels.Matcher) ([]string, error) { + args := m.Called(ctx, from, to, lbl, matchers) + return args.Get(0).([]string), args.Error(1) +} +func (m *MockDistributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) { + args := m.Called(ctx, from, to) + return args.Get(0).([]string), args.Error(1) +} +func (m *MockDistributor) MetricsForLabelMatchers(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { + args := m.Called(ctx, from, to, matchers) + return args.Get(0).([]metric.Metric), args.Error(1) +} + +func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { + args := m.Called(ctx) + return args.Get(0).([]scrape.MetricMetadata), args.Error(1) +} From f669abe07a849d0a34a7cb602441af8d7f561831 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Sun, 15 Aug 2021 00:28:25 -0700 Subject: [PATCH 03/10] add test for restoring FOR state in the ruler via distributors Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/querier_test.go | 18 +- .../{mock_distributor.go => testutils.go} | 21 ++ pkg/ruler/api_test.go | 19 +- pkg/ruler/lifecycle_test.go | 4 +- pkg/ruler/ruler_test.go | 188 ++++++++++++++++-- 5 files changed, 207 insertions(+), 43 deletions(-) rename pkg/querier/testutils/{mock_distributor.go => testutils.go} (78%) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index d259418dd6..92c6e51879 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -159,7 +159,7 @@ func TestQuerier(t *testing.T) { chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) distributor := mockDistibutorFor(t, chunkStore, through) - overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) require.NoError(t, err) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} @@ -282,7 +282,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e) distributor := &errDistributor{} - overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) require.NoError(t, err) queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -370,7 +370,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) require.NoError(t, err) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))} @@ -440,7 +440,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { var cfg Config flagext.DefaultValues(&cfg) - limits := defaultLimitsConfig() + limits := testutils.DefaultLimitsConfig() limits.MaxQueryLength = model.Duration(maxQueryLength) overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) @@ -562,7 +562,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { flagext.DefaultValues(&cfg) cfg.IngesterStreaming = ingesterStreaming - limits := defaultLimitsConfig() + limits := testutils.DefaultLimitsConfig() limits.MaxQueryLookback = testData.maxQueryLookback overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) @@ -904,7 +904,7 @@ func TestShortTermQueryToLTS(t *testing.T) { chunkStore := &emptyChunkStore{} distributor := &errDistributor{} - overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) require.NoError(t, err) queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -1022,9 +1022,3 @@ func (m *mockQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool { m.useQueryableCalled = true return true } - -func defaultLimitsConfig() validation.Limits { - limits := validation.Limits{} - flagext.DefaultValues(&limits) - return limits -} diff --git a/pkg/querier/testutils/mock_distributor.go b/pkg/querier/testutils/testutils.go similarity index 78% rename from pkg/querier/testutils/mock_distributor.go rename to pkg/querier/testutils/testutils.go index 0b0cf25626..55e98c95db 100644 --- a/pkg/querier/testutils/mock_distributor.go +++ b/pkg/querier/testutils/testutils.go @@ -4,6 +4,9 @@ import ( "context" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" @@ -43,3 +46,21 @@ func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricM args := m.Called(ctx) return args.Get(0).([]scrape.MetricMetadata), args.Error(1) } + +type TestConfig struct { + Cfg querier.Config + Distributor querier.Distributor + Stores []querier.QueryableWithFilter +} + +func DefaultQuerierConfig() querier.Config { + querierCfg := querier.Config{} + flagext.DefaultValues(&querierCfg) + return querierCfg +} + +func DefaultLimitsConfig() validation.Limits { + limits := validation.Limits{} + flagext.DefaultValues(&limits) + return limits +} diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go index 2fc6038682..14cb47f8f7 100644 --- a/pkg/ruler/api_test.go +++ b/pkg/ruler/api_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "io" "io/ioutil" "net/http" @@ -24,7 +25,7 @@ func TestRuler_rules(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -81,7 +82,7 @@ func TestRuler_rules_special_characters(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockSpecialCharRules)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -138,7 +139,7 @@ func TestRuler_alerts(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer r.StopAsync() @@ -174,7 +175,7 @@ func TestRuler_Create(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -265,7 +266,7 @@ func TestRuler_DeleteNamespace(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRulesNamespaces)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -304,11 +305,11 @@ func TestRuler_LimitsPerGroup(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck - r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1} + r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1} a := NewAPI(r, r.store, log.NewNopLogger()) @@ -359,11 +360,11 @@ func TestRuler_RulerGroupLimits(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(make(map[string]rulespb.RuleGroupList))) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck - r.limits = &ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1} + r.limits = ruleLimits{maxRuleGroups: 1, maxRulesPerRuleGroup: 1} a := NewAPI(r, r.store, log.NewNopLogger()) diff --git a/pkg/ruler/lifecycle_test.go b/pkg/ruler/lifecycle_test.go index 7b14e548b0..143a9de47d 100644 --- a/pkg/ruler/lifecycle_test.go +++ b/pkg/ruler/lifecycle_test.go @@ -24,7 +24,7 @@ func TestRulerShutdown(t *testing.T) { config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := buildRuler(t, config, nil) + r, rcleanup := buildRuler(t, config, nil, nil) defer rcleanup() r.cfg.EnableSharding = true @@ -59,7 +59,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { ctx := context.Background() config, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := buildRuler(t, config, nil) + r, rcleanup := buildRuler(t, config, nil, nil) defer rcleanup() r.cfg.EnableSharding = true r.cfg.Ring.HeartbeatPeriod = 100 * time.Millisecond diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 04d8d3a2c4..4503287e22 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -9,11 +9,19 @@ import ( "net/http/httptest" "os" "path/filepath" + "reflect" "sort" "strings" "sync" "testing" "time" + "unsafe" + + "github.com/cortexproject/cortex/pkg/chunk/purger" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/mock" "go.uber.org/atomic" @@ -41,6 +49,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/cortexpb" + querier_testutils "github.com/cortexproject/cortex/pkg/querier/testutils" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" @@ -102,7 +111,24 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int { return r.maxRulesPerRuleGroup } -func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, func()) { +func testQueryableFunc(querierTestConfig *querier_testutils.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc { + if querierTestConfig != nil { + // disable active query tracking for test + querierTestConfig.Cfg.ActiveQueryTrackerDir = "" + + overrides, _ := validation.NewOverrides(querier_testutils.DefaultLimitsConfig(), nil) + q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, purger.NewTombstonesLoader(nil, nil), reg, logger) + return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return q.Querier(ctx, mint, maxt) + } + } else { + return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return storage.NoopQuerier(), nil + } + } +} + +func testSetup(t *testing.T, querierTestConfig *querier_testutils.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer, func()) { dir, err := ioutil.TempDir("", filepath.Base(t.Name())) assert.NoError(t, err) cleanup := func() { @@ -117,10 +143,6 @@ func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, Timeout: 2 * time.Minute, }) - noopQueryable := storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return storage.NoopQuerier(), nil - }) - // Mock the pusher pusher := newPusherMock() pusher.MockPush(&cortexpb.WriteResponse{}, nil) @@ -128,12 +150,15 @@ func testSetup(t *testing.T, cfg Config) (*promql.Engine, storage.QueryableFunc, l := log.NewLogfmtLogger(os.Stdout) l = level.NewFilter(l, level.AllowInfo()) - return engine, noopQueryable, pusher, l, ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, cleanup + reg := prometheus.NewRegistry() + queryable := testQueryableFunc(querierTestConfig, reg, l) + + return engine, queryable, pusher, l, ruleLimits{evalDelay: 0, maxRuleGroups: 20, maxRulesPerRuleGroup: 15}, reg, cleanup } func newManager(t *testing.T, cfg Config) (*DefaultMultiTenantManager, func()) { - engine, noopQueryable, pusher, logger, overrides, cleanup := testSetup(t, cfg) - manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, noopQueryable, engine, overrides, nil), prometheus.NewRegistry(), logger) + engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, nil) + manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger) require.NoError(t, err) return manager, cleanup @@ -177,18 +202,17 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer } } -func buildRuler(t *testing.T, cfg Config, rulerAddrMap map[string]*Ruler) (*Ruler, func()) { - engine, noopQueryable, pusher, logger, overrides, cleanup := testSetup(t, cfg) - storage, err := NewLegacyRuleStore(cfg.StoreConfig, promRules.FileLoader{}, log.NewNopLogger()) +func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier_testutils.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) { + engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, querierTestConfig) + storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, promRules.FileLoader{}, log.NewNopLogger()) require.NoError(t, err) - reg := prometheus.NewRegistry() - managerFactory := DefaultTenantManagerFactory(cfg, pusher, noopQueryable, engine, overrides, reg) - manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, reg, log.NewNopLogger()) + managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg) + manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger()) require.NoError(t, err) ruler, err := newRuler( - cfg, + rulerConfig, manager, reg, logger, @@ -200,8 +224,8 @@ func buildRuler(t *testing.T, cfg Config, rulerAddrMap map[string]*Ruler) (*Rule return ruler, cleanup } -func newTestRuler(t *testing.T, cfg Config) (*Ruler, func()) { - ruler, cleanup := buildRuler(t, cfg, nil) +func newTestRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier_testutils.TestConfig) (*Ruler, func()) { + ruler, cleanup := buildRuler(t, rulerConfig, querierTestConfig, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) // Ensure all rules are loaded before usage @@ -261,7 +285,7 @@ func TestRuler_Rules(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -876,7 +900,7 @@ func TestSharding(t *testing.T) { DisabledTenants: tc.disabledUsers, } - r, cleanup := buildRuler(t, cfg, nil) + r, cleanup := buildRuler(t, cfg, nil, nil) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} t.Cleanup(cleanup) @@ -1076,7 +1100,7 @@ func TestRuler_ListAllRules(t *testing.T) { cfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) defer cleanup() - r, rcleanup := newTestRuler(t, cfg) + r, rcleanup := newTestRuler(t, cfg, nil) defer rcleanup() defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck @@ -1172,3 +1196,127 @@ func TestSendAlerts(t *testing.T) { }) } } + +// Tests for whether the Ruler is able to recover ALERTS_FOR_STATE state from Distributors only +func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { + // Test Setup + // alert FOR 30m, already ran for 10m, outage down at 15m prior to now(), outage tolerance set to 1hr + // EXPECTATION: for state for alert restores to 10m+(now-15m) + + // FIRST set up 1 Alert rule with 30m FOR duration + alertForDuration, _ := time.ParseDuration("30m") + mockRules := map[string]rulespb.RuleGroupList{ + "user1": { + &rulespb.RuleGroupDesc{ + Name: "group1", + Namespace: "namespace1", + User: "user1", + Rules: []*rulespb.RuleDesc{ + { + Alert: "UP_ALERT", + Expr: "UP < 1", + For: alertForDuration, + }, + }, + Interval: interval, + }, + }, + } + + // NEXT, set up ruler config with outage tolerance = 1hr + rulerCfg, cleanup := defaultRulerConfig(newMockRuleStore(mockRules)) + rulerCfg.OutageTolerance, _ = time.ParseDuration("1h") + defer cleanup() + + // NEXT, set up mock distributor containing sample, + // metric: ALERTS_FOR_STATE{alertname="UP_ALERT"}, ts: time.now()-15m, value: time.now()-25m + currentTime := time.Now().UTC() + downAtTime := currentTime.Add(time.Minute * -15) + downAtTimeMs := downAtTime.UnixNano() / int64(time.Millisecond) + downAtActiveAtTime := currentTime.Add(time.Minute * -25) + downAtActiveSec := downAtActiveAtTime.Unix() + d := &querier_testutils.MockDistributor{} + d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + model.Matrix{ + &model.SampleStream{ + Metric: model.Metric{ + labels.MetricName: "ALERTS_FOR_STATE", + // user1's only alert rule + labels.AlertName: model.LabelValue(mockRules["user1"][0].GetRules()[0].Alert), + }, + Values: []model.SamplePair{{Timestamp: model.Time(downAtTimeMs), Value: model.SampleValue(downAtActiveSec)}}, + }, + }, + nil) + querierConfig := querier_testutils.DefaultQuerierConfig() + querierConfig.IngesterStreaming = false + + // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. + r, rcleanup := newRuler(t, rulerCfg, &querier_testutils.TestConfig{Cfg: querierConfig, Distributor: d}) + r.syncRules(context.Background(), rulerSyncReasonInitial) + defer rcleanup() + + // assert initial state of rule group + ruleGroup := r.manager.GetRules("user1")[0] + require.Equal(t, time.Time{}, ruleGroup.GetLastEvaluation()) + require.Equal(t, "group1", ruleGroup.Name()) + require.Equal(t, 1, len(ruleGroup.Rules())) + + // assert initial state of rule within rule group + alertRule := ruleGroup.Rules()[0] + require.Equal(t, time.Time{}, alertRule.GetEvaluationTimestamp()) + require.Equal(t, "UP_ALERT", alertRule.Name()) + require.Equal(t, promRules.HealthUnknown, alertRule.Health()) + + // NEXT, evaluate the rule group the first time + ctx := user.InjectOrgID(context.Background(), "user1") + ruleGroup.Eval(ctx, currentTime) + + // assert alert state after first eval + // since the eval is done at the current timestamp, the activeAt timestamp of alert should equal current timestamp + require.Equal(t, "UP_ALERT", alertRule.Name()) + require.Equal(t, promRules.HealthGood, alertRule.Health()) + activeMap := reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() + for activeMap.Next() { + alertRuleActive := activeMap.Value().Elem() + + activeAtRaw := alertRuleActive.FieldByName("ActiveAt") + activeAtTime := reflect.NewAt(activeAtRaw.Type(), unsafe.Pointer(activeAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) + require.Equal(t, activeAtTime, currentTime) + + require.Equal(t, promRules.StatePending, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) + } + + // NEXT, restore the FOR state + ruleGroup.RestoreForState(currentTime) + + // assert alert state after RestoreForState + require.Equal(t, "UP_ALERT", alertRule.Name()) + require.Equal(t, promRules.HealthGood, alertRule.Health()) + activeMap = reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() + for activeMap.Next() { + alertRuleActive := activeMap.Value().Elem() + + activeAtRaw := alertRuleActive.FieldByName("ActiveAt") + activeAtTime := reflect.NewAt(activeAtRaw.Type(), unsafe.Pointer(activeAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) + require.Equal(t, activeAtTime, downAtActiveAtTime.Add(currentTime.Sub(downAtTime))) + + require.Equal(t, promRules.StatePending, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) + } + + // NEXT, 20 minutes is expected to be left, eval timestamp at currentTimestamp +20m + currentTime = currentTime.Add(time.Minute * 20) + ruleGroup.Eval(ctx, currentTime) + + // assert alert state after alert is firing + activeMap = reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() + for activeMap.Next() { + alertRuleActive := activeMap.Value().Elem() + + firedAtRaw := alertRuleActive.FieldByName("FiredAt") + firedAtTime := reflect.NewAt(firedAtRaw.Type(), unsafe.Pointer(firedAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) + require.Equal(t, firedAtTime, currentTime) + + require.Equal(t, promRules.StateFiring, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) + } +} From a8afda9b18138d96a7c098cc791cec02dc7a1420 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Mon, 16 Aug 2021 15:17:12 -0700 Subject: [PATCH 04/10] test for always active alert Signed-off-by: Abdurrahman J. Allawala --- pkg/ruler/ruler_test.go | 50 +++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 4503287e22..74c1d9b01e 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -1214,7 +1214,7 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { Rules: []*rulespb.RuleDesc{ { Alert: "UP_ALERT", - Expr: "UP < 1", + Expr: "1", // always fire for this test For: alertForDuration, }, }, @@ -1248,6 +1248,7 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { }, }, nil) + d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.") querierConfig := querier_testutils.DefaultQuerierConfig() querierConfig.IngesterStreaming = false @@ -1268,55 +1269,40 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { require.Equal(t, "UP_ALERT", alertRule.Name()) require.Equal(t, promRules.HealthUnknown, alertRule.Health()) - // NEXT, evaluate the rule group the first time + // NEXT, evaluate the rule group the first time and assert ctx := user.InjectOrgID(context.Background(), "user1") ruleGroup.Eval(ctx, currentTime) - // assert alert state after first eval // since the eval is done at the current timestamp, the activeAt timestamp of alert should equal current timestamp require.Equal(t, "UP_ALERT", alertRule.Name()) require.Equal(t, promRules.HealthGood, alertRule.Health()) - activeMap := reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() - for activeMap.Next() { - alertRuleActive := activeMap.Value().Elem() - activeAtRaw := alertRuleActive.FieldByName("ActiveAt") - activeAtTime := reflect.NewAt(activeAtRaw.Type(), unsafe.Pointer(activeAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) - require.Equal(t, activeAtTime, currentTime) + activeMapRaw := reflect.ValueOf(alertRule).Elem().FieldByName("active") + activeMapKeys := activeMapRaw.MapKeys() + require.True(t, len(activeMapKeys) == 1) - require.Equal(t, promRules.StatePending, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) - } + activeAlertRuleRaw := activeMapRaw.MapIndex(activeMapKeys[0]).Elem() + activeAtTimeRaw := activeAlertRuleRaw.FieldByName("ActiveAt") + + require.Equal(t, promRules.StatePending, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) + require.Equal(t, reflect.NewAt(activeAtTimeRaw.Type(), unsafe.Pointer(activeAtTimeRaw.UnsafeAddr())).Elem().Interface().(time.Time), currentTime) - // NEXT, restore the FOR state + // NEXT, restore the FOR state and assert ruleGroup.RestoreForState(currentTime) - // assert alert state after RestoreForState require.Equal(t, "UP_ALERT", alertRule.Name()) require.Equal(t, promRules.HealthGood, alertRule.Health()) - activeMap = reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() - for activeMap.Next() { - alertRuleActive := activeMap.Value().Elem() - - activeAtRaw := alertRuleActive.FieldByName("ActiveAt") - activeAtTime := reflect.NewAt(activeAtRaw.Type(), unsafe.Pointer(activeAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) - require.Equal(t, activeAtTime, downAtActiveAtTime.Add(currentTime.Sub(downAtTime))) - - require.Equal(t, promRules.StatePending, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) - } + require.Equal(t, promRules.StatePending, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) + require.Equal(t, reflect.NewAt(activeAtTimeRaw.Type(), unsafe.Pointer(activeAtTimeRaw.UnsafeAddr())).Elem().Interface().(time.Time), downAtActiveAtTime.Add(currentTime.Sub(downAtTime))) // NEXT, 20 minutes is expected to be left, eval timestamp at currentTimestamp +20m currentTime = currentTime.Add(time.Minute * 20) ruleGroup.Eval(ctx, currentTime) // assert alert state after alert is firing - activeMap = reflect.ValueOf(alertRule).Elem().FieldByName("active").MapRange() - for activeMap.Next() { - alertRuleActive := activeMap.Value().Elem() - - firedAtRaw := alertRuleActive.FieldByName("FiredAt") - firedAtTime := reflect.NewAt(firedAtRaw.Type(), unsafe.Pointer(firedAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) - require.Equal(t, firedAtTime, currentTime) + firedAtRaw := activeAlertRuleRaw.FieldByName("FiredAt") + firedAtTime := reflect.NewAt(firedAtRaw.Type(), unsafe.Pointer(firedAtRaw.UnsafeAddr())).Elem().Interface().(time.Time) + require.Equal(t, firedAtTime, currentTime) - require.Equal(t, promRules.StateFiring, promRules.AlertState(alertRuleActive.FieldByName("State").Int())) - } + require.Equal(t, promRules.StateFiring, promRules.AlertState(activeAlertRuleRaw.FieldByName("State").Int())) } From 90130adc12ea6d2f212be96579ff984fac29780f Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Tue, 17 Aug 2021 11:01:46 -0700 Subject: [PATCH 05/10] move querier/testutils to querier pkg Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/distributor_queryable_test.go | 14 +++++------ pkg/querier/metadata_handler_test.go | 6 ++--- pkg/querier/querier_test.go | 30 +++++++++++------------ pkg/querier/{testutils => }/testutils.go | 13 +++++----- pkg/ruler/ruler_test.go | 21 ++++++++-------- 5 files changed, 38 insertions(+), 46 deletions(-) rename pkg/querier/{testutils => }/testutils.go (90%) diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 4f0b910609..dbe6fc9ebe 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/querier/testutils" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" @@ -30,7 +28,7 @@ const ( ) func TestDistributorQuerier(t *testing.T) { - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( model.Matrix{ // Matrixes are unsorted, so this tests that the labels get sorted. @@ -118,7 +116,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) for _, streamingEnabled := range []bool{false, true} { for testName, testData := range tests { t.Run(fmt.Sprintf("%s (streaming enabled: %t)", testName, streamingEnabled), func(t *testing.T) { - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) @@ -150,7 +148,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) } func TestDistributorQueryableFilter(t *testing.T) { - d := &testutils.MockDistributor{} + d := &MockDistributor{} dq := newDistributorQueryable(d, false, nil, 1*time.Hour) now := time.Now() @@ -176,7 +174,7 @@ func TestIngesterStreaming(t *testing.T) { }) require.NoError(t, err) - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ @@ -245,7 +243,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) { {Value: 5.5, TimestampMs: 5500}, } - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( &client.QueryStreamResponse{ Chunkseries: []client.TimeSeriesChunk{ @@ -317,7 +315,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) { {Metric: model.Metric{"job": "baz"}}, {Metric: model.Metric{"job": "baz", "foo": "boom"}}, } - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers). Return(metrics, nil) diff --git a/pkg/querier/metadata_handler_test.go b/pkg/querier/metadata_handler_test.go index e33b1d751f..bf5f53bc25 100644 --- a/pkg/querier/metadata_handler_test.go +++ b/pkg/querier/metadata_handler_test.go @@ -7,15 +7,13 @@ import ( "net/http/httptest" "testing" - "github.com/cortexproject/cortex/pkg/querier/testutils" - "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func TestMetadataHandler_Success(t *testing.T) { - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("MetricsMetadata", mock.Anything).Return( []scrape.MetricMetadata{ {Metric: "alertmanager_dispatcher_aggregation_groups", Help: "Number of active aggregation groups", Type: "gauge", Unit: ""}, @@ -53,7 +51,7 @@ func TestMetadataHandler_Success(t *testing.T) { } func TestMetadataHandler_Error(t *testing.T) { - d := &testutils.MockDistributor{} + d := &MockDistributor{} d.On("MetricsMetadata", mock.Anything).Return([]scrape.MetricMetadata{}, fmt.Errorf("no user id")) handler := MetadataHandler(d) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 92c6e51879..38b0a36f25 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/querier/testutils" - "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb" @@ -159,7 +157,7 @@ func TestQuerier(t *testing.T) { chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) distributor := mockDistibutorFor(t, chunkStore, through) - overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) require.NoError(t, err) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} @@ -282,7 +280,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) { chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e) distributor := &errDistributor{} - overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) require.NoError(t, err) queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -366,11 +364,11 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryIntoFuture(t *testing.T) { t.Run(fmt.Sprintf("%s (ingester streaming enabled = %t)", name, cfg.IngesterStreaming), func(t *testing.T) { // We don't need to query any data for this test, so an empty store is fine. chunkStore := &emptyChunkStore{} - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) - overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) require.NoError(t, err) queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))} @@ -440,7 +438,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLength(t *testing.T) { var cfg Config flagext.DefaultValues(&cfg) - limits := testutils.DefaultLimitsConfig() + limits := DefaultLimitsConfig() limits.MaxQueryLength = model.Duration(maxQueryLength) overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) @@ -562,7 +560,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { flagext.DefaultValues(&cfg) cfg.IngesterStreaming = ingesterStreaming - limits := testutils.DefaultLimitsConfig() + limits := DefaultLimitsConfig() limits.MaxQueryLookback = testData.maxQueryLookback overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) @@ -572,7 +570,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { queryables := []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))} t.Run("query range", func(t *testing.T) { - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(model.Matrix{}, nil) distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{}, nil) @@ -601,7 +599,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("series", func(t *testing.T) { - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -633,7 +631,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("label names", func(t *testing.T) { - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("LabelNames", mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -660,7 +658,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { matchers := []*labels.Matcher{ labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"), } - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -686,7 +684,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { }) t.Run("label values", func(t *testing.T) { - distributor := &testutils.MockDistributor{} + distributor := &MockDistributor{} distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil) queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) @@ -715,7 +713,7 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) { // mockDistibutorFor duplicates the chunks in the mockChunkStore into the mockDistributor // so we can test everything is dedupe correctly. -func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *testutils.MockDistributor { +func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *MockDistributor { chunks, err := chunkcompat.ToChunks(cs.chunks) require.NoError(t, err) @@ -726,7 +724,7 @@ func mockDistibutorFor(t *testing.T, cs mockChunkStore, through model.Time) *tes matrix, err := chunk.ChunksToMatrix(context.Background(), cs.chunks, 0, through) require.NoError(t, err) - result := &testutils.MockDistributor{} + result := &MockDistributor{} result.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(matrix, nil) result.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&client.QueryStreamResponse{Chunkseries: []client.TimeSeriesChunk{tsc}}, nil) return result @@ -904,7 +902,7 @@ func TestShortTermQueryToLTS(t *testing.T) { chunkStore := &emptyChunkStore{} distributor := &errDistributor{} - overrides, err := validation.NewOverrides(testutils.DefaultLimitsConfig(), nil) + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) require.NoError(t, err) queryable, _, _ := New(cfg, overrides, distributor, []QueryableWithFilter{UseAlwaysQueryable(NewChunkStoreQueryable(cfg, chunkStore))}, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger()) diff --git a/pkg/querier/testutils/testutils.go b/pkg/querier/testutils.go similarity index 90% rename from pkg/querier/testutils/testutils.go rename to pkg/querier/testutils.go index 55e98c95db..f25cea403c 100644 --- a/pkg/querier/testutils/testutils.go +++ b/pkg/querier/testutils.go @@ -1,10 +1,9 @@ -package testutils +package querier import ( "context" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" - "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/prometheus/common/model" @@ -48,13 +47,13 @@ func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricM } type TestConfig struct { - Cfg querier.Config - Distributor querier.Distributor - Stores []querier.QueryableWithFilter + Cfg Config + Distributor Distributor + Stores []QueryableWithFilter } -func DefaultQuerierConfig() querier.Config { - querierCfg := querier.Config{} +func DefaultQuerierConfig() Config { + querierCfg := Config{} flagext.DefaultValues(&querierCfg) return querierCfg } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 74c1d9b01e..bcb2e77096 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -49,7 +49,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/cortexpb" - querier_testutils "github.com/cortexproject/cortex/pkg/querier/testutils" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler/rulespb" "github.com/cortexproject/cortex/pkg/ruler/rulestore" @@ -111,12 +110,12 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int { return r.maxRulesPerRuleGroup } -func testQueryableFunc(querierTestConfig *querier_testutils.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc { +func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc { if querierTestConfig != nil { // disable active query tracking for test querierTestConfig.Cfg.ActiveQueryTrackerDir = "" - overrides, _ := validation.NewOverrides(querier_testutils.DefaultLimitsConfig(), nil) + overrides, _ := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) q, _, _ := querier.New(querierTestConfig.Cfg, overrides, querierTestConfig.Distributor, querierTestConfig.Stores, purger.NewTombstonesLoader(nil, nil), reg, logger) return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return q.Querier(ctx, mint, maxt) @@ -128,7 +127,7 @@ func testQueryableFunc(querierTestConfig *querier_testutils.TestConfig, reg prom } } -func testSetup(t *testing.T, querierTestConfig *querier_testutils.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer, func()) { +func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Engine, storage.QueryableFunc, Pusher, log.Logger, RulesLimits, prometheus.Registerer, func()) { dir, err := ioutil.TempDir("", filepath.Base(t.Name())) assert.NoError(t, err) cleanup := func() { @@ -202,7 +201,7 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer } } -func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier_testutils.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) { +func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, rulerAddrMap map[string]*Ruler) (*Ruler, func()) { engine, queryable, pusher, logger, overrides, reg, cleanup := testSetup(t, querierTestConfig) storage, err := NewLegacyRuleStore(rulerConfig.StoreConfig, promRules.FileLoader{}, log.NewNopLogger()) require.NoError(t, err) @@ -218,13 +217,13 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier_tes logger, storage, overrides, - newMockClientsPool(cfg, logger, reg, rulerAddrMap), + newMockClientsPool(rulerConfig, logger, reg, rulerAddrMap), ) require.NoError(t, err) return ruler, cleanup } -func newTestRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier_testutils.TestConfig) (*Ruler, func()) { +func newTestRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig) (*Ruler, func()) { ruler, cleanup := buildRuler(t, rulerConfig, querierTestConfig, nil) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler)) @@ -397,7 +396,7 @@ func TestGetRules(t *testing.T) { }, } - r, cleanUp := buildRuler(t, cfg, rulerAddrMap) + r, cleanUp := buildRuler(t, cfg, nil, rulerAddrMap) r.limits = ruleLimits{evalDelay: 0, tenantShard: tc.shuffleShardSize} t.Cleanup(cleanUp) rulerAddrMap[id] = r @@ -1235,7 +1234,7 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { downAtTimeMs := downAtTime.UnixNano() / int64(time.Millisecond) downAtActiveAtTime := currentTime.Add(time.Minute * -25) downAtActiveSec := downAtActiveAtTime.Unix() - d := &querier_testutils.MockDistributor{} + d := &querier.MockDistributor{} d.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( model.Matrix{ &model.SampleStream{ @@ -1249,11 +1248,11 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { }, nil) d.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Panic("This should not be called for the ruler use-cases.") - querierConfig := querier_testutils.DefaultQuerierConfig() + querierConfig := querier.DefaultQuerierConfig() querierConfig.IngesterStreaming = false // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. - r, rcleanup := newRuler(t, rulerCfg, &querier_testutils.TestConfig{Cfg: querierConfig, Distributor: d}) + r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d}, nil) r.syncRules(context.Background(), rulerSyncReasonInitial) defer rcleanup() From 9e97424a3de2a55f0e40f86afe67e1ed140641cd Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Wed, 18 Aug 2021 13:57:57 -0700 Subject: [PATCH 06/10] add empty chunk store to ruler test Signed-off-by: Abdurrahman J. Allawala --- pkg/ruler/ruler_test.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index bcb2e77096..b99c20a463 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -110,6 +110,24 @@ func (r ruleLimits) RulerMaxRulesPerRuleGroup(_ string) int { return r.maxRulesPerRuleGroup } +type emptyChunkStore struct { + sync.Mutex + called bool +} + +func (c *emptyChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) { + c.Lock() + defer c.Unlock() + c.called = true + return nil, nil +} + +func (c *emptyChunkStore) IsCalled() bool { + c.Lock() + defer c.Unlock() + return c.called +} + func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Registerer, logger log.Logger) storage.QueryableFunc { if querierTestConfig != nil { // disable active query tracking for test @@ -1196,8 +1214,8 @@ func TestSendAlerts(t *testing.T) { } } -// Tests for whether the Ruler is able to recover ALERTS_FOR_STATE state from Distributors only -func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { +// Tests for whether the Ruler is able to recover ALERTS_FOR_STATE state +func TestRecoverAlertsPostOutage(t *testing.T) { // Test Setup // alert FOR 30m, already ran for 10m, outage down at 15m prior to now(), outage tolerance set to 1hr // EXPECTATION: for state for alert restores to 10m+(now-15m) @@ -1251,6 +1269,11 @@ func TestRecoverAlertsPostOutageFromDistributors(t *testing.T) { querierConfig := querier.DefaultQuerierConfig() querierConfig.IngesterStreaming = false + // set up an empty store + queryables := []querier.QueryableWithFilter{ + querier.UseAlwaysQueryable(querier.NewChunkStoreQueryable(querierConfig, &emptyChunkStore{})), + } + // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d}, nil) r.syncRules(context.Background(), rulerSyncReasonInitial) From 2e40a58205b16eb90342259b04c82c443a5d0cc4 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Wed, 18 Aug 2021 14:00:35 -0700 Subject: [PATCH 07/10] add changelog entry Signed-off-by: Abdurrahman J. Allawala --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33be0acda9..4f5194ade1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * [BUGFIX] Memberlist: forward only changes, not entire original message. #4419 * [BUGFIX] Memberlist: don't accept old tombstones as incoming change, and don't forward such messages to other gossip members. #4420 * [BUGFIX] Querier: fixed panic when querying exemplars and using `-distributor.shard-by-all-labels=false`. #4473 +* [BUGFIX] Querier: honor querier minT,maxT if `nil` SelectHints are passed to Select(). #4413 ## 1.10.0 / 2021-08-03 From 87d72f190e0cddc2cccbfa904391be80f7140017 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Sun, 29 Aug 2021 17:47:54 -0700 Subject: [PATCH 08/10] lint Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/testutils.go | 10 ++++++---- pkg/ruler/ruler_test.go | 15 ++++++++------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/querier/testutils.go b/pkg/querier/testutils.go index f25cea403c..ca0f79fe7d 100644 --- a/pkg/querier/testutils.go +++ b/pkg/querier/testutils.go @@ -2,14 +2,16 @@ package querier import ( "context" - "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/prom1/storage/metric" - "github.com/cortexproject/cortex/pkg/util/flagext" - "github.com/cortexproject/cortex/pkg/util/validation" + + "github.com/grafana/dskit/flagext" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/mock" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/prom1/storage/metric" + "github.com/cortexproject/cortex/pkg/util/validation" ) type MockDistributor struct { diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index b99c20a463..5896b98161 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -17,11 +17,12 @@ import ( "time" "unsafe" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/mock" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/mock" "go.uber.org/atomic" @@ -138,10 +139,10 @@ func testQueryableFunc(querierTestConfig *querier.TestConfig, reg prometheus.Reg return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { return q.Querier(ctx, mint, maxt) } - } else { - return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return storage.NoopQuerier(), nil - } + } + + return func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return storage.NoopQuerier(), nil } } @@ -1241,7 +1242,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { } // NEXT, set up ruler config with outage tolerance = 1hr - rulerCfg, cleanup := defaultRulerConfig(newMockRuleStore(mockRules)) + rulerCfg, cleanup := defaultRulerConfig(t, newMockRuleStore(mockRules)) rulerCfg.OutageTolerance, _ = time.ParseDuration("1h") defer cleanup() From 9cdcd16d1f0cd678bce03714e53321653d95635d Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Sun, 29 Aug 2021 20:40:30 -0700 Subject: [PATCH 09/10] honor mint,maxt if sp is null for other querier implementations Signed-off-by: Abdurrahman J. Allawala --- pkg/querier/chunk_store_queryable.go | 9 +++++++-- pkg/querier/distributor_queryable.go | 13 ++++++++----- pkg/querier/distributor_queryable_test.go | 6 +++--- pkg/querier/querier.go | 17 +++++++++-------- 4 files changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index ba32321587..9e0f79da3d 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -44,14 +44,19 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ... return storage.ErrSeriesSet(err) } + minT, maxT := q.mint, q.maxt + if sp != nil { + minT, maxT = sp.Start, sp.End + } + // We will hit this for /series lookup when -querier.query-store-for-labels-enabled is set. // If we don't skip here, it'll make /series lookups extremely slow as all the chunks will be loaded. // That flag is only to be set with blocks storage engine, and this is a protective measure. - if sp == nil || sp.Func == "series" { + if sp != nil && sp.Func == "series" { return storage.EmptySeriesSet() } - chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...) + chunks, err := q.store.Get(q.ctx, userID, model.Time(minT), model.Time(maxT), matchers...) if err != nil { return storage.ErrSeriesSet(err) } diff --git a/pkg/querier/distributor_queryable.go b/pkg/querier/distributor_queryable.go index e93bf85acd..29dd2792ac 100644 --- a/pkg/querier/distributor_queryable.go +++ b/pkg/querier/distributor_queryable.go @@ -83,14 +83,19 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select") defer log.Span.Finish() - // Kludge: Prometheus passes nil SelectParams if it is doing a 'series' operation, - // which needs only metadata. For this specific case we shouldn't apply the queryIngestersWithin + minT, maxT := q.mint, q.maxt + if sp != nil { + minT, maxT = sp.Start, sp.End + } + + // If the querier receives a 'series' query, it means only metadata is needed. + // For this specific case we shouldn't apply the queryIngestersWithin // time range manipulation, otherwise we'll end up returning no series at all for // older time ranges (while in Cortex we do ignore the start/end and always return // series in ingesters). // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". // See: https://github.com/prometheus/prometheus/pull/8050 - if sp == nil || sp.Func == "series" { + if sp != nil && sp.Func == "series" { ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...) if err != nil { return storage.ErrSeriesSet(err) @@ -98,8 +103,6 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers .. return series.MetricsToSeriesSet(ms) } - minT, maxT := sp.Start, sp.End - // If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until // now - queryIngestersWithin, because older time ranges are covered by the storage. This // optimization is particularly important for the blocks storage where the blocks retention in the diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index dbe6fc9ebe..9eba3b8f3a 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -126,10 +126,10 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T) querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT) require.NoError(t, err) - // Select hints are not passed by Prometheus when querying /series. + // Select hints are passed by Prometheus when querying /series. var hints *storage.SelectHints - if !testData.querySeries { - hints = &storage.SelectHints{Start: testData.queryMinT, End: testData.queryMaxT} + if testData.querySeries { + hints = &storage.SelectHints{Func: "series"} } seriesSet := querier.Select(true, hints) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 83bf9c3a21..93481d1358 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -300,17 +300,18 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat level.Debug(log).Log("start", util.TimeFromMillis(sp.Start).UTC().String(), "end", util.TimeFromMillis(sp.End).UTC().String(), "step", sp.Step, "matchers", matchers) } - // If the querier receives a 'series' query, it means only metadata is needed. - // Here we expect that metadataQuerier querier will handle that. - // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". - // See: https://github.com/prometheus/prometheus/pull/8050 - if sp != nil && sp.Func == "series" && !q.queryStoreForLabels { + if sp == nil { + // if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt + sp = &storage.SelectHints{Start: q.mint, End: q.maxt} + } else if sp.Func == "series" && !q.queryStoreForLabels { + // Else if the querier receives a 'series' query, it means only metadata is needed. + // Here we expect that metadataQuerier querier will handle that. + // Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series". + // See: https://github.com/prometheus/prometheus/pull/8050 + // In this case, the query time range has already been validated when the querier has been // created. return q.metadataQuerier.Select(true, sp, matchers...) - } else if sp == nil { - // if SelectHints is null, rely on minT, maxT of querier to scope in range for Select stmt - sp = &storage.SelectHints{Start: q.mint, End: q.maxt} } userID, err := tenant.TenantID(ctx) From 347ba9a1c7036f3cbdfd239eeb5d19d4af13a381 Mon Sep 17 00:00:00 2001 From: "Abdurrahman J. Allawala" Date: Thu, 2 Sep 2021 10:01:31 -0700 Subject: [PATCH 10/10] missed addition in rebase Signed-off-by: Abdurrahman J. Allawala --- pkg/ruler/ruler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 5896b98161..85c4c361e9 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -1276,7 +1276,7 @@ func TestRecoverAlertsPostOutage(t *testing.T) { } // create a ruler but don't start it. instead, we'll evaluate the rule groups manually. - r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d}, nil) + r, rcleanup := buildRuler(t, rulerCfg, &querier.TestConfig{Cfg: querierConfig, Distributor: d, Stores: queryables}, nil) r.syncRules(context.Background(), rulerSyncReasonInitial) defer rcleanup()