From 4d98a0ef98d02958788add00ae5d1b2d5fe25f7d Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 9 Jul 2021 19:27:18 -0700 Subject: [PATCH 01/11] add shuffle sharding grouper/planner Signed-off-by: Albert --- docs/blocks-storage/compactor.md | 5 + docs/configuration/config-file-reference.md | 4 + pkg/compactor/compactor.go | 49 +- pkg/compactor/shuffle_sharding_grouper.go | 379 +++++++++++++ .../shuffle_sharding_grouper_test.go | 496 ++++++++++++++++++ pkg/compactor/shuffle_sharding_planner.go | 39 ++ 6 files changed, 968 insertions(+), 4 deletions(-) create mode 100644 pkg/compactor/shuffle_sharding_grouper.go create mode 100644 pkg/compactor/shuffle_sharding_grouper_test.go create mode 100644 pkg/compactor/shuffle_sharding_planner.go diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index ada0f9f41a..d52f88bb4e 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -173,6 +173,11 @@ compactor: # CLI flag: -compactor.sharding-enabled [sharding_enabled: | default = false] + # The sharding strategy to use. Supported values are: default, + # shuffle-sharding. + # CLI flag: -compactor.sharding-strategy + [sharding_strategy: | default = "default"] + sharding_ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a26fce199c..781148d8de 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -5317,6 +5317,10 @@ The `compactor_config` configures the compactor for the blocks storage. # CLI flag: -compactor.sharding-enabled [sharding_enabled: | default = false] +# The sharding strategy to use. Supported values are: default, shuffle-sharding. +# CLI flag: -compactor.sharding-strategy +[sharding_strategy: | default = "default"] + sharding_ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 75b83fc1e1..bf3ec36baa 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -48,6 +48,9 @@ var ( errInvalidBlockRanges = "compactor block range periods should be divisible by the previous one, but %s is not divisible by %s" RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) + supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle} + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + DefaultBlocksGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { return compact.NewDefaultGrouper( logger, @@ -61,6 +64,19 @@ var ( metadata.NoneFunc) } + ShuffleShardingGrouperFactory = func(ctx context.Context, cfg Config, bkt objstore.Bucket, logger log.Logger, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, garbageCollectedBlocks prometheus.Counter) compact.Grouper { + return NewShuffleShardingGrouper( + logger, + bkt, + false, // Do not accept malformed indexes + true, // Enable vertical compaction + reg, + blocksMarkedForDeletion, + garbageCollectedBlocks, + metadata.NoneFunc, + cfg) + } + DefaultBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { @@ -70,6 +86,16 @@ var ( planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) return compactor, planner, nil } + + ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { + compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool()) + if err != nil { + return nil, nil, err + } + + planner := NewShuffleShardingPlanner(logger, cfg.BlockRanges.ToMilliseconds()) + return compactor, planner, nil + } ) // BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks. @@ -113,8 +139,9 @@ type Config struct { DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` // Compactors sharding. - ShardingEnabled bool `yaml:"sharding_enabled"` - ShardingRing RingConfig `yaml:"sharding_ring"` + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingStrategy string `yaml:"sharding_strategy"` + ShardingRing RingConfig `yaml:"sharding_ring"` // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override @@ -146,6 +173,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CleanupInterval, "compactor.cleanup-interval", 15*time.Minute, "How frequently compactor should run blocks cleanup and maintenance, as well as update the bucket index.") f.IntVar(&cfg.CleanupConcurrency, "compactor.cleanup-concurrency", 20, "Max number of tenants for which blocks cleanup and maintenance should run concurrently.") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") + f.StringVar(&cfg.ShardingStrategy, "compactor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will permanently delete blocks marked for deletion from the bucket. "+ "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") @@ -164,6 +192,11 @@ func (cfg *Config) Validate() error { } } + // Make sure a valid sharding strategy is being used + if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) { + return errInvalidShardingStrategy + } + return nil } @@ -235,12 +268,20 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi blocksGrouperFactory := compactorCfg.BlocksGrouperFactory if blocksGrouperFactory == nil { - blocksGrouperFactory = DefaultBlocksGrouperFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksGrouperFactory = ShuffleShardingGrouperFactory + } else { + blocksGrouperFactory = DefaultBlocksGrouperFactory + } } blocksCompactorFactory := compactorCfg.BlocksCompactorFactory if blocksCompactorFactory == nil { - blocksCompactorFactory = DefaultBlocksCompactorFactory + if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle { + blocksCompactorFactory = ShuffleShardingBlocksCompactorFactory + } else { + blocksCompactorFactory = DefaultBlocksCompactorFactory + } } cortexCompactor, err := newCompactor(compactorCfg, storageCfg, cfgProvider, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go new file mode 100644 index 0000000000..82e14dcfc0 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -0,0 +1,379 @@ +package compactor + +import ( + "fmt" + "hash/fnv" + "sort" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type ShuffleShardingGrouper struct { + logger log.Logger + bkt objstore.Bucket + acceptMalformedIndex bool + enableVerticalCompaction bool + reg prometheus.Registerer + blocksMarkedForDeletion prometheus.Counter + garbageCollectedBlocks prometheus.Counter + hashFunc metadata.HashFunc + compactions *prometheus.CounterVec + compactionRunsStarted *prometheus.CounterVec + compactionRunsCompleted *prometheus.CounterVec + compactionFailures *prometheus.CounterVec + verticalCompactions *prometheus.CounterVec + compactorCfg Config +} + +func NewShuffleShardingGrouper( + logger log.Logger, + bkt objstore.Bucket, + acceptMalformedIndex bool, + enableVerticalCompaction bool, + reg prometheus.Registerer, + blocksMarkedForDeletion prometheus.Counter, + garbageCollectedBlocks prometheus.Counter, + hashFunc metadata.HashFunc, + compactorCfg Config, +) *ShuffleShardingGrouper { + if logger == nil { + logger = log.NewNopLogger() + } + + return &ShuffleShardingGrouper{ + logger: logger, + bkt: bkt, + acceptMalformedIndex: acceptMalformedIndex, + enableVerticalCompaction: enableVerticalCompaction, + reg: reg, + blocksMarkedForDeletion: blocksMarkedForDeletion, + garbageCollectedBlocks: garbageCollectedBlocks, + hashFunc: hashFunc, + compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block.", + }, []string{"group"}), + compactionRunsStarted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_started_total", + Help: "Total number of group compaction attempts.", + }, []string{"group"}), + compactionRunsCompleted: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compaction_runs_completed_total", + Help: "Total number of group completed compaction runs. This also includes compactor group runs that resulted with no compaction.", + }, []string{"group"}), + compactionFailures: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_compactions_failures_total", + Help: "Total number of failed group compactions.", + }, []string{"group"}), + verticalCompactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_compact_group_vertical_compactions_total", + Help: "Total number of group compaction attempts that resulted in a new block based on overlapping blocks.", + }, []string{"group"}), + compactorCfg: compactorCfg, + } +} + +// Groups function modified from https://github.com/cortexproject/cortex/pull/2616 +func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*compact.Group, err error) { + // First of all we have to group blocks using the Thanos default + // grouping (based on downsample resolution + external labels). + mainGroups := map[string][]*metadata.Meta{} + for _, b := range blocks { + key := compact.DefaultGroupKey(b.Thanos) + mainGroups[key] = append(mainGroups[key], b) + } + + // For each group, we have to further split it into set of blocks + // which we can parallelly compact. + var outGroups []*compact.Group + + i := 0 + for _, mainBlocks := range mainGroups { + for _, group := range groupBlocksByCompactableRanges(mainBlocks, g.compactorCfg.BlockRanges.ToMilliseconds()) { + // Nothing to do if we don't have at least 2 blocks. + if len(group.blocks) < 2 { + continue + } + + // TODO: Use the group's hash to determine whether a compactor should be responsible for compacting that group + groupHash := hashGroup(group.blocks[0].Thanos.Labels["__org_id__"], group.rangeStart, group.rangeEnd) + + groupKey := fmt.Sprintf("%v%d", groupHash, i) + i++ + + level.Debug(g.logger).Log("msg", "found compactable group for user", "user", group.blocks[0].Thanos.Labels["__org_id__"], "plan", group.String()) + + // All the blocks within the same group have the same downsample + // resolution and external labels. + resolution := group.blocks[0].Thanos.Downsample.Resolution + externalLabels := labels.FromMap(group.blocks[0].Thanos.Labels) + + thanosGroup, err := compact.NewGroup( + log.With(g.logger, "groupKey", groupKey, "rangeStart", group.rangeStartTime().String(), "rangeEnd", group.rangeEndTime().String(), "externalLabels", externalLabels, "downsampleResolution", resolution), + g.bkt, + groupKey, + externalLabels, + resolution, + false, // No malformed index. + true, // Enable vertical compaction. + g.compactions.WithLabelValues(groupKey), + g.compactionRunsStarted.WithLabelValues(groupKey), + g.compactionRunsCompleted.WithLabelValues(groupKey), + g.compactionFailures.WithLabelValues(groupKey), + g.verticalCompactions.WithLabelValues(groupKey), + g.garbageCollectedBlocks, + g.blocksMarkedForDeletion, + g.hashFunc, + ) + if err != nil { + return nil, errors.Wrap(err, "create compaction group") + } + + for _, m := range group.blocks { + if err := thanosGroup.AppendMeta(m); err != nil { + return nil, errors.Wrap(err, "add block to compaction group") + } + } + + outGroups = append(outGroups, thanosGroup) + } + } + + // Ensure groups are sorted by smallest range, oldest min time first. The rationale + // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner + // than later) and older ones are more likely to be "complete" (no missing block still + // to be uploaded). + sort.SliceStable(outGroups, func(i, j int) bool { + iLength := outGroups[i].MaxTime() - outGroups[i].MinTime() + jLength := outGroups[j].MaxTime() - outGroups[j].MinTime() + + if iLength != jLength { + return iLength < jLength + } + if outGroups[i].MinTime() != outGroups[j].MinTime() { + return outGroups[i].MinTime() < outGroups[j].MinTime() + } + + // Guarantee stable sort for tests. + return outGroups[i].Key() < outGroups[j].Key() + }) + + return outGroups, nil +} + +// Get the hash of a group based on the UserID, and the starting and ending time of the group's range. +func hashGroup(userID string, rangeStart int64, rangeEnd int64) uint32 { + groupString := fmt.Sprintf("%v%v%v", userID, rangeStart, rangeEnd) + groupHasher := fnv.New32a() + // Hasher never returns err. + _, _ = groupHasher.Write([]byte(groupString)) + groupHash := groupHasher.Sum32() + + return groupHash +} + +// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 +type blocksGroup struct { + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key string +} + +// overlaps returns whether the group range overlaps with the input group. +func (g blocksGroup) overlaps(other blocksGroup) bool { + if g.rangeStart >= other.rangeEnd || other.rangeStart >= g.rangeEnd { + return false + } + + return true +} + +func (g blocksGroup) rangeStartTime() time.Time { + return time.Unix(0, g.rangeStart*int64(time.Millisecond)).UTC() +} + +func (g blocksGroup) rangeEndTime() time.Time { + return time.Unix(0, g.rangeEnd*int64(time.Millisecond)).UTC() +} + +func (g blocksGroup) String() string { + out := strings.Builder{} + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key)) + + for i, b := range g.blocks { + if i > 0 { + out.WriteString(", ") + } + + minT := time.Unix(0, b.MinTime*int64(time.Millisecond)).UTC() + maxT := time.Unix(0, b.MaxTime*int64(time.Millisecond)).UTC() + out.WriteString(fmt.Sprintf("%s (min time: %s, max time: %s)", b.ULID.String(), minT.String(), maxT.String())) + } + + return out.String() +} + +func (g blocksGroup) rangeLength() int64 { + return g.rangeEnd - g.rangeStart +} + +// minTime returns the MinTime across all blocks in the group. +func (g blocksGroup) minTime() int64 { + // Blocks are expected to be sorted by MinTime. + return g.blocks[0].MinTime +} + +// maxTime returns the MaxTime across all blocks in the group. +func (g blocksGroup) maxTime() int64 { + max := g.blocks[0].MaxTime + + for _, b := range g.blocks[1:] { + if b.MaxTime > max { + max = b.MaxTime + } + } + + return max +} + +// groupBlocksByCompactableRanges groups input blocks by compactable ranges, giving preference +// to smaller ranges. If a smaller range contains more than 1 block (and thus it should +// be compacted), the larger range block group is not generated until each of its +// smaller ranges have 1 block each at most. +func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []blocksGroup { + if len(blocks) == 0 { + return nil + } + + // Sort blocks by min time. + sortMetasByMinTime(blocks) + + var groups []blocksGroup + + for _, tr := range ranges { + nextGroup: + for _, group := range groupBlocksByRange(blocks, tr) { + + // Exclude groups with a single block, because no compaction is required. + if len(group.blocks) < 2 { + continue + } + + // Ensure this group's range does not overlap with any group already scheduled + // for compaction by a smaller range, because we need to guarantee that smaller ranges + // are compacted first. + for _, c := range groups { + if group.overlaps(c) { + continue nextGroup + } + } + + groups = append(groups, group) + } + } + + // Ensure we don't compact the most recent blocks prematurely when another one of + // the same size still fits in the range. To do it, we consider valid a group only + // if it's before the most recent block or if it fully covers the range. + highestMinTime := blocks[len(blocks)-1].MinTime + + for idx := 0; idx < len(groups); { + group := groups[idx] + + // If the group covers a range before the most recent block, it's fine. + if group.rangeEnd <= highestMinTime { + idx++ + continue + } + + // If the group covers the full range, it's fine. + if group.maxTime()-group.minTime() == group.rangeLength() { + idx++ + continue + } + + // We hit into a group which would compact recent blocks prematurely, + // so we need to filter it out. + + groups = append(groups[:idx], groups[idx+1:]...) + } + + return groups +} + +// groupBlocksByRange splits the blocks by the time range. The range sequence starts at 0. +// Input blocks are expected to be sorted by MinTime. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { + var ret []blocksGroup + + for i := 0; i < len(blocks); { + var ( + group blocksGroup + m = blocks[i] + ) + + group.rangeStart = getRangeStart(m, tr) + group.rangeEnd = group.rangeStart + tr + + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > group.rangeEnd { + i++ + continue + } + + // Add all blocks to the current group that are within [t0, t0+tr]. + for ; i < len(blocks); i++ { + // If the block does not start within this group, then we should break the iteration + // and move it to the next group. + if blocks[i].MinTime >= group.rangeEnd { + break + } + + // If the block doesn't fall into this group, but it started within this group then it + // means it spans across multiple ranges and we should skip it. + if blocks[i].MaxTime > group.rangeEnd { + continue + } + + group.blocks = append(group.blocks, blocks[i]) + } + + if len(group.blocks) > 0 { + ret = append(ret, group) + } + } + + return ret +} + +func getRangeStart(m *metadata.Meta, tr int64) int64 { + // Compute start of aligned time range of size tr closest to the current block's start. + // This code has been copied from TSDB. + if m.MinTime >= 0 { + return tr * (m.MinTime / tr) + } + + return tr * ((m.MinTime - tr + 1) / tr) +} + +func sortMetasByMinTime(metas []*metadata.Meta) { + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) +} diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go new file mode 100644 index 0000000000..5f6429dbb7 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -0,0 +1,496 @@ +package compactor + +import ( + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestTimeShardingGrouper_Groups(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + block4ulid := ulid.MustNew(4, nil) + block5ulid := ulid.MustNew(5, nil) + block6ulid := ulid.MustNew(6, nil) + block7ulid := ulid.MustNew(7, nil) + block8ulid := ulid.MustNew(8, nil) + block9ulid := ulid.MustNew(9, nil) + block10ulid := ulid.MustNew(10, nil) + block11ulid := ulid.MustNew(11, nil) + block12ulid := ulid.MustNew(12, nil) + block13ulid := ulid.MustNew(13, nil) + block14ulid := ulid.MustNew(14, nil) + block15ulid := ulid.MustNew(15, nil) + + blocks := + map[ulid.ULID]*metadata.Meta{ + block1ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block2ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block3ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block4ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block5ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block6ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block7ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block8ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block9ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "3"}}, + }, + block10ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block11ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "2"}}, + }, + block12ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block12ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block13ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block13ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 20 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block14ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block14ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + block15ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block15ulid, MinTime: 21 * time.Hour.Milliseconds(), MaxTime: 40 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"__org_id__": "1"}}, + }, + } + + tests := map[string]struct { + ranges []time.Duration + blocks map[ulid.ULID]*metadata.Meta + expected [][]ulid.ULID + }{ + "test basic grouping": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]}, + expected: [][]ulid.ULID{ + {block5ulid, block6ulid}, + {block1ulid, block3ulid}, + {block2ulid, block4ulid}, + }, + }, + "test no compaction": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]}, + expected: [][]ulid.ULID{}, + }, + "test smallest range first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]}, + expected: [][]ulid.ULID{ + {block1ulid, block3ulid}, + {block2ulid, block4ulid}, + {block10ulid, block11ulid}, + }, + }, + "test oldest min time first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block12ulid: blocks[block12ulid]}, + expected: [][]ulid.ULID{ + {block1ulid, block3ulid, block12ulid}, + {block2ulid, block4ulid}, + }, + }, + "test overlapping blocks": { + ranges: []time.Duration{20 * time.Hour, 40 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block13ulid: blocks[block13ulid], block14ulid: blocks[block14ulid], block15ulid: blocks[block15ulid]}, + expected: [][]ulid.ULID{}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + compactorCfg := &Config{ + BlockRanges: testData.ranges, + } + + g := NewShuffleShardingGrouper(nil, + nil, + false, // Do not accept malformed indexes + true, // Enable vertical compaction + prometheus.NewRegistry(), + nil, + nil, + metadata.NoneFunc, + *compactorCfg) + actual, err := g.Groups(testData.blocks) + require.NoError(t, err) + require.Len(t, actual, len(testData.expected)) + + for idx, expectedIDs := range testData.expected { + assert.Equal(t, expectedIDs, actual[idx].IDs()) + } + }) + } +} + +func TestGroupBlocksByCompactableRanges(t *testing.T) { + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + ranges: []int64{20}, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: nil, + }, + "only 1 block for each range (single range)": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "only 1 block for each range (multiple ranges)": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "input blocks can be compacted on the 1st range only": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + }}, + }, + }, + "input blocks can be compacted on the 2nd range only": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 60, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }}, + }, + }, + "input blocks can be compacted on a mix of 1st and 2nd ranges, guaranteeing no overlaps and giving preference to smaller ranges": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + }}, + {rangeStart: 70, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "input blocks have already been compacted with the largest range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: nil, + }, + "input blocks match the largest range but can be compacted because overlapping": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should be NOT considered for 1st level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should BE considered for 2nd level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }}, + }, + }, + "a block with time range larger then the largest compaction range should NOT be considered for compaction": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 150}}, // This block is larger then the largest compaction range. + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a range containing the most recent block shouldn't be prematurely compacted if doesn't cover the full range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 12}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 13, MaxTime: 15}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByCompactableRanges(testData.blocks, testData.ranges)) + }) + } +} + +func TestGroupBlocksByRange(t *testing.T) { + tests := map[string]struct { + timeRange int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + timeRange: 20, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + }, + }, + "only 1 block per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }}, + }, + }, + "multiple blocks per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }}, + }, + }, + "a block with time range larger then the range should be excluded": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, // This block is larger then the range. + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + "blocks with different time ranges but all fitting within the input range": { + timeRange: 40, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByRange(testData.blocks, testData.timeRange)) + }) + } +} + +func TestBlocksGroup_overlaps(t *testing.T) { + tests := []struct { + first blocksGroup + second blocksGroup + expected bool + }{ + { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: false, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 19, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 21}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 12, rangeEnd: 18}, + expected: true, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, tc.first.overlaps(tc.second)) + assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) + } +} diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go new file mode 100644 index 0000000000..96e70f59a5 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -0,0 +1,39 @@ +package compactor + +import ( + "context" + "fmt" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type ShuffleShardingPlanner struct { + logger log.Logger + ranges []int64 +} + +func NewShuffleShardingPlanner(logger log.Logger, ranges []int64) *ShuffleShardingPlanner { + return &ShuffleShardingPlanner{ + logger: logger, + ranges: ranges, + } +} + +func (p *ShuffleShardingPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + // Ensure all blocks fits within the largest range. This is a double check + // to ensure there's no bug in the previous blocks grouping, given this Plan() + // is just a pass-through. + // Modifed from https://github.com/cortexproject/cortex/pull/2616/files#diff-e3051fc530c48bb276ba958dd8fadc684e546bd7964e6bc75cef9a86ef8df344R28-R63 + largestRange := p.ranges[len(p.ranges)-1] + rangeStart := getRangeStart(metasByMinTime[0], largestRange) + rangeEnd := rangeStart + largestRange + + for _, b := range metasByMinTime { + if b.MinTime < rangeStart || b.MaxTime > rangeEnd { + return nil, fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", b.ULID.String(), b.MinTime, b.MaxTime, rangeStart, rangeEnd) + } + } + + return metasByMinTime, nil +} From 0a0e1f8546fa866137963bfcb9661890c0b06a85 Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 9 Jul 2021 19:30:53 -0700 Subject: [PATCH 02/11] update CHANGELOG.md Signed-off-by: Albert --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 54d2b9fce8..3c4d8c35b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597 * [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601 * [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 +* [FEATURE] Add shuffle sharding grouper and planner within compactor to allow further work towards parallelizing compaction #4621 * [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 * [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 From 0c916fd1f4aa8f3818293c5401ea90ce40489e54 Mon Sep 17 00:00:00 2001 From: Albert Date: Mon, 12 Jul 2021 10:21:05 -0700 Subject: [PATCH 03/11] update changelog Signed-off-by: Albert --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c4d8c35b3..b4e72b66c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,9 @@ * [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157) * Users only have control of the HTTP header when Cortex is not frontend by an auth proxy validating the tenant IDs + +## 1.10.0-rc.0 / 2021-06-28 + * [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298 * [CHANGE] Cortex chunks storage has been deprecated and it's now in maintenance mode: all Cortex users are encouraged to migrate to the blocks storage. No new features will be added to the chunks storage. The default Cortex configuration still runs the chunks engine; please check out the [blocks storage doc](https://cortexmetrics.io/docs/blocks-storage/) on how to configure Cortex to run with the blocks storage. #4268 * [CHANGE] The example Kubernetes manifests (stored at `k8s/`) have been removed due to a lack of proper support and maintenance. #4268 From 464bb2732a77d5aec75d12ca261a85043e1d1ad1 Mon Sep 17 00:00:00 2001 From: Albert Date: Tue, 13 Jul 2021 10:23:03 -0700 Subject: [PATCH 04/11] Add unit tests for shuffle sharding planner Signed-off-by: Albert --- .../shuffle_sharding_grouper_test.go | 2 +- .../shuffle_sharding_planner_test.go | 142 ++++++++++++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 pkg/compactor/shuffle_sharding_planner_test.go diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index 5f6429dbb7..7e553fbc6e 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -12,7 +12,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" ) -func TestTimeShardingGrouper_Groups(t *testing.T) { +func TestShuffleShardingGrouper_Groups(t *testing.T) { block1ulid := ulid.MustNew(1, nil) block2ulid := ulid.MustNew(2, nil) block3ulid := ulid.MustNew(3, nil) diff --git a/pkg/compactor/shuffle_sharding_planner_test.go b/pkg/compactor/shuffle_sharding_planner_test.go new file mode 100644 index 0000000000..fdf63c9207 --- /dev/null +++ b/pkg/compactor/shuffle_sharding_planner_test.go @@ -0,0 +1,142 @@ +package compactor + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestShuffleShardingPlanner_Plan(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []*metadata.Meta + expectedErr error + }{ + "test basic plan": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + expected: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 1 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + }, + "test blocks outside largest range smaller min time after": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 2 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 1": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block1ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + "test blocks outside largest range 2": { + ranges: []int64{2 * time.Hour.Milliseconds()}, + blocks: []*metadata.Meta{ + { + BlockMeta: tsdb.BlockMeta{ + ULID: block1ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 2 * time.Hour.Milliseconds(), + }, + }, + { + BlockMeta: tsdb.BlockMeta{ + ULID: block2ulid, + MinTime: 0 * time.Hour.Milliseconds(), + MaxTime: 4 * time.Hour.Milliseconds(), + }, + }, + }, + expectedErr: fmt.Errorf("block %s with time range %d:%d is outside the largest expected range %d:%d", block2ulid.String(), 0*time.Hour.Milliseconds(), 4*time.Hour.Milliseconds(), 0*time.Hour.Milliseconds(), 2*time.Hour.Milliseconds()), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + p := NewShuffleShardingPlanner(nil, + testData.ranges) + actual, err := p.Plan(context.Background(), testData.blocks) + + if testData.expectedErr != nil { + assert.Equal(t, err, testData.expectedErr) + } else { + require.NoError(t, err) + } + + require.Len(t, actual, len(testData.expected)) + + for idx, expectedMeta := range testData.expected { + assert.Equal(t, expectedMeta.ULID, actual[idx].ULID) + } + }) + } +} From bb398053aaa1d017a5fc046cfbcbe68034321653 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 12 Aug 2021 04:59:24 -0700 Subject: [PATCH 05/11] fix compator unit test Signed-off-by: Albert --- pkg/compactor/compactor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index bf3ec36baa..a4ee314f3c 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -88,7 +88,7 @@ var ( } ShuffleShardingBlocksCompactorFactory = func(ctx context.Context, cfg Config, logger log.Logger, reg prometheus.Registerer) (compact.Compactor, compact.Planner, error) { - compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool()) + compactor, err := tsdb.NewLeveledCompactor(ctx, reg, logger, cfg.BlockRanges.ToMilliseconds(), downsample.NewPool(), nil) if err != nil { return nil, nil, err } From 9fe24e2f0f09ec2a41f258ea405d0e8fc5f3a8db Mon Sep 17 00:00:00 2001 From: ac1214 Date: Thu, 23 Sep 2021 14:00:28 -0600 Subject: [PATCH 06/11] add note regarding thanos metrics Signed-off-by: ac1214 --- pkg/compactor/shuffle_sharding_grouper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 82e14dcfc0..15c2b1c94e 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -60,6 +60,7 @@ func NewShuffleShardingGrouper( blocksMarkedForDeletion: blocksMarkedForDeletion, garbageCollectedBlocks: garbageCollectedBlocks, hashFunc: hashFunc, + // Metrics are copied from Thanos DefaultGrouper constructor compactions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "thanos_compact_group_compactions_total", Help: "Total number of group compaction attempts that resulted in a new block.", From a85ef000563c465b03745ab0c7f8e6d00f312ff5 Mon Sep 17 00:00:00 2001 From: ac1214 Date: Thu, 23 Sep 2021 14:01:47 -0600 Subject: [PATCH 07/11] fix spacing in changelog Signed-off-by: ac1214 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b4e72b66c7..36349bf08c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ * [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #4483 * [BUGFIX] Update go-kit package to fix spurious log messages #4544 + ## 1.10.0 / 2021-08-03 * [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157) From 568d2568e467df400e77f36645ac8645a7e25613 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 13:29:19 -0800 Subject: [PATCH 08/11] Make code compile again Signed-off-by: Alvin Lin --- go.mod | 1 + pkg/compactor/compactor.go | 1 + pkg/compactor/shuffle_sharding_grouper.go | 6 +++++- pkg/compactor/shuffle_sharding_grouper_test.go | 1 + vendor/modules.txt | 1 + 5 files changed, 9 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index cde7daf471..b0b9793f6b 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/felixge/fgprof v0.9.1 github.com/fsouza/fake-gcs-server v1.7.0 + github.com/go-kit/kit v0.12.0 github.com/go-kit/log v0.2.0 github.com/go-openapi/strfmt v0.21.0 github.com/go-openapi/swag v0.19.15 diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a4ee314f3c..f0e2464406 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -72,6 +72,7 @@ var ( true, // Enable vertical compaction reg, blocksMarkedForDeletion, + prometheus.NewCounter(prometheus.CounterOpts{}), garbageCollectedBlocks, metadata.NoneFunc, cfg) diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 15c2b1c94e..434ba5e017 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" @@ -26,6 +26,7 @@ type ShuffleShardingGrouper struct { enableVerticalCompaction bool reg prometheus.Registerer blocksMarkedForDeletion prometheus.Counter + blocksMarkedForNoCompact prometheus.Counter garbageCollectedBlocks prometheus.Counter hashFunc metadata.HashFunc compactions *prometheus.CounterVec @@ -43,6 +44,7 @@ func NewShuffleShardingGrouper( enableVerticalCompaction bool, reg prometheus.Registerer, blocksMarkedForDeletion prometheus.Counter, + blocksMarkedForNoCompact prometheus.Counter, garbageCollectedBlocks prometheus.Counter, hashFunc metadata.HashFunc, compactorCfg Config, @@ -58,6 +60,7 @@ func NewShuffleShardingGrouper( enableVerticalCompaction: enableVerticalCompaction, reg: reg, blocksMarkedForDeletion: blocksMarkedForDeletion, + blocksMarkedForNoCompact: blocksMarkedForNoCompact, garbageCollectedBlocks: garbageCollectedBlocks, hashFunc: hashFunc, // Metrics are copied from Thanos DefaultGrouper constructor @@ -135,6 +138,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re g.verticalCompactions.WithLabelValues(groupKey), g.garbageCollectedBlocks, g.blocksMarkedForDeletion, + g.blocksMarkedForNoCompact, g.hashFunc, ) if err != nil { diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index 7e553fbc6e..e54a849be5 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -149,6 +149,7 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) { prometheus.NewRegistry(), nil, nil, + nil, metadata.NoneFunc, *compactorCfg) actual, err := g.Groups(testData.blocks) diff --git a/vendor/modules.txt b/vendor/modules.txt index f2b2f62ce0..bd61c343f4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -238,6 +238,7 @@ github.com/fsnotify/fsnotify github.com/fsouza/fake-gcs-server/fakestorage github.com/fsouza/fake-gcs-server/internal/backend # github.com/go-kit/kit v0.12.0 +## explicit github.com/go-kit/kit/log github.com/go-kit/kit/log/level # github.com/go-kit/log v0.2.0 From ac51bb1ffec76f7a0d231f3bc547fce607574a69 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 14:20:14 -0800 Subject: [PATCH 09/11] Remove blocklisted pkg Signed-off-by: Alvin Lin --- go.mod | 1 - pkg/compactor/shuffle_sharding_grouper.go | 4 ++-- pkg/compactor/shuffle_sharding_planner.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index b0b9793f6b..cde7daf471 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,6 @@ require ( github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/felixge/fgprof v0.9.1 github.com/fsouza/fake-gcs-server v1.7.0 - github.com/go-kit/kit v0.12.0 github.com/go-kit/log v0.2.0 github.com/go-openapi/strfmt v0.21.0 github.com/go-openapi/swag v0.19.15 diff --git a/pkg/compactor/shuffle_sharding_grouper.go b/pkg/compactor/shuffle_sharding_grouper.go index 434ba5e017..464bc83ff0 100644 --- a/pkg/compactor/shuffle_sharding_grouper.go +++ b/pkg/compactor/shuffle_sharding_grouper.go @@ -7,8 +7,8 @@ import ( "strings" "time" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/compactor/shuffle_sharding_planner.go b/pkg/compactor/shuffle_sharding_planner.go index 96e70f59a5..f5c97c1266 100644 --- a/pkg/compactor/shuffle_sharding_planner.go +++ b/pkg/compactor/shuffle_sharding_planner.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/go-kit/kit/log" + "github.com/go-kit/log" "github.com/thanos-io/thanos/pkg/block/metadata" ) From 46dea189d18b8a4e0eb2f10e196d77f76480b2ae Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 14:23:56 -0800 Subject: [PATCH 10/11] Run go mod vendor Signed-off-by: Alvin Lin --- vendor/modules.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/vendor/modules.txt b/vendor/modules.txt index bd61c343f4..f2b2f62ce0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -238,7 +238,6 @@ github.com/fsnotify/fsnotify github.com/fsouza/fake-gcs-server/fakestorage github.com/fsouza/fake-gcs-server/internal/backend # github.com/go-kit/kit v0.12.0 -## explicit github.com/go-kit/kit/log github.com/go-kit/kit/log/level # github.com/go-kit/log v0.2.0 From 8dd63018393f4121ab3ab7386330ccd78f335ad6 Mon Sep 17 00:00:00 2001 From: Alvin Lin Date: Fri, 14 Jan 2022 14:48:23 -0800 Subject: [PATCH 11/11] Remove uneeded change from CHANGELOG Signed-off-by: Alvin Lin --- CHANGELOG.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 36349bf08c..3c4d8c35b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,14 +85,10 @@ * [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #4483 * [BUGFIX] Update go-kit package to fix spurious log messages #4544 - ## 1.10.0 / 2021-08-03 * [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157) * Users only have control of the HTTP header when Cortex is not frontend by an auth proxy validating the tenant IDs - -## 1.10.0-rc.0 / 2021-06-28 - * [CHANGE] Enable strict JSON unmarshal for `pkg/util/validation.Limits` struct. The custom `UnmarshalJSON()` will now fail if the input has unknown fields. #4298 * [CHANGE] Cortex chunks storage has been deprecated and it's now in maintenance mode: all Cortex users are encouraged to migrate to the blocks storage. No new features will be added to the chunks storage. The default Cortex configuration still runs the chunks engine; please check out the [blocks storage doc](https://cortexmetrics.io/docs/blocks-storage/) on how to configure Cortex to run with the blocks storage. #4268 * [CHANGE] The example Kubernetes manifests (stored at `k8s/`) have been removed due to a lack of proper support and maintenance. #4268