Skip to content

Add metrics for shuffle sharding #4432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486
* [ENHANCEMENT] Blocks storage: Add `-blocks-storage.azure.http.*`, `-alertmanager-storage.azure.http.*`, and `-ruler-storage.azure.http.*` to configure the Azure storage client. #4581
* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #4601
* [ENHANCEMENT] Add a metric `cortex_compactor_remaining_planned_compactions` which will track the remaining planned compactions #4432
* [BUGFIX] AlertManager: remove stale template files. #4495
* [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582
* [BUGFIX] Update Thanos dependency: compactor tracing support, azure blocks storage memory fix. #4585
Expand Down
17 changes: 14 additions & 3 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}
errInvalidShardingStrategy = errors.New("invalid sharding strategy")

DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge) compact.Grouper {
return compact.NewDefaultGrouper(
logger,
bkt,
Expand All @@ -64,7 +64,7 @@ var (
metadata.NoneFunc)
}

ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper {
ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge) compact.Grouper {
return NewShuffleShardingGrouper(
logger,
bkt,
Expand All @@ -74,6 +74,7 @@ var (
blocksMarkedForDeletion,
prometheus.NewCounter(prometheus.CounterOpts{}),
garbageCollectedBlocks,
remainingPlannedCompactions,
metadata.NoneFunc,
cfg)
}
Expand Down Expand Up @@ -108,6 +109,7 @@ type BlocksGrouperFactory func(
reg prometheus.Registerer,
blocksMarkedForDeletion prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
remainingPlannedCompactions prometheus.Gauge,
) compact.Grouper

// BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks.
Expand Down Expand Up @@ -256,6 +258,7 @@ type Compactor struct {
compactionRunInterval prometheus.Gauge
blocksMarkedForDeletion prometheus.Counter
garbageCollectedBlocks prometheus.Counter
remainingPlannedCompactions prometheus.Gauge

// TSDB syncer metrics
syncerMetrics *syncerMetrics
Expand Down Expand Up @@ -303,6 +306,13 @@ func newCompactor(
blocksGrouperFactory BlocksGrouperFactory,
blocksCompactorFactory BlocksCompactorFactory,
) (*Compactor, error) {
var remainingPlannedCompactions prometheus.Gauge
if compactorCfg.ShardingStrategy == "shuffle-sharding" {
remainingPlannedCompactions = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted.",
})
}
c := &Compactor{
compactorCfg: compactorCfg,
storageCfg: storageCfg,
Expand Down Expand Up @@ -361,6 +371,7 @@ func newCompactor(
Name: "cortex_compactor_garbage_collected_blocks_total",
Help: "Total number of blocks marked for deletion by compactor.",
}),
remainingPlannedCompactions: remainingPlannedCompactions,
}

if len(compactorCfg.EnabledTenants) > 0 {
Expand Down Expand Up @@ -696,7 +707,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
compactor, err := compact.NewBucketCompactor(
ulogger,
syncer,
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks),
c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.remainingPlannedCompactions),
c.blocksPlanner,
c.blocksCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
Expand Down
55 changes: 31 additions & 24 deletions pkg/compactor/shuffle_sharding_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,22 @@ import (
)

type ShuffleShardingGrouper struct {
logger log.Logger
bkt objstore.Bucket
acceptMalformedIndex bool
enableVerticalCompaction bool
reg prometheus.Registerer
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
garbageCollectedBlocks prometheus.Counter
hashFunc metadata.HashFunc
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
compactorCfg Config
logger log.Logger
bkt objstore.Bucket
acceptMalformedIndex bool
enableVerticalCompaction bool
reg prometheus.Registerer
blocksMarkedForDeletion prometheus.Counter
blocksMarkedForNoCompact prometheus.Counter
garbageCollectedBlocks prometheus.Counter
remainingPlannedCompactions prometheus.Gauge
hashFunc metadata.HashFunc
compactions *prometheus.CounterVec
compactionRunsStarted *prometheus.CounterVec
compactionRunsCompleted *prometheus.CounterVec
compactionFailures *prometheus.CounterVec
verticalCompactions *prometheus.CounterVec
compactorCfg Config
}

func NewShuffleShardingGrouper(
Expand All @@ -46,6 +47,7 @@ func NewShuffleShardingGrouper(
blocksMarkedForDeletion prometheus.Counter,
blocksMarkedForNoCompact prometheus.Counter,
garbageCollectedBlocks prometheus.Counter,
remainingPlannedCompactions prometheus.Gauge,
hashFunc metadata.HashFunc,
compactorCfg Config,
) *ShuffleShardingGrouper {
Expand All @@ -54,15 +56,16 @@ func NewShuffleShardingGrouper(
}

return &ShuffleShardingGrouper{
logger: logger,
bkt: bkt,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
reg: reg,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
hashFunc: hashFunc,
logger: logger,
bkt: bkt,
acceptMalformedIndex: acceptMalformedIndex,
enableVerticalCompaction: enableVerticalCompaction,
reg: reg,
blocksMarkedForDeletion: blocksMarkedForDeletion,
blocksMarkedForNoCompact: blocksMarkedForNoCompact,
garbageCollectedBlocks: garbageCollectedBlocks,
remainingPlannedCompactions: remainingPlannedCompactions,
hashFunc: hashFunc,
// Metrics are copied from Thanos DefaultGrouper constructor
compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "thanos_compact_group_compactions_total",
Expand Down Expand Up @@ -103,6 +106,9 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
var outGroups []*compact.Group

i := 0
// Metrics for the remaining planned compactions
g.remainingPlannedCompactions.Set(0)

for _, mainBlocks := range mainGroups {
for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) {
// Nothing to do if we don't have at least 2 blocks.
Expand All @@ -113,6 +119,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
// TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group
groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd)

g.remainingPlannedCompactions.Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the metric is scraped in the middle of this function, it looks like it will have some value between 0 and the final value. Is that useful?
Would it be better to count up then Set() once?

groupKey := fmt.Sprintf("%v%d", groupHash, i)
i++

Expand Down
11 changes: 9 additions & 2 deletions pkg/compactor/shuffle_sharding_grouper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -142,14 +143,20 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
BlockRanges: testData.ranges,
}

registerer := prometheus.NewRegistry()
remainingPlannedCompactions := promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Name: "cortex_compactor_remaining_planned_compactions",
Help: "Total number of plans that remain to be compacted.",
})

g := NewShuffleShardingGrouper(nil,
nil,
false, // Do not accept malformed indexes
true, // Enable vertical compaction
prometheus.NewRegistry(),
nil,
registerer,
nil,
nil,
remainingPlannedCompactions,
metadata.NoneFunc,
*compactorCfg)
actual, err := g.Groups(testData.blocks)
Expand Down