Skip to content

Commit fc5c37e

Browse files
authored
feat(resourcemanager): invalidate the burstable groups by overriding to service limit (#9577)
ref #9296 By overriding the burst limit with the service limit for burstable groups to enhance the constraint imposed by the service limit. Signed-off-by: JmPotato <github@ipotato.me>
1 parent 3679ab1 commit fc5c37e

File tree

4 files changed

+49
-19
lines changed

4 files changed

+49
-19
lines changed

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,16 @@ func (krgm *keyspaceResourceGroupManager) getMutableResourceGroup(name string) *
206206
return krgm.groups[name]
207207
}
208208

209+
func (krgm *keyspaceResourceGroupManager) getMutableResourceGroupList() []*ResourceGroup {
210+
krgm.Lock()
211+
defer krgm.Unlock()
212+
res := make([]*ResourceGroup, 0, len(krgm.groups))
213+
for _, group := range krgm.groups {
214+
res = append(res, group)
215+
}
216+
return res
217+
}
218+
209219
func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includeDefault bool) []*ResourceGroup {
210220
krgm.RLock()
211221
res := make([]*ResourceGroup, 0, len(krgm.groups))
@@ -254,6 +264,8 @@ func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64)
254264
// Cleanup the overrides if the service limit is set to 0.
255265
if serviceLimit <= 0 {
256266
krgm.cleanupOverrides()
267+
} else {
268+
krgm.invalidateBurstability(serviceLimit)
257269
}
258270
}
259271

@@ -439,7 +451,12 @@ func (krgm *keyspaceResourceGroupManager) conciliateFillRates() {
439451
// just set the override fill rate to -1 to allow the resource group to consume as many RUs as they originally
440452
// need according to its fill rate setting.
441453
for _, group := range queue {
442-
group.overrideFillRateAndBurstLimit(-1, -1)
454+
if group.getBurstLimit(true) >= 0 {
455+
group.overrideFillRateAndBurstLimit(-1, -1)
456+
} else {
457+
// If the original burst limit is not set, set the override burst limit to the remaining service limit.
458+
group.overrideFillRateAndBurstLimit(-1, int64(remainingServiceLimit))
459+
}
443460
}
444461
// Although this priority level does not require resource limiting, it still needs to deduct the actual
445462
// RU consumption from `remainingServiceLimit` to reflect the concept of priority, so that the
@@ -609,15 +626,21 @@ func (di *demandInfo) allocateBurstRUDemand(
609626
return remainingServiceLimit
610627
}
611628

629+
// Cleanup the overrides for all the resource groups.
612630
func (krgm *keyspaceResourceGroupManager) cleanupOverrides() {
613-
krgm.RLock()
614-
groups := make([]*ResourceGroup, 0, len(krgm.groups))
615-
for _, group := range krgm.groups {
616-
groups = append(groups, group)
617-
}
618-
krgm.RUnlock()
619-
// Cleanup the overrides for all the resource groups without holding the lock.
620-
for _, group := range groups {
631+
for _, group := range krgm.getMutableResourceGroupList() {
621632
group.overrideFillRateAndBurstLimit(-1, -1)
622633
}
623634
}
635+
636+
// Since the burstable resource groups won't require tokens from the server anymore,
637+
// we have to override the burst limit of all the resource groups to the service limit.
638+
// This ensures the burstability of the resource groups can be properly invalidated.
639+
func (krgm *keyspaceResourceGroupManager) invalidateBurstability(serviceLimit float64) {
640+
for _, group := range krgm.getMutableResourceGroupList() {
641+
if group.getBurstLimit() >= 0 {
642+
continue
643+
}
644+
group.overrideBurstLimit(int64(serviceLimit))
645+
}
646+
}

pkg/mcs/resourcemanager/server/keyspace_manager_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ func TestConciliateFillRate(t *testing.T) {
784784
// Priority 2: basic 30+40=70, burst 0+5=5, total 75 < service limit 100, so gets full allocation (remaining: 25)
785785
// Priority 1: basic 20+30=50, burst 2+0=2, total 52 > service limit 25, so gets basic demand proportionally
786786
expectedOverrideFillRateList: []float64{-1, -1, 10, 15},
787-
expectedOverrideBurstLimitList: []int64{-1, -1, 10, 15},
787+
expectedOverrideBurstLimitList: []int64{-1, 100, 10, 15},
788788
},
789789
{
790790
name: "Unlimited burst limit with service limit constraint",
@@ -796,7 +796,7 @@ func TestConciliateFillRate(t *testing.T) {
796796
// Priority 2: demand 30, gets full allocation (remaining: 70)
797797
// Priority 1: demand 110 > remaining 70, basic demand 70 = remaining 70, proportional allocation: 70*(30/70)=30, 70*(40/70)=40
798798
expectedOverrideFillRateList: []float64{-1, 30, 40},
799-
expectedOverrideBurstLimitList: []int64{-1, 30, 40},
799+
expectedOverrideBurstLimitList: []int64{100, 30, 40},
800800
},
801801
{
802802
name: "Partial burst demand satisfied with unlimited burst limit",

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -239,15 +239,12 @@ func (m *Manager) loadKeyspaceResourceGroups() error {
239239
}); err != nil {
240240
return err
241241
}
242-
// Load service limits from the storage.
243-
if err := m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
244-
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
245-
}); err != nil {
246-
return err
247-
}
248242
// Initialize the reserved keyspace resource group manager and default resource groups.
249243
m.initReserved()
250-
return nil
244+
// Load service limits from the storage after all resource groups are loaded.
245+
return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
246+
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
247+
})
251248
}
252249

253250
func (m *Manager) initReserved() {

pkg/mcs/resourcemanager/server/resource_group.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@ func (rg *ResourceGroup) overrideFillRateLocked(new float64) {
165165
func (rg *ResourceGroup) getBurstLimit(ignoreOverride ...bool) int64 {
166166
rg.RLock()
167167
defer rg.RUnlock()
168+
return rg.getBurstLimitLocked(ignoreOverride...)
169+
}
170+
171+
func (rg *ResourceGroup) getBurstLimitLocked(ignoreOverride ...bool) int64 {
168172
if len(ignoreOverride) > 0 && ignoreOverride[0] {
169173
return rg.RUSettings.RU.getBurstLimitSetting()
170174
}
@@ -177,6 +181,12 @@ func (rg *ResourceGroup) getOverrideBurstLimit() int64 {
177181
return rg.RUSettings.RU.overrideBurstLimit
178182
}
179183

184+
func (rg *ResourceGroup) overrideBurstLimit(new int64) {
185+
rg.Lock()
186+
defer rg.Unlock()
187+
rg.overrideBurstLimitLocked(new)
188+
}
189+
180190
func (rg *ResourceGroup) overrideBurstLimitLocked(new int64) {
181191
rg.RUSettings.RU.overrideBurstLimit = new
182192
}
@@ -265,7 +275,7 @@ func (rg *ResourceGroup) RequestRU(
265275
if limitedTokens < grantedTokens {
266276
tb.Tokens = limitedTokens
267277
// Retain the unused tokens for the later requests if it has a burst limit.
268-
if rg.getBurstLimit() > 0 {
278+
if rg.getBurstLimitLocked() > 0 {
269279
rg.RUSettings.RU.lastLimitedTokens += grantedTokens - limitedTokens
270280
}
271281
}

0 commit comments

Comments
 (0)