diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index bcea40d0e30..df985f09e08 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -166,6 +166,11 @@ compactor: # CLI flag: -compactor.disabled-tenants [disabled_tenants: | default = ""] + # Enable planner filter which will filter groups of blocks within the Cortex + # compactor instead of using the Thanos to group blocks. + # CLI flag: -compactor.planner-filter-enabled + [planner_filter_enabled: | default = false] + # Shard tenants across multiple compactor instances. Sharding is required if # you run multiple compactor instances, in order to coordinate compactions and # avoid race conditions leading to the same tenant blocks simultaneously diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 51952688cb4..1b41ff64552 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -120,6 +120,9 @@ type Config struct { // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` + + // Flag to enable planner filter + PlannerFilterEnabled bool `yaml:"planner_filter_enabled"` } // RegisterFlags registers the Compactor flags. @@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -606,6 +610,43 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second, c.compactorCfg.MetaSyncConcurrency) + fetcherFilters := []block.MetadataFilter{ + // Remove the ingester ID because we don't shard blocks anymore, while still + // honoring the shard ID if sharding was done in the past. + NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), + block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), + ignoreDeletionMarkFilter, + deduplicateBlocksFilter, + } + + // If config is set to use planner filter then generate plans and append it to the fetcherFilters + if c.compactorCfg.PlannerFilterEnabled { + level.Info(c.logger).Log("msg", "Compactor using planner filter") + + // Create a new planner filter + f, err := NewPlannerFilter( + ctx, + userID, + ulogger, + bucket, + fetcherFilters, + c.compactorCfg, + c.metaSyncDirForUser(userID), + ) + if err != nil { + return err + } + + // Generate all parallelizable plans + err = f.fetchBlocksAndGeneratePlans(ctx) + if err != nil { + return err + } + + // Add the planner filter to the fetcher's filters + fetcherFilters = append(fetcherFilters, f) + } + fetcher, err := block.NewMetaFetcher( ulogger, c.compactorCfg.MetaSyncConcurrency, @@ -613,14 +654,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { c.metaSyncDirForUser(userID), reg, // List of filters to apply (order matters). - []block.MetadataFilter{ - // Remove the ingester ID because we don't shard blocks anymore, while still - // honoring the shard ID if sharding was done in the past. - NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), - block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), - ignoreDeletionMarkFilter, - deduplicateBlocksFilter, - }, + fetcherFilters, nil, ) if err != nil { diff --git a/pkg/compactor/planner_filter.go b/pkg/compactor/planner_filter.go new file mode 100644 index 00000000000..08ba071b58e --- /dev/null +++ b/pkg/compactor/planner_filter.go @@ -0,0 +1,333 @@ +package compactor + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type PlannerFilter struct { + userID string + ulogger log.Logger + bucket objstore.InstrumentedBucket + fetcherFilters []block.MetadataFilter + compactorCfg Config + metaSyncDir string + + plans []blocksGroup +} + +func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) { + f := &PlannerFilter{ + userID: userID, + ulogger: ulogger, + bucket: bucket, + fetcherFilters: fetcherFilters, + compactorCfg: compactorCfg, + metaSyncDir: metaSyncDir, + } + + return f, nil +} + +// Gets the blocks of the user. +func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) { + fetcher, err := block.NewMetaFetcher( + f.ulogger, + f.compactorCfg.MetaSyncConcurrency, + f.bucket, + f.metaSyncDir, + prometheus.NewRegistry(), + // List of filters to apply (order matters). + f.fetcherFilters, + nil, + ) + if err != nil { + return nil, err + } + + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return nil, err + } + + return metas, nil +} + +// Fetches blocks and generates plans that can be parallized and saves them in the PlannerFilter struct. +func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error { + // Get blocks + blocks, err := f.getUserBlocks(ctx) + if err != nil { + return err + } + + return f.generatePlans(ctx, blocks) +} + +// Generates plans that can be parallized and saves them +func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID]*metadata.Meta) error { + // First of all we have to group blocks using the Thanos default + // grouping (based on downsample resolution + external labels). + mainGroups := map[string][]*metadata.Meta{} + for _, b := range blocks { + key := compact.DefaultGroupKey(b.Thanos) + mainGroups[key] = append(mainGroups[key], b) + } + + var plans []blocksGroup + + for k, mainBlocks := range mainGroups { + for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds(), f.ulogger) { + // Nothing to do if we don't have at least 2 blocks. + if len(plan.blocks) < 2 { + continue + } + + level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String()) + + plan.key = fmt.Sprintf("%v_%v", k, i) + + plans = append(plans, plan) + } + } + + // Ensure groups are sorted by smallest range, oldest min time first. The rationale + // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner + // than later) and older ones are more likely to be "complete" (no missing block still + // to be uploaded). + sort.SliceStable(plans, func(i, j int) bool { + iLength := plans[i].maxTime() - plans[i].minTime() + jLength := plans[j].maxTime() - plans[j].minTime() + + if iLength != jLength { + return iLength < jLength + } + if plans[i].minTime() != plans[j].minTime() { + return plans[i].minTime() < plans[j].minTime() + } + + // Guarantee stable sort for tests. + return plans[i].key < plans[j].key + }) + + f.plans = plans + + return nil +} + +// Filter removes the blocks for every single plan except one. +// Currently we are just using the first plan every single time. +// TODO: Filter plans by putting each plan on the ring and having the compactor select a plan (filter the rest) +func (f *PlannerFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { + // Plans have to exist to be filtered, if no blocks need to be compacted then nothing needs to be filtered. + if len(f.plans) < 1 { + return nil + } + + // Delete blocks for each plan except the first one. + for _, p := range f.plans[1:] { + for _, b := range p.blocks { + delete(metas, b.BlockMeta.ULID) // check what happens if it tries to delete a key that doesn't exist + } + } + + return nil +} + +// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 +type blocksGroup struct { + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key string +} + +// overlaps returns whether the group range overlaps with the input group. +func (g blocksGroup) overlaps(other blocksGroup) bool { + if g.rangeStart >= other.rangeEnd || other.rangeStart >= g.rangeEnd { + return false + } + + return true +} + +func (g blocksGroup) String() string { + out := strings.Builder{} + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key)) + + for i, b := range g.blocks { + if i > 0 { + out.WriteString(", ") + } + + minT := time.Unix(0, b.MinTime*int64(time.Millisecond)).UTC() + maxT := time.Unix(0, b.MaxTime*int64(time.Millisecond)).UTC() + out.WriteString(fmt.Sprintf("%s (min time: %s, max time: %s)", b.ULID.String(), minT.String(), maxT.String())) + } + + return out.String() +} + +func (g blocksGroup) rangeLength() int64 { + return g.rangeEnd - g.rangeStart +} + +// minTime returns the MinTime across all blocks in the group. +func (g blocksGroup) minTime() int64 { + // Blocks are expected to be sorted by MinTime. + return g.blocks[0].MinTime +} + +// maxTime returns the MaxTime across all blocks in the group. +func (g blocksGroup) maxTime() int64 { + max := g.blocks[0].MaxTime + + for _, b := range g.blocks[1:] { + if b.MaxTime > max { + max = b.MaxTime + } + } + + return max +} + +// groupBlocksByCompactableRanges groups input blocks by compactable ranges, giving preference +// to smaller ranges. If a smaller range contains more than 1 block (and thus it should +// be compacted), the larger range block group is not generated until each of its +// smaller ranges have 1 block each at most. +func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []blocksGroup { + if len(blocks) == 0 { + return nil + } + + // Sort blocks by min time. + sortMetasByMinTime(blocks) + + var groups []blocksGroup + + for _, tr := range ranges { + nextGroup: + for _, group := range groupBlocksByRange(blocks, tr) { + // Exclude groups with a single block, because no compaction is required. + if len(group.blocks) < 2 { + continue + } + + // Ensure this group's range does not overlap with any group already scheduled + // for compaction by a smaller range, because we need to guarantee that smaller ranges + // are compacted first. + for _, c := range groups { + if group.overlaps(c) { + continue nextGroup + } + } + + groups = append(groups, group) + } + } + + // Ensure we don't compact the most recent blocks prematurely when another one of + // the same size still fits in the range. To do it, we consider valid a group only + // if it's before the most recent block or if it fully covers the range. + highestMinTime := blocks[len(blocks)-1].MinTime + + for idx := 0; idx < len(groups); { + group := groups[idx] + + // If the group covers a range before the most recent block, it's fine. + if group.rangeEnd <= highestMinTime { + idx++ + continue + } + + // If the group covers the full range, it's fine. + if group.maxTime()-group.minTime() == group.rangeLength() { + idx++ + continue + } + + // We hit into a group which would compact recent blocks prematurely, + // so we need to filter it out. + groups = append(groups[:idx], groups[idx+1:]...) + } + + return groups +} + +// groupBlocksByRange splits the blocks by the time range. The range sequence starts at 0. +// Input blocks are expected to be sorted by MinTime. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { + var ret []blocksGroup + + for i := 0; i < len(blocks); { + var ( + group blocksGroup + m = blocks[i] + ) + + group.rangeStart = getRangeStart(m, tr) + group.rangeEnd = group.rangeStart + tr + + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > group.rangeEnd { + i++ + continue + } + + // Add all blocks to the current group that are within [t0, t0+tr]. + for ; i < len(blocks); i++ { + // If the block does not start within this group, then we should break the iteration + // and move it to the next group. + if blocks[i].MinTime >= group.rangeEnd { + break + } + + // If the block doesn't fall into this group, but it started within this group then it + // means it spans across multiple ranges and we should skip it. + if blocks[i].MaxTime > group.rangeEnd { + continue + } + + group.blocks = append(group.blocks, blocks[i]) + } + + if len(group.blocks) > 0 { + ret = append(ret, group) + } + } + + return ret +} + +func getRangeStart(m *metadata.Meta, tr int64) int64 { + // Compute start of aligned time range of size tr closest to the current block's start. + // This code has been copied from TSDB. + if m.MinTime >= 0 { + return tr * (m.MinTime / tr) + } else { + return tr * ((m.MinTime - tr + 1) / tr) + } +} + +func sortMetasByMinTime(metas []*metadata.Meta) { + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) +} diff --git a/pkg/compactor/planner_filter_test.go b/pkg/compactor/planner_filter_test.go new file mode 100644 index 00000000000..abcb828a01a --- /dev/null +++ b/pkg/compactor/planner_filter_test.go @@ -0,0 +1,461 @@ +package compactor + +import ( + "context" + "testing" + "time" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestPlannerFilterPlanGeneration(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + block4ulid := ulid.MustNew(4, nil) + block5ulid := ulid.MustNew(5, nil) + block6ulid := ulid.MustNew(6, nil) + block7ulid := ulid.MustNew(7, nil) + block8ulid := ulid.MustNew(8, nil) + block9ulid := ulid.MustNew(9, nil) + block10ulid := ulid.MustNew(10, nil) + block11ulid := ulid.MustNew(11, nil) + + blocks := + map[ulid.ULID]*metadata.Meta{ + block1ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block2ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block3ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block4ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block5ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block6ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block7ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block8ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block9ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}}, + }, + block10ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block11ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + } + + tests := map[string]struct { + ranges cortex_tsdb.DurationList + blocks map[ulid.ULID]*metadata.Meta + expectedPlans []blocksGroup + }{ + "test basic planning": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block6ulid], blocks[block5ulid]}, key: "0@14088339200549387484_0"}, + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + }, + }, + "test no compaction": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]}, + expectedPlans: []blocksGroup{}, + }, + "test smallest range first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + {rangeStart: 14400000, rangeEnd: 28800000, blocks: []*metadata.Meta{blocks[block10ulid], blocks[block11ulid]}, key: "0@14088339200549387484_0"}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + compactorCfg := Config{} + flagext.DefaultValues(&compactorCfg) + compactorCfg.BlockRanges = testData.ranges + f := &PlannerFilter{ + userID: "test-user", + compactorCfg: compactorCfg, + ulogger: log.NewNopLogger(), + } + err := f.generatePlans(context.Background(), testData.blocks) + require.NoError(t, err) + actualPlans := f.plans + require.Len(t, actualPlans, len(testData.expectedPlans)) + for i, expectedPlan := range testData.expectedPlans { + assert.Equal(t, expectedPlan, actualPlans[i]) + } + }) + } +} + +func TestGroupBlocksByCompactableRanges(t *testing.T) { + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + ranges: []int64{20}, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: nil, + }, + "only 1 block for each range (single range)": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "only 1 block for each range (multiple ranges)": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "input blocks can be compacted on the 1st range only": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + }}, + }, + }, + "input blocks can be compacted on the 2nd range only": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 60, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }}, + }, + }, + "input blocks can be compacted on a mix of 1st and 2nd ranges, guaranteeing no overlaps and giving preference to smaller ranges": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + }}, + {rangeStart: 70, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "input blocks have already been compacted with the largest range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: nil, + }, + "input blocks match the largest range but can be compacted because overlapping": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should be NOT considered for 1st level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should BE considered for 2nd level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }}, + }, + }, + "a block with time range larger then the largest compaction range should NOT be considered for compaction": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 150}}, // This block is larger then the largest compaction range. + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a range containg the most recent block shouldn't be prematurely compacted if doesn't cover the full range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 12}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 13, MaxTime: 15}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByCompactableRanges(testData.blocks, testData.ranges)) + }) + } +} + +func TestGroupBlocksByRange(t *testing.T) { + tests := map[string]struct { + timeRange int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + timeRange: 20, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + }, + }, + "only 1 block per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }}, + }, + }, + "multiple blocks per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }}, + }, + }, + "a block with time range larger then the range should be excluded": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, // This block is larger then the range. + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + "blocks with different time ranges but all fitting within the input range": { + timeRange: 40, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByRange(testData.blocks, testData.timeRange)) + }) + } +} + +func TestBlocksGroup_overlaps(t *testing.T) { + tests := []struct { + first blocksGroup + second blocksGroup + expected bool + }{ + { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: false, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 19, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 21}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 12, rangeEnd: 18}, + expected: true, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, tc.first.overlaps(tc.second)) + assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) + } +}