diff --git a/CHANGELOG.md b/CHANGELOG.md index 664aaace31..71ed903189 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 +* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 ## 1.16.0 2023-11-20 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 5473c3d6a7..06f242103a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -321,6 +321,7 @@ type Compactor struct { ringSubservicesWatcher *services.FailureWatcher // Metrics. + CompactorStartDurationSeconds prometheus.Gauge compactionRunsStarted prometheus.Counter compactionRunsInterrupted prometheus.Counter compactionRunsCompleted prometheus.Counter @@ -403,6 +404,10 @@ func newCompactor( blocksCompactorFactory: blocksCompactorFactory, allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants), + CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_compactor_start_duration_seconds", + Help: "Time in seconds spent by compactor running start function", + }), compactionRunsStarted: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_compactor_runs_started_total", Help: "Total number of compaction runs started.", @@ -485,6 +490,12 @@ func newCompactor( // Start the compactor. func (c *Compactor) starting(ctx context.Context) error { + begin := time.Now() + defer func() { + c.CompactorStartDurationSeconds.Set(time.Since(begin).Seconds()) + level.Info(c.logger).Log("msg", "compactor started", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + }() + var err error // Create bucket client. @@ -581,6 +592,11 @@ func (c *Compactor) starting(ctx context.Context) error { } func (c *Compactor) stopping(_ error) error { + begin := time.Now() + defer func() { + level.Info(c.logger).Log("msg", "compactor stopped", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds()) + }() + ctx := context.Background() services.StopAndAwaitTerminated(ctx, c.blocksCleaner) //nolint:errcheck diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c2cf41df82..166826932e 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -214,9 +214,11 @@ func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { assert.Equal(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=0`, - }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + `level=info component=compactor msg="compactor stopped"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -365,9 +367,11 @@ func TestCompactor_ShouldRetryCompactionOnFailureWhileDiscoveringUsersFromBucket assert.Equal(t, []string{ `level=info component=cleaner msg="started blocks cleanup and maintenance"`, `level=error component=cleaner msg="failed to run blocks cleanup and maintenance" err="failed to discover users from bucket: failed to iterate the bucket"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=error component=compactor msg="failed to discover users from bucket" err="failed to iterate the bucket"`, - }, strings.Split(strings.TrimSpace(logs.String()), "\n")) + `level=info component=compactor msg="compactor stopped"`, + }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` # TYPE cortex_compactor_runs_started_total counter @@ -661,6 +665,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -675,6 +680,7 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { `level=info component=compactor org_id=user-2 msg="start of compactions"`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -794,6 +800,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { `level=info component=cleaner org_id=user-1 msg="deleted block marked for deletion" block=01DTW0ZCPDDNV4BV83Q2SV4QAZ`, `level=info component=cleaner org_id=user-1 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -802,6 +809,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { `level=info component=compactor org_id=user-1 msg="start of compactions"`, `level=info component=compactor org_id=user-1 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-1`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -986,9 +994,11 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) `level=info component=cleaner org_id=user-1 msg="deleted blocks for tenant marked for deletion" deletedBlocks=1`, `level=info component=cleaner org_id=user-1 msg="updating finished time in tenant deletion mark"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=1`, `level=debug component=compactor msg="skipping user because it is marked for deletion" user=user-1`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) // Instead of testing for shipper metrics, we only check our metrics here. @@ -1178,6 +1188,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni `level=info component=cleaner org_id=user-2 msg="started blocks cleanup and maintenance"`, `level=info component=cleaner org_id=user-2 msg="completed blocks cleanup and maintenance"`, `level=info component=cleaner msg="successfully completed blocks cleanup and maintenance"`, + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="discovering users from bucket"`, `level=info component=compactor msg="discovered users from bucket" users=2`, `level=info component=compactor msg="starting compaction of user blocks" user=user-1`, @@ -1192,6 +1203,7 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni `level=info component=compactor org_id=user-2 msg="start of compactions"`, `level=info component=compactor org_id=user-2 msg="compaction iterations done"`, `level=info component=compactor msg="successfully compacted user blocks" user=user-2`, + `level=info component=compactor msg="compactor stopped"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n"))) } @@ -1599,6 +1611,7 @@ func removeIgnoredLogs(input []string) []string { out := make([]string, 0, len(input)) durationRe := regexp.MustCompile(`\s?duration=\S+`) + durationMsRe := regexp.MustCompile(`\s?duration_ms=\S+`) for i := 0; i < len(input); i++ { log := input[i] @@ -1612,6 +1625,7 @@ func removeIgnoredLogs(input []string) []string { // Remove any duration from logs. log = durationRe.ReplaceAllString(log, "") + log = durationMsRe.ReplaceAllString(log, "") out = append(out, log) } @@ -1941,6 +1955,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) { assert.Equal(t, context.DeadlineExceeded, err) assert.ElementsMatch(t, []string{ + `level=info component=compactor msg="compactor started"`, `level=info component=compactor msg="waiting until compactor is ACTIVE in the ring"`, `level=error component=compactor msg="compactor failed to become ACTIVE in the ring" err="context deadline exceeded"`, }, removeIgnoredLogs(strings.Split(strings.TrimSpace(logs.String()), "\n")))