diff --git a/pkg/mcs/resourcemanager/server/keyspace_manager.go b/pkg/mcs/resourcemanager/server/keyspace_manager.go index f0f459355e8..9ca39d0cb6d 100644 --- a/pkg/mcs/resourcemanager/server/keyspace_manager.go +++ b/pkg/mcs/resourcemanager/server/keyspace_manager.go @@ -206,6 +206,16 @@ func (krgm *keyspaceResourceGroupManager) getMutableResourceGroup(name string) * return krgm.groups[name] } +func (krgm *keyspaceResourceGroupManager) getMutableResourceGroupList() []*ResourceGroup { + krgm.Lock() + defer krgm.Unlock() + res := make([]*ResourceGroup, 0, len(krgm.groups)) + for _, group := range krgm.groups { + res = append(res, group) + } + return res +} + func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includeDefault bool) []*ResourceGroup { krgm.RLock() res := make([]*ResourceGroup, 0, len(krgm.groups)) @@ -254,6 +264,8 @@ func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) // Cleanup the overrides if the service limit is set to 0. if serviceLimit <= 0 { krgm.cleanupOverrides() + } else { + krgm.invalidateBurstability(serviceLimit) } } @@ -439,7 +451,12 @@ func (krgm *keyspaceResourceGroupManager) conciliateFillRates() { // just set the override fill rate to -1 to allow the resource group to consume as many RUs as they originally // need according to its fill rate setting. for _, group := range queue { - group.overrideFillRateAndBurstLimit(-1, -1) + if group.getBurstLimit(true) >= 0 { + group.overrideFillRateAndBurstLimit(-1, -1) + } else { + // If the original burst limit is not set, set the override burst limit to the remaining service limit. + group.overrideFillRateAndBurstLimit(-1, int64(remainingServiceLimit)) + } } // Although this priority level does not require resource limiting, it still needs to deduct the actual // RU consumption from `remainingServiceLimit` to reflect the concept of priority, so that the @@ -609,15 +626,21 @@ func (di *demandInfo) allocateBurstRUDemand( return remainingServiceLimit } +// Cleanup the overrides for all the resource groups. func (krgm *keyspaceResourceGroupManager) cleanupOverrides() { - krgm.RLock() - groups := make([]*ResourceGroup, 0, len(krgm.groups)) - for _, group := range krgm.groups { - groups = append(groups, group) - } - krgm.RUnlock() - // Cleanup the overrides for all the resource groups without holding the lock. - for _, group := range groups { + for _, group := range krgm.getMutableResourceGroupList() { group.overrideFillRateAndBurstLimit(-1, -1) } } + +// Since the burstable resource groups won't require tokens from the server anymore, +// we have to override the burst limit of all the resource groups to the service limit. +// This ensures the burstability of the resource groups can be properly invalidated. +func (krgm *keyspaceResourceGroupManager) invalidateBurstability(serviceLimit float64) { + for _, group := range krgm.getMutableResourceGroupList() { + if group.getBurstLimit() >= 0 { + continue + } + group.overrideBurstLimit(int64(serviceLimit)) + } +} diff --git a/pkg/mcs/resourcemanager/server/keyspace_manager_test.go b/pkg/mcs/resourcemanager/server/keyspace_manager_test.go index 56efd244471..fda492c3ac7 100644 --- a/pkg/mcs/resourcemanager/server/keyspace_manager_test.go +++ b/pkg/mcs/resourcemanager/server/keyspace_manager_test.go @@ -784,7 +784,7 @@ func TestConciliateFillRate(t *testing.T) { // Priority 2: basic 30+40=70, burst 0+5=5, total 75 < service limit 100, so gets full allocation (remaining: 25) // Priority 1: basic 20+30=50, burst 2+0=2, total 52 > service limit 25, so gets basic demand proportionally expectedOverrideFillRateList: []float64{-1, -1, 10, 15}, - expectedOverrideBurstLimitList: []int64{-1, -1, 10, 15}, + expectedOverrideBurstLimitList: []int64{-1, 100, 10, 15}, }, { name: "Unlimited burst limit with service limit constraint", @@ -796,7 +796,7 @@ func TestConciliateFillRate(t *testing.T) { // Priority 2: demand 30, gets full allocation (remaining: 70) // Priority 1: demand 110 > remaining 70, basic demand 70 = remaining 70, proportional allocation: 70*(30/70)=30, 70*(40/70)=40 expectedOverrideFillRateList: []float64{-1, 30, 40}, - expectedOverrideBurstLimitList: []int64{-1, 30, 40}, + expectedOverrideBurstLimitList: []int64{100, 30, 40}, }, { name: "Partial burst demand satisfied with unlimited burst limit", diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 8d49994e829..567d4a04052 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -239,15 +239,12 @@ func (m *Manager) loadKeyspaceResourceGroups() error { }); err != nil { return err } - // Load service limits from the storage. - if err := m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) { - m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit) - }); err != nil { - return err - } // Initialize the reserved keyspace resource group manager and default resource groups. m.initReserved() - return nil + // Load service limits from the storage after all resource groups are loaded. + return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) { + m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit) + }) } func (m *Manager) initReserved() { diff --git a/pkg/mcs/resourcemanager/server/resource_group.go b/pkg/mcs/resourcemanager/server/resource_group.go index 0bb6b3c5a21..b81c0e4e8f1 100644 --- a/pkg/mcs/resourcemanager/server/resource_group.go +++ b/pkg/mcs/resourcemanager/server/resource_group.go @@ -165,6 +165,10 @@ func (rg *ResourceGroup) overrideFillRateLocked(new float64) { func (rg *ResourceGroup) getBurstLimit(ignoreOverride ...bool) int64 { rg.RLock() defer rg.RUnlock() + return rg.getBurstLimitLocked(ignoreOverride...) +} + +func (rg *ResourceGroup) getBurstLimitLocked(ignoreOverride ...bool) int64 { if len(ignoreOverride) > 0 && ignoreOverride[0] { return rg.RUSettings.RU.getBurstLimitSetting() } @@ -177,6 +181,12 @@ func (rg *ResourceGroup) getOverrideBurstLimit() int64 { return rg.RUSettings.RU.overrideBurstLimit } +func (rg *ResourceGroup) overrideBurstLimit(new int64) { + rg.Lock() + defer rg.Unlock() + rg.overrideBurstLimitLocked(new) +} + func (rg *ResourceGroup) overrideBurstLimitLocked(new int64) { rg.RUSettings.RU.overrideBurstLimit = new } @@ -265,7 +275,7 @@ func (rg *ResourceGroup) RequestRU( if limitedTokens < grantedTokens { tb.Tokens = limitedTokens // Retain the unused tokens for the later requests if it has a burst limit. - if rg.getBurstLimit() > 0 { + if rg.getBurstLimitLocked() > 0 { rg.RUSettings.RU.lastLimitedTokens += grantedTokens - limitedTokens } }