Skip to content

Commit db7050c

Browse files
alexqylealanprot
authored andcommitted
Improve on error handling and visit marker status (#178)
Signed-off-by: Alex Le <[email protected]>
1 parent 2dcd647 commit db7050c

12 files changed

+177
-47
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,18 @@ func (c *BlocksCleaner) cleanUpActiveUsers(ctx context.Context, users []string,
301301
err := concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
302302
userLogger := util_log.WithUserID(userID, c.logger)
303303
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
304+
visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
305+
if err != nil {
306+
return err
307+
}
308+
if visitMarkerManager == nil {
309+
return nil
310+
}
311+
errChan := make(chan error, 1)
312+
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
313+
defer func() {
314+
errChan <- nil
315+
}()
304316
return errors.Wrapf(c.cleanUser(ctx, userLogger, userBucket, userID, firstRun), "failed to delete blocks for user: %s", userID)
305317
})
306318

@@ -334,6 +346,18 @@ func (c *BlocksCleaner) cleanDeletedUsers(ctx context.Context, users []string) e
334346
err := concurrency.ForEachUser(ctx, users, c.cfg.CleanupConcurrency, func(ctx context.Context, userID string) error {
335347
userLogger := util_log.WithUserID(userID, c.logger)
336348
userBucket := bucket.NewUserBucketClient(userID, c.bucketClient, c.cfgProvider)
349+
visitMarkerManager, err := c.obtainVisitMarkerManager(ctx, userLogger, userBucket)
350+
if err != nil {
351+
return err
352+
}
353+
if visitMarkerManager == nil {
354+
return nil
355+
}
356+
errChan := make(chan error, 1)
357+
go visitMarkerManager.HeartBeat(ctx, errChan, c.cleanerVisitMarkerFileUpdateInterval, true)
358+
defer func() {
359+
errChan <- nil
360+
}()
337361
return errors.Wrapf(c.deleteUserMarkedForDeletion(ctx, userLogger, userBucket, userID), "failed to delete user marked for deletion: %s", userID)
338362
})
339363

@@ -382,6 +406,21 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
382406
return users, deleted, nil
383407
}
384408

409+
func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) (*VisitMarkerManager, error) {
410+
cleanerVisitMarker := NewCleanerVisitMarker(c.ringLifecyclerID)
411+
visitMarkerManager := NewVisitMarkerManager(userBucket, userLogger, c.ringLifecyclerID, cleanerVisitMarker, c.CleanerVisitMarkerReadFailed, c.CleanerVisitMarkerWriteFailed)
412+
413+
existingCleanerVisitMarker := &CleanerVisitMarker{}
414+
err := visitMarkerManager.ReadVisitMarker(ctx, existingCleanerVisitMarker)
415+
if err != nil && !errors.Is(err, ErrorVisitMarkerNotFound) {
416+
return nil, errors.Wrapf(err, "failed to read cleaner visit marker")
417+
}
418+
if errors.Is(err, ErrorVisitMarkerNotFound) || !existingCleanerVisitMarker.IsVisited(c.cleanerVisitMarkerTimeout) {
419+
return visitMarkerManager, nil
420+
}
421+
return nil, nil
422+
}
423+
385424
// Remove blocks and remaining data for tenant marked for deletion.
386425
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {
387426

pkg/compactor/cleaner_visit_marker.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ func (b *CleanerVisitMarker) IsFailed() bool {
4545
return b.Status == Failed
4646
}
4747

48+
func (b *CleanerVisitMarker) IsInProgress() bool {
49+
return b.Status == InProgress
50+
}
51+
4852
func (b *CleanerVisitMarker) IsPending() bool {
4953
return b.Status == Pending
5054
}
@@ -53,7 +57,13 @@ func (b *CleanerVisitMarker) GetVisitMarkerFilePath() string {
5357
return GetCleanerVisitMarkerFilePath()
5458
}
5559

56-
func (b *CleanerVisitMarker) MarkVisited(ownerIdentifier string) {
60+
func (b *CleanerVisitMarker) MarkInProgress(ownerIdentifier string) {
61+
b.CompactorID = ownerIdentifier
62+
b.Status = InProgress
63+
b.VisitTime = time.Now().Unix()
64+
}
65+
66+
func (b *CleanerVisitMarker) MarkPending(ownerIdentifier string) {
5767
b.CompactorID = ownerIdentifier
5868
b.Status = Pending
5969
b.VisitTime = time.Now().Unix()

pkg/compactor/compactor.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,13 +897,23 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
897897
return nil
898898
}
899899
if c.isCausedByPermissionDenied(lastErr) {
900-
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
900+
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "org_id", userID, "err", lastErr)
901901
return nil
902902
}
903-
903+
if compact.IsHaltError(lastErr) {
904+
level.Error(c.logger).Log("msg", "compactor returned critical error", "org_id", userID, "err", lastErr)
905+
c.compactorMetrics.compactionHaltErrors.WithLabelValues(userID).Inc()
906+
return lastErr
907+
}
908+
c.compactorMetrics.compactionRetryErrors.WithLabelValues(userID).Inc()
904909
retries.Wait()
905910
}
906911

912+
err := errors.Unwrap(errors.Cause(lastErr))
913+
if errors.Is(err, PlannerCompletedPartitionError) || errors.Is(err, PlannerVisitedPartitionError) {
914+
return nil
915+
}
916+
907917
return lastErr
908918
}
909919

pkg/compactor/compactor_metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type compactorMetrics struct {
4141
compactionDuration *prometheus.GaugeVec
4242
partitionGroupDuration *prometheus.GaugeVec
4343
blockGroupDuration *prometheus.GaugeVec
44+
compactionRetryErrors *prometheus.CounterVec
45+
compactionHaltErrors *prometheus.CounterVec
4446
}
4547

4648
const (
@@ -179,6 +181,14 @@ func newCompactorMetricsWithLabels(reg prometheus.Registerer, commonLabels []str
179181
Name: "cortex_compact_block_group_duration_seconds",
180182
Help: "Duration of sharding grouper in seconds",
181183
}, commonLabels)
184+
m.compactionRetryErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
185+
Name: "cortex_compactor_compaction_retry_error_total",
186+
Help: "Total number of retry errors from compactions.",
187+
}, CommonLabels)
188+
m.compactionHaltErrors = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
189+
Name: "cortex_compactor_compaction_halt_error_total",
190+
Help: "Total number of halt errors from compactions.",
191+
}, CommonLabels)
182192

183193
return &m
184194
}

pkg/compactor/partition_compaction_grouper.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
658658
level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())
659659
begin := time.Now()
660660

661-
visitMarkerManager.MarkVisited(g.ctx)
661+
visitMarkerManager.MarkPending(g.ctx)
662662
level.Info(partitionedGroupLogger).Log("msg", "marked partition visited in group", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "group", partitionedGroup.String())
663663

664664
resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution
@@ -711,9 +711,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
711711

712712
outGroups = append(outGroups, thanosGroup)
713713
level.Debug(partitionedGroupLogger).Log("msg", "added partition to compaction groups")
714-
// Grouper holds additional groups for compaction. In case, it lost
715-
// competition for the first group it claimed.
716-
if len(outGroups) >= g.compactionConcurrency+int(math.Min(math.Ceil(float64(g.compactionConcurrency)*0.1), 5)) {
714+
if len(outGroups) >= g.compactionConcurrency {
717715
break
718716
}
719717
}

pkg/compactor/partition_visit_marker.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ func (b *PartitionVisitMarker) IsVisited(partitionVisitMarkerTimeout time.Durati
5151
return b.IsCompleted() || (partitionID == b.PartitionID && !b.IsExpired(partitionVisitMarkerTimeout))
5252
}
5353

54-
func (b *PartitionVisitMarker) IsVisitedByCompactor(partitionVisitMarkerTimeout time.Duration, partitionID int, compactorID string) bool {
55-
return b.CompactorID == compactorID && b.IsVisited(partitionVisitMarkerTimeout, partitionID)
54+
func (b *PartitionVisitMarker) IsPendingByCompactor(partitionVisitMarkerTimeout time.Duration, partitionID int, compactorID string) bool {
55+
return b.CompactorID == compactorID && partitionID == b.PartitionID && b.IsPending() && !b.IsExpired(partitionVisitMarkerTimeout)
5656
}
5757

5858
func (b *PartitionVisitMarker) IsCompleted() bool {
@@ -75,12 +75,18 @@ func (b *PartitionVisitMarker) GetVisitMarkerFilePath() string {
7575
return GetPartitionVisitMarkerFilePath(b.PartitionedGroupID, b.PartitionID)
7676
}
7777

78-
func (b *PartitionVisitMarker) MarkVisited(ownerIdentifier string) {
78+
func (b *PartitionVisitMarker) MarkInProgress(ownerIdentifier string) {
7979
b.CompactorID = ownerIdentifier
8080
b.Status = InProgress
8181
b.VisitTime = time.Now().Unix()
8282
}
8383

84+
func (b *PartitionVisitMarker) MarkPending(ownerIdentifier string) {
85+
b.CompactorID = ownerIdentifier
86+
b.Status = Pending
87+
b.VisitTime = time.Now().Unix()
88+
}
89+
8490
func (b *PartitionVisitMarker) MarkCompleted(ownerIdentifier string) {
8591
b.CompactorID = ownerIdentifier
8692
b.Status = Completed
@@ -95,15 +101,15 @@ func (b *PartitionVisitMarker) MarkFailed(ownerIdentifier string) {
95101

96102
func (b *PartitionVisitMarker) LogInfo() []string {
97103
return []string{
98-
"partitioned_group_id",
104+
"visit_marker_partitioned_group_id",
99105
fmt.Sprintf("%d", b.PartitionedGroupID),
100-
"partition_id",
106+
"visit_marker_partition_id",
101107
fmt.Sprintf("%d", b.PartitionID),
102-
"compactor_id",
108+
"visit_marker_compactor_id",
103109
b.CompactorID,
104-
"status",
110+
"visit_marker_status",
105111
string(b.Status),
106-
"visit_time",
112+
"visit_marker_visit_time",
107113
time.Unix(b.VisitTime, 0).String(),
108114
}
109115
}

pkg/compactor/shuffle_sharding_grouper.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
251251
level.Info(partitionedGroupLogger).Log("msg", "found compactable group for user", "group", partitionedGroup.String())
252252
begin := time.Now()
253253

254-
visitMarkerManager.MarkVisited(g.ctx)
254+
visitMarkerManager.MarkPending(g.ctx)
255255
level.Info(partitionedGroupLogger).Log("msg", "marked partition visited in group", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds(), "group", partitionedGroup.String())
256256

257257
resolution := partitionedGroup.blocks[0].Thanos.Downsample.Resolution
@@ -304,9 +304,7 @@ func (g *ShuffleShardingGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (re
304304

305305
outGroups = append(outGroups, thanosGroup)
306306
level.Debug(partitionedGroupLogger).Log("msg", "added partition to compaction groups")
307-
// Grouper holds additional groups for compaction. In case, it lost
308-
// competition for the first group it claimed.
309-
if len(outGroups) >= g.compactionConcurrency+int(math.Min(math.Ceil(float64(g.compactionConcurrency)*0.1), 5)) {
307+
if len(outGroups) >= g.compactionConcurrency {
310308
break
311309
}
312310
}

pkg/compactor/shuffle_sharding_grouper_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
207207
blocks: map[ulid.ULID]*metadata.Meta{block1hto2hExt1Ulid: blocks[block1hto2hExt1Ulid], block3hto4hExt1Ulid: blocks[block3hto4hExt1Ulid], block0hto1hExt1Ulid: blocks[block0hto1hExt1Ulid], block2hto3hExt1Ulid: blocks[block2hto3hExt1Ulid]},
208208
expected: [][]ulid.ULID{
209209
{block1hto2hExt1Ulid, block0hto1hExt1Ulid},
210-
{block3hto4hExt1Ulid, block2hto3hExt1Ulid},
211210
},
212211
visitedPartitions: []struct {
213212
partitionedGroupID uint32
@@ -239,7 +238,6 @@ func TestShuffleShardingGrouper_Groups(t *testing.T) {
239238
expected: [][]ulid.ULID{
240239
{block1hto2hExt2Ulid, block0hto1hExt2Ulid},
241240
{block1hto2hExt1Ulid, block0hto1hExt1Ulid},
242-
{block3hto4hExt1Ulid, block2hto3hExt1Ulid},
243241
},
244242
},
245243
"test should skip block with no compact marker": {

pkg/compactor/shuffle_sharding_planner.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
"github.com/thanos-io/thanos/pkg/block/metadata"
1515
)
1616

17+
var (
18+
PlannerCompletedPartitionError = errors.New("got completed partition")
19+
PlannerVisitedPartitionError = errors.New("got partition visited by other compactor")
20+
)
21+
1722
type ShuffleShardingPlanner struct {
1823
ctx context.Context
1924
bkt objstore.InstrumentedBucket
@@ -104,12 +109,12 @@ func (p *ShuffleShardingPlanner) PlanWithPartition(_ context.Context, metasByMin
104109
if existingPartitionVisitMarker.IsCompleted() {
105110
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
106111
level.Warn(p.logger).Log("msg", "partition is in completed status", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.LogInfo())
107-
return nil, nil
112+
return nil, PlannerCompletedPartitionError
108113
}
109-
if !existingPartitionVisitMarker.IsVisitedByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
114+
if !existingPartitionVisitMarker.IsPendingByCompactor(p.partitionVisitMarkerTimeout, partitionID, p.ringLifecyclerID) {
110115
p.compactorMetrics.compactionsNotPlanned.WithLabelValues(p.userID, cortexMetaExtensions.TimeRangeStr()).Inc()
111116
level.Warn(p.logger).Log("msg", "partition is not visited by current compactor", "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "compactor_id", p.ringLifecyclerID, existingPartitionVisitMarker.LogInfo())
112-
return nil, nil
117+
return nil, PlannerVisitedPartitionError
113118
}
114119
}
115120

pkg/compactor/shuffle_sharding_planner_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
241241
isExpired: false,
242242
compactorID: otherCompactor,
243243
},
244-
expected: []*metadata.Meta{},
244+
expectedErr: PlannerVisitedPartitionError,
245245
},
246246
"test should not compact if visit marker file is expired": {
247247
ranges: []int64{2 * time.Hour.Milliseconds()},
@@ -265,7 +265,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
265265
isExpired: true,
266266
compactorID: currentCompactor,
267267
},
268-
expected: []*metadata.Meta{},
268+
expectedErr: PlannerVisitedPartitionError,
269269
},
270270
}
271271

@@ -285,6 +285,7 @@ func TestShuffleShardingPlanner_Plan(t *testing.T) {
285285
PartitionedGroupID: partitionedGroupID,
286286
PartitionID: partitionID,
287287
VisitTime: expireTime.Unix(),
288+
Status: Pending,
288289
Version: PartitionVisitMarkerVersion1,
289290
}
290291
visitMarkerFileContent, _ := json.Marshal(visitMarker)

0 commit comments

Comments
 (0)