Skip to content

Commit 0ecd515

Browse files
authored
Add ingesters shuffle sharding support on the read path (#3252)
* Added ingesters shuffle sharding support on the read path Signed-off-by: Marco Pracucci <[email protected]> * Committed broken test to signal there's an issue Signed-off-by: Marco Pracucci <[email protected]> * Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added Signed-off-by: Marco Pracucci <[email protected]> * Added ShuffleShardingIngestersLookbackPeriod validation Signed-off-by: Marco Pracucci <[email protected]> * Tiny getIngestersForQuery() optimisation Signed-off-by: Marco Pracucci <[email protected]> * Added unit tests on distributor changes Signed-off-by: Marco Pracucci <[email protected]> * Added integration test Signed-off-by: Marco Pracucci <[email protected]> * Fixed race in unit test Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Removed ring.ByZoneAndAddr because unused Signed-off-by: Marco Pracucci <[email protected]> * Addressed review comments Signed-off-by: Marco Pracucci <[email protected]> * Fixed TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy after wrong rebase Signed-off-by: Marco Pracucci <[email protected]> * Fixed tests after rebase Signed-off-by: Marco Pracucci <[email protected]> * Added unit test on ReplicationSet.GetAddresses() Signed-off-by: Marco Pracucci <[email protected]> * Improved TestRing_ShuffleShardWithLookback Signed-off-by: Marco Pracucci <[email protected]> * Fixed TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy Signed-off-by: Marco Pracucci <[email protected]> * Increased unit tests timeout Signed-off-by: Marco Pracucci <[email protected]>
1 parent b135907 commit 0ecd515

17 files changed

+984
-178
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
* [CHANGE] Renamed `-<prefix>.redis.enable-tls` CLI flag to `-<prefix>.redis.tls-enabled`, and its respective YAML config option from `enable_tls` to `tls_enabled`. #3298
4949
* [CHANGE] Increased default `-<prefix>.redis.timeout` from `100ms` to `500ms`. #3301
5050
* [CHANGE] `cortex_alertmanager_config_invalid` has been removed in favor of `cortex_alertmanager_config_last_reload_successful`. #3289
51-
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
51+
* [FEATURE] Shuffle sharding: added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-tenant` globally, or using per-tenant limit `max_queriers_per_tenant`), each tenants's requests will be handled by different set of queriers. #3113 #3257
52+
* [FEATURE] Shuffle sharding: added support for shuffle-sharding ingesters on the read path. When ingesters shuffle-sharding is enabled and `-querier.shuffle-sharding-ingesters-lookback-period` is set, queriers will fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. #3252
5253
* [FEATURE] Query-frontend: added `compression` config to support results cache with compression. #3217
5354
* [ENHANCEMENT] Allow to specify multiple comma-separated Cortex services to `-target` CLI option (or its respective YAML config option). For example, `-target=all,compactor` can be used to start Cortex single-binary with compactor as well. #3275
5455
* [ENHANCEMENT] Expose additional HTTP configs for the S3 backend client. New flag are listed below: #3244
@@ -92,6 +93,7 @@
9293
* [BUGFIX] Use a valid grpc header when logging IP addresses. #3307
9394
* [BUGFIX] Fixed the metric `cortex_prometheus_rule_group_duration_seconds` in the Ruler, it wouldn't report any values. #3310
9495
* [BUGFIX] Fixed gRPC connections leaking in rulers when rulers sharding is enabled and APIs called. #3314
96+
* [BUGFIX] Fixed shuffle sharding consistency when zone-awareness is enabled and the shard size is increased or instances in a new zone are added. #3299
9597

9698
## 1.4.0 / 2020-10-02
9799

docs/blocks-storage/querier.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ querier:
173173
# Default value 0 means secondary store is always queried.
174174
# CLI flag: -querier.use-second-store-before-time
175175
[use_second_store_before_time: <time> | default = 0]
176+
177+
# When distributor's sharding strategy is shuffle-sharding and this setting is
178+
# > 0, queriers fetch in-memory series from the minimum set of required
179+
# ingesters, selecting only ingesters which may have received series since
180+
# 'now - lookback period'. The lookback period should be greater or equal than
181+
# the configured 'query store after'. If this setting is 0, queriers always
182+
# query all ingesters (ingesters shuffle sharding on read path is disabled).
183+
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
184+
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
176185
```
177186
178187
### `blocks_storage_config`

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,15 @@ store_gateway_client:
740740
# Default value 0 means secondary store is always queried.
741741
# CLI flag: -querier.use-second-store-before-time
742742
[use_second_store_before_time: <time> | default = 0]
743+
744+
# When distributor's sharding strategy is shuffle-sharding and this setting is >
745+
# 0, queriers fetch in-memory series from the minimum set of required ingesters,
746+
# selecting only ingesters which may have received series since 'now - lookback
747+
# period'. The lookback period should be greater or equal than the configured
748+
# 'query store after'. If this setting is 0, queriers always query all ingesters
749+
# (ingesters shuffle sharding on read path is disabled).
750+
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
751+
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
743752
```
744753

745754
### `query_frontend_config`

integration/e2e/metrics_options.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ type MetricsOptions struct {
2323
GetValue GetMetricValueFunc
2424
LabelMatchers []*labels.Matcher
2525
WaitMissingMetrics bool
26+
SkipMissingMetrics bool
2627
}
2728

2829
// WithMetricCount is an option to get the histogram/summary count as metric value.
@@ -43,6 +44,11 @@ func WaitMissingMetrics(opts *MetricsOptions) {
4344
opts.WaitMissingMetrics = true
4445
}
4546

47+
// SkipWaitMissingMetrics is an option to skip/ignore whenever an expected metric is missing.
48+
func SkipMissingMetrics(opts *MetricsOptions) {
49+
opts.SkipMissingMetrics = true
50+
}
51+
4652
func buildMetricsOptions(opts []MetricsOption) MetricsOptions {
4753
result := DefaultMetricsOptions
4854
for _, opt := range opts {

integration/e2e/service.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,12 +577,20 @@ func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([
577577
// Get the metric family.
578578
mf, ok := families[m]
579579
if !ok {
580+
if options.SkipMissingMetrics {
581+
continue
582+
}
583+
580584
return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
581585
}
582586

583587
// Filter metrics.
584588
metrics := filterMetrics(mf.GetMetric(), options)
585589
if len(metrics) == 0 {
590+
if options.SkipMissingMetrics {
591+
continue
592+
}
593+
586594
return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
587595
}
588596

integration/ingester_sharding_test.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
"testing"
99
"time"
1010

11+
"github.com/prometheus/common/model"
1112
"github.com/prometheus/prometheus/pkg/labels"
13+
"github.com/stretchr/testify/assert"
1214
"github.com/stretchr/testify/require"
1315

1416
"github.com/cortexproject/cortex/integration/e2e"
@@ -43,9 +45,16 @@ func TestIngesterSharding(t *testing.T) {
4345
defer s.Close()
4446

4547
flags := BlocksStorageFlags
48+
flags["-distributor.shard-by-all-labels"] = "true"
4649
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
4750
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
4851

52+
if testData.shardingStrategy == "shuffle-sharding" {
53+
// Enable shuffle sharding on read path but not lookback, otherwise all ingesters would be
54+
// queried being just registered.
55+
flags["-querier.shuffle-sharding-ingesters-lookback-period"] = "1ns"
56+
}
57+
4958
// Start dependencies.
5059
consul := e2edb.NewConsul()
5160
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
@@ -56,6 +65,7 @@ func TestIngesterSharding(t *testing.T) {
5665
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
5766
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
5867
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
68+
ingesters := e2ecortex.NewCompositeCortexService(ingester1, ingester2, ingester3)
5969
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
6070
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))
6171

@@ -70,15 +80,19 @@ func TestIngesterSharding(t *testing.T) {
7080

7181
// Push series.
7282
now := time.Now()
83+
expectedVectors := map[string]model.Vector{}
7384

74-
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
85+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
7586
require.NoError(t, err)
7687

7788
for i := 1; i <= numSeriesToPush; i++ {
78-
series, _ := generateSeries(fmt.Sprintf("series_%d", i), now)
89+
metricName := fmt.Sprintf("series_%d", i)
90+
series, expectedVector := generateSeries(metricName, now)
7991
res, err := client.Push(series)
8092
require.NoError(t, err)
8193
require.Equal(t, 200, res.StatusCode)
94+
95+
expectedVectors[metricName] = expectedVector
8296
}
8397

8498
// Extract metrics from ingesters.
@@ -99,6 +113,29 @@ func TestIngesterSharding(t *testing.T) {
99113
require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
100114
require.Equal(t, numSeriesToPush, totalIngestedSeries)
101115

116+
// Query back series.
117+
for metricName, expectedVector := range expectedVectors {
118+
result, err := client.Query(metricName, now)
119+
require.NoError(t, err)
120+
require.Equal(t, model.ValVector, result.Type())
121+
assert.Equal(t, expectedVector, result.(model.Vector))
122+
}
123+
124+
// We expect that only ingesters belonging to tenant's shard have been queried if
125+
// shuffle sharding is enabled.
126+
expectedIngesters := ingesters.NumInstances()
127+
if testData.shardingStrategy == "shuffle-sharding" {
128+
expectedIngesters = testData.tenantShardSize
129+
}
130+
131+
expectedCalls := expectedIngesters * len(expectedVectors)
132+
require.NoError(t, ingesters.WaitSumMetricsWithOptions(
133+
e2e.Equals(float64(expectedCalls)),
134+
[]string{"cortex_request_duration_seconds"},
135+
e2e.WithMetricCount,
136+
e2e.SkipMissingMetrics, // Some ingesters may have received no request at all.
137+
e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/QueryStream"))))
138+
102139
// Ensure no service-specific metrics prefix is used by the wrong service.
103140
assertServiceMetricsPrefixes(t, Distributor, distributor)
104141
assertServiceMetricsPrefixes(t, Ingester, ingester1)

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func (t *Cortex) initOverrides() (serv services.Service, err error) {
169169

170170
func (t *Cortex) initDistributorService() (serv services.Service, err error) {
171171
t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort
172+
t.Cfg.Distributor.ShuffleShardingLookbackPeriod = t.Cfg.Querier.ShuffleShardingIngestersLookbackPeriod
172173

173174
// Check whether the distributor can join the distributors ring, which is
174175
// whenever it's not running as an internal dependency (ie. querier or

pkg/distributor/distributor.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ type Config struct {
170170
// when true the distributor does not validate the label name, Cortex doesn't directly use
171171
// this (and should never use it) but this feature is used by other projects built on top of it
172172
SkipLabelNameValidation bool `yaml:"-"`
173+
174+
// This config is dynamically injected because defined in the querier config.
175+
ShuffleShardingLookbackPeriod time.Duration `yaml:"-"`
173176
}
174177

175178
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -622,16 +625,8 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
622625
return err
623626
}
624627

625-
// ForAllIngesters runs f, in parallel, for all ingesters
626-
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
627-
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
628-
if err != nil {
629-
return nil, err
630-
}
631-
if reallyAll {
632-
replicationSet.MaxErrors = 0
633-
}
634-
628+
// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
629+
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
635630
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
636631
client, err := d.ingesterPool.GetClientFor(ing.Addr)
637632
if err != nil {
@@ -644,10 +639,15 @@ func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f fun
644639

645640
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
646641
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) ([]string, error) {
642+
replicationSet, err := d.getIngestersForMetadata(ctx)
643+
if err != nil {
644+
return nil, err
645+
}
646+
647647
req := &client.LabelValuesRequest{
648648
LabelName: string(labelName),
649649
}
650-
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
650+
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
651651
return client.LabelValues(ctx, req)
652652
})
653653
if err != nil {
@@ -670,8 +670,13 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
670670

671671
// LabelNames returns all of the label names.
672672
func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {
673+
replicationSet, err := d.getIngestersForMetadata(ctx)
674+
if err != nil {
675+
return nil, err
676+
}
677+
673678
req := &client.LabelNamesRequest{}
674-
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
679+
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
675680
return client.LabelNames(ctx, req)
676681
})
677682
if err != nil {
@@ -698,12 +703,17 @@ func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {
698703

699704
// MetricsForLabelMatchers gets the metrics that match said matchers
700705
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
706+
replicationSet, err := d.getIngestersForMetadata(ctx)
707+
if err != nil {
708+
return nil, err
709+
}
710+
701711
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
702712
if err != nil {
703713
return nil, err
704714
}
705715

706-
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
716+
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
707717
return client.MetricsForLabelMatchers(ctx, req)
708718
})
709719
if err != nil {
@@ -729,9 +739,14 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
729739

730740
// MetricsMetadata returns all metric metadata of a user.
731741
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
742+
replicationSet, err := d.getIngestersForMetadata(ctx)
743+
if err != nil {
744+
return nil, err
745+
}
746+
732747
req := &ingester_client.MetricsMetadataRequest{}
733748
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
734-
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
749+
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
735750
return client.MetricsMetadata(ctx, req)
736751
})
737752
if err != nil {
@@ -764,8 +779,16 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
764779

765780
// UserStats returns statistics about the current user.
766781
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
782+
replicationSet, err := d.getIngestersForMetadata(ctx)
783+
if err != nil {
784+
return nil, err
785+
}
786+
787+
// Make sure we get a successful response from all of them.
788+
replicationSet.MaxErrors = 0
789+
767790
req := &client.UserStatsRequest{}
768-
resps, err := d.ForAllIngesters(ctx, true, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
791+
resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
769792
return client.UserStats(ctx, req)
770793
})
771794
if err != nil {
@@ -801,7 +824,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
801824

802825
req := &client.UserStatsRequest{}
803826
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
804-
// Not using d.ForAllIngesters(), so we can fail after first error.
827+
// Not using d.ForReplicationSet(), so we can fail after first error.
805828
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
806829
if err != nil {
807830
return nil, err

0 commit comments

Comments
 (0)