Skip to content

Commit 2dcd647

Browse files
alexqylealanprot
authored andcommitted
Partitioning Compactor
Signed-off-by: Alex Le <[email protected]>
1 parent f0b0fb3 commit 2dcd647

36 files changed

+9662
-1569
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
6+
"github.com/prometheus/prometheus/storage"
7+
"github.com/prometheus/prometheus/util/annotations"
8+
)
9+
10+
type backgrounChunkSeriesSet struct {
11+
nextSet chan storage.ChunkSeries
12+
actual storage.ChunkSeries
13+
cs storage.ChunkSeriesSet
14+
}
15+
16+
func (b *backgrounChunkSeriesSet) Next() bool {
17+
s, ok := <-b.nextSet
18+
b.actual = s
19+
return ok
20+
}
21+
22+
func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries {
23+
return b.actual
24+
}
25+
26+
func (b *backgrounChunkSeriesSet) Err() error {
27+
return b.cs.Err()
28+
}
29+
30+
func (b *backgrounChunkSeriesSet) Warnings() annotations.Annotations {
31+
return b.cs.Warnings()
32+
}
33+
34+
func (b *backgrounChunkSeriesSet) run(ctx context.Context) {
35+
for {
36+
if !b.cs.Next() {
37+
close(b.nextSet)
38+
return
39+
}
40+
41+
select {
42+
case b.nextSet <- b.cs.At():
43+
case <-ctx.Done():
44+
return
45+
}
46+
}
47+
}
48+
49+
func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet {
50+
r := &backgrounChunkSeriesSet{
51+
cs: cs,
52+
nextSet: make(chan storage.ChunkSeries, 1000),
53+
}
54+
55+
go func() {
56+
r.run(ctx)
57+
}()
58+
59+
return r
60+
}

pkg/compactor/block_visit_marker.go

Lines changed: 124 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,16 @@ import (
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/thanos-io/objstore"
1818
"github.com/thanos-io/thanos/pkg/block/metadata"
19+
"golang.org/x/sync/errgroup"
1920

2021
"github.com/cortexproject/cortex/pkg/util/runutil"
2122
)
2223

2324
const (
24-
// BlockVisitMarkerFile is the known json filename for representing the most recent compactor visit.
25-
BlockVisitMarkerFile = "visit-mark.json"
25+
// BlockVisitMarkerFileSuffix is the known suffix of json filename for representing the most recent compactor visit.
26+
BlockVisitMarkerFileSuffix = "visit-mark.json"
27+
// BlockVisitMarkerFilePrefix is the known prefix of json filename for representing the most recent compactor visit.
28+
BlockVisitMarkerFilePrefix = "partition-"
2629
// VisitMarkerVersion1 is the current supported version of visit-mark file.
2730
VisitMarkerVersion1 = 1
2831
)
@@ -34,23 +37,34 @@ var (
3437
)
3538

3639
type BlockVisitMarker struct {
37-
CompactorID string `json:"compactorID"`
40+
CompactorID string `json:"compactorID"`
41+
Status VisitStatus `json:"status"`
42+
PartitionedGroupID uint32 `json:"partitionedGroupID"`
43+
PartitionID int `json:"partitionID"`
3844
// VisitTime is a unix timestamp of when the block was visited (mark updated).
3945
VisitTime int64 `json:"visitTime"`
4046
// Version of the file.
4147
Version int `json:"version"`
4248
}
4349

44-
func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration) bool {
45-
return time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
50+
func (b *BlockVisitMarker) isVisited(blockVisitMarkerTimeout time.Duration, partitionID int) bool {
51+
return b.isCompleted() || partitionID == b.PartitionID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
4652
}
4753

48-
func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, compactorID string) bool {
49-
return b.CompactorID == compactorID && time.Now().Before(time.Unix(b.VisitTime, 0).Add(blockVisitMarkerTimeout))
54+
func (b *BlockVisitMarker) isVisitedByCompactor(blockVisitMarkerTimeout time.Duration, partitionID int, compactorID string) bool {
55+
return b.CompactorID == compactorID && b.isVisited(blockVisitMarkerTimeout, partitionID)
5056
}
5157

52-
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
53-
visitMarkerFile := path.Join(blockID, BlockVisitMarkerFile)
58+
func (b *BlockVisitMarker) isCompleted() bool {
59+
return b.Status == Completed
60+
}
61+
62+
func GetBlockVisitMarkerFile(blockID string, partitionID int) string {
63+
return path.Join(blockID, fmt.Sprintf("%s%d-%s", BlockVisitMarkerFilePrefix, partitionID, BlockVisitMarkerFileSuffix))
64+
}
65+
66+
func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketReader, logger log.Logger, blockID string, partitionID int, blockVisitMarkerReadFailed prometheus.Counter) (*BlockVisitMarker, error) {
67+
visitMarkerFile := GetBlockVisitMarkerFile(blockID, partitionID)
5468
visitMarkerFileReader, err := bkt.ReaderWithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, visitMarkerFile)
5569
if err != nil {
5670
if bkt.IsObjNotFoundErr(err) {
@@ -76,15 +90,23 @@ func ReadBlockVisitMarker(ctx context.Context, bkt objstore.InstrumentedBucketRe
7690
return &blockVisitMarker, nil
7791
}
7892

79-
func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error {
80-
blockVisitMarkerFilePath := path.Join(blockID, BlockVisitMarkerFile)
93+
func UpdateBlockVisitMarker(ctx context.Context, bkt objstore.Bucket, blockID string, partitionID int, reader io.Reader, blockVisitMarkerWriteFailed prometheus.Counter) error {
94+
blockVisitMarkerFilePath := GetBlockVisitMarkerFile(blockID, partitionID)
8195
if err := bkt.Upload(ctx, blockVisitMarkerFilePath, reader); err != nil {
8296
blockVisitMarkerWriteFailed.Inc()
8397
return err
8498
}
8599
return nil
86100
}
87101

102+
func generateBlocksInfo(blocks []*metadata.Meta) string {
103+
var blockIds []string
104+
for _, block := range blocks {
105+
blockIds = append(blockIds, block.ULID.String())
106+
}
107+
return strings.Join(blockIds, ",")
108+
}
109+
88110
func markBlocksVisited(
89111
ctx context.Context,
90112
bkt objstore.Bucket,
@@ -98,54 +120,121 @@ func markBlocksVisited(
98120
blockVisitMarkerWriteFailed.Inc()
99121
return
100122
}
101-
reader := bytes.NewReader(visitMarkerFileContent)
123+
g, _ := errgroup.WithContext(ctx)
124+
g.SetLimit(32)
102125
for _, block := range blocks {
103-
select {
104-
// Exit early if possible.
105-
case <-ctx.Done():
106-
return
107-
default:
108-
}
109-
110126
blockID := block.ULID.String()
111-
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, reader, blockVisitMarkerWriteFailed); err != nil {
112-
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "blockID", blockID, "err", err)
113-
}
114-
reader.Reset(visitMarkerFileContent)
127+
g.Go(func() error {
128+
select {
129+
// Exit early if possible.
130+
case <-ctx.Done():
131+
return nil
132+
default:
133+
}
134+
135+
reader := bytes.NewReader(visitMarkerFileContent)
136+
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, marker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil {
137+
level.Error(logger).Log("msg", "unable to upsert visit marker file content for block", "partition_id", marker.PartitionID, "block_id", blockID, "err", err)
138+
}
139+
reader.Reset(visitMarkerFileContent)
140+
return nil
141+
})
142+
}
143+
if err := g.Wait(); err != nil {
144+
blockVisitMarkerWriteFailed.Inc()
145+
return
115146
}
147+
level.Debug(logger).Log("msg", "marked blocks visited", "partition_id", marker.PartitionID, "blocks", generateBlocksInfo(blocks))
116148
}
117149

118-
func markBlocksVisitedHeartBeat(ctx context.Context, bkt objstore.Bucket, logger log.Logger, blocks []*metadata.Meta, compactorID string, blockVisitMarkerFileUpdateInterval time.Duration, blockVisitMarkerWriteFailed prometheus.Counter) {
119-
var blockIds []string
120-
for _, block := range blocks {
121-
blockIds = append(blockIds, block.ULID.String())
122-
}
123-
blocksInfo := strings.Join(blockIds, ",")
124-
level.Info(logger).Log("msg", fmt.Sprintf("start heart beat for blocks: %s", blocksInfo))
150+
func markBlocksVisitedHeartBeat(
151+
ctx context.Context,
152+
bkt objstore.Bucket,
153+
logger log.Logger,
154+
blocks []*metadata.Meta,
155+
partitionedGroupID uint32,
156+
partitionID int,
157+
compactorID string,
158+
blockVisitMarkerFileUpdateInterval time.Duration,
159+
blockVisitMarkerWriteFailed prometheus.Counter,
160+
errChan chan error,
161+
) {
162+
blocksInfo := generateBlocksInfo(blocks)
163+
level.Info(logger).Log("msg", "start visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
125164
ticker := time.NewTicker(blockVisitMarkerFileUpdateInterval)
126165
defer ticker.Stop()
166+
isComplete := false
127167
heartBeat:
128168
for {
129169
level.Debug(logger).Log("msg", fmt.Sprintf("heart beat for blocks: %s", blocksInfo))
130170
blockVisitMarker := BlockVisitMarker{
131-
VisitTime: time.Now().Unix(),
132-
CompactorID: compactorID,
133-
Version: VisitMarkerVersion1,
171+
VisitTime: time.Now().Unix(),
172+
CompactorID: compactorID,
173+
Status: Pending,
174+
PartitionedGroupID: partitionedGroupID,
175+
PartitionID: partitionID,
176+
Version: VisitMarkerVersion1,
134177
}
135178
markBlocksVisited(ctx, bkt, logger, blocks, blockVisitMarker, blockVisitMarkerWriteFailed)
136179

137180
select {
138181
case <-ctx.Done():
182+
level.Warn(logger).Log("msg", "visit marker heart beat got cancelled", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
139183
break heartBeat
140184
case <-ticker.C:
141185
continue
186+
case err := <-errChan:
187+
isComplete = err == nil
188+
if err != nil {
189+
level.Warn(logger).Log("msg", "stop visit marker heart beat due to error", "err", err, "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
190+
}
191+
break heartBeat
142192
}
143193
}
144-
level.Info(logger).Log("msg", fmt.Sprintf("stop heart beat for blocks: %s", blocksInfo))
194+
if isComplete {
195+
level.Info(logger).Log("msg", "update visit marker to completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
196+
markBlocksVisitMarkerCompleted(context.Background(), bkt, logger, blocks, partitionedGroupID, partitionID, compactorID, blockVisitMarkerWriteFailed)
197+
}
198+
level.Info(logger).Log("msg", "stop visit marker heart beat", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "blocks", blocksInfo)
199+
}
200+
201+
func markBlocksVisitMarkerCompleted(
202+
ctx context.Context,
203+
bkt objstore.Bucket,
204+
logger log.Logger,
205+
blocks []*metadata.Meta,
206+
partitionedGroupID uint32,
207+
partitionID int,
208+
compactorID string,
209+
blockVisitMarkerWriteFailed prometheus.Counter,
210+
) {
211+
blockVisitMarker := BlockVisitMarker{
212+
VisitTime: time.Now().Unix(),
213+
CompactorID: compactorID,
214+
Status: Completed,
215+
PartitionedGroupID: partitionedGroupID,
216+
PartitionID: partitionID,
217+
Version: VisitMarkerVersion1,
218+
}
219+
visitMarkerFileContent, err := json.Marshal(blockVisitMarker)
220+
if err != nil {
221+
blockVisitMarkerWriteFailed.Inc()
222+
return
223+
}
224+
reader := bytes.NewReader(visitMarkerFileContent)
225+
for _, block := range blocks {
226+
blockID := block.ULID.String()
227+
if err := UpdateBlockVisitMarker(ctx, bkt, blockID, blockVisitMarker.PartitionID, reader, blockVisitMarkerWriteFailed); err != nil {
228+
level.Error(logger).Log("msg", "unable to upsert completed visit marker file content for block", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID, "err", err)
229+
} else {
230+
level.Info(logger).Log("msg", "block partition is completed", "partitioned_group_id", blockVisitMarker.PartitionedGroupID, "partition_id", blockVisitMarker.PartitionID, "block_id", blockID)
231+
}
232+
reader.Reset(visitMarkerFileContent)
233+
}
145234
}
146235

147236
func IsBlockVisitMarker(path string) bool {
148-
return strings.HasSuffix(path, BlockVisitMarkerFile)
237+
return strings.HasSuffix(path, BlockVisitMarkerFileSuffix)
149238
}
150239

151240
func IsNotBlockVisitMarkerError(err error) bool {

pkg/compactor/block_visit_marker_test.go

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package compactor
22

33
import (
44
"context"
5+
"fmt"
56
"testing"
67
"time"
78

@@ -85,10 +86,77 @@ func TestMarkBlocksVisited(t *testing.T) {
8586
logger := log.NewNopLogger()
8687
markBlocksVisited(ctx, bkt, logger, tcase.blocks, tcase.visitMarker, dummyCounter)
8788
for _, meta := range tcase.blocks {
88-
res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), dummyCounter)
89+
res, err := ReadBlockVisitMarker(ctx, objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), tcase.visitMarker.PartitionID, dummyCounter)
8990
require.NoError(t, err)
9091
require.Equal(t, tcase.visitMarker, *res)
9192
}
9293
})
9394
}
9495
}
96+
97+
func TestMarkBlockVisitedHeartBeat(t *testing.T) {
98+
partitionedGroupID := uint32(12345)
99+
partitionID := 0
100+
compactorID := "test-compactor"
101+
for _, tcase := range []struct {
102+
name string
103+
isCancelled bool
104+
compactionErr error
105+
expectedStatus VisitStatus
106+
}{
107+
{
108+
name: "heart beat got cancelled",
109+
isCancelled: true,
110+
compactionErr: nil,
111+
expectedStatus: Pending,
112+
},
113+
{
114+
name: "heart beat complete without error",
115+
isCancelled: false,
116+
compactionErr: nil,
117+
expectedStatus: Completed,
118+
},
119+
{
120+
name: "heart beat stopped due to compaction error",
121+
isCancelled: false,
122+
compactionErr: fmt.Errorf("some compaction failure"),
123+
expectedStatus: Pending,
124+
},
125+
} {
126+
t.Run(tcase.name, func(t *testing.T) {
127+
ulid0 := ulid.MustNew(uint64(time.Now().UnixMilli()+0), nil)
128+
ulid1 := ulid.MustNew(uint64(time.Now().UnixMilli()+1), nil)
129+
blocks := []*metadata.Meta{
130+
{
131+
BlockMeta: tsdb.BlockMeta{
132+
ULID: ulid0,
133+
},
134+
},
135+
{
136+
BlockMeta: tsdb.BlockMeta{
137+
ULID: ulid1,
138+
},
139+
},
140+
}
141+
ctx, cancel := context.WithCancel(context.Background())
142+
dummyCounter := prometheus.NewCounter(prometheus.CounterOpts{})
143+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
144+
logger := log.NewNopLogger()
145+
errChan := make(chan error, 1)
146+
go markBlocksVisitedHeartBeat(ctx, objstore.WithNoopInstr(bkt), logger, blocks, partitionedGroupID, partitionID, compactorID, time.Second, dummyCounter, errChan)
147+
time.Sleep(2 * time.Second)
148+
if tcase.isCancelled {
149+
cancel()
150+
} else {
151+
errChan <- tcase.compactionErr
152+
defer cancel()
153+
}
154+
time.Sleep(2 * time.Second)
155+
for _, meta := range blocks {
156+
res, err := ReadBlockVisitMarker(context.Background(), objstore.WithNoopInstr(bkt), logger, meta.ULID.String(), partitionID, dummyCounter)
157+
require.NoError(t, err)
158+
require.Equal(t, tcase.expectedStatus, res.Status)
159+
}
160+
})
161+
}
162+
}

0 commit comments

Comments
 (0)