Skip to content

Add shuffle sharding grouper/planner (Clone of PR 4357) #4621

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597
* [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601
* [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621
* [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604
* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/compactor.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ compactor:
# CLI flag: -compactor.sharding-enabled
[sharding_enabled: <boolean> | default = false]

# The sharding strategy to use. Supported values are: default,
# shuffle-sharding.
# CLI flag: -compactor.sharding-strategy
[sharding_strategy: <string> | default = "default"]

sharding_ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5317,6 +5317,10 @@ The `compactor_config` configures the compactor for the blocks storage.
# CLI flag: -compactor.sharding-enabled
[sharding_enabled: <boolean> | default = false]

# The sharding strategy to use. Supported values are: default, shuffle-sharding.
# CLI flag: -compactor.sharding-strategy
[sharding_strategy: <string> | default = "default"]

sharding_ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
Expand Down
50 changes: 46 additions & 4 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ var (
errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s"
RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
Expand All @@ -61,6 +64,20 @@ var (
metadata.NoneFunc)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
return NewShuffleShardingGrouper(
logger,
bkt,
false, // Do not accept malformed indexes
true, // Enable vertical compaction
reg,
blocksMarkedForDeletion,
prometheus.NewCounter(prometheus.CounterOpts{}),
garbageCollectedBlocks,
metadata.NoneFunc,
cfg)
}

DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) {
compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
if err != nil {
Expand All @@ -70,6 +87,16 @@ var (
planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds())
return compactor, planner, nil
}

ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) {
compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil)
if err != nil {
return nil, nil, err
}

planner := NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds())
return compactor, planner, nil
}
)

// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
Expand Down Expand Up @@ -113,8 +140,9 @@ type Config struct {
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`

// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingStrategy string `yaml:"sharding_strategy"`
ShardingRing RingConfig `yaml:"sharding_ring"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
Expand Down Expand Up @@ -146,6 +174,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.")
f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+
"If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.")
Expand All @@ -164,6 +193,11 @@ func (cfg *Config) Validate() error {
}
}

// Make sure a valid sharding strategy is being used
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
return errInvalidShardingStrategy
}

return nil
}

Expand Down Expand Up @@ -235,12 +269,20 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi

blocksGrouperFactory := compactorCfg.BlocksGrouperFactory
if blocksGrouperFactory == nil {
blocksGrouperFactory = DefaultBlocksGrouperFactory
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
blocksGrouperFactory = ShuffleShardingGrouperFactory
} else {
blocksGrouperFactory = DefaultBlocksGrouperFactory
}
}

blocksCompactorFactory := compactorCfg.BlocksCompactorFactory
if blocksCompactorFactory == nil {
blocksCompactorFactory = DefaultBlocksCompactorFactory
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
blocksCompactorFactory = ShuffleShardingBlocksCompactorFactory
} else {
blocksCompactorFactory = DefaultBlocksCompactorFactory
}
}

cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory)
Expand Down
Loading