Skip to content
3 changes: 0 additions & 3 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ func HandleOverlaps(ctx context.Context, c Cluster, overlaps []*core.RegionInfo)
if regionStats != nil {
regionStats.ClearDefunctRegion(id)
}
// TODO: after https://github.com/tikv/pd/pull/10040 merged,
// We will update new region stats in AffinityChecker.Check,
// AffinityChecker.Check will call GetRegionAffinityGroupState, if region is not in cache, we will add it
if affinityManager != nil {
affinityManager.InvalidCache(id)
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/schedule/affinity/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,6 @@ import (
"github.com/tikv/pd/pkg/storage"
)

func TestStoreCondition(t *testing.T) {
re := require.New(t)
re.Equal(groupDegraded, storeDisconnected.groupAvailability())
re.Equal(groupDegraded, storeEvictLeader.groupAvailability())
re.Equal(groupDegraded, storeLowSpace.groupAvailability())
re.Equal(groupExpired, storeDown.groupAvailability())
re.Equal(groupExpired, storeRemoving.groupAvailability())
re.Equal(groupExpired, storeRemoved.groupAvailability())

re.True(storeEvictLeader.affectsLeaderOnly())
re.False(storeDisconnected.affectsLeaderOnly())
re.False(storeDown.affectsLeaderOnly())
}

func TestGroupState(t *testing.T) {
re := require.New(t)
group := &GroupState{
Expand Down
39 changes: 26 additions & 13 deletions pkg/schedule/affinity/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,20 +319,13 @@ func (m *Manager) deleteCacheLocked(regionID uint64) {
delete(cache.groupInfo.Regions, regionID)
}

func (m *Manager) saveCache(region *core.RegionInfo, group *GroupState) *regionCache {
regionID := region.GetID()
cache := &regionCache{
region: region,
groupInfo: group.groupInfoPtr,
affinityVer: group.affinityVer,
isAffinity: group.isRegionAffinity(region),
}
// Save cache
func (m *Manager) saveCache(cache *regionCache) {
regionID := cache.region.GetID()
m.Lock()
defer m.Unlock()
// If the Group has changed, update it but do not save it afterward.
groupInfo, ok := m.groups[group.ID]
if ok && groupInfo == group.groupInfoPtr && groupInfo.AffinityVer == group.affinityVer {
groupInfo, ok := m.groups[cache.groupInfo.ID]
if ok && groupInfo == cache.groupInfo && groupInfo.AffinityVer == cache.affinityVer {
m.deleteCacheLocked(regionID)
m.regions[regionID] = *cache
groupInfo.Regions[regionID] = *cache
Expand All @@ -341,10 +334,10 @@ func (m *Manager) saveCache(region *core.RegionInfo, group *GroupState) *regionC
groupInfo.AffinityRegionCount++
}
}
return cache
}

// InvalidCache invalidates the cache of the corresponding Region in the manager by its Region ID.
// Since cache misses for Region are more likely when InvalidCache is called, check for existence under a read lock first.
func (m *Manager) InvalidCache(regionID uint64) {
m.RLock()
_, ok := m.regions[regionID]
Expand All @@ -369,7 +362,19 @@ func (m *Manager) getCache(region *core.RegionInfo) (*regionCache, *GroupState)
}

// GetRegionAffinityGroupState returns the affinity group state and isAffinity for a region.
// This is a read-only operation that does not modify cache and we use this for temporary check
func (m *Manager) GetRegionAffinityGroupState(region *core.RegionInfo) (group *GroupState, isAffinity bool) {
return m.calcRegionAffinityGroupState(region, false)
}

// GetAndCacheRegionAffinityGroupState returns the affinity group state and saves it to cache.
// The caller must call InvalidCache() when the region is deleted or merged.
// Currently only used by AffinityChecker.Check.
func (m *Manager) GetAndCacheRegionAffinityGroupState(region *core.RegionInfo) (group *GroupState, isAffinity bool) {
return m.calcRegionAffinityGroupState(region, true)
}

func (m *Manager) calcRegionAffinityGroupState(region *core.RegionInfo, saveCache bool) (group *GroupState, isAffinity bool) {
if region == nil || !m.IsAvailable() {
return nil, false
}
Expand All @@ -381,7 +386,15 @@ func (m *Manager) GetRegionAffinityGroupState(region *core.RegionInfo) (group *G
if group == nil {
return nil, false
}
cache = m.saveCache(region, group)
cache = &regionCache{
region: region,
groupInfo: group.groupInfoPtr,
affinityVer: group.affinityVer,
isAffinity: group.isRegionAffinity(region),
}
if saveCache {
m.saveCache(cache)
}
}

return group, cache.isAffinity
Expand Down
21 changes: 15 additions & 6 deletions pkg/schedule/affinity/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,18 @@ func TestRegionCountStaleCache(t *testing.T) {
_, err = manager.UpdateAffinityGroupPeers("g", 1, []uint64{1, 2, 3})
re.NoError(err)
region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0])

// test GetRegionAffinityGroupState (read-only, no cache)
_, isAffinity := manager.GetRegionAffinityGroupState(region)
re.True(isAffinity)
groupInfo := getGroupForTest(re, manager, "g")
re.Zero(groupInfo.AffinityRegionCount)
re.Empty(groupInfo.Regions)

// test GetAndCacheRegionAffinityGroupState (saves cache)
_, isAffinity = manager.GetAndCacheRegionAffinityGroupState(region)
re.True(isAffinity)
groupInfo = getGroupForTest(re, manager, "g")
re.Equal(1, groupInfo.AffinityRegionCount)
re.Len(groupInfo.Regions, 1)

Expand All @@ -212,7 +221,7 @@ func TestRegionCountStaleCache(t *testing.T) {

// Remove key ranges, which bumps AffinityVer and invalidates affinity for the cached region.
region = generateRegionForTest(200, []uint64{4, 5, 6}, ranges[0])
_, isAffinity = manager.GetRegionAffinityGroupState(region)
_, isAffinity = manager.GetAndCacheRegionAffinityGroupState(region)
re.True(isAffinity)
groupInfo = getGroupForTest(re, manager, "g")
re.Equal(1, groupInfo.AffinityRegionCount)
Expand All @@ -223,7 +232,7 @@ func TestRegionCountStaleCache(t *testing.T) {
re.Empty(groupInfo.Regions)

// Add key ranges, which bumps AffinityVer and invalidates affinity for the cached region.
_, isAffinity = manager.GetRegionAffinityGroupState(region)
_, isAffinity = manager.GetAndCacheRegionAffinityGroupState(region)
re.True(isAffinity)
groupInfo = getGroupForTest(re, manager, "g")
re.Equal(1, groupInfo.AffinityRegionCount)
Expand Down Expand Up @@ -262,9 +271,9 @@ func TestDeleteGroupClearsCache(t *testing.T) {
region2 := generateRegionForTest(200, []uint64{1, 2, 3}, ranges[0])

// Trigger cache population
_, isAffinity1 := manager.GetRegionAffinityGroupState(region1)
_, isAffinity1 := manager.GetAndCacheRegionAffinityGroupState(region1)
re.True(isAffinity1)
_, isAffinity2 := manager.GetRegionAffinityGroupState(region2)
_, isAffinity2 := manager.GetAndCacheRegionAffinityGroupState(region2)
re.True(isAffinity2)

// Verify regions are in cache
Expand Down Expand Up @@ -324,7 +333,7 @@ func TestAvailabilityChangeRegionCount(t *testing.T) {

// Add regions to cache
region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0])
_, isAffinity := manager.GetRegionAffinityGroupState(region)
_, isAffinity := manager.GetAndCacheRegionAffinityGroupState(region)
re.True(isAffinity)

// Verify region is cached
Expand Down Expand Up @@ -384,7 +393,7 @@ func TestInvalidCacheMultipleTimes(t *testing.T) {

// Add region
region := generateRegionForTest(100, []uint64{1, 2, 3}, ranges[0])
_, isAffinity := manager.GetRegionAffinityGroupState(region)
_, isAffinity := manager.GetAndCacheRegionAffinityGroupState(region)
re.True(isAffinity)

// Verify region is in cache
Expand Down
17 changes: 12 additions & 5 deletions pkg/schedule/affinity/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ const (
// storeCondition is an enum for store conditions. Valid values are the store-prefixed enum constants,
// which are split into three groups separated by degradedBoundary.
// Roughly, larger values indicate a more severe degree of unavailability.
// This follows the leaderTarget and regionTarget logic in StoreStateFilter.anyConditionMatch.
type storeCondition int

const (
storeAvailable storeCondition = iota

// All values greater than storeAvailable and less than degradedBoundary will trigger groupDegraded.
storeEvictLeader
storeLeaderEvicted
storeBusy
storeDisconnected
storePreparing
storeLowSpace
Expand All @@ -60,8 +62,10 @@ func (c storeCondition) String() string {
switch c {
case storeAvailable:
return "available"
case storeEvictLeader:
case storeLeaderEvicted:
return "evicted"
case storeBusy:
return "busy"
case storeDisconnected:
return "disconnected"
case storePreparing:
Expand Down Expand Up @@ -92,7 +96,7 @@ func (c storeCondition) groupAvailability() groupAvailability {

func (c storeCondition) affectsLeaderOnly() bool {
switch c {
case storeEvictLeader:
case storeLeaderEvicted, storeBusy:
return true
default:
return false
Expand Down Expand Up @@ -193,8 +197,11 @@ func (m *Manager) collectUnavailableStores() map[uint64]storeCondition {
unavailableStores[store.GetID()] = storeDown

// Then the conditions that will mark the group as degraded
case !store.AllowLeaderTransferIn() || m.conf.CheckLabelProperty(config.RejectLeader, store.GetLabels()):
unavailableStores[store.GetID()] = storeEvictLeader
case !store.AllowLeaderTransferIn() || m.conf.CheckLabelProperty(config.RejectLeader, store.GetLabels()) ||
store.EvictedAsSlowStore() || store.EvictedAsStoppingStore() || store.IsEvictedAsSlowTrend():
unavailableStores[store.GetID()] = storeLeaderEvicted
case store.IsBusy():
unavailableStores[store.GetID()] = storeBusy
case store.IsDisconnected():
unavailableStores[store.GetID()] = storeDisconnected
case store.IsLowSpace(lowSpaceRatio):
Expand Down
89 changes: 87 additions & 2 deletions pkg/schedule/affinity/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,104 @@ package affinity

import (
"context"
"fmt"
"maps"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
"github.com/tikv/pd/pkg/mock/mockconfig"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/storage"
)

func TestStoreCondition(t *testing.T) {
re := require.New(t)
re.Equal(groupDegraded, storeLeaderEvicted.groupAvailability())
re.Equal(groupDegraded, storeBusy.groupAvailability())
re.Equal(groupDegraded, storeDisconnected.groupAvailability())
re.Equal(groupDegraded, storePreparing.groupAvailability())
re.Equal(groupDegraded, storeLowSpace.groupAvailability())
re.Equal(groupExpired, storeDown.groupAvailability())
re.Equal(groupExpired, storeRemoving.groupAvailability())
re.Equal(groupExpired, storeRemoved.groupAvailability())

re.True(storeLeaderEvicted.affectsLeaderOnly())
re.True(storeBusy.affectsLeaderOnly())
re.False(storeDisconnected.affectsLeaderOnly())
re.False(storeDown.affectsLeaderOnly())
}

func TestCollectUnavailableStores(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

memoryStorage := storage.NewStorageWithMemoryBackend()
storeInfos := core.NewStoresInfo()
conf := mockconfig.NewTestOptions()
conf.SetLabelProperty("reject-leader", "reject", "leader")
regionLabeler, err := labeler.NewRegionLabeler(ctx, memoryStorage, time.Second*5)
re.NoError(err)
manager, err := NewManager(ctx, memoryStorage, storeInfos, conf, regionLabeler)
re.NoError(err)

stores := make([]*core.StoreInfo, 12)
for i := range stores {
stores[i] = core.NewStoreInfo(&metapb.Store{
Id: uint64(i),
Address: fmt.Sprintf("s%d", i),
State: metapb.StoreState_Up,
NodeState: metapb.NodeState_Serving,
}, core.SetLastHeartbeatTS(time.Now()))
}
// stores[0]: storeAvailable
// stores[1..5]: storeEvicted
stores[1] = stores[1].Clone(core.PauseLeaderTransfer(constant.In))
stores[2] = stores[2].Clone(core.SetStoreLabels([]*metapb.StoreLabel{{Key: "reject", Value: "leader"}}))
stores[3] = stores[3].Clone(core.SlowStoreEvicted())
stores[4] = stores[4].Clone(core.StoppingStoreEvicted())
stores[5] = stores[5].Clone(core.SlowTrendEvicted())
// stores[6]: storeBusy
stores[6] = stores[6].Clone(core.SetStoreStats(&pdpb.StoreStats{IsBusy: true}))
// stores[7]: storeDisconnected
stores[7] = stores[7].Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Minute)))
// stores[8]: storePreparing
stores[8] = stores[8].Clone(core.SetNodeState(metapb.NodeState_Preparing))
// stores[9]: storeDown
stores[9] = stores[9].Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Hour)))
// stores[10]: storeRemoving
stores[10] = stores[10].Clone(core.SetStoreState(metapb.StoreState_Offline, false))
// stores[11]: storeRemoved
stores[11] = stores[11].Clone(core.SetStoreState(metapb.StoreState_Tombstone))

for _, store := range stores {
storeInfos.PutStore(store)
}

actual := manager.collectUnavailableStores()
expected := map[uint64]storeCondition{
1: storeLeaderEvicted,
2: storeLeaderEvicted,
3: storeLeaderEvicted,
4: storeLeaderEvicted,
5: storeLeaderEvicted,
6: storeBusy,
7: storeDisconnected,
8: storePreparing,
9: storeDown,
10: storeRemoving,
11: storeRemoved,
}
re.True(maps.Equal(expected, actual))
}

func TestObserveAvailableRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -339,15 +424,15 @@ func TestGroupAvailabilityPriority(t *testing.T) {
re.NoError(err)

// evict-leader on leader -> degraded
unavailable := map[uint64]storeCondition{1: storeEvictLeader}
unavailable := map[uint64]storeCondition{1: storeLeaderEvicted}
changed, changes := manager.getGroupAvailabilityChanges(unavailable)
re.True(changed)
manager.setGroupAvailabilityChanges(unavailable, changes)
groupInfo := getGroupForTest(re, manager, "leader-only")
re.Equal(groupDegraded, groupInfo.GetAvailability())

// evict-leader only on follower should not change availability
unavailable = map[uint64]storeCondition{2: storeEvictLeader}
unavailable = map[uint64]storeCondition{2: storeLeaderEvicted}
changed, changes = manager.getGroupAvailabilityChanges(unavailable)
re.True(changed)
manager.setGroupAvailabilityChanges(unavailable, changes)
Expand Down
12 changes: 10 additions & 2 deletions pkg/schedule/checker/affinity_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
}

// Get the affinity group for this region
group, isAffinity := c.affinityManager.GetRegionAffinityGroupState(region)
group, isAffinity := c.affinityManager.GetAndCacheRegionAffinityGroupState(region)
if group == nil {
// Region doesn't belong to any affinity group
return nil
Expand Down Expand Up @@ -115,8 +115,16 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
// Recheck after refetching.
if needRefetch {
c.affinityManager.ObserveAvailableRegion(region, group)
group, isAffinity = c.affinityManager.GetRegionAffinityGroupState(region)
group, isAffinity = c.affinityManager.GetAndCacheRegionAffinityGroupState(region)
}

// A Region may no longer exist in the RegionTree due to a merge.
// In this case, clear the cache in affinity manager for that Region and skip processing it.
if c.cluster.GetRegion(region.GetID()) == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it be put in line 87?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot move the check before GetAndCacheRegionAffinityGroupState.

Problem with moving it earlier:

  • If check passes, then region is deleted, then HandleOverlaps calls InvalidCache (but cache doesn't
    exist yet - no-op)
  • Then we save cache → stale cache leak (no cleanup path)

Current approach guarantees:

  • Save cache first, then check
  • Either HandleOverlaps (when region deleted) OR AffinityChecker (when check fails) will clean up
  • At least one cleanup point always executes

c.affinityManager.InvalidCache(region.GetID())
return nil
}

if group == nil || !group.AffinitySchedulingAllowed {
affinityCheckerGroupSchedulingDisabledCounter.Inc()
return nil
Expand Down
Loading