From de15f5dbf91164455fe219511ffb3cc99963d834 Mon Sep 17 00:00:00 2001 From: Ryan West Date: Wed, 31 Jan 2024 18:58:39 -0800 Subject: [PATCH 1/4] Distributor: allow 0 value in config value Signed-off-by: Ryan West --- CHANGELOG.md | 1 + integration/ingester_sharding_test.go | 16 +++++++- pkg/distributor/distributor.go | 57 ++++++++++++++++++--------- pkg/distributor/distributor_test.go | 11 +++++- 4 files changed, 63 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64fa3fd564..d5a38e90a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 +* [BUGFIX] Ingester: Shuffle-Sharding with IngestionTenantShardSize == 0, default sharding strategy should be used #5189 ## 1.16.0 2023-11-20 diff --git a/integration/ingester_sharding_test.go b/integration/ingester_sharding_test.go index ab8e2ca06d..aeffc1f0cc 100644 --- a/integration/ingester_sharding_test.go +++ b/integration/ingester_sharding_test.go @@ -27,16 +27,28 @@ func TestIngesterSharding(t *testing.T) { tenantShardSize int expectedIngestersWithSeries int }{ - "default sharding strategy should spread series across all ingesters": { + //Default Sharding Strategy + "default sharding strategy should be ignored and spread across all ingesters": { shardingStrategy: "default", tenantShardSize: 2, // Ignored by default strategy. expectedIngestersWithSeries: 3, }, + "default sharding strategy should spread series across all ingesters": { + shardingStrategy: "default", + tenantShardSize: 0, // Ignored by default strategy. + expectedIngestersWithSeries: 3, + }, + //Shuffle Sharding Strategy "shuffle-sharding strategy should spread series across the configured shard size number of ingesters": { shardingStrategy: "shuffle-sharding", tenantShardSize: 2, expectedIngestersWithSeries: 2, }, + "Tenant Shard Size of 0 should leverage all ingesters": { + shardingStrategy: "shuffle-sharding", + tenantShardSize: 0, + expectedIngestersWithSeries: 3, + }, } for testName, testData := range tests { @@ -125,7 +137,7 @@ func TestIngesterSharding(t *testing.T) { // We expect that only ingesters belonging to tenant's shard have been queried if // shuffle sharding is enabled. expectedIngesters := ingesters.NumInstances() - if testData.shardingStrategy == "shuffle-sharding" { + if testData.shardingStrategy == "shuffle-sharding" && testData.tenantShardSize > 0 { expectedIngesters = testData.tenantShardSize } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 49d9446d68..e8115e2a36 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -49,7 +49,7 @@ var ( // Validation errors. errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be a positive integer") // Distributor instance limits errors. errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor") @@ -178,10 +178,14 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidShardingStrategy } - if cfg.ShardingStrategy == util.ShardingStrategyShuffle && limits.IngestionTenantShardSize <= 0 { + if limits.IngestionTenantShardSize < 0 { return errInvalidTenantShardSize } + if limits.IngestionTenantShardSize == 0 && cfg.ShardingStrategy == util.ShardingStrategyShuffle { + level.Warn(util_log.Logger).Log("msg", "identified sharding strategy: %s, with shard size of 0, this will fallback to default sharding strategy", util.ShardingStrategyShuffle) + } + haHATrackerConfig := cfg.HATrackerConfig.ToHATrackerConfig() return haHATrackerConfig.Validate() @@ -583,9 +587,21 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co inflight := d.inflightPushRequests.Inc() defer d.inflightPushRequests.Dec() + if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { + return nil, errTooManyInflightPushRequests + } + + if d.cfg.InstanceLimits.MaxIngestionRate > 0 { + if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { + return nil, errMaxSamplesPushRateLimitReached + } + } + now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) + removeReplica := false + numSamples := 0 numExemplars := 0 for _, ts := range req.Timeseries { @@ -598,17 +614,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) - if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { - return nil, errTooManyInflightPushRequests - } - - if d.cfg.InstanceLimits.MaxIngestionRate > 0 { - if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { - return nil, errMaxSamplesPushRateLimitReached - } - } - - removeReplica := false // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID) @@ -673,11 +678,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate. d.ingestionRate.Add(int64(totalN)) - subRing := d.ingestersRing - - // Obtain a subring if required. - if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { - subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) + // Determine Shuffle Sharding Strategy + subRing, err := d.GetShuffleShardingRing(ctx) + if err != nil { + return nil, err } keys := append(seriesKeys, metadataKeys...) @@ -691,6 +695,23 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } +func (d *Distributor) GetShuffleShardingRing(ctx context.Context) (ring.ReadRing, error) { + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } + //Get any overrides for the current tenant + limits := d.limits.GetOverridesForUser(userID) + //Retrieves all the current ingesters rings. + subRing := d.ingestersRing + //Determines if Shuffle Sharding is enabled and or the overrides has a Sharding Strategy of greater than 0. + if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle && limits.IngestionTenantShardSize > 0 { + subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) + } + + return subRing, nil +} + func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error { span, _ := opentracing.StartSpanFromContext(ctx, "doBatch") defer span.Finish() diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index fd7efa062b..1d7a6cac81 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -76,14 +76,14 @@ func TestConfig_Validate(t *testing.T) { initLimits: func(_ *validation.Limits) {}, expected: errInvalidShardingStrategy, }, - "should fail if the default shard size is 0 on when sharding strategy = shuffle-sharding": { + "should pass sharding strategy when IngestionTenantShardSize = 0": { initConfig: func(cfg *Config) { cfg.ShardingStrategy = "shuffle-sharding" }, initLimits: func(limits *validation.Limits) { limits.IngestionTenantShardSize = 0 }, - expected: errInvalidTenantShardSize, + expected: nil, }, "should pass if the default shard size > 0 on when sharding strategy = shuffle-sharding": { initConfig: func(cfg *Config) { @@ -94,6 +94,13 @@ func TestConfig_Validate(t *testing.T) { }, expected: nil, }, + "should fail because the ingestionTenantShardSize is a non-positive number": { + initConfig: func(_ *Config) {}, + initLimits: func(limits *validation.Limits) { + limits.IngestionTenantShardSize = -1 + }, + expected: errInvalidTenantShardSize, + }, } for testName, testData := range tests { From 2c6c85ed58da1baa94affc913cdc807b926cdeae Mon Sep 17 00:00:00 2001 From: Ryan West Date: Wed, 31 Jan 2024 19:01:01 -0800 Subject: [PATCH 2/4] Updating CHANGELOG Signed-off-by: Ryan West --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5a38e90a0..600a94d0c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734 -* [BUGFIX] Ingester: Shuffle-Sharding with IngestionTenantShardSize == 0, default sharding strategy should be used #5189 +* [BUGFIX] Distributor: Shuffle-Sharding with IngestionTenantShardSize == 0, default sharding strategy should be used #5189 ## 1.16.0 2023-11-20 From cf85d4677cb3fa4e2b9c656f50e4084176d06739 Mon Sep 17 00:00:00 2001 From: Ryan West Date: Wed, 31 Jan 2024 19:59:45 -0800 Subject: [PATCH 3/4] Removing log line, specifying shuffle sharding on validate, and moving code back to L680 Signed-off-by: Ryan West --- pkg/distributor/distributor.go | 57 +++++++++-------------------- pkg/distributor/distributor_test.go | 4 +- 2 files changed, 21 insertions(+), 40 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e8115e2a36..131b97a7ea 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -49,7 +49,7 @@ var ( // Validation errors. errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be a positive integer") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") // Distributor instance limits errors. errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor") @@ -178,14 +178,10 @@ func (cfg *Config) Validate(limits validation.Limits) error { return errInvalidShardingStrategy } - if limits.IngestionTenantShardSize < 0 { + if cfg.ShardingStrategy == util.ShardingStrategyShuffle && limits.IngestionTenantShardSize < 0 { return errInvalidTenantShardSize } - if limits.IngestionTenantShardSize == 0 && cfg.ShardingStrategy == util.ShardingStrategyShuffle { - level.Warn(util_log.Logger).Log("msg", "identified sharding strategy: %s, with shard size of 0, this will fallback to default sharding strategy", util.ShardingStrategyShuffle) - } - haHATrackerConfig := cfg.HATrackerConfig.ToHATrackerConfig() return haHATrackerConfig.Validate() @@ -587,21 +583,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co inflight := d.inflightPushRequests.Inc() defer d.inflightPushRequests.Dec() - if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { - return nil, errTooManyInflightPushRequests - } - - if d.cfg.InstanceLimits.MaxIngestionRate > 0 { - if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { - return nil, errMaxSamplesPushRateLimitReached - } - } - now := time.Now() d.activeUsers.UpdateUserTimestamp(userID, now) - removeReplica := false - numSamples := 0 numExemplars := 0 for _, ts := range req.Timeseries { @@ -614,6 +598,17 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // Count the total number of metadata in. d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) + if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { + return nil, errTooManyInflightPushRequests + } + + if d.cfg.InstanceLimits.MaxIngestionRate > 0 { + if rate := d.ingestionRate.Rate(); rate >= d.cfg.InstanceLimits.MaxIngestionRate { + return nil, errMaxSamplesPushRateLimitReached + } + } + + removeReplica := false // Cache user limit with overrides so we spend less CPU doing locking. See issue #4904 limits := d.limits.GetOverridesForUser(userID) @@ -678,10 +673,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co // totalN included samples and metadata. Ingester follows this pattern when computing its ingestion rate. d.ingestionRate.Add(int64(totalN)) - // Determine Shuffle Sharding Strategy - subRing, err := d.GetShuffleShardingRing(ctx) - if err != nil { - return nil, err + subRing := d.ingestersRing + + // Obtain a subring if required. + if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle { + subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) } keys := append(seriesKeys, metadataKeys...) @@ -695,23 +691,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } -func (d *Distributor) GetShuffleShardingRing(ctx context.Context) (ring.ReadRing, error) { - userID, err := tenant.TenantID(ctx) - if err != nil { - return nil, err - } - //Get any overrides for the current tenant - limits := d.limits.GetOverridesForUser(userID) - //Retrieves all the current ingesters rings. - subRing := d.ingestersRing - //Determines if Shuffle Sharding is enabled and or the overrides has a Sharding Strategy of greater than 0. - if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle && limits.IngestionTenantShardSize > 0 { - subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize) - } - - return subRing, nil -} - func (d *Distributor) doBatch(ctx context.Context, req *cortexpb.WriteRequest, subRing ring.ReadRing, keys []uint32, initialMetadataIndex int, validatedMetadata []*cortexpb.MetricMetadata, validatedTimeseries []cortexpb.PreallocTimeseries, userID string) error { span, _ := opentracing.StartSpanFromContext(ctx, "doBatch") defer span.Finish() diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1d7a6cac81..74e9080da2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -95,7 +95,9 @@ func TestConfig_Validate(t *testing.T) { expected: nil, }, "should fail because the ingestionTenantShardSize is a non-positive number": { - initConfig: func(_ *Config) {}, + initConfig: func(cfg *Config) { + cfg.ShardingStrategy = "shuffle-sharding" + }, initLimits: func(limits *validation.Limits) { limits.IngestionTenantShardSize = -1 }, From a2e3cabeadb89445518bc44909e0a933261b1296 Mon Sep 17 00:00:00 2001 From: Ryan West Date: Fri, 2 Feb 2024 09:51:49 -0800 Subject: [PATCH 4/4] Improving the error message readability Signed-off-by: Ryan West --- pkg/distributor/distributor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 131b97a7ea..4b4eea0bf9 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -49,7 +49,7 @@ var ( // Validation errors. errInvalidShardingStrategy = errors.New("invalid sharding strategy") - errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") + errInvalidTenantShardSize = errors.New("invalid tenant shard size. The value must be greater than or equal to 0") // Distributor instance limits errors. errTooManyInflightPushRequests = errors.New("too many inflight push requests in distributor")