diff --git a/CHANGELOG.md b/CHANGELOG.md index 301cc89210..f27977a5ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,7 @@ * [ENHANCEMENT] Ring: DoBatch prioritize 4xx errors when failing. #4783 * [ENHANCEMENT] Cortex now built with Go 1.18. #4829 * [ENHANCEMENT] Ingester: Prevent ingesters to become unhealthy during wall replay. #4847 +* [ENHANCEMENT] Compactor: Introduced visit marker file for blocks so blocks are under compaction will not be picked up by another compactor. #4805 * [FEATURE] Compactor: Added `-compactor.block-files-concurrency` allowing to configure number of go routines for download/upload block files during compaction. #4784 * [FEATURE] Compactor: Added -compactor.blocks-fetch-concurrency` allowing to configure number of go routines for blocks during compaction. #4787 * [FEATURE] Compactor: Added configurations for Azure MSI in blocks-storage, ruler-storage and alertmanager-storage. #4818 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index 6156b0edd1..2e7b40b8a2 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -254,4 +254,14 @@ compactor: # Timeout for waiting on compactor to become ACTIVE in the ring. # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] + + # How long block visit marker file should be considered as expired and able to + # be picked up by compactor again. + # CLI flag: -compactor.block-visit-marker-timeout + [block_visit_marker_timeout: | default = 5m] + + # How frequently block visit marker file should be updated duration + # compaction. + # CLI flag: -compactor.block-visit-marker-file-update-interval + [block_visit_marker_file_update_interval: | default = 1m] ``` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 908d85be0f..6fb232f54f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3846,6 +3846,15 @@ sharding_ring: # Timeout for waiting on compactor to become ACTIVE in the ring. # CLI flag: -compactor.ring.wait-active-instance-timeout [wait_active_instance_timeout: | default = 10m] + +# How long block visit marker file should be considered as expired and able to +# be picked up by compactor again. +# CLI flag: -compactor.block-visit-marker-timeout +[block_visit_marker_timeout: | default = 5m] + +# How frequently block visit marker file should be updated duration compaction. +# CLI flag: -compactor.block-visit-marker-file-update-interval +[block_visit_marker_file_update_interval: | default = 1m] ``` ### `store_gateway_config` diff --git a/pkg/compactor/block_visit_marker.go b/pkg/compactor/block_visit_marker.go new file mode 100644 index 0000000000..155681dcad --- /dev/null +++ b/pkg/compactor/block_visit_marker.go @@ -0,0 +1,116 @@ +package compactor + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "path" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" +) + +const BlockVisitMarkerFile = "block.visit" + +var ( + ErrorBlockVisitMarkerNotFound = errors.New("block visit marker not found") + ErrorUnmarshalBlockVisitMarker = errors.New("unmarshal block visit marker JSON") +) + +type BlockVisitMarker struct { + CompactorID string `json:"compactorID"` + VisitTime time.Time `json:"visitTime"` +} + +func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool { + return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) +} + +func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool { + return time.Now().Before(b.VisitTime.Add(blockVisitMarkerTimeout)) && b.CompactorID == compactorID +} + +func ReadBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) { + visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile) + visitMarkerFileReader, err := bkt.Get(ctx, visitMarkerFile) + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return nil, errors.Wrapf(ErrorBlockVisitMarkerNotFound, "block visit marker file: %s", visitMarkerFile) + } + blockVisitMarkerReadFailed.Inc() + return nil, errors.Wrapf(err, "get block visit marker file: %s", visitMarkerFile) + } + b, err := ioutil.ReadAll(visitMarkerFileReader) + if err != nil { + blockVisitMarkerReadFailed.Inc() + return nil, errors.Wrapf(err, "read block visit marker file: %s", visitMarkerFile) + } + blockVisitMarker := BlockVisitMarker{} + err = json.Unmarshal(b, &blockVisitMarker) + if err != nil { + blockVisitMarkerReadFailed.Inc() + return nil, errors.Wrapf(ErrorUnmarshalBlockVisitMarker, "block visit marker file: %s, error: %v", visitMarkerFile, err.Error()) + } + return &blockVisitMarker, nil +} + +func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) error { + blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile) + blockVisitMarker := BlockVisitMarker{ + CompactorID: compactorID, + VisitTime: time.Now(), + } + visitMarkerFileContent, err := json.Marshal(blockVisitMarker) + if err != nil { + blockVisitMarkerWriteFailed.Inc() + return err + } + err = bkt.Upload(ctx, blockVisitMarkerFilePath, bytes.NewReader(visitMarkerFileContent)) + if err != nil { + blockVisitMarkerWriteFailed.Inc() + return err + } + return nil +} + +func markBlocksVisited(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerWriteFailed prometheus.Counter) { + for _, block := range blocks { + blockID := block.ULID.String() + err := UpdateBlockVisitMarker(ctx, bkt, blockID, compactorID, blockVisitMarkerWriteFailed) + if err != nil { + level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err) + } + } +} + +func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) { + var blockIds []string + for _, block := range blocks { + blockIds = append(blockIds, block.ULID.String()) + } + blocksInfo := strings.Join(blockIds, ",") + level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo)) + ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval) + defer ticker.Stop() +heartBeat: + for { + level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo)) + markBlocksVisited(ctx, bkt, logger, blocks, compactorID, blockVisitMarkerWriteFailed) + + select { + case <-ctx.Done(): + break heartBeat + case <-ticker.C: + continue + } + } + level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo)) +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 22efca4e47..bf4b2b871a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -53,7 +53,7 @@ var ( errInvalidShardingStrategy = errors.New("invalid sharding strategy") errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") - DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, _ prometheus.Gauge, _ prometheus.Counter, _ prometheus.Counter, _ *ring.Ring, _ *ring.Lifecycler, _ Limits, _ string) compact.Grouper { return compact.NewDefaultGrouper( logger, bkt, @@ -68,8 +68,9 @@ var ( cfg.BlocksFetchConcurrency) } - ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion, blocksMarkedForNoCompaction, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycle *ring.Lifecycler, limits Limits, userID string) compact.Grouper { return NewShuffleShardingGrouper( + ctx, logger, bkt, false, // Do not accept malformed indexes @@ -83,10 +84,15 @@ var ( cfg, ring, ringLifecycle.Addr, + ringLifecycle.ID, limits, userID, cfg.BlockFilesConcurrency, - cfg.BlocksFetchConcurrency) + cfg.BlocksFetchConcurrency, + cfg.CompactionConcurrency, + cfg.BlockVisitMarkerTimeout, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed) } DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { @@ -95,7 +101,7 @@ var ( return nil, nil, err } - plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { return compact.NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter) } @@ -108,9 +114,9 @@ var ( return nil, nil, err } - plannerFactory := func(logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + plannerFactory := func(ctx context.Context, bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, blockVisitMarkerReadFailed prometheus.Counter, blockVisitMarkerWriteFailed prometheus.Counter) compact.Planner { - return NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks) + return NewShuffleShardingPlanner(ctx, bkt, logger, cfg.BlockRanges.ToMilliseconds(), noCompactionMarkFilter.NoCompactMarkedBlocks, ringLifecycle.ID, cfg.BlockVisitMarkerTimeout, cfg.BlockVisitMarkerFileUpdateInterval, blockVisitMarkerReadFailed, blockVisitMarkerWriteFailed) } return compactor, plannerFactory, nil } @@ -127,6 +133,8 @@ type BlocksGrouperFactory func( blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, remainingPlannedCompactions prometheus.Gauge, + blockVisitMarkerReadFailed prometheus.Counter, + blockVisitMarkerWriteFailed prometheus.Counter, ring *ring.Ring, ringLifecycler *ring.Lifecycler, limit Limits, @@ -142,9 +150,14 @@ type BlocksCompactorFactory func( ) (compact.Compactor, PlannerFactory, error) type PlannerFactory func( + ctx context.Context, + bkt objstore.Bucket, logger log.Logger, cfg Config, noCompactionMarkFilter *compact.GatherNoCompactionMarkFilter, + ringLifecycle *ring.Lifecycler, + blockVisitMarkerReadFailed prometheus.Counter, + blockVisitMarkerWriteFailed prometheus.Counter, ) compact.Planner // Limits defines limits used by the Compactor. @@ -190,6 +203,10 @@ type Config struct { // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` + + // Block visit marker file config + BlockVisitMarkerTimeout time.Duration `yaml:"block_visit_marker_timeout"` + BlockVisitMarkerFileUpdateInterval time.Duration `yaml:"block_visit_marker_file_update_interval"` } // RegisterFlags registers the Compactor flags. @@ -223,6 +240,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") + + f.DurationVar(&cfg.BlockVisitMarkerTimeout, "compactor.block-visit-marker-timeout", 5*time.Minute, "How long block visit marker file should be considered as expired and able to be picked up by compactor again.") + f.DurationVar(&cfg.BlockVisitMarkerFileUpdateInterval, "compactor.block-visit-marker-file-update-interval", 1*time.Minute, "How frequently block visit marker file should be updated duration compaction.") } func (cfg *Config) Validate(limits validation.Limits) error { @@ -306,6 +326,8 @@ type Compactor struct { blocksMarkedForNoCompaction prometheus.Counter garbageCollectedBlocks prometheus.Counter remainingPlannedCompactions prometheus.Gauge + blockVisitMarkerReadFailed prometheus.Counter + blockVisitMarkerWriteFailed prometheus.Counter // TSDB syncer metrics syncerMetrics *syncerMetrics @@ -423,6 +445,14 @@ func newCompactor( Name: "cortex_compactor_garbage_collected_blocks_total", Help: "Total number of blocks marked for deletion by compactor.", }), + blockVisitMarkerReadFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_read_failed", + Help: "Number of block visit marker file failed to be read.", + }), + blockVisitMarkerWriteFailed: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_write_failed", + Help: "Number of block visit marker file failed to be written.", + }), remainingPlannedCompactions: remainingPlannedCompactions, limits: limits, } @@ -760,11 +790,13 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { return errors.Wrap(err, "failed to create syncer") } + currentCtx, cancel := context.WithCancel(ctx) + defer cancel() compactor, err := compact.NewBucketCompactor( ulogger, syncer, - c.blocksGrouperFactory(ctx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.ring, c.ringLifecycler, c.limits, userID), - c.blocksPlannerFactory(ulogger, c.compactorCfg, noCompactMarkerFilter), + c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, reg, c.blocksMarkedForDeletion, c.blocksMarkedForNoCompaction, c.garbageCollectedBlocks, c.remainingPlannedCompactions, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.ring, c.ringLifecycler, c.limits, userID), + c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed), c.blocksCompactor, path.Join(c.compactorCfg.DataDir, "compact"), bucket, diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index ecf333d86d..884c88e31a 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -465,9 +465,13 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) + bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) + bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/block.visit", nil) bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) + bucketClient.MockGet(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", "", nil) + bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) @@ -516,15 +520,19 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/block.visit", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/block.visit", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) @@ -767,16 +775,24 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", mockNoCompactBlockJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) + bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", mockNoCompactBlockJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", "", nil) + bucketClient.MockUpload("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/block.visit", "", nil) + bucketClient.MockUpload("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/block.visit", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/block.visit", "", nil) + bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/block.visit", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) @@ -828,6 +844,8 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockIter("user-1/01DTVP434PA9VFXSW2JKB3392D", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTVP434PA9VFXSW2JKB3392D/index"}, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/index", "some index content", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) + bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", nil) bucketClient.MockExists("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", false, nil) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil) @@ -988,15 +1006,23 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) + bucketClient.MockUpload("user-1/01DTVP434PA9VFXSW2JKB3392D/block.visit", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/meta.json", mockBlockMetaJSON("01FN6CDF3PNEWWRY5MPGJPE3EX"), nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/deletion-mark.json", "", nil) bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", "", nil) + bucketClient.MockUpload("user-1/01FN6CDF3PNEWWRY5MPGJPE3EX/block.visit", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/block.visit", "", nil) + bucketClient.MockUpload("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/block.visit", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/meta.json", mockBlockMetaJSON("01FN3V83ABR9992RF8WRJZ76ZQ"), nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/no-compact-mark.json", "", nil) + bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/block.visit", "", nil) + bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/block.visit", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -1078,6 +1104,8 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil) + bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/block.visit", "", nil) + bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/block.visit", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) } @@ -1179,9 +1207,17 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit blockDirectory := []string{} for blockID, blockTimes := range blocks { + blockVisitMarker := BlockVisitMarker{ + CompactorID: "test-compactor", + VisitTime: time.Now(), + } + visitMarkerFileContent, _ := json.Marshal(blockVisitMarker) bucketClient.MockGet(userID+"/"+blockID+"/meta.json", mockBlockMetaJSONWithTime(blockID, userID, blockTimes["startTime"], blockTimes["endTime"]), nil) bucketClient.MockGet(userID+"/"+blockID+"/deletion-mark.json", "", nil) bucketClient.MockGet(userID+"/"+blockID+"/no-compact-mark.json", "", nil) + bucketClient.MockGetTimes(userID+"/"+blockID+"/block.visit", "", nil, 1) + bucketClient.MockGet(userID+"/"+blockID+"/block.visit", string(visitMarkerFileContent), nil) + bucketClient.MockUpload(userID+"/"+blockID+"/block.visit", nil) blockDirectory = append(blockDirectory, userID+"/"+blockID) // Get all of the unique group hashes so that they can be used to ensure all groups were compacted @@ -1503,7 +1539,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li blocksCompactorFactory := func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, PlannerFactory, error) { return tsdbCompactor, - func(_ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter) compact.Planner { + func(ctx context.Context, bkt objstore.Bucket, _ log.Logger, _ Config, noCompactMarkFilter *compact.GatherNoCompactionMarkFilter, ringLifecycle *ring.Lifecycler, _ prometheus.Counter, _ prometheus.Counter) compact.Planner { tsdbPlanner.noCompactMarkFilters = append(tsdbPlanner.noCompactMarkFilters, noCompactMarkFilter) return tsdbPlanner }, diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 146fd62721..57b01436cd 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -1,6 +1,7 @@ package compactor import ( + "context" "fmt" "hash/fnv" "sort" @@ -22,6 +23,7 @@ import ( ) type ShuffleShardingGrouper struct { + ctx context.Context logger log.Logger bkt objstore.Bucket acceptMalformedIndex bool @@ -42,12 +44,19 @@ type ShuffleShardingGrouper struct { userID string blockFilesConcurrency int blocksFetchConcurrency int + compactionConcurrency int ring ring.ReadRing ringLifecyclerAddr string + ringLifecyclerID string + + blockVisitMarkerTimeout time.Duration + blockVisitMarkerReadFailed prometheus.Counter + blockVisitMarkerWriteFailed prometheus.Counter } func NewShuffleShardingGrouper( + ctx context.Context, logger log.Logger, bkt objstore.Bucket, acceptMalformedIndex bool, @@ -61,16 +70,22 @@ func NewShuffleShardingGrouper( compactorCfg Config, ring ring.ReadRing, ringLifecyclerAddr string, + ringLifecyclerID string, limits Limits, userID string, blockFilesConcurrency int, blocksFetchConcurrency int, + compactionConcurrency int, + blockVisitMarkerTimeout time.Duration, + blockVisitMarkerReadFailed prometheus.Counter, + blockVisitMarkerWriteFailed prometheus.Counter, ) *ShuffleShardingGrouper { if logger == nil { logger = log.NewNopLogger() } return &ShuffleShardingGrouper{ + ctx: ctx, logger: logger, bkt: bkt, acceptMalformedIndex: acceptMalformedIndex, @@ -102,13 +117,18 @@ func NewShuffleShardingGrouper( Name: "thanos_compact_group_vertical_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", }, []string{"group"}), - compactorCfg: compactorCfg, - ring: ring, - ringLifecyclerAddr: ringLifecyclerAddr, - limits: limits, - userID: userID, - blockFilesConcurrency: blockFilesConcurrency, - blocksFetchConcurrency: blocksFetchConcurrency, + compactorCfg: compactorCfg, + ring: ring, + ringLifecyclerAddr: ringLifecyclerAddr, + ringLifecyclerID: ringLifecyclerID, + limits: limits, + userID: userID, + blockFilesConcurrency: blockFilesConcurrency, + blocksFetchConcurrency: blocksFetchConcurrency, + compactionConcurrency: compactionConcurrency, + blockVisitMarkerTimeout: blockVisitMarkerTimeout, + blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed, } } @@ -142,103 +162,134 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re var remainingCompactions = 0. defer func() { g.remainingPlannedCompactions.Set(remainingCompactions) }() + var groups []blocksGroup for _, mainBlocks := range mainGroups { - for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { - // Nothing to do if we don't have at least 2 blocks. - if len(group.blocks) < 2 { - continue - } - - groupHash := hashGroup(g.userID, group.rangeStart, group.rangeEnd) - - if owned, err := g.ownGroup(groupHash); err != nil { - level.Warn(g.logger).Log("msg", "unable to check if user is owned by this shard", "group hash", groupHash, "err", err, "group", group.String()) - continue - } else if !owned { - level.Info(g.logger).Log("msg", "skipping group because it is not owned by this shard", "group_hash", groupHash) - continue - } - - remainingCompactions++ - groupKey := fmt.Sprintf("%v%s", groupHash, group.blocks[0].Thanos.GroupKey()) - - level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) - - // All the blocks within the same group have the same downsample - // resolution and external labels. - resolution := group.blocks[0].Thanos.Downsample.Resolution - externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) - - thanosGroup, err := compact.NewGroup( - log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), - g.bkt, - groupKey, - externalLabels, - resolution, - false, // No malformed index. - true, // Enable vertical compaction. - g.compactions.WithLabelValues(groupKey), - g.compactionRunsStarted.WithLabelValues(groupKey), - g.compactionRunsCompleted.WithLabelValues(groupKey), - g.compactionFailures.WithLabelValues(groupKey), - g.verticalCompactions.WithLabelValues(groupKey), - g.garbageCollectedBlocks, - g.blocksMarkedForDeletion, - g.blocksMarkedForNoCompact, - g.hashFunc, - g.blockFilesConcurrency, - g.blocksFetchConcurrency, - ) - if err != nil { - return nil, errors.Wrap(err, "create compaction group") - } - - for _, m := range group.blocks { - if err := thanosGroup.AppendMeta(m); err != nil { - return nil, errors.Wrap(err, "add block to compaction group") - } - } - - outGroups = append(outGroups, thanosGroup) - } + groups = append(groups, groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds())...) } // Ensure groups are sorted by smallest range, oldest min time first. The rationale // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner // than later) and older ones are more likely to be "complete" (no missing block still // to be uploaded). - sort.SliceStable(outGroups, func(i, j int) bool { - iLength := outGroups[i].MaxTime() - outGroups[i].MinTime() - jLength := outGroups[j].MaxTime() - outGroups[j].MinTime() + sort.SliceStable(groups, func(i, j int) bool { + iGroup := groups[i] + jGroup := groups[j] + iMinTime := iGroup.minTime() + iMaxTime := iGroup.maxTime() + jMinTime := jGroup.minTime() + jMaxTime := jGroup.maxTime() + iLength := iMaxTime - iMinTime + jLength := jMaxTime - jMinTime if iLength != jLength { return iLength < jLength } - if outGroups[i].MinTime() != outGroups[j].MinTime() { - return outGroups[i].MinTime() < outGroups[j].MinTime() + if iMinTime != jMinTime { + return iMinTime < jMinTime } + iGroupHash := hashGroup(g.userID, iGroup.rangeStart, iGroup.rangeEnd) + iGroupKey := createGroupKey(iGroupHash, iGroup) + jGroupHash := hashGroup(g.userID, jGroup.rangeStart, jGroup.rangeEnd) + jGroupKey := createGroupKey(jGroupHash, jGroup) // Guarantee stable sort for tests. - return outGroups[i].Key() < outGroups[j].Key() + return iGroupKey < jGroupKey }) - return outGroups, nil -} +mainLoop: + for _, group := range groups { + var blockIds []string + for _, block := range group.blocks { + blockIds = append(blockIds, block.ULID.String()) + } + blocksInfo := strings.Join(blockIds, ",") + level.Info(g.logger).Log("msg", "check group", "blocks", blocksInfo) -// Check whether this compactor instance owns the group. -func (g *ShuffleShardingGrouper) ownGroup(groupHash uint32) (bool, error) { - subRing := g.ring.ShuffleShard(g.userID, g.limits.CompactorTenantShardSize(g.userID)) + // Nothing to do if we don't have at least 2 blocks. + if len(group.blocks) < 2 { + continue + } - rs, err := subRing.Get(groupHash, RingOp, nil, nil, nil) - if err != nil { - return false, err - } + groupHash := hashGroup(g.userID, group.rangeStart, group.rangeEnd) + + if isVisited, err := g.isGroupVisited(group.blocks, g.ringLifecyclerID); err != nil { + level.Warn(g.logger).Log("msg", "unable to check if blocks in group are visited", "group hash", groupHash, "err", err, "group", group.String()) + continue + } else if isVisited { + level.Info(g.logger).Log("msg", "skipping group because at least one block in group is visited", "group_hash", groupHash) + continue + } + + remainingCompactions++ + groupKey := createGroupKey(groupHash, group) + + level.Info(g.logger).Log("msg", "found compactable group for user", "group_hash", groupHash, "group", group.String()) + markBlocksVisited(g.ctx, g.bkt, g.logger, group.blocks, g.ringLifecyclerID, g.blockVisitMarkerWriteFailed) + + // All the blocks within the same group have the same downsample + // resolution and external labels. + resolution := group.blocks[0].Thanos.Downsample.Resolution + externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) + + thanosGroup, err := compact.NewGroup( + log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), + g.bkt, + groupKey, + externalLabels, + resolution, + false, // No malformed index. + true, // Enable vertical compaction. + g.compactions.WithLabelValues(groupKey), + g.compactionRunsStarted.WithLabelValues(groupKey), + g.compactionRunsCompleted.WithLabelValues(groupKey), + g.compactionFailures.WithLabelValues(groupKey), + g.verticalCompactions.WithLabelValues(groupKey), + g.garbageCollectedBlocks, + g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, + g.hashFunc, + g.blockFilesConcurrency, + g.blocksFetchConcurrency, + ) + if err != nil { + return nil, errors.Wrap(err, "create compaction group") + } + + for _, m := range group.blocks { + if err := thanosGroup.AppendMeta(m); err != nil { + return nil, errors.Wrap(err, "add block to compaction group") + } + } - if len(rs.Instances) != 1 { - return false, fmt.Errorf("unexpected number of compactors in the shard (expected 1, got %d)", len(rs.Instances)) + outGroups = append(outGroups, thanosGroup) + if len(outGroups) == g.compactionConcurrency { + break mainLoop + } } - return rs.Instances[0].Addr == g.ringLifecyclerAddr, nil + level.Info(g.logger).Log("msg", fmt.Sprintf("total groups for compaction: %d", len(outGroups))) + + return outGroups, nil +} + +func (g *ShuffleShardingGrouper) isGroupVisited(blocks []*metadata.Meta, compactorID string) (bool, error) { + for _, block := range blocks { + blockID := block.ULID.String() + blockVisitMarker, err := ReadBlockVisitMarker(g.ctx, g.bkt, blockID, g.blockVisitMarkerReadFailed) + if err != nil { + if errors.Is(err, ErrorBlockVisitMarkerNotFound) { + level.Debug(g.logger).Log("msg", "no visit marker file for block", "blockID", blockID) + continue + } + level.Error(g.logger).Log("msg", "unable to read block visit marker file", "blockID", blockID, "err", err) + return true, err + } + if compactorID != blockVisitMarker.CompactorID && blockVisitMarker.isVisited(g.blockVisitMarkerTimeout) { + level.Debug(g.logger).Log("msg", fmt.Sprintf("visited block: %s", blockID)) + return true, nil + } + } + return false, nil } // Check whether this compactor exists on the subring based on user ID @@ -264,6 +315,10 @@ func hashGroup(userID string, rangeStart int64, rangeEnd int64) uint32 { return groupHash } +func createGroupKey(groupHash uint32, group blocksGroup) string { + return fmt.Sprintf("%v%s", groupHash, group.blocks[0].Thanos.GroupKey()) +} + // blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 type blocksGroup struct { rangeStart int64 // Included. diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index d1ae8d2a8e..d618fbd955 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -2,6 +2,9 @@ package compactor import ( "bytes" + "context" + "encoding/json" + "path" "testing" "time" @@ -17,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -111,15 +115,25 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { }, } + testCompactorID := "test-compactor" + otherCompactorID := "other-compactor" + tests := map[string]struct { - ranges []time.Duration - blocks map[ulid.ULID]*metadata.Meta + concurrency int + ranges []time.Duration + blocks map[ulid.ULID]*metadata.Meta + visitedBlocks []struct { + id ulid.ULID + compactorID string + isExpired bool + } expected [][]ulid.ULID metrics string }{ "test basic grouping": { - ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + concurrency: 3, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, expected: [][]ulid.ULID{ {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, @@ -131,17 +145,19 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { `, }, "test no compaction": { - ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block0to1hExt3Ulid: blocks[block0to1hExt3Ulid]}, - expected: [][]ulid.ULID{}, + concurrency: 1, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid], block0to1hExt3Ulid: blocks[block0to1hExt3Ulid]}, + expected: [][]ulid.ULID{}, metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. # TYPE cortex_compactor_remaining_planned_compactions gauge cortex_compactor_remaining_planned_compactions 0 `, }, "test smallest range first": { - ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block4hto6hExt2Ulid: blocks[block4hto6hExt2Ulid], block6hto8hExt2Ulid: blocks[block6hto8hExt2Ulid]}, + concurrency: 3, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block4hto6hExt2Ulid: blocks[block4hto6hExt2Ulid], block6hto8hExt2Ulid: blocks[block6hto8hExt2Ulid]}, expected: [][]ulid.ULID{ {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, {block3hto4hExt1Ulid, block2hto3hExt1Ulid}, @@ -153,8 +169,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { `, }, "test oldest min time first": { - ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt1UlidCopy: blocks[block1hto2hExt1UlidCopy]}, + concurrency: 2, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt1UlidCopy: blocks[block1hto2hExt1UlidCopy]}, expected: [][]ulid.ULID{ {block1hto2hExt1Ulid, block0hto1hExt1Ulid, block1hto2hExt1UlidCopy}, {block3hto4hExt1Ulid, block2hto3hExt1Ulid}, @@ -165,8 +182,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { `, }, "test overlapping blocks": { - ranges: []time.Duration{20 * time.Hour, 40 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block0hto20hExt1Ulid: blocks[block0hto20hExt1Ulid], block21hto40hExt1Ulid: blocks[block21hto40hExt1Ulid], block21hto40hExt1UlidCopy: blocks[block21hto40hExt1UlidCopy]}, + concurrency: 1, + ranges: []time.Duration{20 * time.Hour, 40 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block0hto20hExt1Ulid: blocks[block0hto20hExt1Ulid], block21hto40hExt1Ulid: blocks[block21hto40hExt1Ulid], block21hto40hExt1UlidCopy: blocks[block21hto40hExt1UlidCopy]}, expected: [][]ulid.ULID{ {block21hto40hExt1Ulid, block21hto40hExt1UlidCopy}, }, @@ -176,8 +194,9 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { `, }, "test imperfect maxTime blocks": { - ranges: []time.Duration{2 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{block0hto1h30mExt1Ulid: blocks[block0hto1h30mExt1Ulid], block0hto45mExt1Ulid: blocks[block0hto45mExt1Ulid]}, + concurrency: 1, + ranges: []time.Duration{2 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block0hto1h30mExt1Ulid: blocks[block0hto1h30mExt1Ulid], block0hto45mExt1Ulid: blocks[block0hto45mExt1Ulid]}, expected: [][]ulid.ULID{ {block0hto45mExt1Ulid, block0hto1h30mExt1Ulid}, }, @@ -187,12 +206,104 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { `, }, "test prematurely created blocks": { - ranges: []time.Duration{2 * time.Hour}, - blocks: map[ulid.ULID]*metadata.Meta{blocklast1hExt1UlidCopy: blocks[blocklast1hExt1UlidCopy], blocklast1hExt1Ulid: blocks[blocklast1hExt1Ulid]}, - expected: [][]ulid.ULID{}, + concurrency: 1, + ranges: []time.Duration{2 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{blocklast1hExt1UlidCopy: blocks[blocklast1hExt1UlidCopy], blocklast1hExt1Ulid: blocks[blocklast1hExt1Ulid]}, + expected: [][]ulid.ULID{}, metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. # TYPE cortex_compactor_remaining_planned_compactions gauge cortex_compactor_remaining_planned_compactions 0 +`, + }, + "test group with all blocks visited": { + concurrency: 1, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, + }, + visitedBlocks: []struct { + id ulid.ULID + compactorID string + isExpired bool + }{ + {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, + {id: block0hto1hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 1 +`, + }, + "test group with one block visited": { + concurrency: 1, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, + }, + visitedBlocks: []struct { + id ulid.ULID + compactorID string + isExpired bool + }{ + {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: false}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 1 +`, + }, + "test group block visit marker file expired": { + concurrency: 1, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, + }, + visitedBlocks: []struct { + id ulid.ULID + compactorID string + isExpired bool + }{ + {id: block1hto2hExt2Ulid, compactorID: otherCompactorID, isExpired: true}, + {id: block0hto1hExt2Ulid, compactorID: otherCompactorID, isExpired: true}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 1 +`, + }, + "test group with one block visited by current compactor": { + concurrency: 1, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, + }, + visitedBlocks: []struct { + id ulid.ULID + compactorID string + isExpired bool + }{ + {id: block1hto2hExt2Ulid, compactorID: testCompactorID, isExpired: false}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 1 +`, + }, + "test basic grouping with concurrency 2": { + concurrency: 2, + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid], block1hto2hExt2Ulid: blocks[block1hto2hExt2Ulid], block0hto1hExt2Ulid: blocks[block0hto1hExt2Ulid]}, + expected: [][]ulid.ULID{ + {block1hto2hExt2Ulid, block0hto1hExt2Ulid}, + {block1hto2hExt1Ulid, block0hto1hExt1Ulid}, + }, + metrics: `# HELP cortex_compactor_remaining_planned_compactions Total number of plans that remain to be compacted. + # TYPE cortex_compactor_remaining_planned_compactions gauge + cortex_compactor_remaining_planned_compactions 2 `, }, } @@ -225,9 +336,39 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { Name: "cortex_compactor_remaining_planned_compactions", Help: "Total number of plans that remain to be compacted.", }) + blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_read_failed", + Help: "Number of block visit marker file failed to be read.", + }) + blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_write_failed", + Help: "Number of block visit marker file failed to be written.", + }) + + bkt := &bucket.ClientMock{} + blockVisitMarkerTimeout := 5 * time.Minute + for _, visitedBlock := range testData.visitedBlocks { + visitMarkerFile := path.Join(visitedBlock.id.String(), BlockVisitMarkerFile) + expireTime := time.Now() + if visitedBlock.isExpired { + expireTime = expireTime.Add(-1 * blockVisitMarkerTimeout) + } + blockVisitMarker := BlockVisitMarker{ + CompactorID: visitedBlock.compactorID, + VisitTime: expireTime, + } + visitMarkerFileContent, _ := json.Marshal(blockVisitMarker) + bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) + } + bkt.MockUpload(mock.Anything, nil) + bkt.MockGet(mock.Anything, "", nil) - g := NewShuffleShardingGrouper(nil, + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + g := NewShuffleShardingGrouper( + ctx, nil, + bkt, false, // Do not accept malformed indexes true, // Enable vertical compaction registerer, @@ -239,10 +380,16 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { *compactorCfg, ring, "test-addr", + testCompactorID, overrides, "", 10, - 3) + 3, + testData.concurrency, + blockVisitMarkerTimeout, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, + ) actual, err := g.Groups(testData.blocks) require.NoError(t, err) require.Len(t, actual, len(testData.expected)) diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index 4c38ff5982..9906a59e47 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -3,23 +3,51 @@ package compactor import ( "context" "fmt" + "time" "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore" ) type ShuffleShardingPlanner struct { - logger log.Logger - ranges []int64 - noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark + ctx context.Context + bkt objstore.Bucket + logger log.Logger + ranges []int64 + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark + ringLifecyclerID string + blockVisitMarkerTimeout time.Duration + blockVisitMarkerFileUpdateInterval time.Duration + blockVisitMarkerReadFailed prometheus.Counter + blockVisitMarkerWriteFailed prometheus.Counter } -func NewShuffleShardingPlanner(logger log.Logger, ranges []int64, noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark) *ShuffleShardingPlanner { +func NewShuffleShardingPlanner( + ctx context.Context, + bkt objstore.Bucket, + logger log.Logger, + ranges []int64, + noCompBlocksFunc func() map[ulid.ULID]*metadata.NoCompactMark, + ringLifecyclerID string, + blockVisitMarkerTimeout time.Duration, + blockVisitMarkerFileUpdateInterval time.Duration, + blockVisitMarkerReadFailed prometheus.Counter, + blockVisitMarkerWriteFailed prometheus.Counter, +) *ShuffleShardingPlanner { return &ShuffleShardingPlanner{ - logger: logger, - ranges: ranges, - noCompBlocksFunc: noCompBlocksFunc, + ctx: ctx, + bkt: bkt, + logger: logger, + ranges: ranges, + noCompBlocksFunc: noCompBlocksFunc, + ringLifecyclerID: ringLifecyclerID, + blockVisitMarkerTimeout: blockVisitMarkerTimeout, + blockVisitMarkerFileUpdateInterval: blockVisitMarkerFileUpdateInterval, + blockVisitMarkerReadFailed: blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed: blockVisitMarkerWriteFailed, } } @@ -35,12 +63,23 @@ func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metad resultMetas := make([]*metadata.Meta, 0, len(metasByMinTime)) for _, b := range metasByMinTime { + blockID := b.ULID.String() if _, excluded := noCompactMarked[b.ULID]; excluded { continue } if b.MinTime < rangeStart || b.MaxTime > rangeEnd { - return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", b.ULID.String(), b.MinTime, b.MaxTime, rangeStart, rangeEnd) + return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", blockID, b.MinTime, b.MaxTime, rangeStart, rangeEnd) + } + + blockVisitMarker, err := ReadBlockVisitMarker(p.ctx, p.bkt, blockID, p.blockVisitMarkerReadFailed) + if err != nil { + // shuffle_sharding_grouper should put visit marker file for blocks ready for + // compaction. So error should be returned if visit marker file does not exist. + return nil, fmt.Errorf("unable to get visit marker file for block %s: %s", blockID, err.Error()) + } + if !blockVisitMarker.isVisitedByCompactor(p.blockVisitMarkerTimeout, p.ringLifecyclerID) { + return nil, fmt.Errorf("block %s is not visited by current compactor %s", blockID, p.ringLifecyclerID) } resultMetas = append(resultMetas, b) @@ -50,5 +89,7 @@ func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metad return nil, nil } + go markBlocksVisitedHeartBeat(p.ctx, p.bkt, p.logger, resultMetas, p.ringLifecyclerID, p.blockVisitMarkerFileUpdateInterval, p.blockVisitMarkerWriteFailed) + return resultMetas, nil } diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go index 8dbf1c9de4..b4d0a3f69e 100644 --- a/pkg/compactor/shuffle_sharding_planner_test.go +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -2,18 +2,36 @@ package compactor import ( "context" + "encoding/json" "fmt" + "path" "testing" "time" + "github.com/go-kit/log" "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/concurrency" ) func TestShuffleShardingPlanner_Plan(t *testing.T) { + type VisitedBlock struct { + id ulid.ULID + isExpired bool + compactorID string + } + + currentCompactor := "test-compactor" + otherCompactor := "other-compactor" + block1ulid := ulid.MustNew(1, nil) block2ulid := ulid.MustNew(2, nil) block3ulid := ulid.MustNew(3, nil) @@ -24,6 +42,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { blocks []*metadata.Meta expected []*metadata.Meta expectedErr error + visitedBlocks []VisitedBlock }{ "test basic plan": { ranges: []int64{2 * time.Hour.Milliseconds()}, @@ -43,6 +62,18 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expected: []*metadata.Meta{ { BlockMeta: tsdb.BlockMeta{ @@ -78,6 +109,18 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds()), }, "test blocks outside largest range 1": { @@ -98,6 +141,18 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block1ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), }, "test blocks outside largest range 2": { @@ -118,6 +173,18 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), }, "test should skip blocks marked for no compact": { @@ -146,6 +213,23 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block3ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expected: []*metadata.Meta{ { BlockMeta: tsdb.BlockMeta{ @@ -182,17 +266,120 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) { }, }, }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: currentCompactor, + }, + { + id: block2ulid, + isExpired: false, + compactorID: currentCompactor, + }, + }, expected: []*metadata.Meta{}, }, + "test should not compact if visit marker file is not expired and visited by other compactor": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: false, + compactorID: otherCompactor, + }, + }, + expectedErr: fmt.Errorf("block %s is not visited by current compactor %s", block1ulid.String(), currentCompactor), + }, + "test should not compact if visit marker file is expired": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + visitedBlocks: []VisitedBlock{ + { + id: block1ulid, + isExpired: true, + compactorID: currentCompactor, + }, + }, + expectedErr: fmt.Errorf("block %s is not visited by current compactor %s", block1ulid.String(), currentCompactor), + }, } + blockVisitMarkerTimeout := 5 * time.Minute for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - p := NewShuffleShardingPlanner(nil, + bkt := &bucket.ClientMock{} + for _, visitedBlock := range testData.visitedBlocks { + visitMarkerFile := path.Join(visitedBlock.id.String(), BlockVisitMarkerFile) + expireTime := time.Now() + if visitedBlock.isExpired { + expireTime = expireTime.Add(-1 * blockVisitMarkerTimeout) + } + blockVisitMarker := BlockVisitMarker{ + CompactorID: visitedBlock.compactorID, + VisitTime: expireTime, + } + visitMarkerFileContent, _ := json.Marshal(blockVisitMarker) + bkt.MockGet(visitMarkerFile, string(visitMarkerFileContent), nil) + } + bkt.MockUpload(mock.Anything, nil) + + registerer := prometheus.NewPedanticRegistry() + blockVisitMarkerReadFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_read_failed", + Help: "Number of block visit marker file failed to be read.", + }) + blockVisitMarkerWriteFailed := promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Name: "cortex_compactor_block_visit_marker_write_failed", + Help: "Number of block visit marker file failed to be written.", + }) + + logs := &concurrency.SyncBuffer{} + logger := log.NewLogfmtLogger(logs) + p := NewShuffleShardingPlanner( + context.Background(), + bkt, + logger, testData.ranges, func() map[ulid.ULID]*metadata.NoCompactMark { return testData.noCompactBlocks }, + currentCompactor, + blockVisitMarkerTimeout, + time.Minute, + blockVisitMarkerReadFailed, + blockVisitMarkerWriteFailed, ) actual, err := p.Plan(context.Background(), testData.blocks) diff --git a/pkg/storage/bucket/client_mock.go b/pkg/storage/bucket/client_mock.go index 5f6784a68c..82b82f4afb 100644 --- a/pkg/storage/bucket/client_mock.go +++ b/pkg/storage/bucket/client_mock.go @@ -102,6 +102,29 @@ func (m *ClientMock) MockGet(name, content string, err error) { } } +// MockGetTimes is a convenient method to mock Get() and Exists() to run x time +func (m *ClientMock) MockGetTimes(name, content string, err error, times int) { + if content != "" { + m.On("Exists", mock.Anything, name).Return(true, err).Times(times) + m.On("Attributes", mock.Anything, name).Return(objstore.ObjectAttributes{ + Size: int64(len(content)), + LastModified: time.Now(), + }, nil).Times(times) + + // Since we return an ReadCloser and it can be consumed only once, + // each time the mocked Get() is called we do create a new one, so + // that getting the same mocked object twice works as expected. + mockedGet := m.On("Get", mock.Anything, name).Times(times) + mockedGet.Run(func(args mock.Arguments) { + mockedGet.Return(ioutil.NopCloser(bytes.NewReader([]byte(content))), err) + }) + } else { + m.On("Exists", mock.Anything, name).Return(false, err).Times(times) + m.On("Get", mock.Anything, name).Return(nil, errObjectDoesNotExist).Times(times) + m.On("Attributes", mock.Anything, name).Return(nil, errObjectDoesNotExist).Times(times) + } +} + func (m *ClientMock) MockDelete(name string, err error) { m.On("Delete", mock.Anything, name).Return(err) }