diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 106535ff2f..3b5f0600b3 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -23,6 +23,7 @@ import ( cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -54,6 +55,9 @@ type Config struct { ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."` ShardingStrategy string `yaml:"sharding_strategy"` + + EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` + DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` } // RegisterFlags registers the Config flags. @@ -62,6 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier) f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) + f.Var(&cfg.EnabledTenants, "store-gateway.enabled_tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.") + f.Var(&cfg.DisabledTenants, "store-gateway.disabled_tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.") } // Validate the Config. @@ -99,6 +105,8 @@ type StoreGateway struct { subservicesWatcher *services.FailureWatcher bucketSync *prometheus.CounterVec + + allowedTenants *util.AllowedTenants } func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { @@ -135,6 +143,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf Name: "cortex_storegateway_bucket_sync_total", Help: "Total number of times the bucket sync operation triggered.", }, []string{"reason"}), + allowedTenants: util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants), } // Init metrics. @@ -142,6 +151,13 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf g.bucketSync.WithLabelValues(syncReasonPeriodic) g.bucketSync.WithLabelValues(syncReasonRingChange) + if len(gatewayCfg.EnabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", ")) + } + if len(gatewayCfg.DisabledTenants) > 0 { + level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", ")) + } + // Init sharding strategy. var shardingStrategy ShardingStrategy diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index b7a2852ea7..e9081282fe 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -54,9 +54,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli // DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. // Not go-routine safe. type DefaultShardingStrategy struct { - r *ring.Ring - instanceAddr string - logger log.Logger + r *ring.Ring + instanceAddr string + logger log.Logger + allowedTenants *util.AllowedTenants } // NewDefaultShardingStrategy creates DefaultShardingStrategy. @@ -70,7 +71,17 @@ func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Lo // FilterUsers implements ShardingStrategy. func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { - return userIDs + filteredUserIDs := []string{} + for _, userID := range userIDs { + if !s.allowedTenants.IsAllowed(userID) { + level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + + filteredUserIDs = append(filteredUserIDs, userID) + } + + return filteredUserIDs } // FilterBlocks implements ShardingStrategy. @@ -82,11 +93,12 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta // ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, // where each tenant blocks are sharded across a subset of store-gateway instances. type ShuffleShardingStrategy struct { - r *ring.Ring - instanceID string - instanceAddr string - limits ShardingLimits - logger log.Logger + r *ring.Ring + instanceID string + instanceAddr string + limits ShardingLimits + logger log.Logger + allowedTenants *util.AllowedTenants } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. @@ -107,6 +119,12 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin for _, userID := range userIDs { subRing := GetShuffleShardingSubring(s.r, userID, s.limits) + //filter out users not owned by this shard. + if !s.allowedTenants.IsAllowed(userID) { + level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID) + continue + } + // Include the user only if it belongs to this store-gateway shard. if subRing.HasInstance(s.instanceID) { filteredIDs = append(filteredIDs, userID) diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 869f15350c..03cd291695 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -42,6 +42,9 @@ func TestDefaultShardingStrategy(t *testing.T) { zoneAwarenessEnabled bool setupRing func(*ring.Desc) expectedBlocks map[string][]ulid.ULID + enableTenants []string + disableTenants []string + allowedTenants func(*DefaultShardingStrategy) }{ "one ACTIVE instance in the ring with replication factor = 1": { replicationFactor: 1, @@ -64,6 +67,21 @@ func TestDefaultShardingStrategy(t *testing.T) { "127.0.0.2": {block2, block4}, }, }, + "two ACTIVE instances in the ring with replication factor = 1 and one tenant disabled": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt) + }, + allowedTenants: func(r *DefaultShardingStrategy) { + userIds:= []string{"u-1","u-2"} + r.FilterUsers(context.Background(), userIds) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3}, // Tenant 1 blocks expected + "127.0.0.2": {}, // Tenant 2 blocks disabled + }, + }, "one ACTIVE instance in the ring with replication factor = 2": { replicationFactor: 2, setupRing: func(r *ring.Desc) {