From 89fb6082a88d90c2f930855c1be2a4e99b51202e Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 28 Nov 2023 15:41:18 -0800 Subject: [PATCH 1/4] Add additional metrics to record compactor start and stop duration in seconds Signed-off-by: Alex Le --- pkg/compactor/compactor.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5473c3d6a7..19f34aeccd 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -321,6 +321,8 @@ type Compactor struct { ringSubservicesWatcher *services.FailureWatcher // Metrics. + CompactorStartDurationSeconds prometheus.Gauge + CompactorStopDurationSeconds prometheus.Gauge compactionRunsStarted prometheus.Counter compactionRunsInterrupted prometheus.Counter compactionRunsCompleted prometheus.Counter @@ -403,6 +405,14 @@ func newCompactor( blocksCompactorFactory: blocksCompactorFactory, allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), + CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_start_duration_seconds", + Help: "Time in seconds spent by compactor running start function", + }), + CompactorStopDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_stop_duration_seconds", + Help: "Time in seconds spent by compactor running stop function", + }), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", Help: "Total number of compaction runs started.", @@ -485,6 +495,11 @@ func newCompactor( // Start the compactor. func (c *Compactor) starting(ctx context.Context) error { + begin := time.Now() + defer func() { + c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds()) + }() + var err error // Create bucket client. @@ -581,6 +596,11 @@ func (c *Compactor) starting(ctx context.Context) error { } func (c *Compactor) stopping(_ error) error { + begin := time.Now() + defer func() { + c.CompactorStopDurationSeconds.Set(time.Since(begin).Seconds()) + }() + ctx := context.Background() services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck From 4d081dbbc7f66bdb62f804d2e3014acc1134b55b Mon Sep 17 00:00:00 2001 From: Alex Le Date: Tue, 28 Nov 2023 15:45:42 -0800 Subject: [PATCH 2/4] update CHANGELOG Signed-off-by: Alex Le --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index efcf471713..c38cf7c12e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 +* [ENHANCEMENT] Compactor: Add compactor start and stop duration in seconds metrics. #5683 ## 1.16.0 2023-11-20 From 07a1ede6dcac906d81b973e2cedb6512c821c55c Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 29 Nov 2023 13:46:00 -0800 Subject: [PATCH 3/4] removed stop duration metric replaced it by log Signed-off-by: Alex Le --- CHANGELOG.md | 2 +- pkg/compactor/compactor.go | 8 ++------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c38cf7c12e..dd788f6e01 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 -* [ENHANCEMENT] Compactor: Add compactor start and stop duration in seconds metrics. #5683 +* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 ## 1.16.0 2023-11-20 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 19f34aeccd..06f242103a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -322,7 +322,6 @@ type Compactor struct { // Metrics. CompactorStartDurationSeconds prometheus.Gauge - CompactorStopDurationSeconds prometheus.Gauge compactionRunsStarted prometheus.Counter compactionRunsInterrupted prometheus.Counter compactionRunsCompleted prometheus.Counter @@ -409,10 +408,6 @@ func newCompactor( Name: "cortex_compactor_start_duration_seconds", Help: "Time in seconds spent by compactor running start function", }), - CompactorStopDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_compactor_stop_duration_seconds", - Help: "Time in seconds spent by compactor running stop function", - }), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", Help: "Total number of compaction runs started.", @@ -498,6 +493,7 @@ func (c *Compactor) starting(ctx context.Context) error { begin := time.Now() defer func() { c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds()) + level.Info(c.logger).Log("msg", "compactor started", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) }() var err error @@ -598,7 +594,7 @@ func (c *Compactor) starting(ctx context.Context) error { func (c *Compactor) stopping(_ error) error { begin := time.Now() defer func() { - c.CompactorStopDurationSeconds.Set(time.Since(begin).Seconds()) + level.Info(c.logger).Log("msg", "compactor stopped", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) }() ctx := context.Background() From e16ff32611b15a37ac8c6b112d02d9aaadc6f158 Mon Sep 17 00:00:00 2001 From: Alex Le Date: Wed, 29 Nov 2023 15:07:13 -0800 Subject: [PATCH 4/4] fix test Signed-off-by: Alex Le --- pkg/compactor/compactor_test.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 36ae05bda9..f149f89a1d 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -213,9 +213,11 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { assert.Equal(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=0`, - }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + `level=info component=compactor msg="compactor stopped"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -364,9 +366,11 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket assert.Equal(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, `level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`, - }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + `level=info component=compactor msg="compactor stopped"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -656,6 +660,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -670,6 +675,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { `level=info component=compactor org_id=user-2 msg="start of compactions"`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -788,6 +794,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { `level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`, `level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -796,6 +803,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { `level=info component=compactor org_id=user-1 msg="start of compactions"`, `level=info component=compactor org_id=user-1 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-1`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -978,9 +986,11 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) `level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`, `level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, `level=debug component=compactor msg="skipping user because it is marked for deletion" user=user-1`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -1168,6 +1178,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -1182,6 +1193,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni `level=info component=compactor org_id=user-2 msg="start of compactions"`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) } @@ -1587,6 +1599,7 @@ func removeIgnoredLogs(input []string) []string { out := make([]string, 0, len(input)) durationRe := regexp.MustCompile(`\s?duration=\S+`) + durationMsRe := regexp.MustCompile(`\s?duration_ms=\S+`) for i := 0; i < len(input); i++ { log := input[i] @@ -1600,6 +1613,7 @@ func removeIgnoredLogs(input []string) []string { // Remove any duration from logs. log = durationRe.ReplaceAllString(log, "") + log = durationMsRe.ReplaceAllString(log, "") out = append(out, log) } @@ -1929,6 +1943,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { assert.Equal(t, context.DeadlineExceeded, err) assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))