Skip to content

Commit e2cf3f7

Browse files
lhy1024ti-chi-bot
andauthored
add affinity merge cache (#105)
* affinity: add stale region cache to avoid cascading merges (tikv#10103) (tikv#10106) ref tikv#9764, close tikv#10104 Signed-off-by: lhy1024 <admin@liudos.us> Co-authored-by: lhy1024 <admin@liudos.us> * fix lint Signed-off-by: lhy1024 <admin@liudos.us> --------- Signed-off-by: lhy1024 <admin@liudos.us> Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
1 parent 258ec09 commit e2cf3f7

File tree

8 files changed

+264
-59
lines changed

8 files changed

+264
-59
lines changed

pkg/schedule/checker/affinity_checker.go

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,15 @@ package checker
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"slices"
21+
"time"
2022

2123
"go.uber.org/zap"
2224

2325
"github.com/pingcap/log"
2426

27+
"github.com/tikv/pd/pkg/cache"
2528
"github.com/tikv/pd/pkg/core"
2629
"github.com/tikv/pd/pkg/errs"
2730
"github.com/tikv/pd/pkg/schedule/affinity"
@@ -34,21 +37,27 @@ import (
3437
"github.com/tikv/pd/pkg/utils/logutil"
3538
)
3639

40+
const recentMergeTTL = time.Minute
41+
3742
// AffinityChecker groups regions with affinity labels together by affinity group.
3843
// It ensures regions adhere to affinity group constraints by creating operators.
3944
type AffinityChecker struct {
4045
PauseController
41-
cluster sche.CheckerCluster
42-
affinityManager *affinity.Manager
43-
conf config.CheckerConfigProvider
46+
cluster sche.CheckerCluster
47+
affinityManager *affinity.Manager
48+
conf config.CheckerConfigProvider
49+
recentMergeCache *cache.TTLUint64
50+
startTime time.Time
4451
}
4552

4653
// NewAffinityChecker create an affinity checker.
47-
func NewAffinityChecker(cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *AffinityChecker {
54+
func NewAffinityChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *AffinityChecker {
4855
return &AffinityChecker{
49-
cluster: cluster,
50-
affinityManager: cluster.GetAffinityManager(),
51-
conf: conf,
56+
cluster: cluster,
57+
affinityManager: cluster.GetAffinityManager(),
58+
conf: conf,
59+
recentMergeCache: cache.NewIDTTL(ctx, gcInterval, recentMergeTTL),
60+
startTime: time.Now(),
5261
}
5362
}
5463

@@ -247,10 +256,25 @@ func (c *AffinityChecker) createAffinityOperator(region *core.RegionInfo, group
247256

248257
// mergeCheck verifies if a region can be merged with its adjacent regions within the same affinity group.
249258
// It follows similar logic to merge_checker but with affinity-specific constraints:
250-
// - Does NOT skip recently split or recently started regions
259+
// - Does NOT skip recently split regions
251260
// - Does NOT skip hot spots regions
252261
// - Only merges regions within the same affinity group
262+
// - Skips regions that are recently merged
253263
func (c *AffinityChecker) mergeCheck(region *core.RegionInfo, group *affinity.GroupState) []*operator.Operator {
264+
// Skip merge during startup TTL period (1 minute after leader switch)
265+
// After a leader switch, wait for recentMergeTTL before processing affinity merge
266+
// to reduce issues caused by cache invalidation after transfer leader
267+
if time.Since(c.startTime) < recentMergeTTL {
268+
affinityMergeCheckerSkipStartupCounter.Inc()
269+
return nil
270+
}
271+
272+
// Check if region is in cache
273+
if c.recentMergeCache.Exists(region.GetID()) {
274+
affinityMergeCheckerSkipCachedCounter.Inc()
275+
return nil
276+
}
277+
254278
maxAffinityMergeRegionSize := c.conf.GetMaxAffinityMergeRegionSize()
255279

256280
if maxAffinityMergeRegionSize == 0 {
@@ -290,6 +314,12 @@ func (c *AffinityChecker) mergeCheck(region *core.RegionInfo, group *affinity.Gr
290314
return nil
291315
}
292316

317+
// Check if target region is in cache
318+
if c.recentMergeCache.Exists(target.GetID()) {
319+
affinityMergeCheckerSkipCachedCounter.Inc()
320+
return nil
321+
}
322+
293323
if region.GetApproximateSize()+target.GetApproximateSize() > maxSize ||
294324
region.GetApproximateKeys()+target.GetApproximateKeys() > maxKeys {
295325
affinityMergeCheckerTargetTooBigCounter.Inc()
@@ -437,3 +467,25 @@ func cloneRegionWithReplacePeerStores(region *core.RegionInfo, leaderStoreID uin
437467

438468
return region.Clone(options...)
439469
}
470+
471+
// RecordOpSuccess is called when an operator completes successfully.
472+
// It caches merged regions to prevent immediate re-merging.
473+
//
474+
// Merge completes quickly (e.g., 21ms) but TiKV needs time to update approximate_size via split-check.
475+
// During this gap, PD may use stale size data to schedule more merges. This 1-minute cache prevents that.
476+
func (c *AffinityChecker) RecordOpSuccess(op *operator.Operator) {
477+
// if schedule limit is 0, disable schedule, so we don't need to cache it.
478+
if c.conf.GetAffinityScheduleLimit() == 0 {
479+
return
480+
}
481+
// Process both merge and affinity merge operators
482+
if !op.HasRelatedMergeRegion() {
483+
return
484+
}
485+
relatedID := op.GetRelatedMergeRegion()
486+
if relatedID == 0 {
487+
return
488+
}
489+
c.recentMergeCache.PutWithTTL(op.RegionID(), nil, recentMergeTTL)
490+
c.recentMergeCache.PutWithTTL(relatedID, nil, recentMergeTTL)
491+
}

0 commit comments

Comments
 (0)