Skip to content

add/support enabled_tenants and disabled_tenants feature in storegateway #5186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
)
Expand Down Expand Up @@ -54,6 +55,9 @@ type Config struct {
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."`
ShardingStrategy string `yaml:"sharding_strategy"`

EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
}

// RegisterFlags registers the Config flags.
Expand All @@ -62,6 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier)
f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.Var(&cfg.EnabledTenants, "store-gateway.enabled_tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.")
f.Var(&cfg.DisabledTenants, "store-gateway.disabled_tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.")
}

// Validate the Config.
Expand Down Expand Up @@ -99,6 +105,8 @@ type StoreGateway struct {
subservicesWatcher *services.FailureWatcher

bucketSync *prometheus.CounterVec

allowedTenants *util.AllowedTenants
}

func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) {
Expand Down Expand Up @@ -135,13 +143,21 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
Name: "cortex_storegateway_bucket_sync_total",
Help: "Total number of times the bucket sync operation triggered.",
}, []string{"reason"}),
allowedTenants: util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants),
}

// Init metrics.
g.bucketSync.WithLabelValues(syncReasonInitial)
g.bucketSync.WithLabelValues(syncReasonPeriodic)
g.bucketSync.WithLabelValues(syncReasonRingChange)

if len(gatewayCfg.EnabledTenants) > 0 {
level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", "))
}
if len(gatewayCfg.DisabledTenants) > 0 {
level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", "))
}

// Init sharding strategy.
var shardingStrategy ShardingStrategy

Expand Down
36 changes: 27 additions & 9 deletions pkg/storegateway/sharding_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli
// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
// Not go-routine safe.
type DefaultShardingStrategy struct {
r *ring.Ring
instanceAddr string
logger log.Logger
r *ring.Ring
instanceAddr string
logger log.Logger
allowedTenants *util.AllowedTenants
}

// NewDefaultShardingStrategy creates DefaultShardingStrategy.
Expand All @@ -70,7 +71,17 @@ func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Lo

// FilterUsers implements ShardingStrategy.
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
return userIDs
filteredUserIDs := []string{}
for _, userID := range userIDs {
if !s.allowedTenants.IsAllowed(userID) {
level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
continue
}

filteredUserIDs = append(filteredUserIDs, userID)
}

return filteredUserIDs
}

// FilterBlocks implements ShardingStrategy.
Expand All @@ -82,11 +93,12 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta
// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
// where each tenant blocks are sharded across a subset of store-gateway instances.
type ShuffleShardingStrategy struct {
r *ring.Ring
instanceID string
instanceAddr string
limits ShardingLimits
logger log.Logger
r *ring.Ring
instanceID string
instanceAddr string
limits ShardingLimits
logger log.Logger
allowedTenants *util.AllowedTenants
}

// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
Expand All @@ -107,6 +119,12 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin
for _, userID := range userIDs {
subRing := GetShuffleShardingSubring(s.r, userID, s.limits)

//filter out users not owned by this shard.
if !s.allowedTenants.IsAllowed(userID) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need the GetShuffleShardingSubring on the filtered users, otherwise we might end up with store-gateways running without users

level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
continue
}

// Include the user only if it belongs to this store-gateway shard.
if subRing.HasInstance(s.instanceID) {
filteredIDs = append(filteredIDs, userID)
Expand Down
18 changes: 18 additions & 0 deletions pkg/storegateway/sharding_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func TestDefaultShardingStrategy(t *testing.T) {
zoneAwarenessEnabled bool
setupRing func(*ring.Desc)
expectedBlocks map[string][]ulid.ULID
enableTenants []string
disableTenants []string
allowedTenants func(*DefaultShardingStrategy)
}{
"one ACTIVE instance in the ring with replication factor = 1": {
replicationFactor: 1,
Expand All @@ -64,6 +67,21 @@ func TestDefaultShardingStrategy(t *testing.T) {
"127.0.0.2": {block2, block4},
},
},
"two ACTIVE instances in the ring with replication factor = 1 and one tenant disabled": {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, this function does not test FilterUsers, it tests FilterBlocks. So this is not the right place

replicationFactor: 1,
setupRing: func(r *ring.Desc) {
r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE, registeredAt)
r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE, registeredAt)
},
allowedTenants: func(r *DefaultShardingStrategy) {
userIds:= []string{"u-1","u-2"}
r.FilterUsers(context.Background(), userIds)
},
expectedBlocks: map[string][]ulid.ULID{
"127.0.0.1": {block1, block3}, // Tenant 1 blocks expected
"127.0.0.2": {}, // Tenant 2 blocks disabled
},
},
"one ACTIVE instance in the ring with replication factor = 2": {
replicationFactor: 2,
setupRing: func(r *ring.Desc) {
Expand Down