From ae784219a0c5fb0c280a05dd23731c06506eae31 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 5 Apr 2022 18:29:32 -0700 Subject: [PATCH 1/4] Introduce SkipBlocksWithOutOfOrderChunksEnabled feature --- docs/blocks-storage/compactor.md | 5 ++++ docs/configuration/config-file-reference.md | 5 ++++ pkg/compactor/compactor.go | 28 +++++++++++---------- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index d52f88bb4e..3d3bb0159f 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -147,6 +147,11 @@ compactor: # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] + # When enabled, mark blocks containing index with out-of-order chunks for no + # compact instead of halting the compaction. + # CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled + [skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers # global location too. This option can (and should) be safely disabled as soon diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 7407c7c922..e6f3c5ea7a 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5294,6 +5294,11 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.tenant-cleanup-delay [tenant_cleanup_delay: | default = 6h] +# When enabled, mark blocks containing index with out-of-order chunks for no +# compact instead of halting the compaction. +# CLI flag: -compactor.skip-blocks-with-out-of-order-chunks-enabled +[skip_blocks_with_out_of_order_chunks_enabled: | default = false] + # When enabled, at compactor startup the bucket will be scanned and all found # deletion marks inside the block location will be copied to the markers global # location too. This option can (and should) be safely disabled as soon as the diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 2661123fa1..99879d7230 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -120,18 +120,19 @@ type BlocksCompactorFactory func( // Config holds the Compactor config. type Config struct { - BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` - BlockSyncConcurrency int `yaml:"block_sync_concurrency"` - MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` - ConsistencyDelay time.Duration `yaml:"consistency_delay"` - DataDir string `yaml:"data_dir"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - CompactionRetries int `yaml:"compaction_retries"` - CompactionConcurrency int `yaml:"compaction_concurrency"` - CleanupInterval time.Duration `yaml:"cleanup_interval"` - CleanupConcurrency int `yaml:"cleanup_concurrency"` - DeletionDelay time.Duration `yaml:"deletion_delay"` - TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + BlockRanges cortex_tsdb.DurationList `yaml:"block_ranges"` + BlockSyncConcurrency int `yaml:"block_sync_concurrency"` + MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` + ConsistencyDelay time.Duration `yaml:"consistency_delay"` + DataDir string `yaml:"data_dir"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + CompactionRetries int `yaml:"compaction_retries"` + CompactionConcurrency int `yaml:"compaction_concurrency"` + CleanupInterval time.Duration `yaml:"cleanup_interval"` + CleanupConcurrency int `yaml:"cleanup_concurrency"` + DeletionDelay time.Duration `yaml:"deletion_delay"` + TenantCleanupDelay time.Duration `yaml:"tenant_cleanup_delay"` + SkipBlocksWithOutOfOrderChunksEnabled bool `yaml:"skip_blocks_with_out_of_order_chunks_enabled"` // Whether the migration of block deletion marks to the global markers location is enabled. BlockDeletionMarksMigrationEnabled bool `yaml:"block_deletion_marks_migration_enabled"` @@ -180,6 +181,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", false, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.SkipBlocksWithOutOfOrderChunksEnabled, "compactor.skip-blocks-with-out-of-order-chunks-enabled", false, "When enabled, mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -702,7 +704,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { path.Join(c.compactorCfg.DataDir, "compact"), bucket, c.compactorCfg.CompactionConcurrency, - false, + c.compactorCfg.SkipBlocksWithOutOfOrderChunksEnabled, ) if err != nil { return errors.Wrap(err, "failed to create bucket compactor") From f4d8b971897ceb657956601042072d8c114756c5 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 5 Apr 2022 18:32:55 -0700 Subject: [PATCH 2/4] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 470aefe522..b00fd3a0c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 * [CHANGE] Enable Thanos series limiter in store-gateway. #4702 +* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction ## 1.12.0 in progress From ad865e0ef901ed088a503505931c06b83b98b816 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Tue, 5 Apr 2022 20:39:28 -0700 Subject: [PATCH 3/4] Adding test --- pkg/compactor/compactor.go | 16 ++++++--- pkg/compactor/compactor_test.go | 57 ++++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 99879d7230..7084c39e95 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -51,7 +51,7 @@ var ( supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} errInvalidShardingStrategy = errors.New("invalid sharding strategy") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -60,11 +60,11 @@ var ( reg, blocksMarkedForDeletion, garbageCollectedBlocks, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompact, metadata.NoneFunc) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, blocksMarkedForNoCompact prometheus.Counter) compact.Grouper { return NewShuffleShardingGrouper( logger, bkt, @@ -72,7 +72,7 @@ var ( true, // Enable vertical compaction reg, blocksMarkedForDeletion, - prometheus.NewCounter(prometheus.CounterOpts{}), + blocksMarkedForNoCompact, garbageCollectedBlocks, metadata.NoneFunc, cfg) @@ -108,6 +108,7 @@ type BlocksGrouperFactory func( reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, ) compact.Grouper // BlocksCompactorFactory builds and returns the compactor and planner to use to compact a tenant's blocks. @@ -258,6 +259,7 @@ type Compactor struct { compactionRunInterval prometheus.Gauge blocksMarkedForDeletion prometheus.Counter garbageCollectedBlocks prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -363,6 +365,10 @@ func newCompactor( Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", }), + blocksMarkedForNoCompact: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_blocks_marked_for_no_compact_total", + Help: "Total number of blocks marked for no compact.", + }), } if len(compactorCfg.EnabledTenants) > 0 { @@ -698,7 +704,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks), + c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.garbageCollectedBlocks, c.blocksMarkedForNoCompact), c.blocksPlanner, c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 2f8c6c7964..77eea11abd 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -17,6 +17,8 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" "github.com/go-kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -30,16 +32,17 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/thanos-io/thanos/pkg/testutil" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + cortex_storage_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/concurrency" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" - cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -800,6 +803,58 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) `), testedMetrics...)) } +func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) { + bucketClient, tmpDir := cortex_storage_testutil.PrepareFilesystemBucket(t) + bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + + b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"}) + b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"}) + + err := testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20) + require.NoError(t, err) + + cfg := prepareConfig() + cfg.SkipBlocksWithOutOfOrderChunksEnabled = true + c, tsdbCompac, tsdbPlanner, _, registry := prepare(t, cfg, bucketClient) + + tsdbCompac.On("Compact", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(b1, nil) + + tsdbPlanner.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: b1, + MinTime: 10, + MaxTime: 20, + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: b2, + MinTime: 20, + MaxTime: 30, + }, + }, + }, nil) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + defer services.StopAndAwaitTerminated(context.Background(), c) //nolint:errcheck + + // Wait until a run has completed. + cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} { + if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil { + return true + } + return false + }) + + assert.NoError(t, prom_testutil.GatherAndCompare(registry, strings.NewReader(` + # HELP cortex_compactor_blocks_marked_for_no_compact_total Total number of blocks marked for no compact. + # TYPE cortex_compactor_blocks_marked_for_no_compact_total counter + cortex_compactor_blocks_marked_for_no_compact_total 1 + `), "cortex_compactor_blocks_marked_for_no_compact_total")) +} + func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunning(t *testing.T) { t.Parallel() From 97db09c4bd37795de61217da226e3b36b05292b1 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Wed, 6 Apr 2022 03:41:01 +0000 Subject: [PATCH 4/4] Running make clean-white-noise --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b00fd3a0c3..725e90c234 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ * [FEATURE] Ruler: Add `external_labels` option to tag all alerts with a given set of labels. * [CHANGE] Fix incorrectly named `cortex_cache_fetched_keys` and `cortex_cache_hits` metrics. Renamed to `cortex_cache_fetched_keys_total` and `cortex_cache_hits_total` respectively. #4686 * [CHANGE] Enable Thanos series limiter in store-gateway. #4702 -* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction +* [FEATURE] Compactor: Add `-compactor.skip-blocks-with-out-of-order-chunks-enabled` configuration to mark blocks containing index with out-of-order chunks for no compact instead of halting the compaction ## 1.12.0 in progress