Skip to content

Commit da811ff

Browse files
committed
Add planner filter unit tests
Signed-off-by: Albert <[email protected]>
1 parent bab9b8c commit da811ff

File tree

3 files changed

+134
-10
lines changed

3 files changed

+134
-10
lines changed

docs/blocks-storage/compactor.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ compactor:
166166
# CLI flag: -compactor.disabled-tenants
167167
[disabled_tenants: <string> | default = ""]
168168

169+
# Enable planner filter which will filter groups of blocks within the Cortex
170+
# compactor instead of using the Thanos to group blocks.
171+
# CLI flag: -compactor.planner-filter-enabled
172+
[planner_filter_enabled: <boolean> | default = false]
173+
169174
# Shard tenants across multiple compactor instances. Sharding is required if
170175
# you run multiple compactor instances, in order to coordinate compactions and
171176
# avoid race conditions leading to the same tenant blocks simultaneously

pkg/compactor/planner_filter.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type PlannerFilter struct {
2626
compactorCfg Config
2727
metaSyncDir string
2828

29-
plans []*blocksGroup
29+
plans []blocksGroup
3030
}
3131

3232
func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) {
@@ -66,7 +66,7 @@ func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metad
6666
return metas, nil
6767
}
6868

69-
// Fetches blocks and generates plans that can be parallized and saves them
69+
// Fetches blocks and generates plans that can be parallized and saves them in the PlannerFilter struct.
7070
func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error {
7171
// Get blocks
7272
blocks, err := f.getUserBlocks(ctx)
@@ -87,20 +87,20 @@ func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID]
8787
mainGroups[key] = append(mainGroups[key], b)
8888
}
8989

90-
var plans []*blocksGroup
90+
var plans []blocksGroup
9191

92-
for _, mainBlocks := range mainGroups {
93-
for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds()) {
92+
for k, mainBlocks := range mainGroups {
93+
for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds(), f.ulogger) {
9494
// Nothing to do if we don't have at least 2 blocks.
9595
if len(plan.blocks) < 2 {
9696
continue
9797
}
9898

99-
plan.key = i
100-
10199
level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String())
102100

103-
plans = append(plans, &plan)
101+
plan.key = fmt.Sprintf("%v_%v", k, i)
102+
103+
plans = append(plans, plan)
104104
}
105105
}
106106

@@ -152,7 +152,7 @@ type blocksGroup struct {
152152
rangeStart int64 // Included.
153153
rangeEnd int64 // Excluded.
154154
blocks []*metadata.Meta
155-
key int
155+
key string
156156
}
157157

158158
// overlaps returns whether the group range overlaps with the input group.
@@ -166,7 +166,7 @@ func (g blocksGroup) overlaps(other blocksGroup) bool {
166166

167167
func (g blocksGroup) String() string {
168168
out := strings.Builder{}
169-
out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, blocks: ", g.rangeStart, g.rangeEnd))
169+
out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key))
170170

171171
for i, b := range g.blocks {
172172
if i > 0 {

pkg/compactor/planner_filter_test.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,132 @@
11
package compactor
22

33
import (
4+
"context"
45
"testing"
6+
"time"
57

8+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
9+
"github.com/cortexproject/cortex/pkg/util/flagext"
10+
"github.com/go-kit/kit/log"
11+
"github.com/oklog/ulid"
612
"github.com/prometheus/prometheus/tsdb"
713
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
815
"github.com/thanos-io/thanos/pkg/block/metadata"
916
)
1017

18+
func TestPlannerFilterPlanGeneration(t *testing.T) {
19+
block1ulid := ulid.MustNew(1, nil)
20+
block2ulid := ulid.MustNew(2, nil)
21+
block3ulid := ulid.MustNew(3, nil)
22+
block4ulid := ulid.MustNew(4, nil)
23+
block5ulid := ulid.MustNew(5, nil)
24+
block6ulid := ulid.MustNew(6, nil)
25+
block7ulid := ulid.MustNew(7, nil)
26+
block8ulid := ulid.MustNew(8, nil)
27+
block9ulid := ulid.MustNew(9, nil)
28+
block10ulid := ulid.MustNew(10, nil)
29+
block11ulid := ulid.MustNew(11, nil)
30+
31+
blocks :=
32+
map[ulid.ULID]*metadata.Meta{
33+
block1ulid: {
34+
BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()},
35+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
36+
},
37+
block2ulid: {
38+
BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()},
39+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
40+
},
41+
block3ulid: {
42+
BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()},
43+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
44+
},
45+
block4ulid: {
46+
BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()},
47+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
48+
},
49+
block5ulid: {
50+
BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()},
51+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}},
52+
},
53+
block6ulid: {
54+
BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()},
55+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}},
56+
},
57+
block7ulid: {
58+
BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()},
59+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}},
60+
},
61+
block8ulid: {
62+
BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()},
63+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}},
64+
},
65+
block9ulid: {
66+
BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()},
67+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}},
68+
},
69+
block10ulid: {
70+
BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()},
71+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}},
72+
},
73+
block11ulid: {
74+
BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()},
75+
Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}},
76+
},
77+
}
78+
79+
tests := map[string]struct {
80+
ranges cortex_tsdb.DurationList
81+
blocks map[ulid.ULID]*metadata.Meta
82+
expectedPlans []blocksGroup
83+
}{
84+
"test basic planning": {
85+
ranges: []time.Duration{2 * time.Hour, 4 * time.Hour},
86+
blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]},
87+
expectedPlans: []blocksGroup{
88+
{rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block6ulid], blocks[block5ulid]}, key: "0@14088339200549387484_0"},
89+
{rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"},
90+
{rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"},
91+
},
92+
},
93+
"test no compaction": {
94+
ranges: []time.Duration{2 * time.Hour, 4 * time.Hour},
95+
blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]},
96+
expectedPlans: []blocksGroup{},
97+
},
98+
"test smallest range first": {
99+
ranges: []time.Duration{2 * time.Hour, 4 * time.Hour},
100+
blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]},
101+
expectedPlans: []blocksGroup{
102+
{rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"},
103+
{rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"},
104+
{rangeStart: 14400000, rangeEnd: 28800000, blocks: []*metadata.Meta{blocks[block10ulid], blocks[block11ulid]}, key: "0@14088339200549387484_0"},
105+
},
106+
},
107+
}
108+
109+
for testName, testData := range tests {
110+
t.Run(testName, func(t *testing.T) {
111+
compactorCfg := Config{}
112+
flagext.DefaultValues(&compactorCfg)
113+
compactorCfg.BlockRanges = testData.ranges
114+
f := &PlannerFilter{
115+
userID: "test-user",
116+
compactorCfg: compactorCfg,
117+
ulogger: log.NewNopLogger(),
118+
}
119+
err := f.generatePlans(context.Background(), testData.blocks)
120+
require.NoError(t, err)
121+
actualPlans := f.plans
122+
require.Len(t, actualPlans, len(testData.expectedPlans))
123+
for i, expectedPlan := range testData.expectedPlans {
124+
assert.Equal(t, expectedPlan, actualPlans[i])
125+
}
126+
})
127+
}
128+
}
129+
11130
func TestGroupBlocksByCompactableRanges(t *testing.T) {
12131
tests := map[string]struct {
13132
ranges []int64

0 commit comments

Comments
 (0)