Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ func (krgm *keyspaceResourceGroupManager) getMutableResourceGroup(name string) *
return krgm.groups[name]
}

func (krgm *keyspaceResourceGroupManager) getMutableResourceGroupList() []*ResourceGroup {
krgm.Lock()
defer krgm.Unlock()
res := make([]*ResourceGroup, 0, len(krgm.groups))
for _, group := range krgm.groups {
res = append(res, group)
}
return res
}

func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includeDefault bool) []*ResourceGroup {
krgm.RLock()
res := make([]*ResourceGroup, 0, len(krgm.groups))
Expand Down Expand Up @@ -254,6 +264,8 @@ func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64)
// Cleanup the overrides if the service limit is set to 0.
if serviceLimit <= 0 {
krgm.cleanupOverrides()
} else {
krgm.invalidateBurstability(serviceLimit)
}
}

Expand Down Expand Up @@ -439,7 +451,12 @@ func (krgm *keyspaceResourceGroupManager) conciliateFillRates() {
// just set the override fill rate to -1 to allow the resource group to consume as many RUs as they originally
// need according to its fill rate setting.
for _, group := range queue {
group.overrideFillRateAndBurstLimit(-1, -1)
if group.getBurstLimit(true) >= 0 {
group.overrideFillRateAndBurstLimit(-1, -1)
} else {
// If the original burst limit is not set, set the override burst limit to the remaining service limit.
group.overrideFillRateAndBurstLimit(-1, int64(remainingServiceLimit))
}
}
// Although this priority level does not require resource limiting, it still needs to deduct the actual
// RU consumption from `remainingServiceLimit` to reflect the concept of priority, so that the
Expand Down Expand Up @@ -609,15 +626,21 @@ func (di *demandInfo) allocateBurstRUDemand(
return remainingServiceLimit
}

// Cleanup the overrides for all the resource groups.
func (krgm *keyspaceResourceGroupManager) cleanupOverrides() {
krgm.RLock()
groups := make([]*ResourceGroup, 0, len(krgm.groups))
for _, group := range krgm.groups {
groups = append(groups, group)
}
krgm.RUnlock()
// Cleanup the overrides for all the resource groups without holding the lock.
for _, group := range groups {
for _, group := range krgm.getMutableResourceGroupList() {
group.overrideFillRateAndBurstLimit(-1, -1)
}
}

// Since the burstable resource groups won't require tokens from the server anymore,
// we have to override the burst limit of all the resource groups to the service limit.
// This ensures the burstability of the resource groups can be properly invalidated.
func (krgm *keyspaceResourceGroupManager) invalidateBurstability(serviceLimit float64) {
for _, group := range krgm.getMutableResourceGroupList() {
if group.getBurstLimit() >= 0 {
continue
}
group.overrideBurstLimit(int64(serviceLimit))
}
}
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/keyspace_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func TestConciliateFillRate(t *testing.T) {
// Priority 2: basic 30+40=70, burst 0+5=5, total 75 < service limit 100, so gets full allocation (remaining: 25)
// Priority 1: basic 20+30=50, burst 2+0=2, total 52 > service limit 25, so gets basic demand proportionally
expectedOverrideFillRateList: []float64{-1, -1, 10, 15},
expectedOverrideBurstLimitList: []int64{-1, -1, 10, 15},
expectedOverrideBurstLimitList: []int64{-1, 100, 10, 15},
},
{
name: "Unlimited burst limit with service limit constraint",
Expand All @@ -796,7 +796,7 @@ func TestConciliateFillRate(t *testing.T) {
// Priority 2: demand 30, gets full allocation (remaining: 70)
// Priority 1: demand 110 > remaining 70, basic demand 70 = remaining 70, proportional allocation: 70*(30/70)=30, 70*(40/70)=40
expectedOverrideFillRateList: []float64{-1, 30, 40},
expectedOverrideBurstLimitList: []int64{-1, 30, 40},
expectedOverrideBurstLimitList: []int64{100, 30, 40},
},
{
name: "Partial burst demand satisfied with unlimited burst limit",
Expand Down
11 changes: 4 additions & 7 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,12 @@ func (m *Manager) loadKeyspaceResourceGroups() error {
}); err != nil {
return err
}
// Load service limits from the storage.
if err := m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
}); err != nil {
return err
}
// Initialize the reserved keyspace resource group manager and default resource groups.
m.initReserved()
return nil
// Load service limits from the storage after all resource groups are loaded.
return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
})
}

func (m *Manager) initReserved() {
Expand Down
12 changes: 11 additions & 1 deletion pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (rg *ResourceGroup) overrideFillRateLocked(new float64) {
func (rg *ResourceGroup) getBurstLimit(ignoreOverride ...bool) int64 {
rg.RLock()
defer rg.RUnlock()
return rg.getBurstLimitLocked(ignoreOverride...)
}

func (rg *ResourceGroup) getBurstLimitLocked(ignoreOverride ...bool) int64 {
if len(ignoreOverride) > 0 && ignoreOverride[0] {
return rg.RUSettings.RU.getBurstLimitSetting()
}
Expand All @@ -177,6 +181,12 @@ func (rg *ResourceGroup) getOverrideBurstLimit() int64 {
return rg.RUSettings.RU.overrideBurstLimit
}

func (rg *ResourceGroup) overrideBurstLimit(new int64) {
rg.Lock()
defer rg.Unlock()
rg.overrideBurstLimitLocked(new)
}

func (rg *ResourceGroup) overrideBurstLimitLocked(new int64) {
rg.RUSettings.RU.overrideBurstLimit = new
}
Expand Down Expand Up @@ -265,7 +275,7 @@ func (rg *ResourceGroup) RequestRU(
if limitedTokens < grantedTokens {
tb.Tokens = limitedTokens
// Retain the unused tokens for the later requests if it has a burst limit.
if rg.getBurstLimit() > 0 {
if rg.getBurstLimitLocked() > 0 {
rg.RUSettings.RU.lastLimitedTokens += grantedTokens - limitedTokens
}
}
Expand Down