Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.DeleteResourceGroupOption) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)

Expand Down Expand Up @@ -140,6 +140,7 @@
provider ResourceGroupProvider
groupsController sync.Map
ruConfig *RUConfig
keyspaceID uint32

loopCtx context.Context
loopCancel func()
Expand Down Expand Up @@ -174,6 +175,7 @@
clientUniqueID uint64,
provider ResourceGroupProvider,
requestUnitConfig *RequestUnitConfig,
keyspaceID uint32,
opts ...ResourceControlCreateOption,
) (*ResourceGroupsController, error) {
config, err := loadServerConfig(ctx, provider)
Expand All @@ -188,6 +190,7 @@
controller := &ResourceGroupsController{
clientUniqueID: clientUniqueID,
provider: provider,
keyspaceID: keyspaceID,
ruConfig: ruConfig,
lowTokenNotifyChan: make(chan notifyMsg, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
Expand All @@ -197,7 +200,7 @@
for _, opt := range opts {
opt(controller)
}
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig), zap.Uint32("keyspace-id", keyspaceID))
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
controller.safeRuConfig.Store(controller.ruConfig)
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
Expand Down Expand Up @@ -272,7 +275,8 @@
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
Expand All @@ -299,7 +303,8 @@
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())

Check warning on line 307 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L306-L307

Added lines #L306 - L307 were not covered by tests
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
Expand Down
7 changes: 4 additions & 3 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"

pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/constants"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/opt"
)
Expand Down Expand Up @@ -210,7 +211,7 @@ func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, met
return args.String(0), args.Error(1)
}

func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string, _ ...pd.DeleteResourceGroupOption) (string, error) {
func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
args := m.Called(ctx, resourceGroupName)
return args.String(0), args.Error(1)
}
Expand Down Expand Up @@ -251,7 +252,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
Expand Down Expand Up @@ -325,7 +326,7 @@ func TestTryGetController(t *testing.T) {
defer cancel()

mockProvider := newMockResourceGroupProvider()
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
controller.Start(ctx)

defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
Expand Down
69 changes: 26 additions & 43 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pd

import (
"context"
"fmt"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -34,14 +35,20 @@ import (
type actionType int

const (
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
controllerConfigPathPrefix = "resource_group/controller"
add actionType = 0
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
keyspaceResourceGroupSettingPathPrefix = "resource_group/keyspace/settings/%d"
controllerConfigPathPrefix = "resource_group/controller"
)

// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix)
func GroupSettingsPathPrefixBytes(keyspaceID uint32) []byte {
if keyspaceID == constants.NullKeyspaceID {
return []byte(groupSettingsPathPrefix)
}
return fmt.Appendf(nil, keyspaceResourceGroupSettingPathPrefix, keyspaceID)
}

// ControllerConfigPathPrefixBytes is used to watch or get controller config.
var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix)
Expand All @@ -52,16 +59,15 @@ type ResourceManagerClient interface {
GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...GetResourceGroupOption) (*rmpb.ResourceGroup, error)
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...DeleteResourceGroupOption) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error)
}

// GetResourceGroupOp represents available options when getting resource group.
type GetResourceGroupOp struct {
withRUStats bool
withKeyspaceID *rmpb.KeyspaceIDValue
withRUStats bool
}

// GetResourceGroupOption configures GetResourceGroupOp.
Expand All @@ -72,32 +78,6 @@ func WithRUStats(op *GetResourceGroupOp) {
op.withRUStats = true
}

// WithKeyspaceID specifies to return resource group with keyspace id.
func WithKeyspaceID(keyspaceID uint32) GetResourceGroupOption {
return func(op *GetResourceGroupOp) {
op.withKeyspaceID = &rmpb.KeyspaceIDValue{
Value: keyspaceID,
}
}
}

// DeleteResourceGroupOp represents available options when deleting resource group.
type DeleteResourceGroupOp struct {
withKeyspaceID *rmpb.KeyspaceIDValue
}

// DeleteResourceGroupOption configures DeleteResourceGroupOp.
type DeleteResourceGroupOption func(*DeleteResourceGroupOp)

// DeleteWithKeyspaceID specifies to delete resource group with keyspace id.
func DeleteWithKeyspaceID(keyspaceID uint32) DeleteResourceGroupOption {
return func(op *DeleteResourceGroupOp) {
op.withKeyspaceID = &rmpb.KeyspaceIDValue{
Value: keyspaceID,
}
}
}

// resourceManagerClient gets the ResourceManager client of current PD leader.
func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) {
cc, err := c.getOrCreateGRPCConn()
Expand All @@ -119,7 +99,9 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup
}
req := &rmpb.ListResourceGroupsRequest{
WithRuStats: getOp.withRUStats,
KeyspaceId: getOp.withKeyspaceID,
KeyspaceId: &rmpb.KeyspaceIDValue{
Value: c.inner.keyspaceID,
},
}
resp, err := cc.ListResourceGroups(ctx, req)
if err != nil {
Expand All @@ -146,7 +128,9 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string,
req := &rmpb.GetResourceGroupRequest{
ResourceGroupName: resourceGroupName,
WithRuStats: getOp.withRUStats,
KeyspaceId: getOp.withKeyspaceID,
KeyspaceId: &rmpb.KeyspaceIDValue{
Value: c.inner.keyspaceID,
},
}
resp, err := cc.GetResourceGroup(ctx, req)
if err != nil {
Expand Down Expand Up @@ -197,18 +181,16 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG
}

// DeleteResourceGroup implements the ResourceManagerClient interface.
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string, ops ...DeleteResourceGroupOption) (string, error) {
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
cc, err := c.inner.resourceManagerClient()
if err != nil {
return "", err
}
deleteOp := &DeleteResourceGroupOp{}
for _, op := range ops {
op(deleteOp)
}
req := &rmpb.DeleteResourceGroupRequest{
ResourceGroupName: resourceGroupName,
KeyspaceId: deleteOp.withKeyspaceID,
KeyspaceId: &rmpb.KeyspaceIDValue{
Value: c.inner.keyspaceID,
},
}
resp, err := cc.DeleteResourceGroup(ctx, req)
if err != nil {
Expand All @@ -224,7 +206,8 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri

// LoadResourceGroups implements the ResourceManagerClient interface.
func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, opt.WithPrefix())
prefix := GroupSettingsPathPrefixBytes(c.inner.keyspaceID)
resp, err := c.Get(ctx, prefix, opt.WithPrefix())
if err != nil {
return nil, 0, err
}
Expand Down
24 changes: 12 additions & 12 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ import (

const (
defaultConsumptionChanSize = 1024

reservedDefaultGroupName = "default"
maxGroupNameLength = 32
middlePriority = 8
maxPriority = 16
unlimitedRate = math.MaxInt32
unlimitedBurstLimit = -1
maxGroupNameLength = 32
middlePriority = 8
maxPriority = 16
unlimitedRate = math.MaxInt32
unlimitedBurstLimit = -1
// DefaultResourceGroupName is the reserved default resource group name within each keyspace.
DefaultResourceGroupName = "default"
)

// consumptionItem is used to send the consumption info to the background metrics flusher.
Expand Down Expand Up @@ -96,13 +96,13 @@ func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name str

func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
krgm.RLock()
if _, ok := krgm.groups[reservedDefaultGroupName]; ok {
if _, ok := krgm.groups[DefaultResourceGroupName]; ok {
krgm.RUnlock()
return
}
krgm.RUnlock()
defaultGroup := &ResourceGroup{
Name: reservedDefaultGroupName,
Name: DefaultResourceGroupName,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &RequestUnitSettings{
RU: &GroupTokenBucket{
Expand Down Expand Up @@ -159,7 +159,7 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour
}

func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error {
if name == reservedDefaultGroupName {
if name == DefaultResourceGroupName {
return errs.ErrDeleteReservedGroup
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
Expand Down Expand Up @@ -190,7 +190,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ
krgm.RLock()
res := make([]*ResourceGroup, 0, len(krgm.groups))
for _, group := range krgm.groups {
if !includeDefault && group.Name == reservedDefaultGroupName {
if !includeDefault && group.Name == DefaultResourceGroupName {
continue
}
res = append(res, group.Clone(withStats))
Expand All @@ -207,7 +207,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupNames(includeDefault b
defer krgm.RUnlock()
res := make([]string, 0, len(krgm.groups))
for name := range krgm.groups {
if !includeDefault && name == reservedDefaultGroupName {
if !includeDefault && name == DefaultResourceGroupName {
continue
}
res = append(res, name)
Expand Down
10 changes: 5 additions & 5 deletions pkg/mcs/resourcemanager/server/keyspace_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ func TestInitDefaultResourceGroup(t *testing.T) {
re.Empty(krgm.groups)

// No default resource group initially.
_, exists := krgm.groups[reservedDefaultGroupName]
_, exists := krgm.groups[DefaultResourceGroupName]
re.False(exists)

// Initialize the default resource group.
krgm.initDefaultResourceGroup()

// Verify the default resource group is created.
defaultGroup, exists := krgm.groups[reservedDefaultGroupName]
defaultGroup, exists := krgm.groups[DefaultResourceGroupName]
re.True(exists)
re.Equal(reservedDefaultGroupName, defaultGroup.Name)
re.Equal(DefaultResourceGroupName, defaultGroup.Name)
re.Equal(rmpb.GroupMode_RUMode, defaultGroup.Mode)
re.Equal(uint32(middlePriority), defaultGroup.Priority)

Expand Down Expand Up @@ -183,11 +183,11 @@ func TestDeleteResourceGroup(t *testing.T) {

// Try to delete the default group.
krgm.initDefaultResourceGroup()
err = krgm.deleteResourceGroup(reservedDefaultGroupName)
err = krgm.deleteResourceGroup(DefaultResourceGroupName)
re.Error(err) // Should not be able to delete default group.

// Verify default group still exists.
re.NotNil(krgm.getResourceGroup(reservedDefaultGroupName, false))
re.NotNil(krgm.getResourceGroup(DefaultResourceGroupName, false))
}

func TestGetResourceGroup(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {

// DeleteResourceGroup deletes a resource group.
func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error {
// TODO: should we allow to delete default resource group?
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2100,7 +2100,7 @@ func (suite *ruleCheckerTestSuite) TestRemoveOrphanPeer() {
},
}
suite.ruleManager.SetRule(rule)
suite.ruleManager.DeleteRule("pd", "default")
suite.ruleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID)

// case1: regionA has 3 peers but not extra peer can be removed, so it needs to add peer first
suite.cluster.AddLeaderRegionWithRange(1, "200", "300", 1, 2, 3)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b
tc.SetRule(&placement.Rule{
GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
})
tc.RuleManager.DeleteRule("pd", "default")
tc.RuleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID)

tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 0)
Expand Down
Loading