From b52735e89366376eb595d7221cdf9fdf1a5e57fd Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 9 Jul 2021 19:30:53 -0700 Subject: [PATCH 1/4] update CHANGELOG.md Signed-off-by: Albert --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c4d8c35b3..23eca54411 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486 * [ENHANCEMENT] Blocks storage: Add `-blocks-storage.azure.http.*`, `-alertmanager-storage.azure.http.*`, and `-ruler-storage.azure.http.*` to configure the Azure storage client. #4581 * [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #4601 +* [ENHANCEMENT] Add a metric `cortex_compactor_remaining_planned_compactions` which will track the remaining planned compactions #4432 * [BUGFIX] AlertManager: remove stale template files. #4495 * [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582 * [BUGFIX] Update Thanos dependency: compactor tracing support, azure blocks storage memory fix. #4585 From 026c09a4fd5a44752dd8fa3a050951af4375422b Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 16:46:48 -0700 Subject: [PATCH 2/4] Add metrics for remaining planned compactions Signed-off-by: Albert --- pkg/compactor/compactor.go | 17 ++++++++++++++--- pkg/compactor/shuffle_sharding_grouper.go | 5 +++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 2661123fa1..66d37f9698 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,7 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -64,7 +64,7 @@ var ( metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -74,6 +74,7 @@ var ( blocksMarkedForDeletion, prometheus.NewCounter(prometheus.CounterOpts{}), garbageCollectedBlocks, + remainingPlannedCompactions, metadata.NoneFunc, cfg) } @@ -108,6 +109,7 @@ type BlocksGrouperFactory func( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + remainingPlannedCompactions prometheus.Gauge, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -256,6 +258,7 @@ type Compactor struct { compactionRunInterval prometheus.Gauge blocksMarkedForDeletion prometheus.Counter garbageCollectedBlocks prometheus.Counter + remainingPlannedCompactions prometheus.Gauge // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -303,6 +306,13 @@ func newCompactor( blocksGrouperFactory BlocksGrouperFactory, blocksCompactorFactory BlocksCompactorFactory, ) (*Compactor, error) { + var remainingPlannedCompactions prometheus.Gauge + if compactorCfg.ShardingStrategy == "shuffle-sharding" { + remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted.", + }) + } c := &Compactor{ compactorCfg: compactorCfg, storageCfg: storageCfg, @@ -361,6 +371,7 @@ func newCompactor( Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", }), + remainingPlannedCompactions: remainingPlannedCompactions, } if len(compactorCfg.EnabledTenants) > 0 { @@ -696,7 +707,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions), c.blocksPlanner, c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 464bc83ff0..119113c2c2 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -46,6 +46,7 @@ func NewShuffleShardingGrouper( blocksMarkedForDeletion prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + remainingPlannedCompactions prometheus.Gauge, hashFunc metadata.HashFunc, compactorCfg Config, ) *ShuffleShardingGrouper { @@ -103,6 +104,9 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re var outGroups []*compact.Group i := 0 + // Metrics for the remaining planned compactions + g.remainingPlannedCompactions.Set(0) + for _, mainBlocks := range mainGroups { for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { // Nothing to do if we don't have at least 2 blocks. @@ -113,6 +117,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + g.remainingPlannedCompactions.Inc() groupKey := fmt.Sprintf("%v%d", groupHash, i) i++ From d3697c8d1fa82204929ef43f6b8c56d8ac35c8ff Mon Sep 17 00:00:00 2001 From: Albert Date: Wed, 18 Aug 2021 18:33:15 -0700 Subject: [PATCH 3/4] fix unit tests Signed-off-by: Albert --- pkg/compactor/shuffle_sharding_grouper_test.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index e54a849be5..dfa1363970 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -6,6 +6,7 @@ import ( "github.com/oklog/ulid" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -142,14 +143,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { BlockRanges: testData.ranges, } + registerer := prometheus.NewRegistry() + remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_remaining_planned_compactions", + Help: "Total number of plans that remain to be compacted.", + }) + g := NewShuffleShardingGrouper(nil, nil, false, // Do not accept malformed indexes true, // Enable vertical compaction - prometheus.NewRegistry(), - nil, + registerer, nil, nil, + remainingPlannedCompactions, metadata.NoneFunc, *compactorCfg) actual, err := g.Groups(testData.blocks) From 0f9a910a5b5f7d85d528d3aceb817c1d885d3fc9 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 17:29:01 -0800 Subject: [PATCH 4/4] fix merging error Signed-off-by: Alvin Lin --- pkg/compactor/shuffle_sharding_grouper.go | 50 ++++++++++++----------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 119113c2c2..eac021e2ee 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -20,21 +20,22 @@ import ( ) type ShuffleShardingGrouper struct { - logger log.Logger - bkt objstore.Bucket - acceptMalformedIndex bool - enableVerticalCompaction bool - reg prometheus.Registerer - blocksMarkedForDeletion prometheus.Counter - blocksMarkedForNoCompact prometheus.Counter - garbageCollectedBlocks prometheus.Counter - hashFunc metadata.HashFunc - compactions *prometheus.CounterVec - compactionRunsStarted *prometheus.CounterVec - compactionRunsCompleted *prometheus.CounterVec - compactionFailures *prometheus.CounterVec - verticalCompactions *prometheus.CounterVec - compactorCfg Config + logger log.Logger + bkt objstore.Bucket + acceptMalformedIndex bool + enableVerticalCompaction bool + reg prometheus.Registerer + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + garbageCollectedBlocks prometheus.Counter + remainingPlannedCompactions prometheus.Gauge + hashFunc metadata.HashFunc + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + compactorCfg Config } func NewShuffleShardingGrouper( @@ -55,15 +56,16 @@ func NewShuffleShardingGrouper( } return &ShuffleShardingGrouper{ - logger: logger, - bkt: bkt, - acceptMalformedIndex: acceptMalformedIndex, - enableVerticalCompaction: enableVerticalCompaction, - reg: reg, - blocksMarkedForDeletion: blocksMarkedForDeletion, - blocksMarkedForNoCompact: blocksMarkedForNoCompact, - garbageCollectedBlocks: garbageCollectedBlocks, - hashFunc: hashFunc, + logger: logger, + bkt: bkt, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + reg: reg, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + remainingPlannedCompactions: remainingPlannedCompactions, + hashFunc: hashFunc, // Metrics are copied from Thanos DefaultGrouper constructor compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_compactions_total",