Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions pkg/schedule/affinity/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func (g *Group) String() string {
// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it.
type GroupState struct {
Group
// RegularSchedulingEnabled indicates whether balance scheduling is allowed.
RegularSchedulingEnabled bool `json:"-"`
// AffinitySchedulingEnabled indicates whether affinity scheduling is allowed.
AffinitySchedulingEnabled bool `json:"-"`
// RegularSchedulingAllowed indicates whether balance scheduling is allowed.
RegularSchedulingAllowed bool `json:"-"`
// AffinitySchedulingAllowed indicates whether affinity scheduling is allowed.
AffinitySchedulingAllowed bool `json:"-"`
Comment on lines +111 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is RegularSchedulingAllowed == !AffinitySchedulingAllowed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, if the group is in degraded, they are both false.

// Phase is a status intended for API display. See the definition of Phase for details.
Phase Phase `json:"phase"`
// RangeCount indicates how many key ranges are associated with this group.
Expand All @@ -130,7 +130,7 @@ type GroupState struct {

// IsRegionAffinity checks whether the Region is in an affinity state.
func (g *GroupState) isRegionAffinity(region *core.RegionInfo) bool {
if region == nil || !g.AffinitySchedulingEnabled {
if region == nil || !g.AffinitySchedulingAllowed {
return false
}
// Compare the Leader
Expand Down Expand Up @@ -176,8 +176,8 @@ type runtimeGroupInfo struct {
// newGroupState creates a GroupState from the given runtimeGroupInfo.
func newGroupState(g *runtimeGroupInfo) *GroupState {
var phase Phase
affinitySchedulingEnabled := g.IsAffinitySchedulingEnabled()
if g.RangeCount != 0 && affinitySchedulingEnabled {
affinitySchedulingAllowed := g.IsAffinitySchedulingAllowed()
if g.RangeCount != 0 && affinitySchedulingAllowed {
if g.AffinityRegionCount > 0 && len(g.Regions) == g.AffinityRegionCount {
phase = PhaseStable
} else {
Expand All @@ -194,8 +194,8 @@ func newGroupState(g *runtimeGroupInfo) *GroupState {
LeaderStoreID: g.LeaderStoreID,
VoterStoreIDs: slices.Clone(g.VoterStoreIDs),
},
RegularSchedulingEnabled: g.IsRegularSchedulingEnabled(),
AffinitySchedulingEnabled: affinitySchedulingEnabled,
RegularSchedulingAllowed: g.IsRegularSchedulingAllowed(),
AffinitySchedulingAllowed: affinitySchedulingAllowed,
Phase: phase,
RangeCount: g.RangeCount,
RegionCount: len(g.Regions),
Expand Down Expand Up @@ -263,13 +263,13 @@ func newDegradedExpiredAtFromNow() uint64 {
return uint64(time.Now().Unix()) + defaultDegradedExpirationSeconds
}

// IsAffinitySchedulingEnabled indicates whether affinity scheduling is allowed.
func (g *runtimeGroupInfo) IsAffinitySchedulingEnabled() bool {
// IsAffinitySchedulingAllowed indicates whether affinity scheduling is allowed.
func (g *runtimeGroupInfo) IsAffinitySchedulingAllowed() bool {
return g.IsAvailable() && g.LeaderStoreID != 0 && len(g.VoterStoreIDs) != 0
}

// IsRegularSchedulingEnabled indicates whether balance scheduling is allowed.
func (g *runtimeGroupInfo) IsRegularSchedulingEnabled() bool {
// IsRegularSchedulingAllowed indicates whether balance scheduling is allowed.
func (g *runtimeGroupInfo) IsRegularSchedulingAllowed() bool {
return g.IsExpired() || g.LeaderStoreID == 0 || len(g.VoterStoreIDs) == 0
}

Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/affinity/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func TestStoreCondition(t *testing.T) {
re.Equal(groupDegraded, storeEvictLeader.groupAvailability())
re.Equal(groupDegraded, storeLowSpace.groupAvailability())
re.Equal(groupExpired, storeDown.groupAvailability())
re.Equal(groupExpired, storeRemovingOrRemoved.groupAvailability())
re.Equal(groupExpired, storeRemoving.groupAvailability())
re.Equal(groupExpired, storeRemoved.groupAvailability())

re.True(storeEvictLeader.affectsLeaderOnly())
re.False(storeDisconnected.affectsLeaderOnly())
Expand All @@ -51,7 +52,7 @@ func TestGroupState(t *testing.T) {
LeaderStoreID: 1,
VoterStoreIDs: []uint64{1, 2, 3},
},
AffinitySchedulingEnabled: true,
AffinitySchedulingAllowed: true,
}
// keyRange is unused in this test.
region := generateRegionForTest(100, []uint64{1, 2, 3}, nonOverlappingRange)
Expand All @@ -61,7 +62,7 @@ func TestGroupState(t *testing.T) {
region = generateRegionForTest(100, []uint64{1, 2, 4}, nonOverlappingRange)
re.False(group.isRegionAffinity(region))

group.AffinitySchedulingEnabled = false
group.AffinitySchedulingAllowed = false
region = generateRegionForTest(100, []uint64{1, 2, 3}, nonOverlappingRange)
re.False(group.isRegionAffinity(region))
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/schedule/affinity/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,11 @@ func TestAvailabilityChangeRegionCount(t *testing.T) {
storeInfos.PutStore(store2Down)

// Trigger availability check
manager.checkStoresAvailability()
manager.checkGroupsAvailability()

// Verify group state changed
groupInfo := getGroupForTest(re, manager, "availability-test")
re.False(groupInfo.IsAffinitySchedulingEnabled())
re.False(groupInfo.IsAffinitySchedulingAllowed())

// Verify cache is cleared
groupState2 := manager.GetAffinityGroupState("availability-test")
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestConcurrentOperations(t *testing.T) {
go func() {
defer wg.Done()
for range 20 {
manager.checkStoresAvailability()
manager.checkGroupsAvailability()
time.Sleep(2 * time.Millisecond)
}
}()
Expand Down Expand Up @@ -561,18 +561,18 @@ func TestDegradedExpiration(t *testing.T) {
// Verify group is available
groupState := manager.GetAffinityGroupState("expiration-test")
re.NotNil(groupState)
re.True(groupState.AffinitySchedulingEnabled)
re.True(groupState.AffinitySchedulingAllowed)

// Make store 2 unhealthy to trigger degraded status
store2 := storeInfos.GetStore(2)
store2Down := store2.Clone(core.SetLastHeartbeatTS(time.Now().Add(-2 * time.Minute)))
storeInfos.PutStore(store2Down)
manager.checkStoresAvailability()
manager.checkGroupsAvailability()

// Verify group became degraded
groupInfo := getGroupForTest(re, manager, "expiration-test")
re.Equal(groupDegraded, groupInfo.GetAvailability())
re.False(groupInfo.IsAffinitySchedulingEnabled())
re.False(groupInfo.IsAffinitySchedulingAllowed())

// Record the expiration time
manager.RLock()
Expand All @@ -590,7 +590,7 @@ func TestDegradedExpiration(t *testing.T) {
manager.Unlock()

// Run availability check again
manager.checkStoresAvailability()
manager.checkGroupsAvailability()

// Verify group is now expired
re.True(groupInfo.IsExpired())
Expand All @@ -599,5 +599,5 @@ func TestDegradedExpiration(t *testing.T) {
// Verify scheduling is still disallowed
groupState2 := manager.GetAffinityGroupState("expiration-test")
re.NotNil(groupState2)
re.False(groupState2.AffinitySchedulingEnabled)
re.False(groupState2.AffinitySchedulingAllowed)
}
7 changes: 4 additions & 3 deletions pkg/schedule/affinity/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ var (

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
groupCount = affinityStatusGauge.WithLabelValues("group-count")
regionCount = affinityStatusGauge.WithLabelValues("region-count")
affinityRegionCount = affinityStatusGauge.WithLabelValues("affinity-region-count")
groupCount = affinityStatusGauge.WithLabelValues("group-count")
regionCount = affinityStatusGauge.WithLabelValues("region-count")
affinityRegionCount = affinityStatusGauge.WithLabelValues("affinity-region-count")
regionSplitDenyCount = affinityStatusGauge.WithLabelValues("split-deny-count")
)

func init() {
Expand Down
61 changes: 39 additions & 22 deletions pkg/schedule/affinity/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (

// storeCondition is an enum for store conditions. Valid values are the store-prefixed enum constants,
// which are split into three groups separated by degradedBoundary.
// Roughly, larger values indicate a more severe degree of unavailability.
type storeCondition int

const (
Expand All @@ -51,7 +52,8 @@ const (

// All values greater than degradedBoundary will trigger groupExpired.
storeDown
storeRemovingOrRemoved
storeRemoving
storeRemoved
)

func (c storeCondition) String() string {
Expand All @@ -68,8 +70,10 @@ func (c storeCondition) String() string {
return "low-space"
case storeDown:
return "down"
case storeRemovingOrRemoved:
return "removing-or-removed"
case storeRemoving:
return "removing"
case storeRemoved:
return "removed"
default:
return "unknown"
}
Expand All @@ -95,25 +99,27 @@ func (c storeCondition) affectsLeaderOnly() bool {
}
}

// GetNewAvailability uses the given unavailableStores to compute a new groupAvailability.
// Note that this function does not update runtimeGroupInfo.
func (g *runtimeGroupInfo) GetNewAvailability(unavailableStores map[uint64]storeCondition) groupAvailability {
maxCondition := storeAvailable
for _, storeID := range g.VoterStoreIDs {
if condition, ok := unavailableStores[storeID]; ok && (!condition.affectsLeaderOnly() || storeID == g.LeaderStoreID) {
if maxCondition == storeAvailable || condition > maxCondition {
maxCondition = condition
func calcGroupAvailability(
unavailableStores map[uint64]storeCondition,
leaderStoreID uint64,
voterStoreIDs []uint64,
) groupAvailability {
worstCondition := storeAvailable
for _, storeID := range voterStoreIDs {
if condition, ok := unavailableStores[storeID]; ok && (!condition.affectsLeaderOnly() || storeID == leaderStoreID) {
if worstCondition == storeAvailable || condition > worstCondition {
worstCondition = condition
}
}
}
return maxCondition.groupAvailability()
return worstCondition.groupAvailability()
}

// ObserveAvailableRegion observes available Region and collects information to update the Peer distribution within the Group.
func (m *Manager) ObserveAvailableRegion(region *core.RegionInfo, group *GroupState) {
// Use the peer distribution of the first observed available Region as the result.
// In the future, we may want to use a more sophisticated strategy rather than first-win.
if group == nil || group.AffinitySchedulingEnabled {
if group == nil || group.AffinitySchedulingAllowed {
return
}
leaderStoreID := region.GetLeader().GetStoreId()
Expand Down Expand Up @@ -141,19 +147,19 @@ func (m *Manager) startAvailabilityCheckLoop() {
log.Info("affinity manager availability check loop stopped")
return
case <-ticker.C:
m.checkStoresAvailability()
m.checkGroupsAvailability()
}
}
}()
log.Info("affinity manager availability check loop started", zap.Duration("interval", interval))
}

// checkStoresAvailability checks the availability status of stores and invalidates groups with unavailable stores.
func (m *Manager) checkStoresAvailability() {
// checkGroupsAvailability checks the condition of stores and invalidates groups with unavailable stores.
func (m *Manager) checkGroupsAvailability() {
if !m.IsAvailable() {
return
}
unavailableStores := m.generateUnavailableStores()
unavailableStores := m.collectUnavailableStores()
isUnavailableStoresChanged, groupAvailabilityChanges := m.getGroupAvailabilityChanges(unavailableStores)
if isUnavailableStoresChanged {
m.setGroupAvailabilityChanges(unavailableStores, groupAvailabilityChanges)
Expand All @@ -172,15 +178,17 @@ func (m *Manager) collectMetrics() {
affinityRegionCount.Set(float64(m.affinityRegionCount))
}

func (m *Manager) generateUnavailableStores() map[uint64]storeCondition {
func (m *Manager) collectUnavailableStores() map[uint64]storeCondition {
unavailableStores := make(map[uint64]storeCondition)
stores := m.storeSetInformer.GetStores()
lowSpaceRatio := m.conf.GetLowSpaceRatio()
for _, store := range stores {
switch {
// First the conditions that will mark the group as expired
case store.IsRemoved() || store.IsPhysicallyDestroyed() || store.IsRemoving():
unavailableStores[store.GetID()] = storeRemovingOrRemoved
case store.IsRemoved() || store.IsPhysicallyDestroyed():
unavailableStores[store.GetID()] = storeRemoved
case store.IsRemoving():
unavailableStores[store.GetID()] = storeRemoving
case store.IsUnhealthy():
unavailableStores[store.GetID()] = storeDown

Expand Down Expand Up @@ -216,8 +224,17 @@ func (m *Manager) getGroupAvailabilityChanges(unavailableStores map[uint64]store
// Analyze which Groups have changed availability
// Collect log messages to print after releasing lock
for _, groupInfo := range m.groups {
newAvailability := groupInfo.GetNewAvailability(unavailableStores)
if newAvailability != groupInfo.GetAvailability() {
availability := groupInfo.GetAvailability()

// A Group in the expired status cannot be restored.
if availability == groupExpired {
unavailableGroupCount++
continue
}

// Only Groups in the available or degraded status can be changed automatically.
newAvailability := calcGroupAvailability(unavailableStores, groupInfo.LeaderStoreID, groupInfo.VoterStoreIDs)
if availability != newAvailability {
groupAvailabilityChanges[groupInfo.ID] = newAvailability
}
if newAvailability == groupAvailable {
Expand Down
Loading