Skip to content
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -846,6 +846,11 @@ error = '''
invalid group settings, please check the group name, priority and the number of resources
'''

["PD:resourcemanager:ErrKeyspaceNotExists"]
error = '''
the keyspace does not exist with id %d
'''

["PD:scatter:ErrEmptyRegion"]
error = '''
empty region
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ var (

// Resource Manager errors
var (
ErrKeyspaceNotExists = errors.Normalize("the keyspace does not exist with id %d", errors.RFCCodeText("PD:resourcemanager:ErrKeyspaceNotExists"))
ErrResourceGroupNotExists = errors.Normalize("the %s resource group does not exist", errors.RFCCodeText("PD:resourcemanager:ErrGroupNotExists"))
ErrDeleteReservedGroup = errors.Normalize("cannot delete reserved group", errors.RFCCodeText("PD:resourcemanager:ErrDeleteReservedGroup"))
ErrInvalidGroup = errors.Normalize("invalid group settings, please check the group name, priority and the number of resources", errors.RFCCodeText("PD:resourcemanager:ErrInvalidGroup"))
Expand Down
7 changes: 4 additions & 3 deletions pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

rmserver "github.com/tikv/pd/pkg/mcs/resourcemanager/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/reflectutil"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (s *Service) putResourceGroup(c *gin.Context) {
// @Router /config/group/{name} [get]
func (s *Service) getResourceGroup(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
group := s.manager.GetResourceGroup(c.Param("name"), withStats)
group := s.manager.GetResourceGroup(constant.NullKeyspaceID, c.Param("name"), withStats)
if group == nil {
c.String(http.StatusNotFound, errors.New("resource group not found").Error())
}
Expand All @@ -178,7 +179,7 @@ func (s *Service) getResourceGroup(c *gin.Context) {
// @Router /config/groups [get]
func (s *Service) getResourceGroupList(c *gin.Context) {
withStats := strings.EqualFold(c.Query("with_stats"), "true")
groups := s.manager.GetResourceGroupList(withStats)
groups := s.manager.GetResourceGroupList(constant.NullKeyspaceID, withStats)
c.IndentedJSON(http.StatusOK, groups)
}

Expand All @@ -191,7 +192,7 @@ func (s *Service) getResourceGroupList(c *gin.Context) {
// @Failure 404 {string} error
// @Router /config/group/{name} [delete]
func (s *Service) deleteResourceGroup(c *gin.Context) {
if err := s.manager.DeleteResourceGroup(c.Param("name")); err != nil {
if err := s.manager.DeleteResourceGroup(constant.NullKeyspaceID, c.Param("name")); err != nil {
c.String(http.StatusNotFound, err.Error())
}
c.String(http.StatusOK, "Success!")
Expand Down
24 changes: 9 additions & 15 deletions pkg/mcs/resourcemanager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/apiutil"
)

Expand Down Expand Up @@ -93,7 +94,7 @@
if err := s.checkServing(); err != nil {
return nil, err
}
rg := s.manager.GetResourceGroup(req.ResourceGroupName, req.WithRuStats)
rg := s.manager.GetResourceGroup(constant.NullKeyspaceID, req.ResourceGroupName, req.WithRuStats)
if rg == nil {
return nil, errors.New("resource group not found")
}
Expand All @@ -107,7 +108,7 @@
if err := s.checkServing(); err != nil {
return nil, err
}
groups := s.manager.GetResourceGroupList(req.WithRuStats)
groups := s.manager.GetResourceGroupList(constant.NullKeyspaceID, req.WithRuStats)
resp := &rmpb.ListResourceGroupsResponse{
Groups: make([]*rmpb.ResourceGroup, 0, len(groups)),
}
Expand All @@ -134,7 +135,7 @@
if err := s.checkServing(); err != nil {
return nil, err
}
err := s.manager.DeleteResourceGroup(req.ResourceGroupName)
err := s.manager.DeleteResourceGroup(constant.NullKeyspaceID, req.ResourceGroupName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -180,24 +181,17 @@
for _, req := range request.Requests {
resourceGroupName := req.GetResourceGroupName()
// Get the resource group from manager to acquire token buckets.
rg := s.manager.GetMutableResourceGroup(resourceGroupName)
rg := s.manager.GetMutableResourceGroup(constant.NullKeyspaceID, resourceGroupName)
if rg == nil {
log.Warn("resource group not found", zap.String("resource-group", resourceGroupName))
continue
}
// Send the consumption to update the metrics.
isBackground := req.GetIsBackground()
isTiFlash := req.GetIsTiflash()
if isBackground && isTiFlash {
return errors.New("background and tiflash cannot be true at the same time")
err = s.manager.dispatchConsumption(req)
if err != nil {
return err

Check warning on line 192 in pkg/mcs/resourcemanager/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/grpc_service.go#L192

Added line #L192 was not covered by tests
}
s.manager.consumptionDispatcher <- struct {
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}{resourceGroupName, req.GetConsumptionSinceLastRequest(), isBackground, isTiFlash}
if isBackground {
if req.GetIsBackground() {
continue
}
now := time.Now()
Expand Down
239 changes: 239 additions & 0 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"encoding/json"
"math"
"sort"

"github.com/gogo/protobuf/proto"
"go.uber.org/zap"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
)

const (
defaultConsumptionChanSize = 1024

reservedDefaultGroupName = "default"
maxGroupNameLength = 32
middlePriority = 8
maxPriority = 16
unlimitedRate = math.MaxInt32
unlimitedBurstLimit = -1
)

// consumptionItem is used to send the consumption info to the background metrics flusher.
type consumptionItem struct {
keyspaceID uint32
resourceGroupName string
*rmpb.Consumption
isBackground bool
isTiFlash bool
}

type keyspaceResourceGroupManager struct {
syncutil.RWMutex
groups map[string]*ResourceGroup

keyspaceID uint32
storage endpoint.ResourceGroupStorage
}

func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.ResourceGroupStorage) *keyspaceResourceGroupManager {
return &keyspaceResourceGroupManager{
groups: make(map[string]*ResourceGroup),
keyspaceID: keyspaceID,
storage: storage,
}
}

func (krgm *keyspaceResourceGroupManager) addResourceGroupFromRaw(name string, rawValue string) error {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(rawValue), group); err != nil {
log.Error("failed to parse the keyspace resource group meta info",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return err
}
krgm.Lock()
krgm.groups[group.Name] = FromProtoResourceGroup(group)
krgm.Unlock()
return nil
}

func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name string, rawValue string) error {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(rawValue), tokens); err != nil {
log.Error("failed to parse the keyspace resource group state",
zap.Uint32("keyspace-id", krgm.keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))
return err
}
krgm.Lock()
if group, ok := krgm.groups[name]; ok {
group.SetStatesIntoResourceGroup(tokens)
}
krgm.Unlock()
return nil
}

func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
krgm.RLock()
if _, ok := krgm.groups[reservedDefaultGroupName]; ok {
krgm.RUnlock()
return
}
krgm.RUnlock()
defaultGroup := &ResourceGroup{
Name: reservedDefaultGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: unlimitedRate,
BurstLimit: unlimitedBurstLimit,
},
},
},
Priority: middlePriority,
}
if err := krgm.addResourceGroup(defaultGroup.IntoProtoResourceGroup()); err != nil {
log.Warn("init default group failed", zap.Uint32("keyspace-id", krgm.keyspaceID), zap.Error(err))
}

Check warning on line 119 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L118-L119

Added lines #L118 - L119 were not covered by tests
}

func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.ResourceGroup) error {
if len(grouppb.Name) == 0 || len(grouppb.Name) > maxGroupNameLength {
return errs.ErrInvalidGroup
}
// Check the Priority.
if grouppb.GetPriority() > maxPriority {
return errs.ErrInvalidGroup
}

Check warning on line 129 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L128-L129

Added lines #L128 - L129 were not covered by tests
group := FromProtoResourceGroup(grouppb)
krgm.Lock()
defer krgm.Unlock()
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
}

Check warning on line 135 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L134-L135

Added lines #L134 - L135 were not covered by tests
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
return err
}

Check warning on line 138 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L137-L138

Added lines #L137 - L138 were not covered by tests
krgm.groups[group.Name] = group
return nil
}

func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.ResourceGroup) error {
if group == nil || group.Name == "" {
return errs.ErrInvalidGroup
}

Check warning on line 146 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L145-L146

Added lines #L145 - L146 were not covered by tests
krgm.RLock()
curGroup, ok := krgm.groups[group.Name]
krgm.RUnlock()
if !ok {
return errs.ErrResourceGroupNotExists.FastGenByArgs(group.Name)
}

err := curGroup.PatchSettings(group)
if err != nil {
return err
}

Check warning on line 157 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L156-L157

Added lines #L156 - L157 were not covered by tests
return curGroup.persistSettings(krgm.keyspaceID, krgm.storage)
}

func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error {
if name == reservedDefaultGroupName {
return errs.ErrDeleteReservedGroup
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
}

Check warning on line 167 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L166-L167

Added lines #L166 - L167 were not covered by tests
krgm.Lock()
delete(krgm.groups, name)
krgm.Unlock()
return nil
}

func (krgm *keyspaceResourceGroupManager) getResourceGroup(name string, withStats bool) *ResourceGroup {
krgm.RLock()
defer krgm.RUnlock()
if group, ok := krgm.groups[name]; ok {
return group.Clone(withStats)
}
return nil
}

func (krgm *keyspaceResourceGroupManager) getMutableResourceGroup(name string) *ResourceGroup {
krgm.Lock()
defer krgm.Unlock()
return krgm.groups[name]
}

func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includeDefault bool) []*ResourceGroup {
krgm.RLock()
res := make([]*ResourceGroup, 0, len(krgm.groups))
for _, group := range krgm.groups {
if !includeDefault && group.Name == reservedDefaultGroupName {
continue
}
res = append(res, group.Clone(withStats))
}
krgm.RUnlock()
sort.Slice(res, func(i, j int) bool {
return res[i].Name < res[j].Name
})
return res
}

func (krgm *keyspaceResourceGroupManager) getResourceGroupNames(includeDefault bool) []string {
krgm.RLock()
defer krgm.RUnlock()
res := make([]string, 0, len(krgm.groups))
for name := range krgm.groups {
if !includeDefault && name == reservedDefaultGroupName {
continue
}
res = append(res, name)
}
return res
}

func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
krgm.RLock()
keys := make([]string, 0, len(krgm.groups))
for k := range krgm.groups {
keys = append(keys, k)
}
krgm.RUnlock()
for idx := range keys {
krgm.RLock()
group, ok := krgm.groups[keys[idx]]
if ok {
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
log.Error("persist keyspace resource group state failed",
zap.Uint32("keyspace-id", krgm.keyspaceID),
zap.String("group-name", group.Name),
zap.Int("index", idx),
zap.Error(err))
}

Check warning on line 235 in pkg/mcs/resourcemanager/server/keyspace_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/keyspace_manager.go#L230-L235

Added lines #L230 - L235 were not covered by tests
}
krgm.RUnlock()
}
}
Loading