Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions pkg/schedule/checker/affinity_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/logutil"
)

Expand All @@ -44,6 +45,7 @@ const recentMergeTTL = time.Minute
type AffinityChecker struct {
PauseController
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
affinityManager *affinity.Manager
conf config.CheckerConfigProvider
recentMergeCache *cache.TTLUint64
Expand All @@ -54,6 +56,7 @@ type AffinityChecker struct {
func NewAffinityChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *AffinityChecker {
return &AffinityChecker{
cluster: cluster,
ruleManager: cluster.GetRuleManager(),
affinityManager: cluster.GetAffinityManager(),
conf: conf,
recentMergeCache: cache.NewIDTTL(ctx, gcInterval, recentMergeTTL),
Expand All @@ -79,6 +82,10 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
affinityCheckerPausedCounter.Inc()
return nil
}
if !c.cluster.GetSharedConfig().IsPlacementRulesEnabled() {
affinityCheckerPlacementRulesDisabledCounter.Inc()
return nil
}

// Check region state
if region.GetLeader() == nil {
Expand All @@ -89,7 +96,7 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
affinityCheckerUnhealthyRegionCounter.Inc()
return nil
}
if !filter.IsRegionReplicated(c.cluster, region) {
if !c.isRegionPlacementRuleSatisfiedWithBestLocation(region, true /* isRealRegion */) {
affinityCheckerAbnormalReplicaCounter.Inc()
return nil
}
Expand All @@ -111,8 +118,9 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
// so expire the group first, then provide the available Region information and fetch the Group state again.
if !isAffinity {
targetRegion := cloneRegionWithReplacePeerStores(region, group.LeaderStoreID, group.VoterStoreIDs...)
if targetRegion == nil || !filter.IsRegionReplicated(c.cluster, targetRegion) {
if targetRegion == nil || !c.isRegionPlacementRuleSatisfiedWithBestLocation(targetRegion, false /* isRealRegion */) {
c.affinityManager.ExpireAffinityGroup(group.ID)
group = c.affinityManager.GetAffinityGroupState(group.ID)
needRefetch = true
}
}
Expand Down Expand Up @@ -377,7 +385,7 @@ func (c *AffinityChecker) checkAffinityMergeTarget(region, adjacent *core.Region
return false
}

if !filter.IsRegionReplicated(c.cluster, adjacent) {
if !c.isRegionPlacementRuleSatisfiedWithBestLocation(adjacent, true /* isRealRegion */) {
affinityMergeCheckerAdjAbnormalReplicaCounter.Inc()
return false
}
Expand Down Expand Up @@ -487,3 +495,54 @@ func (c *AffinityChecker) RecordOpSuccess(op *operator.Operator) {
c.recentMergeCache.PutWithTTL(op.RegionID(), nil, recentMergeTTL)
c.recentMergeCache.PutWithTTL(relatedID, nil, recentMergeTTL)
}

func (c *AffinityChecker) isRegionPlacementRuleSatisfiedWithBestLocation(region *core.RegionInfo, isRealRegion bool) bool {
// Get the RegionFit for the given Region. If the Region is not a real existing Region but a virtual target state,
// use FitRegionWithoutCache to bypass the cache.
var fit *placement.RegionFit
if isRealRegion {
fit = c.ruleManager.FitRegion(c.cluster, region)
} else {
fit = c.ruleManager.FitRegionWithoutCache(c.cluster, region)
}

// Check region is satisfied
if fit == nil || !fit.IsSatisfied() {
return false
}

// Check whether all peers covered by the rules are at the best isolation level.
// This logic is based on `RuleChecker.fixBetterLocation`.
for _, rf := range fit.RuleFits {
if len(rf.Rule.LocationLabels) == 0 {
continue
}
isWitness := rf.Rule.IsWitness && isWitnessEnabled(c.cluster)
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(region, rf.Rule, isWitness)
Comment on lines +520 to +522
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we need not to consider witness

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for compatibility reasons, as the original code was retained.

_, newStoreID, filterByTempState := strategy.getBetterLocation(c.cluster, region, fit, rf)
// filterByTempState being true means a better placement exists but is temporarily unschedulable.
// This is also considered not satisfied.
if newStoreID != 0 || filterByTempState {
return false
}
// If the isolation level does not meet the requirement, it is also considered not to be at the best location.
if !statistics.IsRegionLabelIsolationSatisfied(rf.Stores, rf.Rule.LocationLabels, rf.Rule.IsolationLevel) {
return false
}
}

return true
}

func (c *AffinityChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.Name(),
cluster: c.cluster,
isolationLevel: rule.IsolationLevel,
locationLabels: rule.LocationLabels,
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
fastFailover: fastFailover,
}
}
80 changes: 77 additions & 3 deletions pkg/schedule/checker/affinity_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2050,9 +2050,9 @@ func TestAffinityCheckerExpireGroupWhenPlacementRuleMismatch(t *testing.T) {

groupState := affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.False(groupState.AffinitySchedulingAllowed, "Group should be expired when peer count violates placement rules")
re.Equal(affinity.PhasePending, groupState.Phase)
re.Equal([]uint64{1, 2, 3, 4}, groupState.VoterStoreIDs, "Peers remain as configured until a valid available region is observed")
re.True(groupState.AffinitySchedulingAllowed)
re.Equal(affinity.PhaseStable, groupState.Phase)
re.Equal([]uint64{1, 2, 3}, groupState.VoterStoreIDs)
}

// TestAffinityCheckerTargetStoreEvictLeader tests that operator is not created when target store has evict-leader.
Expand Down Expand Up @@ -2122,6 +2122,80 @@ func TestAffinityCheckerTargetStoreRejectLeader(t *testing.T) {
re.Nil(ops, "Should not create operator when target store has reject-leader label")
}

// TestAffinityCheckerRegionHasBetterLocation tests how the checker handles a Region where isRegionPlacementRuleSatisfiedWithBestLocation returns false.
func TestAffinityCheckerRegionHasBetterLocation(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := mockconfig.NewTestOptions()
opt.SetLocationLabels([]string{"region", "zone", "host"})
tc := mockcluster.NewCluster(ctx, opt)

tc.AddLabelsStore(1, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(3, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h2"})
tc.AddLabelsStore(4, 0, map[string]string{"region": "r1", "zone": "z2", "host": "h1"})
tc.AddLabelsStore(5, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h1"})
tc.AddLabelsStore(6, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h2"})

affinityManager := tc.GetAffinityManager()
checker := newTestAffinityChecker(ctx, tc, opt)

// Create affinity group without best location
group := &affinity.Group{
ID: "test_group",
LeaderStoreID: 2,
VoterStoreIDs: []uint64{1, 2, 3},
}
err := createAffinityGroupForTest(affinityManager, group, []byte(""), []byte(""))
re.NoError(err)

// No scheduling is generated because the source Region is not at the best location.
tc.AddLeaderRegion(100, 1, 3, 4)
region := tc.GetRegion(100)
re.False(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
ops := checker.Check(region)
re.Nil(ops)
groupState := affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.Equal([]uint64{1, 2, 3}, groupState.VoterStoreIDs)

// Because the Group’s currently specified Peers are not at the best location,
// they are cleared and replaced with the source Region’s Peers.
// In this case, no scheduling is generated, but the Peers are updated.
tc.AddLeaderRegion(200, 1, 4, 5)
region = tc.GetRegion(200)
re.True(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, false))
ops = checker.Check(region)
re.Nil(ops)
groupState = affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.Equal([]uint64{1, 4, 5}, groupState.VoterStoreIDs)

// When both the source Region and the target Region are at the best location, scheduling is generated.
tc.AddLeaderRegion(300, 1, 4, 6)
region = tc.GetRegion(300)
re.True(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
ops = checker.Check(region)
re.NotNil(ops)
re.Len(ops, 1)

// No scheduling is generated when Placement Rules are disabled.
tc.SetEnablePlacementRules(false)
ops = checker.Check(region)
re.Nil(ops)

// If an IsolationLevel is configured, no scheduling is generated when the required isolation level cannot be met.
tc.SetEnablePlacementRules(true)
rule := tc.GetRuleManager().GetRule(placement.DefaultGroupID, placement.DefaultRuleID)
re.NotNil(rule)
rule.IsolationLevel = "region"
re.NoError(tc.GetRuleManager().SetRule(rule))
ops = checker.Check(region)
re.Nil(ops)
}

// TestAffinityMergeCheckPeerStoreMismatch tests that merge is rejected when peer stores don't match.
func TestAffinityMergeCheckPeerStoreMismatch(t *testing.T) {
re := require.New(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/checker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ var (

affinityCheckerCounter = checkerCounter.WithLabelValues(affinityChecker, "check")
affinityCheckerPausedCounter = checkerCounter.WithLabelValues(affinityChecker, "paused")
affinityCheckerPlacementRulesDisabledCounter = checkerCounter.WithLabelValues(affinityChecker, "placement-rules-disabled")
affinityCheckerRegionNoLeaderCounter = checkerCounter.WithLabelValues(affinityChecker, "region-no-leader")
affinityCheckerGroupSchedulingDisabledCounter = checkerCounter.WithLabelValues(affinityChecker, "group-scheduling-disabled")
affinityCheckerNewOpCounter = checkerCounter.WithLabelValues(affinityChecker, "new-operator")
Expand Down
47 changes: 47 additions & 0 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/tikv/pd/pkg/core/constant"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/versioninfo"
)

// ReplicaStrategy collects some utilities to manipulate region peers. It
Expand Down Expand Up @@ -155,3 +157,48 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
}
return source.GetID()
}

func (s *ReplicaStrategy) getBetterLocation(cluster sche.SharedCluster, region *core.RegionInfo, fit *placement.RegionFit, rf *placement.RuleFit) (oldStoreID, newStoreID uint64, filterByTempState bool) {
ruleStores := getRuleFitStores(cluster, rf)
oldStoreID = s.SelectStoreToRemove(ruleStores)
if oldStoreID == 0 {
return 0, 0, false
}
oldStore := cluster.GetStore(oldStoreID)
if oldStore == nil {
return 0, 0, false
}
var coLocationStores []*core.StoreInfo
regionStores := cluster.GetRegionStores(region)
for _, store := range regionStores {
if store.GetLabelValue(core.EngineKey) != oldStore.GetLabelValue(core.EngineKey) {
continue
}
for _, r := range fit.GetRules() {
if r.Role != rf.Rule.Role {
continue
}
if placement.MatchLabelConstraints(store, r.LabelConstraints) {
coLocationStores = append(coLocationStores, store)
break
}
}
}
newStoreID, filterByTempState = s.SelectStoreToImprove(coLocationStores, oldStoreID)
return
}

func isWitnessEnabled(cluster sche.CheckerCluster) bool {
config := cluster.GetCheckerConfig()
return versioninfo.IsFeatureSupported(config.GetClusterVersion(), versioninfo.SwitchWitness) && config.IsWitnessAllowed()
}

func getRuleFitStores(cluster sche.SharedCluster, rf *placement.RuleFit) []*core.StoreInfo {
var stores []*core.StoreInfo
for _, p := range rf.Peers {
if s := cluster.GetStore(p.GetStoreId()); s != nil {
stores = append(stores, s)
}
}
return stores
}
Loading