From 43d61df1be4e40155c7b16e3e432c9850409f782 Mon Sep 17 00:00:00 2001 From: Shashank Date: Tue, 28 Feb 2023 21:45:50 +0530 Subject: [PATCH 1/6] add enabled and disabled tenants in storegateway Signed-off-by: Shashank --- pkg/storegateway/gateway.go | 15 +++++++++++++++ pkg/storegateway/sharding_strategy.go | 24 ++++++++++++++++++++---- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 536a7f2556..d23a07001c 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -23,6 +23,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -58,6 +59,9 @@ type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."` ShardingStrategy string `yaml:"sharding_strategy"` + + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` } // RegisterFlags registers the Config flags. @@ -66,6 +70,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier) f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) + f.Var(&cfg.EnabledTenants, "store-gateway.enabled_tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") + f.Var(&cfg.DisabledTenants, "store-gateway.disabled_tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") } // Validate the Config. @@ -103,6 +109,8 @@ type StoreGateway struct { subservicesWatcher *services.FailureWatcher bucketSync *prometheus.CounterVec + + allowedTenants *util.AllowedTenants } func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { @@ -139,6 +147,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Name: "cortex_storegateway_bucket_sync_total", Help: "Total number of times the bucket sync operation triggered.", }, []string{"reason"}), + allowedTenants: util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants), } // Init metrics. @@ -161,6 +170,12 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Help: instanceLimitsMetricHelp, ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"}, }).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes)) + if len(gatewayCfg.EnabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", ")) + } + if len(gatewayCfg.DisabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", ")) + } // Init sharding strategy. var shardingStrategy ShardingStrategy diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 6c6493cfb1..2bc8438023 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -54,9 +54,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli // DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. // Not go-routine safe. type DefaultShardingStrategy struct { - r *ring.Ring - instanceAddr string - logger log.Logger + r *ring.Ring + instanceAddr string + logger log.Logger + allowedTenants *util.AllowedTenants } // NewDefaultShardingStrategy creates DefaultShardingStrategy. @@ -70,7 +71,15 @@ func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Lo // FilterUsers implements ShardingStrategy. func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs + allUserIDs := userIDs + + for _, userID := range userIDs { + if !s.allowedTenants.IsAllowed(userID) { + level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + } + } + + return allUserIDs } // FilterBlocks implements ShardingStrategy. @@ -89,6 +98,7 @@ type ShuffleShardingStrategy struct { logger log.Logger zoneStableShuffleSharding bool + allowedTenants *util.AllowedTenants } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. @@ -111,6 +121,12 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin for _, userID := range userIDs { subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) + //filter out users not owned by this shard. + if !s.allowedTenants.IsAllowed(userID) { + level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + // Include the user only if it belongs to this store-gateway shard. if subRing.HasInstance(s.instanceID) { filteredIDs = append(filteredIDs, userID) From 160c4bcb5912995838257a5bcda514afec3ff4b4 Mon Sep 17 00:00:00 2001 From: Shashank Date: Mon, 22 May 2023 19:33:20 +0530 Subject: [PATCH 2/6] added test cases Signed-off-by: Shashank --- pkg/storegateway/sharding_strategy.go | 8 +++++--- pkg/storegateway/sharding_strategy_test.go | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 2bc8438023..04482f36d0 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -71,15 +71,17 @@ func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Lo // FilterUsers implements ShardingStrategy. func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - allUserIDs := userIDs - + filteredUserIDs := []string{} for _, userID := range userIDs { if !s.allowedTenants.IsAllowed(userID) { level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue } + + filteredUserIDs = append(filteredUserIDs, userID) } - return allUserIDs + return filteredUserIDs } // FilterBlocks implements ShardingStrategy. diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 1dfd54d41e..adc2ee4690 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -44,6 +44,9 @@ func TestDefaultShardingStrategy(t *testing.T) { zoneAwarenessEnabled bool setupRing func(*ring.Desc) expectedBlocks map[string][]ulid.ULID + enableTenants []string + disableTenants []string + allowedTenants func(*DefaultShardingStrategy) }{ "one ACTIVE instance in the ring with replication factor = 1": { replicationFactor: 1, @@ -66,6 +69,21 @@ func TestDefaultShardingStrategy(t *testing.T) { "127.0.0.2": {block2, block4}, }, }, + "two ACTIVE instances in the ring with replication factor = 1 and one tenant disabled": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt) + }, + allowedTenants: func(r *DefaultShardingStrategy) { + userIds:= []string{"u-1","u-2"} + r.FilterUsers(context.Background(), userIds) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3}, // Tenant 1 blocks expected + "127.0.0.2": {}, // Tenant 2 blocks disabled + }, + }, "one ACTIVE instance in the ring with replication factor = 2": { replicationFactor: 2, setupRing: func(r *ring.Desc) { From 2b13822a8a52f88921cb89eef353a5959223d1e7 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Tue, 7 Nov 2023 05:22:02 +0100 Subject: [PATCH 3/6] Fix implementation and tests Signed-off-by: Friedrich Gonzalez --- .../bucket_index_metadata_fetcher_test.go | 8 +-- pkg/storegateway/bucket_stores_test.go | 21 ++++--- pkg/storegateway/gateway.go | 10 ++-- pkg/storegateway/gateway_test.go | 10 +++- pkg/storegateway/sharding_strategy.go | 56 ++++++++++--------- pkg/storegateway/sharding_strategy_test.go | 43 +++++++------- 6 files changed, 82 insertions(+), 66 deletions(-) diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 129bd0c39f..534e212dcd 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -55,7 +55,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1), } - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, filters) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Equal(t, map[ulid.ULID]*metadata.Meta{ @@ -109,7 +109,7 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(log.NewNopLogger(), nil), nil, log.NewNopLogger(), reg, nil) metas, _, err := fetcher.Fetch(ctx) require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) assert.Empty(t, metas) @@ -157,7 +157,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { logs := &concurrency.SyncBuffer{} logger := log.NewLogfmtLogger(logs) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Empty(t, metas) @@ -212,7 +212,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) { // Upload a corrupted bucket index. require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!"))) - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil) + fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil) metas, partials, err := fetcher.Fetch(ctx) require.NoError(t, err) assert.Empty(t, metas) diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 9d57d52d42..da38ce2eaa 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -120,7 +120,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) if tc.mockInitialSync { @@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Query series before the initial sync. @@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { bucket = &failFirstGetBucket{Bucket: bucket} reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Initial sync should succeed even if a transient error occurs. @@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -397,11 +397,16 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { tests := map[string]struct { shardingStrategy ShardingStrategy expectedStores int32 + allowedTenants *util.AllowedTenants }{ "when sharding is disabled all users should be synced": { - shardingStrategy: NewNoShardingStrategy(), + shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), nil), expectedStores: 3, }, + "sharding disabled, user-1 disabled": { + shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), util.NewAllowedTenants(nil, []string{"user-1"})), + expectedStores: 2, + }, "when sharding is enabled only stores for filtered users should be created": { shardingStrategy: func() ShardingStrategy { s := &mockShardingStrategy{} @@ -465,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(ctx)) @@ -521,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) @@ -542,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) require.NoError(t, err) - stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index d23a07001c..7e6e3ffeba 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -109,8 +109,6 @@ type StoreGateway struct { subservicesWatcher *services.FailureWatcher bucketSync *prometheus.CounterVec - - allowedTenants *util.AllowedTenants } func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { @@ -147,8 +145,8 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Name: "cortex_storegateway_bucket_sync_total", Help: "Total number of times the bucket sync operation triggered.", }, []string{"reason"}), - allowedTenants: util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants), } + allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants) // Init metrics. g.bucketSync.WithLabelValues(syncReasonInitial) @@ -207,14 +205,14 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf // Instance the right strategy. switch gatewayCfg.ShardingStrategy { case util.ShardingStrategyDefault: - shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger) + shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger, allowedTenants) case util.ShardingStrategyShuffle: - shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding) + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, allowedTenants, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding) default: return nil, errInvalidShardingStrategy } } else { - shardingStrategy = NewNoShardingStrategy() + shardingStrategy = NewNoShardingStrategy(logger, allowedTenants) } g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 291004314a..387f991cb6 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -133,6 +133,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { ctx := context.Background() gatewayCfg := mockGatewayConfig() gatewayCfg.ShardingEnabled = true + gatewayCfg.DisabledTenants = []string{"user-disabled"} storageCfg := mockStorageConfig(t) ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -153,7 +154,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck assert.False(t, g.ringLifecycler.IsRegistered()) - bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() { + bucketClient.MockIterWithCallback("", []string{"user-1", "user-2", "user-disabled"}, nil, func() { // During the initial sync, we expect the instance to always be in the JOINING // state within the ring. assert.True(t, g.ringLifecycler.IsRegistered()) @@ -163,6 +164,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { }) bucketClient.MockIter("user-1/", []string{}, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockIter("user-disabled/", []string{}, nil) // Once successfully started, the instance should be ACTIVE in the ring. require.NoError(t, services.StartAndAwaitRunning(ctx, g)) @@ -174,6 +176,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.NotNil(t, g.stores.getStore("user-1")) assert.NotNil(t, g.stores.getStore("user-2")) + assert.Nil(t, g.stores.getStore("user-disabled")) assert.Nil(t, g.stores.getStore("user-unknown")) }) } @@ -184,6 +187,7 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { ctx := context.Background() gatewayCfg := mockGatewayConfig() gatewayCfg.ShardingEnabled = false + gatewayCfg.DisabledTenants = []string{"user-disabled"} storageCfg := mockStorageConfig(t) bucketClient := &bucket.ClientMock{} @@ -191,13 +195,15 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { require.NoError(t, err) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck - bucketClient.MockIter("", []string{"user-1", "user-2"}, nil) + bucketClient.MockIter("", []string{"user-1", "user-2", "user-disabled"}, nil) bucketClient.MockIter("user-1/", []string{}, nil) bucketClient.MockIter("user-2/", []string{}, nil) + bucketClient.MockIter("user-disabled/", []string{}, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) assert.NotNil(t, g.stores.getStore("user-1")) assert.NotNil(t, g.stores.getStore("user-2")) + assert.Nil(t, g.stores.getStore("user-disabled")) assert.Nil(t, g.stores.getStore("user-unknown")) } diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index 04482f36d0..cac1b63ba0 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -36,15 +36,35 @@ type ShardingLimits interface { StoreGatewayTenantShardSize(userID string) float64 } +func filterDisallowedTenants(userIDs []string, logger log.Logger, allowedTenants *util.AllowedTenants) []string { + filteredUserIDs := []string{} + for _, userID := range userIDs { + if !allowedTenants.IsAllowed(userID) { + level.Debug(logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + + filteredUserIDs = append(filteredUserIDs, userID) + } + + return filteredUserIDs +} + // NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out. -type NoShardingStrategy struct{} +type NoShardingStrategy struct { + logger log.Logger + allowedTenants *util.AllowedTenants +} -func NewNoShardingStrategy() *NoShardingStrategy { - return &NoShardingStrategy{} +func NewNoShardingStrategy(logger log.Logger, allowedTenants *util.AllowedTenants) *NoShardingStrategy { + return &NoShardingStrategy{ + logger: logger, + allowedTenants: allowedTenants, + } } func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs + return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) } func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error { @@ -61,27 +81,19 @@ type DefaultShardingStrategy struct { } // NewDefaultShardingStrategy creates DefaultShardingStrategy. -func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy { +func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger, allowedTenants *util.AllowedTenants) *DefaultShardingStrategy { return &DefaultShardingStrategy{ r: r, instanceAddr: instanceAddr, logger: logger, + + allowedTenants: allowedTenants, } } // FilterUsers implements ShardingStrategy. func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - filteredUserIDs := []string{} - for _, userID := range userIDs { - if !s.allowedTenants.IsAllowed(userID) { - level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) - continue - } - - filteredUserIDs = append(filteredUserIDs, userID) - } - - return filteredUserIDs + return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) } // FilterBlocks implements ShardingStrategy. @@ -104,7 +116,7 @@ type ShuffleShardingStrategy struct { } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. -func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, allowedTenants *util.AllowedTenants, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ r: r, instanceID: instanceID, @@ -113,22 +125,16 @@ func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, l logger: logger, zoneStableShuffleSharding: zoneStableShuffleSharding, + allowedTenants: allowedTenants, } } // FilterUsers implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { var filteredIDs []string - - for _, userID := range userIDs { + for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) { subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) - //filter out users not owned by this shard. - if !s.allowedTenants.IsAllowed(userID) { - level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) - continue - } - // Include the user only if it belongs to this store-gateway shard. if subRing.HasInstance(s.instanceID) { filteredIDs = append(filteredIDs, userID) diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index adc2ee4690..f2ee50fb4c 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" ) @@ -44,9 +45,6 @@ func TestDefaultShardingStrategy(t *testing.T) { zoneAwarenessEnabled bool setupRing func(*ring.Desc) expectedBlocks map[string][]ulid.ULID - enableTenants []string - disableTenants []string - allowedTenants func(*DefaultShardingStrategy) }{ "one ACTIVE instance in the ring with replication factor = 1": { replicationFactor: 1, @@ -69,21 +67,6 @@ func TestDefaultShardingStrategy(t *testing.T) { "127.0.0.2": {block2, block4}, }, }, - "two ACTIVE instances in the ring with replication factor = 1 and one tenant disabled": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt) - }, - allowedTenants: func(r *DefaultShardingStrategy) { - userIds:= []string{"u-1","u-2"} - r.FilterUsers(context.Background(), userIds) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3}, // Tenant 1 blocks expected - "127.0.0.2": {}, // Tenant 2 blocks disabled - }, - }, "one ACTIVE instance in the ring with replication factor = 2": { replicationFactor: 2, setupRing: func(r *ring.Desc) { @@ -288,7 +271,7 @@ func TestDefaultShardingStrategy(t *testing.T) { require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) for instanceAddr, expectedBlocks := range testData.expectedBlocks { - filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger()) + filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger(), nil) synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) @@ -353,6 +336,7 @@ func TestShuffleShardingStrategy(t *testing.T) { setupRing func(*ring.Desc) expectedUsers []usersExpectation expectedBlocks []blocksExpectation + isTenantDisabled bool }{ "one ACTIVE instance in the ring with RF = 1 and SS = 1": { replicationFactor: 1, @@ -369,6 +353,18 @@ func TestShuffleShardingStrategy(t *testing.T) { {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, }, }, + "one ACTIVE instance in the ring with RF = 1 SS = 1 and user disabled": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE, registeredAt) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: nil}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + isTenantDisabled: true, + }, "one ACTIVE instance in the ring with RF = 2 and SS = 1 (should still sync blocks on the only available instance)": { replicationFactor: 1, limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, @@ -647,15 +643,20 @@ func TestShuffleShardingStrategy(t *testing.T) { // Wait until the ring client has synced. require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + var allowedTenants *util.AllowedTenants + if testData.isTenantDisabled { + allowedTenants = util.NewAllowedTenants(nil, []string{userID}) + } + // Assert on filter users. for _, expected := range testData.expectedUsers { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) } // Assert on filter blocks. for _, expected := range testData.expectedBlocks { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), allowedTenants, zoneStableShuffleSharding) //nolint:govet synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) synced.WithLabelValues(shardExcludedMeta).Set(0) From 6abd800381c657979c46183b155fa5bc9ef9305c Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Tue, 7 Nov 2023 09:33:39 +0100 Subject: [PATCH 4/6] Update docs Signed-off-by: Friedrich Gonzalez --- docs/blocks-storage/store-gateway.md | 13 +++++++++++++ docs/configuration/config-file-reference.md | 13 +++++++++++++ 2 files changed, 26 insertions(+) diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 7162d34b93..7a4a5649de 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -331,6 +331,19 @@ store_gateway: # shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy [sharding_strategy: | default = "default"] + + # Comma separated list of tenants whose store metrics this storegateway can + # process. If specified, only these tenants will be handled by storegateway, + # otherwise this storegateway will be enabled for all the tenants in the + # store-gateway cluster. + # CLI flag: -store-gateway.enabled_tenants + [enabled_tenants: | default = ""] + + # Comma separated list of tenants whose store metrics this storegateway cannot + # process. If specified, a storegateway that would normally pick the specified + # tenant(s) for processing will ignore them instead. + # CLI flag: -store-gateway.disabled_tenants + [disabled_tenants: | default = ""] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 5103df9a38..bba506e152 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4912,6 +4912,19 @@ sharding_ring: # The sharding strategy to use. Supported values are: default, shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy [sharding_strategy: | default = "default"] + +# Comma separated list of tenants whose store metrics this storegateway can +# process. If specified, only these tenants will be handled by storegateway, +# otherwise this storegateway will be enabled for all the tenants in the +# store-gateway cluster. +# CLI flag: -store-gateway.enabled_tenants +[enabled_tenants: | default = ""] + +# Comma separated list of tenants whose store metrics this storegateway cannot +# process. If specified, a storegateway that would normally pick the specified +# tenant(s) for processing will ignore them instead. +# CLI flag: -store-gateway.disabled_tenants +[disabled_tenants: | default = ""] ``` ### `tracing_config` From 3f99d75e3751131545332d6dfd9716c6187d6cfa Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Tue, 7 Nov 2023 09:57:56 +0100 Subject: [PATCH 5/6] Update changelog Signed-off-by: Friedrich Gonzalez --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e25c9b55e..393747661d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 ## 1.16.0 in progress From 06665e44f0782e2a84dbbb0d3fdea964d3198159 Mon Sep 17 00:00:00 2001 From: Friedrich Gonzalez Date: Tue, 7 Nov 2023 17:25:00 +0100 Subject: [PATCH 6/6] Replace underscore for dash in flag Signed-off-by: Friedrich Gonzalez --- docs/blocks-storage/store-gateway.md | 4 ++-- docs/configuration/config-file-reference.md | 4 ++-- pkg/storegateway/gateway.go | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 7a4a5649de..a0ad2b1d3c 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -336,13 +336,13 @@ store_gateway: # process. If specified, only these tenants will be handled by storegateway, # otherwise this storegateway will be enabled for all the tenants in the # store-gateway cluster. - # CLI flag: -store-gateway.enabled_tenants + # CLI flag: -store-gateway.enabled-tenants [enabled_tenants: | default = ""] # Comma separated list of tenants whose store metrics this storegateway cannot # process. If specified, a storegateway that would normally pick the specified # tenant(s) for processing will ignore them instead. - # CLI flag: -store-gateway.disabled_tenants + # CLI flag: -store-gateway.disabled-tenants [disabled_tenants: | default = ""] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index bba506e152..fbcc159477 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4917,13 +4917,13 @@ sharding_ring: # process. If specified, only these tenants will be handled by storegateway, # otherwise this storegateway will be enabled for all the tenants in the # store-gateway cluster. -# CLI flag: -store-gateway.enabled_tenants +# CLI flag: -store-gateway.enabled-tenants [enabled_tenants: | default = ""] # Comma separated list of tenants whose store metrics this storegateway cannot # process. If specified, a storegateway that would normally pick the specified # tenant(s) for processing will ignore them instead. -# CLI flag: -store-gateway.disabled_tenants +# CLI flag: -store-gateway.disabled-tenants [disabled_tenants: | default = ""] ``` diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 7e6e3ffeba..5633974913 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -70,8 +70,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier) f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) - f.Var(&cfg.EnabledTenants, "store-gateway.enabled_tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") - f.Var(&cfg.DisabledTenants, "store-gateway.disabled_tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") + f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") + f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") } // Validate the Config.