diff --git a/CHANGELOG.md b/CHANGELOG.md index 54d2b9fce8..3c4d8c35b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597 * [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601 * [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 * [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 * [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index ada0f9f41a..d52f88bb4e 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -173,6 +173,11 @@ compactor: # CLI flag: -compactor.sharding-enabled [sharding_enabled: | default = false] + # The sharding strategy to use. Supported values are: default, + # shuffle-sharding. + # CLI flag: -compactor.sharding-strategy + [sharding_strategy: | default = "default"] + sharding_ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a26fce199c..781148d8de 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5317,6 +5317,10 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.sharding-enabled [sharding_enabled: | default = false] +# The sharding strategy to use. Supported values are: default, shuffle-sharding. +# CLI flag: -compactor.sharding-strategy +[sharding_strategy: | default = "default"] + sharding_ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 75b83fc1e1..f0e2464406 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -48,6 +48,9 @@ var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) + supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( logger, @@ -61,6 +64,20 @@ var ( metadata.NoneFunc) } + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + return NewShuffleShardingGrouper( + logger, + bkt, + false, // Do not accept malformed indexes + true, // Enable vertical compaction + reg, + blocksMarkedForDeletion, + prometheus.NewCounter(prometheus.CounterOpts{}), + garbageCollectedBlocks, + metadata.NoneFunc, + cfg) + } + DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { @@ -70,6 +87,16 @@ var ( planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) return compactor, planner, nil } + + ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { + compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) + if err != nil { + return nil, nil, err + } + + planner := NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds()) + return compactor, planner, nil + } ) // BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks. @@ -113,8 +140,9 @@ type Config struct { DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. - ShardingEnabled bool `yaml:"sharding_enabled"` - ShardingRing RingConfig `yaml:"sharding_ring"` + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardingRing RingConfig `yaml:"sharding_ring"` // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override @@ -146,6 +174,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.") f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") + f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -164,6 +193,11 @@ func (cfg *Config) Validate() error { } } + // Make sure a valid sharding strategy is being used + if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) { + return errInvalidShardingStrategy + } + return nil } @@ -235,12 +269,20 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi blocksGrouperFactory := compactorCfg.BlocksGrouperFactory if blocksGrouperFactory == nil { - blocksGrouperFactory = DefaultBlocksGrouperFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksGrouperFactory = ShuffleShardingGrouperFactory + } else { + blocksGrouperFactory = DefaultBlocksGrouperFactory + } } blocksCompactorFactory := compactorCfg.BlocksCompactorFactory if blocksCompactorFactory == nil { - blocksCompactorFactory = DefaultBlocksCompactorFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksCompactorFactory = ShuffleShardingBlocksCompactorFactory + } else { + blocksCompactorFactory = DefaultBlocksCompactorFactory + } } cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go new file mode 100644 index 0000000000..464bc83ff0 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -0,0 +1,384 @@ +package compactor + +import ( + "fmt" + "hash/fnv" + "sort" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type ShuffleShardingGrouper struct { + logger log.Logger + bkt objstore.Bucket + acceptMalformedIndex bool + enableVerticalCompaction bool + reg prometheus.Registerer + blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter + garbageCollectedBlocks prometheus.Counter + hashFunc metadata.HashFunc + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + compactorCfg Config +} + +func NewShuffleShardingGrouper( + logger log.Logger, + bkt objstore.Bucket, + acceptMalformedIndex bool, + enableVerticalCompaction bool, + reg prometheus.Registerer, + blocksMarkedForDeletion prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, + garbageCollectedBlocks prometheus.Counter, + hashFunc metadata.HashFunc, + compactorCfg Config, +) *ShuffleShardingGrouper { + if logger == nil { + logger = log.NewNopLogger() + } + + return &ShuffleShardingGrouper{ + logger: logger, + bkt: bkt, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + reg: reg, + blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, + garbageCollectedBlocks: garbageCollectedBlocks, + hashFunc: hashFunc, + // Metrics are copied from Thanos DefaultGrouper constructor + compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block.", + }, []string{"group"}), + compactionRunsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_started_total", + Help: "Total number of group compaction attempts.", + }, []string{"group"}), + compactionRunsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_completed_total", + Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", + }, []string{"group"}), + compactionFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_failures_total", + Help: "Total number of failed group compactions.", + }, []string{"group"}), + verticalCompactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_vertical_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", + }, []string{"group"}), + compactorCfg: compactorCfg, + } +} + +// Groups function modified from https://github.com/cortexproject/cortex/pull/2616 +func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + // First of all we have to group blocks using the Thanos default + // grouping (based on downsample resolution + external labels). + mainGroups := map[string][]*metadata.Meta{} + for _, b := range blocks { + key := compact.DefaultGroupKey(b.Thanos) + mainGroups[key] = append(mainGroups[key], b) + } + + // For each group, we have to further split it into set of blocks + // which we can parallelly compact. + var outGroups []*compact.Group + + i := 0 + for _, mainBlocks := range mainGroups { + for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { + // Nothing to do if we don't have at least 2 blocks. + if len(group.blocks) < 2 { + continue + } + + // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group + groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + + groupKey := fmt.Sprintf("%v%d", groupHash, i) + i++ + + level.Debug(g.logger).Log("msg", "found compactable group for user", "user", group.blocks[0].Thanos.Labels["__org_id__"], "plan", group.String()) + + // All the blocks within the same group have the same downsample + // resolution and external labels. + resolution := group.blocks[0].Thanos.Downsample.Resolution + externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) + + thanosGroup, err := compact.NewGroup( + log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), + g.bkt, + groupKey, + externalLabels, + resolution, + false, // No malformed index. + true, // Enable vertical compaction. + g.compactions.WithLabelValues(groupKey), + g.compactionRunsStarted.WithLabelValues(groupKey), + g.compactionRunsCompleted.WithLabelValues(groupKey), + g.compactionFailures.WithLabelValues(groupKey), + g.verticalCompactions.WithLabelValues(groupKey), + g.garbageCollectedBlocks, + g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, + g.hashFunc, + ) + if err != nil { + return nil, errors.Wrap(err, "create compaction group") + } + + for _, m := range group.blocks { + if err := thanosGroup.AppendMeta(m); err != nil { + return nil, errors.Wrap(err, "add block to compaction group") + } + } + + outGroups = append(outGroups, thanosGroup) + } + } + + // Ensure groups are sorted by smallest range, oldest min time first. The rationale + // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner + // than later) and older ones are more likely to be "complete" (no missing block still + // to be uploaded). + sort.SliceStable(outGroups, func(i, j int) bool { + iLength := outGroups[i].MaxTime() - outGroups[i].MinTime() + jLength := outGroups[j].MaxTime() - outGroups[j].MinTime() + + if iLength != jLength { + return iLength < jLength + } + if outGroups[i].MinTime() != outGroups[j].MinTime() { + return outGroups[i].MinTime() < outGroups[j].MinTime() + } + + // Guarantee stable sort for tests. + return outGroups[i].Key() < outGroups[j].Key() + }) + + return outGroups, nil +} + +// Get the hash of a group based on the UserID, and the starting and ending time of the group's range. +func hashGroup(userID string, rangeStart int64, rangeEnd int64) uint32 { + groupString := fmt.Sprintf("%v%v%v", userID, rangeStart, rangeEnd) + groupHasher := fnv.New32a() + // Hasher never returns err. + _, _ = groupHasher.Write([]byte(groupString)) + groupHash := groupHasher.Sum32() + + return groupHash +} + +// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 +type blocksGroup struct { + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key string +} + +// overlaps returns whether the group range overlaps with the input group. +func (g blocksGroup) overlaps(other blocksGroup) bool { + if g.rangeStart >= other.rangeEnd || other.rangeStart >= g.rangeEnd { + return false + } + + return true +} + +func (g blocksGroup) rangeStartTime() time.Time { + return time.Unix(0, g.rangeStart*int64(time.Millisecond)).UTC() +} + +func (g blocksGroup) rangeEndTime() time.Time { + return time.Unix(0, g.rangeEnd*int64(time.Millisecond)).UTC() +} + +func (g blocksGroup) String() string { + out := strings.Builder{} + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key)) + + for i, b := range g.blocks { + if i > 0 { + out.WriteString(", ") + } + + minT := time.Unix(0, b.MinTime*int64(time.Millisecond)).UTC() + maxT := time.Unix(0, b.MaxTime*int64(time.Millisecond)).UTC() + out.WriteString(fmt.Sprintf("%s (min time: %s, max time: %s)", b.ULID.String(), minT.String(), maxT.String())) + } + + return out.String() +} + +func (g blocksGroup) rangeLength() int64 { + return g.rangeEnd - g.rangeStart +} + +// minTime returns the MinTime across all blocks in the group. +func (g blocksGroup) minTime() int64 { + // Blocks are expected to be sorted by MinTime. + return g.blocks[0].MinTime +} + +// maxTime returns the MaxTime across all blocks in the group. +func (g blocksGroup) maxTime() int64 { + max := g.blocks[0].MaxTime + + for _, b := range g.blocks[1:] { + if b.MaxTime > max { + max = b.MaxTime + } + } + + return max +} + +// groupBlocksByCompactableRanges groups input blocks by compactable ranges, giving preference +// to smaller ranges. If a smaller range contains more than 1 block (and thus it should +// be compacted), the larger range block group is not generated until each of its +// smaller ranges have 1 block each at most. +func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []blocksGroup { + if len(blocks) == 0 { + return nil + } + + // Sort blocks by min time. + sortMetasByMinTime(blocks) + + var groups []blocksGroup + + for _, tr := range ranges { + nextGroup: + for _, group := range groupBlocksByRange(blocks, tr) { + + // Exclude groups with a single block, because no compaction is required. + if len(group.blocks) < 2 { + continue + } + + // Ensure this group's range does not overlap with any group already scheduled + // for compaction by a smaller range, because we need to guarantee that smaller ranges + // are compacted first. + for _, c := range groups { + if group.overlaps(c) { + continue nextGroup + } + } + + groups = append(groups, group) + } + } + + // Ensure we don't compact the most recent blocks prematurely when another one of + // the same size still fits in the range. To do it, we consider valid a group only + // if it's before the most recent block or if it fully covers the range. + highestMinTime := blocks[len(blocks)-1].MinTime + + for idx := 0; idx < len(groups); { + group := groups[idx] + + // If the group covers a range before the most recent block, it's fine. + if group.rangeEnd <= highestMinTime { + idx++ + continue + } + + // If the group covers the full range, it's fine. + if group.maxTime()-group.minTime() == group.rangeLength() { + idx++ + continue + } + + // We hit into a group which would compact recent blocks prematurely, + // so we need to filter it out. + + groups = append(groups[:idx], groups[idx+1:]...) + } + + return groups +} + +// groupBlocksByRange splits the blocks by the time range. The range sequence starts at 0. +// Input blocks are expected to be sorted by MinTime. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { + var ret []blocksGroup + + for i := 0; i < len(blocks); { + var ( + group blocksGroup + m = blocks[i] + ) + + group.rangeStart = getRangeStart(m, tr) + group.rangeEnd = group.rangeStart + tr + + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > group.rangeEnd { + i++ + continue + } + + // Add all blocks to the current group that are within [t0, t0+tr]. + for ; i < len(blocks); i++ { + // If the block does not start within this group, then we should break the iteration + // and move it to the next group. + if blocks[i].MinTime >= group.rangeEnd { + break + } + + // If the block doesn't fall into this group, but it started within this group then it + // means it spans across multiple ranges and we should skip it. + if blocks[i].MaxTime > group.rangeEnd { + continue + } + + group.blocks = append(group.blocks, blocks[i]) + } + + if len(group.blocks) > 0 { + ret = append(ret, group) + } + } + + return ret +} + +func getRangeStart(m *metadata.Meta, tr int64) int64 { + // Compute start of aligned time range of size tr closest to the current block's start. + // This code has been copied from TSDB. + if m.MinTime >= 0 { + return tr * (m.MinTime / tr) + } + + return tr * ((m.MinTime - tr + 1) / tr) +} + +func sortMetasByMinTime(metas []*metadata.Meta) { + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) +} diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go new file mode 100644 index 0000000000..e54a849be5 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -0,0 +1,497 @@ +package compactor + +import ( + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestShuffleShardingGrouper_Groups(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + block4ulid := ulid.MustNew(4, nil) + block5ulid := ulid.MustNew(5, nil) + block6ulid := ulid.MustNew(6, nil) + block7ulid := ulid.MustNew(7, nil) + block8ulid := ulid.MustNew(8, nil) + block9ulid := ulid.MustNew(9, nil) + block10ulid := ulid.MustNew(10, nil) + block11ulid := ulid.MustNew(11, nil) + block12ulid := ulid.MustNew(12, nil) + block13ulid := ulid.MustNew(13, nil) + block14ulid := ulid.MustNew(14, nil) + block15ulid := ulid.MustNew(15, nil) + + blocks := + map[ulid.ULID]*metadata.Meta{ + block1ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block2ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block3ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block4ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block5ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block6ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block7ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block8ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block9ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "3"}}, + }, + block10ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block11ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block12ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block12ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block13ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block13ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 20 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block14ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block14ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block15ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block15ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + } + + tests := map[string]struct { + ranges []time.Duration + blocks map[ulid.ULID]*metadata.Meta + expected [][]ulid.ULID + }{ + "test basic grouping": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]}, + expected: [][]ulid.ULID{ + {block5ulid, block6ulid}, + {block1ulid, block3ulid}, + {block2ulid, block4ulid}, + }, + }, + "test no compaction": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]}, + expected: [][]ulid.ULID{}, + }, + "test smallest range first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]}, + expected: [][]ulid.ULID{ + {block1ulid, block3ulid}, + {block2ulid, block4ulid}, + {block10ulid, block11ulid}, + }, + }, + "test oldest min time first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block12ulid: blocks[block12ulid]}, + expected: [][]ulid.ULID{ + {block1ulid, block3ulid, block12ulid}, + {block2ulid, block4ulid}, + }, + }, + "test overlapping blocks": { + ranges: []time.Duration{20 * time.Hour, 40 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block13ulid: blocks[block13ulid], block14ulid: blocks[block14ulid], block15ulid: blocks[block15ulid]}, + expected: [][]ulid.ULID{}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + compactorCfg := &Config{ + BlockRanges: testData.ranges, + } + + g := NewShuffleShardingGrouper(nil, + nil, + false, // Do not accept malformed indexes + true, // Enable vertical compaction + prometheus.NewRegistry(), + nil, + nil, + nil, + metadata.NoneFunc, + *compactorCfg) + actual, err := g.Groups(testData.blocks) + require.NoError(t, err) + require.Len(t, actual, len(testData.expected)) + + for idx, expectedIDs := range testData.expected { + assert.Equal(t, expectedIDs, actual[idx].IDs()) + } + }) + } +} + +func TestGroupBlocksByCompactableRanges(t *testing.T) { + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + ranges: []int64{20}, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: nil, + }, + "only 1 block for each range (single range)": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "only 1 block for each range (multiple ranges)": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "input blocks can be compacted on the 1st range only": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + }}, + }, + }, + "input blocks can be compacted on the 2nd range only": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 60, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }}, + }, + }, + "input blocks can be compacted on a mix of 1st and 2nd ranges, guaranteeing no overlaps and giving preference to smaller ranges": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + }}, + {rangeStart: 70, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "input blocks have already been compacted with the largest range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: nil, + }, + "input blocks match the largest range but can be compacted because overlapping": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should be NOT considered for 1st level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should BE considered for 2nd level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }}, + }, + }, + "a block with time range larger then the largest compaction range should NOT be considered for compaction": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 150}}, // This block is larger then the largest compaction range. + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a range containing the most recent block shouldn't be prematurely compacted if doesn't cover the full range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 12}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 13, MaxTime: 15}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByCompactableRanges(testData.blocks, testData.ranges)) + }) + } +} + +func TestGroupBlocksByRange(t *testing.T) { + tests := map[string]struct { + timeRange int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + timeRange: 20, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + }, + }, + "only 1 block per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }}, + }, + }, + "multiple blocks per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }}, + }, + }, + "a block with time range larger then the range should be excluded": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, // This block is larger then the range. + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + "blocks with different time ranges but all fitting within the input range": { + timeRange: 40, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByRange(testData.blocks, testData.timeRange)) + }) + } +} + +func TestBlocksGroup_overlaps(t *testing.T) { + tests := []struct { + first blocksGroup + second blocksGroup + expected bool + }{ + { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: false, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 19, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 21}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 12, rangeEnd: 18}, + expected: true, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, tc.first.overlaps(tc.second)) + assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) + } +} diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go new file mode 100644 index 0000000000..f5c97c1266 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -0,0 +1,39 @@ +package compactor + +import ( + "context" + "fmt" + + "github.com/go-kit/log" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type ShuffleShardingPlanner struct { + logger log.Logger + ranges []int64 +} + +func NewShuffleShardingPlanner(logger log.Logger, ranges []int64) *ShuffleShardingPlanner { + return &ShuffleShardingPlanner{ + logger: logger, + ranges: ranges, + } +} + +func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + // Ensure all blocks fits within the largest range. This is a double check + // to ensure there's no bug in the previous blocks grouping, given this Plan() + // is just a pass-through. + // Modifed from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63 + largestRange := p.ranges[len(p.ranges)-1] + rangeStart := getRangeStart(metasByMinTime[0], largestRange) + rangeEnd := rangeStart + largestRange + + for _, b := range metasByMinTime { + if b.MinTime < rangeStart || b.MaxTime > rangeEnd { + return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", b.ULID.String(), b.MinTime, b.MaxTime, rangeStart, rangeEnd) + } + } + + return metasByMinTime, nil +} diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go new file mode 100644 index 0000000000..fdf63c9207 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -0,0 +1,142 @@ +package compactor + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestShuffleShardingPlanner_Plan(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []*metadata.Meta + expectedErr error + }{ + "test basic plan": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + expected: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + }, + "test blocks outside largest range smaller min time after": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 2 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 1": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block1ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 2": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + p := NewShuffleShardingPlanner(nil, + testData.ranges) + actual, err := p.Plan(context.Background(), testData.blocks) + + if testData.expectedErr != nil { + assert.Equal(t, err, testData.expectedErr) + } else { + require.NoError(t, err) + } + + require.Len(t, actual, len(testData.expected)) + + for idx, expectedMeta := range testData.expected { + assert.Equal(t, expectedMeta.ULID, actual[idx].ULID) + } + }) + } +}