diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e25c9b55e..393747661d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 ## 1.16.0 in progress diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 7162d34b93..a0ad2b1d3c 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -331,6 +331,19 @@ store_gateway: # shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy [sharding_strategy: | default = "default"] + + # Comma separated list of tenants whose store metrics this storegateway can + # process. If specified, only these tenants will be handled by storegateway, + # otherwise this storegateway will be enabled for all the tenants in the + # store-gateway cluster. + # CLI flag: -store-gateway.enabled-tenants + [enabled_tenants: | default = ""] + + # Comma separated list of tenants whose store metrics this storegateway cannot + # process. If specified, a storegateway that would normally pick the specified + # tenant(s) for processing will ignore them instead. + # CLI flag: -store-gateway.disabled-tenants + [disabled_tenants: | default = ""] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5103df9a38..fbcc159477 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4912,6 +4912,19 @@ sharding_ring: # The sharding strategy to use. Supported values are: default, shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy [sharding_strategy: | default = "default"] + +# Comma separated list of tenants whose store metrics this storegateway can +# process. If specified, only these tenants will be handled by storegateway, +# otherwise this storegateway will be enabled for all the tenants in the +# store-gateway cluster. +# CLI flag: -store-gateway.enabled-tenants +[enabled_tenants: | default = ""] + +# Comma separated list of tenants whose store metrics this storegateway cannot +# process. If specified, a storegateway that would normally pick the specified +# tenant(s) for processing will ignore them instead. +# CLI flag: -store-gateway.disabled-tenants +[disabled_tenants: | default = ""] ``` ### `tracing_config` diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 129bd0c39f..534e212dcd 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -55,7 +55,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1), } - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, filters) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Equal(t, map[ulid.ULID]*metadata.Meta{ @@ -109,7 +109,7 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(log.NewNopLogger(), nil), nil, log.NewNopLogger(), reg, nil) metas, _, err := fetcher.Fetch(ctx) require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) assert.Empty(t, metas) @@ -157,7 +157,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { logs := &concurrency.SyncBuffer{} logger := log.NewLogfmtLogger(logs) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Empty(t, metas) @@ -212,7 +212,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { // Upload a corrupted bucket index. require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!"))) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Empty(t, metas) diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 9d57d52d42..da38ce2eaa 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -120,7 +120,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) if tc.mockInitialSync { @@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Query series before the initial sync. @@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { bucket = &failFirstGetBucket{Bucket: bucket} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Initial sync should succeed even if a transient error occurs. @@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -397,11 +397,16 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { tests := map[string]struct { shardingStrategy ShardingStrategy expectedStores int32 + allowedTenants *util.AllowedTenants }{ "when sharding is disabled all users should be synced": { - shardingStrategy: NewNoShardingStrategy(), + shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), nil), expectedStores: 3, }, + "sharding disabled, user-1 disabled": { + shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), util.NewAllowedTenants(nil, []string{"user-1"})), + expectedStores: 2, + }, "when sharding is enabled only stores for filtered users should be created": { shardingStrategy: func() ShardingStrategy { s := &mockShardingStrategy{} @@ -465,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(ctx)) @@ -521,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -542,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 536a7f2556..5633974913 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -23,6 +23,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -58,6 +59,9 @@ type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."` ShardingStrategy string `yaml:"sharding_strategy"` + + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` } // RegisterFlags registers the Config flags. @@ -66,6 +70,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier) f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) + f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") + f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") } // Validate the Config. @@ -140,6 +146,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Help: "Total number of times the bucket sync operation triggered.", }, []string{"reason"}), } + allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants) // Init metrics. g.bucketSync.WithLabelValues(syncReasonInitial) @@ -161,6 +168,12 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Help: instanceLimitsMetricHelp, ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"}, }).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes)) + if len(gatewayCfg.EnabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", ")) + } + if len(gatewayCfg.DisabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", ")) + } // Init sharding strategy. var shardingStrategy ShardingStrategy @@ -192,14 +205,14 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf // Instance the right strategy. switch gatewayCfg.ShardingStrategy { case util.ShardingStrategyDefault: - shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger) + shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger, allowedTenants) case util.ShardingStrategyShuffle: - shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding) + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, allowedTenants, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding) default: return nil, errInvalidShardingStrategy } } else { - shardingStrategy = NewNoShardingStrategy() + shardingStrategy = NewNoShardingStrategy(logger, allowedTenants) } g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 291004314a..387f991cb6 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -133,6 +133,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { ctx := context.Background() gatewayCfg := mockGatewayConfig() gatewayCfg.ShardingEnabled = true + gatewayCfg.DisabledTenants = []string{"user-disabled"} storageCfg := mockStorageConfig(t) ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -153,7 +154,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck assert.False(t, g.ringLifecycler.IsRegistered()) - bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() { + bucketClient.MockIterWithCallback("", []string{"user-1", "user-2", "user-disabled"}, nil, func() { // During the initial sync, we expect the instance to always be in the JOINING // state within the ring. assert.True(t, g.ringLifecycler.IsRegistered()) @@ -163,6 +164,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { }) bucketClient.MockIter("user-1/", []string{}, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockIter("user-disabled/", []string{}, nil) // Once successfully started, the instance should be ACTIVE in the ring. require.NoError(t, services.StartAndAwaitRunning(ctx, g)) @@ -174,6 +176,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.NotNil(t, g.stores.getStore("user-1")) assert.NotNil(t, g.stores.getStore("user-2")) + assert.Nil(t, g.stores.getStore("user-disabled")) assert.Nil(t, g.stores.getStore("user-unknown")) }) } @@ -184,6 +187,7 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { ctx := context.Background() gatewayCfg := mockGatewayConfig() gatewayCfg.ShardingEnabled = false + gatewayCfg.DisabledTenants = []string{"user-disabled"} storageCfg := mockStorageConfig(t) bucketClient := &bucket.ClientMock{} @@ -191,13 +195,15 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { require.NoError(t, err) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck - bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("", []string{"user-1", "user-2", "user-disabled"}, nil) bucketClient.MockIter("user-1/", []string{}, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockIter("user-disabled/", []string{}, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) assert.NotNil(t, g.stores.getStore("user-1")) assert.NotNil(t, g.stores.getStore("user-2")) + assert.Nil(t, g.stores.getStore("user-disabled")) assert.Nil(t, g.stores.getStore("user-unknown")) } diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 6c6493cfb1..cac1b63ba0 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -36,15 +36,35 @@ type ShardingLimits interface { StoreGatewayTenantShardSize(userID string) float64 } +func filterDisallowedTenants(userIDs []string, logger log.Logger, allowedTenants *util.AllowedTenants) []string { + filteredUserIDs := []string{} + for _, userID := range userIDs { + if !allowedTenants.IsAllowed(userID) { + level.Debug(logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + + filteredUserIDs = append(filteredUserIDs, userID) + } + + return filteredUserIDs +} + // NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out. -type NoShardingStrategy struct{} +type NoShardingStrategy struct { + logger log.Logger + allowedTenants *util.AllowedTenants +} -func NewNoShardingStrategy() *NoShardingStrategy { - return &NoShardingStrategy{} +func NewNoShardingStrategy(logger log.Logger, allowedTenants *util.AllowedTenants) *NoShardingStrategy { + return &NoShardingStrategy{ + logger: logger, + allowedTenants: allowedTenants, + } } func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs + return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) } func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error { @@ -54,23 +74,26 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli // DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. // Not go-routine safe. type DefaultShardingStrategy struct { - r *ring.Ring - instanceAddr string - logger log.Logger + r *ring.Ring + instanceAddr string + logger log.Logger + allowedTenants *util.AllowedTenants } // NewDefaultShardingStrategy creates DefaultShardingStrategy. -func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy { +func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger, allowedTenants *util.AllowedTenants) *DefaultShardingStrategy { return &DefaultShardingStrategy{ r: r, instanceAddr: instanceAddr, logger: logger, + + allowedTenants: allowedTenants, } } // FilterUsers implements ShardingStrategy. func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs + return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) } // FilterBlocks implements ShardingStrategy. @@ -89,10 +112,11 @@ type ShuffleShardingStrategy struct { logger log.Logger zoneStableShuffleSharding bool + allowedTenants *util.AllowedTenants } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. -func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, allowedTenants *util.AllowedTenants, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ r: r, instanceID: instanceID, @@ -101,14 +125,14 @@ func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, l logger: logger, zoneStableShuffleSharding: zoneStableShuffleSharding, + allowedTenants: allowedTenants, } } // FilterUsers implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { var filteredIDs []string - - for _, userID := range userIDs { + for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) { subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) // Include the user only if it belongs to this store-gateway shard. diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 1dfd54d41e..f2ee50fb4c 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -270,7 +271,7 @@ func TestDefaultShardingStrategy(t *testing.T) { require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) for instanceAddr, expectedBlocks := range testData.expectedBlocks { - filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger()) + filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger(), nil) synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) @@ -335,6 +336,7 @@ func TestShuffleShardingStrategy(t *testing.T) { setupRing func(*ring.Desc) expectedUsers []usersExpectation expectedBlocks []blocksExpectation + isTenantDisabled bool }{ "one ACTIVE instance in the ring with RF = 1 and SS = 1": { replicationFactor: 1, @@ -351,6 +353,18 @@ func TestShuffleShardingStrategy(t *testing.T) { {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, }, }, + "one ACTIVE instance in the ring with RF = 1 SS = 1 and user disabled": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + isTenantDisabled: true, + }, "one ACTIVE instance in the ring with RF = 2 and SS = 1 (should still sync blocks on the only available instance)": { replicationFactor: 1, limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, @@ -629,15 +643,20 @@ func TestShuffleShardingStrategy(t *testing.T) { // Wait until the ring client has synced. require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + var allowedTenants *util.AllowedTenants + if testData.isTenantDisabled { + allowedTenants = util.NewAllowedTenants(nil, []string{userID}) + } + // Assert on filter users. for _, expected := range testData.expectedUsers { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) } // Assert on filter blocks. for _, expected := range testData.expectedBlocks { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0)