Skip to content

Commit 1e229ce

Browse files
ac1214alvinlin123roystchiangalanprot
authored
Add shuffle-sharding for the compactor (#4433)
* Add metrics for remaining planned compactions Signed-off-by: Albert <[email protected]> * fix unit tests Signed-off-by: Albert <[email protected]> * Add shuffle sharding for compactor Signed-off-by: Albert <[email protected]> * update changelog Signed-off-by: Albert <[email protected]> * fix linting Signed-off-by: Albert <[email protected]> * Fix build errors Signed-off-by: Alvin Lin <[email protected]> * Fix up change log Signed-off-by: Alvin Lin <[email protected]> * Fix linting error Signed-off-by: Alvin Lin <[email protected]> * Remove use of nolint Signed-off-by: Alvin Lin <[email protected]> * Compactor.ownUser now determines whether the user is owned by a compactor via ring, instead of returning true if shuffle-sharding is enabled Signed-off-by: Roy Chiang <[email protected]> * fix bug where multiple compactors are trying to cleanup the same tenant at once, which results in dangling bucket index Signed-off-by: Roy Chiang <[email protected]> * set all remaining compation in one go, instead of slowly incrementing it as plans get generated Signed-off-by: Roy Chiang <[email protected]> * rename ownUser function for better readability Signed-off-by: Roy Chiang <[email protected]> * address comments Signed-off-by: Roy Chiang <[email protected]> * fixed rebase issues Signed-off-by: Roy Chiang <[email protected]> * fix tests Signed-off-by: Roy Chiang <[email protected]> Co-authored-by: Albert <[email protected]> Co-authored-by: Alvin Lin <[email protected]> Co-authored-by: Roy Chiang <[email protected]> Co-authored-by: Alan Protasio <[email protected]>
1 parent 34a541f commit 1e229ce

File tree

10 files changed

+546
-88
lines changed

10 files changed

+546
-88
lines changed

CHANGELOG.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels.
99
* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction
1010
* [FEATURE] Querier/Query-Frontend: Add `-querier.per-step-stats-enabled` and `-frontend.cache-queryable-samples-stats` configurations to enable query sample statistics
11+
* [FEATURE] Add shuffle sharding for the compactor #4433
1112

1213
## 1.12.0 in progress
1314

@@ -16,7 +17,6 @@
1617
* [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597
1718
* [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601
1819
* [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601
19-
* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4624
2020
* [ENHANCEMENT] Update Go version to 1.17.8. #4602 #4604 #4658
2121
* [ENHANCEMENT] Keep track of discarded samples due to bad relabel configuration in `cortex_discarded_samples_total`. #4503
2222
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571

docs/configuration/config-file-reference.md

+6
Original file line numberDiff line numberDiff line change
@@ -4267,6 +4267,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
42674267
# CLI flag: -compactor.blocks-retention-period
42684268
[compactor_blocks_retention_period: <duration> | default = 0s]
42694269
4270+
# The default tenant's shard size when the shuffle-sharding strategy is used by
4271+
# the compactor. When this setting is specified in the per-tenant overrides, a
4272+
# value of 0 disables shuffle sharding for the tenant.
4273+
# CLI flag: -compactor.tenant-shard-size
4274+
[compactor_tenant_shard_size: <int> | default = 0]
4275+
42704276
# S3 server-side encryption type. Required to enable server-side encryption
42714277
# overrides for a specific tenant. If not set, the default S3 client settings
42724278
# are used.

docs/guides/shuffle-sharding.md

+13
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ Cortex currently supports shuffle sharding in the following services:
5454
- [Query-frontend / Query-scheduler](#query-frontend-and-query-scheduler-shuffle-sharding)
5555
- [Store-gateway](#store-gateway-shuffle-sharding)
5656
- [Ruler](#ruler-shuffle-sharding)
57+
- [Compactor](#compactor-shuffle-sharding)
5758

5859
Shuffle sharding is **disabled by default** and needs to be explicitly enabled in the configuration.
5960

@@ -154,6 +155,18 @@ Cortex ruler can run in three modes:
154155

155156
Note that when using sharding strategy, each rule group is evaluated by single ruler only, there is no replication.
156157

158+
### Compactor shuffle sharding
159+
160+
Cortex compactor can run in three modes:
161+
162+
1. **No sharding at all.** This is the most basic mode of the compactor. It is activated by using `-compactor.sharding-enabled=false` (default). In this mode every compactor will run every compaction.
163+
2. **Default sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=default` (default). In this mode compactors register themselves into the ring. One single tenant will belong to only 1 compactor.
164+
3. **Shuffle sharding**, activated by using `-compactor.sharding-enabled=true` and `-compactor.sharding-strategy=shuffle-sharding`. Similarly to default sharding, but compactions for each tenant can be carried out on multiple compactors (`-compactor.tenant-shard-size`, can also be set per tenant as `compactor_tenant_shard_size` in overrides).
165+
166+
With shuffle sharding selected as the sharding strategy, a subset of the compactors will be used to handle a user based on the shard size.
167+
168+
The idea behind using the shuffle sharding strategy for the compactor is to further enable horizontal scalability and build tolerance for compactions that may take longer than the compaction interval.
169+
157170
## FAQ
158171

159172
### Does shuffle sharding add additional overhead to the KV store?

pkg/compactor/compactor.go

+67-11
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexproject/cortex/pkg/util/flagext"
3535
util_log "github.com/cortexproject/cortex/pkg/util/log"
3636
"github.com/cortexproject/cortex/pkg/util/services"
37+
"github.com/cortexproject/cortex/pkg/util/validation"
3738
)
3839

3940
const (
@@ -50,8 +51,9 @@ var (
5051

5152
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
5253
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
54+
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
5355

54-
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
56+
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper {
5557
return compact.NewDefaultGrouper(
5658
logger,
5759
bkt,
@@ -64,7 +66,7 @@ var (
6466
metadata.NoneFunc)
6567
}
6668

67-
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
69+
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper {
6870
return NewShuffleShardingGrouper(
6971
logger,
7072
bkt,
@@ -74,8 +76,13 @@ var (
7476
blocksMarkedForDeletion,
7577
blocksMarkedForNoCompaction,
7678
garbageCollectedBlocks,
79+
remainingPlannedCompactions,
7780
metadata.NoneFunc,
78-
cfg)
81+
cfg,
82+
ring,
83+
ringLifecycle.Addr,
84+
limits,
85+
userID)
7986
}
8087

8188
DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) {
@@ -115,6 +122,11 @@ type BlocksGrouperFactory func(
115122
blocksMarkedForDeletion prometheus.Counter,
116123
blocksMarkedForNoCompact prometheus.Counter,
117124
garbageCollectedBlocks prometheus.Counter,
125+
remainingPlannedCompactions prometheus.Gauge,
126+
ring *ring.Ring,
127+
ringLifecycler *ring.Lifecycler,
128+
limit Limits,
129+
userID string,
118130
) compact.Grouper
119131

120132
// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
@@ -131,6 +143,11 @@ type PlannerFactory func(
131143
noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter,
132144
) compact.Planner
133145

146+
// Limits defines limits used by the Compactor.
147+
type Limits interface {
148+
CompactorTenantShardSize(userID string) int
149+
}
150+
134151
// Config holds the Compactor config.
135152
type Config struct {
136153
BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"`
@@ -200,7 +217,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
200217
f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.")
201218
}
202219

203-
func (cfg *Config) Validate() error {
220+
func (cfg *Config) Validate(limits validation.Limits) error {
204221
// Each block range period should be divisible by the previous one.
205222
for i := 1; i < len(cfg.BlockRanges); i++ {
206223
if cfg.BlockRanges[i]%cfg.BlockRanges[i-1] != 0 {
@@ -213,6 +230,12 @@ func (cfg *Config) Validate() error {
213230
return errInvalidShardingStrategy
214231
}
215232

233+
if cfg.ShardingEnabled && cfg.ShardingStrategy == util.ShardingStrategyShuffle {
234+
if limits.CompactorTenantShardSize <= 0 {
235+
return errInvalidTenantShardSize
236+
}
237+
}
238+
216239
return nil
217240
}
218241

@@ -233,6 +256,7 @@ type Compactor struct {
233256
parentLogger log.Logger
234257
registerer prometheus.Registerer
235258
allowedTenants *util.AllowedTenants
259+
limits Limits
236260

237261
// Functions that creates bucket client, grouper, planner and compactor using the context.
238262
// Useful for injecting mock objects from tests.
@@ -273,13 +297,14 @@ type Compactor struct {
273297
blocksMarkedForDeletion prometheus.Counter
274298
blocksMarkedForNoCompaction prometheus.Counter
275299
garbageCollectedBlocks prometheus.Counter
300+
remainingPlannedCompactions prometheus.Gauge
276301

277302
// TSDB syncer metrics
278303
syncerMetrics *syncerMetrics
279304
}
280305

281306
// NewCompactor makes a new Compactor.
282-
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer) (*Compactor, error) {
307+
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider ConfigProvider, logger log.Logger, registerer prometheus.Registerer, limits Limits) (*Compactor, error) {
283308
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
284309
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
285310
}
@@ -302,7 +327,7 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
302327
}
303328
}
304329

305-
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory)
330+
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits)
306331
if err != nil {
307332
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
308333
}
@@ -319,7 +344,15 @@ func newCompactor(
319344
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
320345
blocksGrouperFactory BlocksGrouperFactory,
321346
blocksCompactorFactory BlocksCompactorFactory,
347+
limits Limits,
322348
) (*Compactor, error) {
349+
var remainingPlannedCompactions prometheus.Gauge
350+
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
351+
remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
352+
Name: "cortex_compactor_remaining_planned_compactions",
353+
Help: "Total number of plans that remain to be compacted. Only available with shuffle-sharding strategy",
354+
})
355+
}
323356
c := &Compactor{
324357
compactorCfg: compactorCfg,
325358
storageCfg: storageCfg,
@@ -382,6 +415,8 @@ func newCompactor(
382415
Name: "cortex_compactor_garbage_collected_blocks_total",
383416
Help: "Total number of blocks marked for deletion by compactor.",
384417
}),
418+
remainingPlannedCompactions: remainingPlannedCompactions,
419+
limits: limits,
385420
}
386421

387422
if len(compactorCfg.EnabledTenants) > 0 {
@@ -419,7 +454,7 @@ func (c *Compactor) starting(ctx context.Context) error {
419454
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)
420455

421456
// Create the users scanner.
422-
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUser, c.parentLogger)
457+
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUserForCleanUp, c.parentLogger)
423458

424459
// Create the blocks cleaner (service).
425460
c.blocksCleaner = NewBlocksCleaner(BlocksCleanerConfig{
@@ -573,7 +608,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
573608
}
574609

575610
// Ensure the user ID belongs to our shard.
576-
if owned, err := c.ownUser(userID); err != nil {
611+
if owned, err := c.ownUserForCompaction(userID); err != nil {
577612
c.compactionRunSkippedTenants.Inc()
578613
level.Warn(c.logger).Log("msg", "unable to check if user is owned by this shard", "user", userID, "err", err)
579614
continue
@@ -722,7 +757,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
722757
compactor, err := compact.NewBucketCompactor(
723758
ulogger,
724759
syncer,
725-
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks),
760+
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID),
726761
c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter),
727762
c.blocksCompactor,
728763
path.Join(c.compactorCfg.DataDir, "compact"),
@@ -775,16 +810,37 @@ func (c *Compactor) discoverUsers(ctx context.Context) ([]string, error) {
775810
return users, err
776811
}
777812

778-
func (c *Compactor) ownUser(userID string) (bool, error) {
813+
func (c *Compactor) ownUserForCompaction(userID string) (bool, error) {
814+
return c.ownUser(userID, false)
815+
}
816+
817+
func (c *Compactor) ownUserForCleanUp(userID string) (bool, error) {
818+
return c.ownUser(userID, true)
819+
}
820+
821+
func (c *Compactor) ownUser(userID string, isCleanUp bool) (bool, error) {
779822
if !c.allowedTenants.IsAllowed(userID) {
780823
return false, nil
781824
}
782825

783-
// Always owned if sharding is disabled.
826+
// Always owned if sharding is disabled
784827
if !c.compactorCfg.ShardingEnabled {
785828
return true, nil
786829
}
787830

831+
// If we aren't cleaning up user blocks, and we are using shuffle-sharding, ownership is determined by a subring
832+
// Cleanup should only be owned by a single compactor, as there could be race conditions during block deletion
833+
if !isCleanUp && c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle {
834+
subRing := c.ring.ShuffleShard(userID, c.limits.CompactorTenantShardSize(userID))
835+
836+
rs, err := subRing.GetAllHealthy(RingOp)
837+
if err != nil {
838+
return false, err
839+
}
840+
841+
return rs.Includes(c.ringLifecycler.Addr), nil
842+
}
843+
788844
// Hash the user ID.
789845
hasher := fnv.New32a()
790846
_, _ = hasher.Write([]byte(userID))

0 commit comments

Comments
 (0)