diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 4c9236bfd8a..ee100b218c9 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -87,7 +87,7 @@ type ResourceGroupProvider interface { ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) - DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.DeleteResourceGroupOption) (string, error) + DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) @@ -140,6 +140,7 @@ type ResourceGroupsController struct { provider ResourceGroupProvider groupsController sync.Map ruConfig *RUConfig + keyspaceID uint32 loopCtx context.Context loopCancel func() @@ -174,6 +175,7 @@ func NewResourceGroupController( clientUniqueID uint64, provider ResourceGroupProvider, requestUnitConfig *RequestUnitConfig, + keyspaceID uint32, opts ...ResourceControlCreateOption, ) (*ResourceGroupsController, error) { config, err := loadServerConfig(ctx, provider) @@ -188,6 +190,7 @@ func NewResourceGroupController( controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, provider: provider, + keyspaceID: keyspaceID, ruConfig: ruConfig, lowTokenNotifyChan: make(chan notifyMsg, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), @@ -197,7 +200,7 @@ func NewResourceGroupController( for _, opt := range opts { opt(controller) } - log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) + log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig), zap.Uint32("keyspace-id", keyspaceID)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} controller.safeRuConfig.Store(controller.ruConfig) enableControllerTraceLog.Store(config.EnableControllerTraceLog) @@ -272,7 +275,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event if !c.ruConfig.isSingleGroupByKeyspace { // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) + prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID) + watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) } @@ -299,7 +303,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) + prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID) + watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV()) if err != nil { log.Warn("watch resource group meta failed", zap.Error(err)) watchRetryTimer.Reset(watchRetryInterval) diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index c940496215d..4aba1dd7659 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -32,6 +32,7 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/constants" "github.com/tikv/pd/client/errs" "github.com/tikv/pd/client/opt" ) @@ -210,7 +211,7 @@ func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, met return args.String(0), args.Error(1) } -func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string, _ ...pd.DeleteResourceGroupOption) (string, error) { +func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { args := m.Called(ctx, resourceGroupName) return args.String(0), args.Error(1) } @@ -251,7 +252,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) { defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport") mockProvider := newMockResourceGroupProvider() - controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID) controller.Start(ctx) defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} @@ -325,7 +326,7 @@ func TestTryGetController(t *testing.T) { defer cancel() mockProvider := newMockResourceGroupProvider() - controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil) + controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID) controller.Start(ctx) defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 596bbcdda92..a5b9f063a5a 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -16,6 +16,7 @@ package pd import ( "context" + "fmt" "time" "github.com/gogo/protobuf/proto" @@ -34,14 +35,20 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 - groupSettingsPathPrefix = "resource_group/settings" - controllerConfigPathPrefix = "resource_group/controller" + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" + keyspaceResourceGroupSettingPathPrefix = "resource_group/keyspace/settings/%d" + controllerConfigPathPrefix = "resource_group/controller" ) // GroupSettingsPathPrefixBytes is used to watch or get resource groups. -var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) +func GroupSettingsPathPrefixBytes(keyspaceID uint32) []byte { + if keyspaceID == constants.NullKeyspaceID { + return []byte(groupSettingsPathPrefix) + } + return fmt.Appendf(nil, keyspaceResourceGroupSettingPathPrefix, keyspaceID) +} // ControllerConfigPathPrefixBytes is used to watch or get controller config. var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix) @@ -52,7 +59,7 @@ type ResourceManagerClient interface { GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...GetResourceGroupOption) (*rmpb.ResourceGroup, error) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) - DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...DeleteResourceGroupOption) (string, error) + DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error) @@ -60,8 +67,7 @@ type ResourceManagerClient interface { // GetResourceGroupOp represents available options when getting resource group. type GetResourceGroupOp struct { - withRUStats bool - withKeyspaceID *rmpb.KeyspaceIDValue + withRUStats bool } // GetResourceGroupOption configures GetResourceGroupOp. @@ -72,32 +78,6 @@ func WithRUStats(op *GetResourceGroupOp) { op.withRUStats = true } -// WithKeyspaceID specifies to return resource group with keyspace id. -func WithKeyspaceID(keyspaceID uint32) GetResourceGroupOption { - return func(op *GetResourceGroupOp) { - op.withKeyspaceID = &rmpb.KeyspaceIDValue{ - Value: keyspaceID, - } - } -} - -// DeleteResourceGroupOp represents available options when deleting resource group. -type DeleteResourceGroupOp struct { - withKeyspaceID *rmpb.KeyspaceIDValue -} - -// DeleteResourceGroupOption configures DeleteResourceGroupOp. -type DeleteResourceGroupOption func(*DeleteResourceGroupOp) - -// DeleteWithKeyspaceID specifies to delete resource group with keyspace id. -func DeleteWithKeyspaceID(keyspaceID uint32) DeleteResourceGroupOption { - return func(op *DeleteResourceGroupOp) { - op.withKeyspaceID = &rmpb.KeyspaceIDValue{ - Value: keyspaceID, - } - } -} - // resourceManagerClient gets the ResourceManager client of current PD leader. func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) { cc, err := c.getOrCreateGRPCConn() @@ -119,7 +99,9 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup } req := &rmpb.ListResourceGroupsRequest{ WithRuStats: getOp.withRUStats, - KeyspaceId: getOp.withKeyspaceID, + KeyspaceId: &rmpb.KeyspaceIDValue{ + Value: c.inner.keyspaceID, + }, } resp, err := cc.ListResourceGroups(ctx, req) if err != nil { @@ -146,7 +128,9 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string, req := &rmpb.GetResourceGroupRequest{ ResourceGroupName: resourceGroupName, WithRuStats: getOp.withRUStats, - KeyspaceId: getOp.withKeyspaceID, + KeyspaceId: &rmpb.KeyspaceIDValue{ + Value: c.inner.keyspaceID, + }, } resp, err := cc.GetResourceGroup(ctx, req) if err != nil { @@ -197,18 +181,16 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG } // DeleteResourceGroup implements the ResourceManagerClient interface. -func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string, ops ...DeleteResourceGroupOption) (string, error) { +func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) { cc, err := c.inner.resourceManagerClient() if err != nil { return "", err } - deleteOp := &DeleteResourceGroupOp{} - for _, op := range ops { - op(deleteOp) - } req := &rmpb.DeleteResourceGroupRequest{ ResourceGroupName: resourceGroupName, - KeyspaceId: deleteOp.withKeyspaceID, + KeyspaceId: &rmpb.KeyspaceIDValue{ + Value: c.inner.keyspaceID, + }, } resp, err := cc.DeleteResourceGroup(ctx, req) if err != nil { @@ -224,7 +206,8 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri // LoadResourceGroups implements the ResourceManagerClient interface. func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) { - resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, opt.WithPrefix()) + prefix := GroupSettingsPathPrefixBytes(c.inner.keyspaceID) + resp, err := c.Get(ctx, prefix, opt.WithPrefix()) if err != nil { return nil, 0, err } diff --git a/pkg/mcs/resourcemanager/server/keyspace_manager.go b/pkg/mcs/resourcemanager/server/keyspace_manager.go index 0fedd2e01af..0c5b7b707df 100644 --- a/pkg/mcs/resourcemanager/server/keyspace_manager.go +++ b/pkg/mcs/resourcemanager/server/keyspace_manager.go @@ -32,13 +32,13 @@ import ( const ( defaultConsumptionChanSize = 1024 - - reservedDefaultGroupName = "default" - maxGroupNameLength = 32 - middlePriority = 8 - maxPriority = 16 - unlimitedRate = math.MaxInt32 - unlimitedBurstLimit = -1 + maxGroupNameLength = 32 + middlePriority = 8 + maxPriority = 16 + unlimitedRate = math.MaxInt32 + unlimitedBurstLimit = -1 + // DefaultResourceGroupName is the reserved default resource group name within each keyspace. + DefaultResourceGroupName = "default" ) // consumptionItem is used to send the consumption info to the background metrics flusher. @@ -96,13 +96,13 @@ func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name str func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() { krgm.RLock() - if _, ok := krgm.groups[reservedDefaultGroupName]; ok { + if _, ok := krgm.groups[DefaultResourceGroupName]; ok { krgm.RUnlock() return } krgm.RUnlock() defaultGroup := &ResourceGroup{ - Name: reservedDefaultGroupName, + Name: DefaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &RequestUnitSettings{ RU: &GroupTokenBucket{ @@ -159,7 +159,7 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour } func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error { - if name == reservedDefaultGroupName { + if name == DefaultResourceGroupName { return errs.ErrDeleteReservedGroup } if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil { @@ -190,7 +190,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ krgm.RLock() res := make([]*ResourceGroup, 0, len(krgm.groups)) for _, group := range krgm.groups { - if !includeDefault && group.Name == reservedDefaultGroupName { + if !includeDefault && group.Name == DefaultResourceGroupName { continue } res = append(res, group.Clone(withStats)) @@ -207,7 +207,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupNames(includeDefault b defer krgm.RUnlock() res := make([]string, 0, len(krgm.groups)) for name := range krgm.groups { - if !includeDefault && name == reservedDefaultGroupName { + if !includeDefault && name == DefaultResourceGroupName { continue } res = append(res, name) diff --git a/pkg/mcs/resourcemanager/server/keyspace_manager_test.go b/pkg/mcs/resourcemanager/server/keyspace_manager_test.go index f3ddd755af2..5e4ea12fa15 100644 --- a/pkg/mcs/resourcemanager/server/keyspace_manager_test.go +++ b/pkg/mcs/resourcemanager/server/keyspace_manager_test.go @@ -36,16 +36,16 @@ func TestInitDefaultResourceGroup(t *testing.T) { re.Empty(krgm.groups) // No default resource group initially. - _, exists := krgm.groups[reservedDefaultGroupName] + _, exists := krgm.groups[DefaultResourceGroupName] re.False(exists) // Initialize the default resource group. krgm.initDefaultResourceGroup() // Verify the default resource group is created. - defaultGroup, exists := krgm.groups[reservedDefaultGroupName] + defaultGroup, exists := krgm.groups[DefaultResourceGroupName] re.True(exists) - re.Equal(reservedDefaultGroupName, defaultGroup.Name) + re.Equal(DefaultResourceGroupName, defaultGroup.Name) re.Equal(rmpb.GroupMode_RUMode, defaultGroup.Mode) re.Equal(uint32(middlePriority), defaultGroup.Priority) @@ -183,11 +183,11 @@ func TestDeleteResourceGroup(t *testing.T) { // Try to delete the default group. krgm.initDefaultResourceGroup() - err = krgm.deleteResourceGroup(reservedDefaultGroupName) + err = krgm.deleteResourceGroup(DefaultResourceGroupName) re.Error(err) // Should not be able to delete default group. // Verify default group still exists. - re.NotNil(krgm.getResourceGroup(reservedDefaultGroupName, false)) + re.NotNil(krgm.getResourceGroup(DefaultResourceGroupName, false)) } func TestGetResourceGroup(t *testing.T) { diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index fdd5ff55f65..0653980d51e 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -273,7 +273,6 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error { // DeleteResourceGroup deletes a resource group. func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error { - // TODO: should we allow to delete default resource group? krgm := m.getKeyspaceResourceGroupManager(keyspaceID) if krgm == nil { return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID) diff --git a/pkg/schedule/checker/rule_checker_test.go b/pkg/schedule/checker/rule_checker_test.go index cbe692f6452..305dc291f4a 100644 --- a/pkg/schedule/checker/rule_checker_test.go +++ b/pkg/schedule/checker/rule_checker_test.go @@ -2100,7 +2100,7 @@ func (suite *ruleCheckerTestSuite) TestRemoveOrphanPeer() { }, } suite.ruleManager.SetRule(rule) - suite.ruleManager.DeleteRule("pd", "default") + suite.ruleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID) // case1: regionA has 3 peers but not extra peer can be removed, so it needs to add peer first suite.cluster.AddLeaderRegionWithRange(1, "200", "300", 1, 2, 3) diff --git a/pkg/schedule/schedulers/hot_region_test.go b/pkg/schedule/schedulers/hot_region_test.go index b12e5ccf244..a980350d08e 100644 --- a/pkg/schedule/schedulers/hot_region_test.go +++ b/pkg/schedule/schedulers/hot_region_test.go @@ -310,7 +310,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b tc.SetRule(&placement.Rule{ GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}}, }) - tc.RuleManager.DeleteRule("pd", "default") + tc.RuleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID) tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenBytes(2, 0) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index fd1ab45066f..f4dc99edf04 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/log" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/constants" "github.com/tikv/pd/client/pkg/caller" "github.com/tikv/pd/client/resource_group/controller" sd "github.com/tikv/pd/client/servicediscovery" @@ -43,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tests" + "github.com/tikv/pd/tests/integrations/mcs" // Register Service _ "github.com/tikv/pd/pkg/mcs/registry" @@ -173,7 +175,7 @@ func (suite *resourceManagerClientTestSuite) cleanupResourceGroups(re *require.A re.NoError(err) for _, group := range groups { deleteResp, err := cli.DeleteResourceGroup(suite.ctx, group.GetName()) - if group.Name == "default" { + if group.Name == server.DefaultResourceGroupName { re.Contains(err.Error(), "cannot delete reserved group") continue } @@ -204,7 +206,8 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { }, }, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + controller, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) + re.NoError(err) controller.Start(suite.ctx) defer controller.Stop() @@ -279,7 +282,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { name := groupNamePrefix + strconv.Itoa(i) meta = controller.GetActiveResourceGroup(name) // The deleted resource group may not be immediately removed from the controller. - return meta == nil || meta.Name == "default" + return meta == nil || meta.Name == server.DefaultResourceGroupName }, testutil.WithTickInterval(50*time.Millisecond)) } } @@ -294,8 +297,10 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/disableWatch")) }() // Distinguish the controller with and without enabling `isSingleGroupByKeyspace`. - controllerKeySpace, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, controller.EnableSingleGroupByKeyspace()) - controller, _ := controller.NewResourceGroupController(suite.ctx, 2, cli, nil) + controllerKeySpace, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID, controller.EnableSingleGroupByKeyspace()) + re.NoError(err) + controller, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, constants.NullKeyspaceID) + re.NoError(err) controller.Start(suite.ctx) controllerKeySpace.Start(suite.ctx) defer controllerKeySpace.Stop() @@ -321,6 +326,7 @@ func (suite *resourceManagerClientTestSuite) TestWatchWithSingleGroupByKeyspace( tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} controller.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) meta := controller.GetActiveResourceGroup(group.Name) + re.NotNil(meta) re.Equal(meta.RUSettings.RU, group.RUSettings.RU) controllerKeySpace.OnRequestWait(suite.ctx, group.Name, tcs.makeReadRequest()) @@ -411,7 +417,8 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { CPUMsCost: 1, } - rgsController, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + rgsController, err := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, constants.NullKeyspaceID) + re.NoError(err) rgsController.Start(suite.ctx) defer rgsController.Stop() @@ -541,7 +548,8 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { }, }, } - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + controller, err := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, constants.NullKeyspaceID, controller.EnableSingleGroupByKeyspace()) + re.NoError(err) controller.Start(suite.ctx) resourceGroupName := suite.initGroups[1].Name tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 2, times: 100, waitDuration: 0} @@ -609,7 +617,7 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() { resourceGroupName2 := suite.initGroups[2].Name tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0} wreq := tcs.makeWriteRequest() - _, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) + _, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)")) @@ -666,7 +674,8 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() { WriteCostPerByte: 1, CPUMsCost: 1, } - c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, controller.EnableSingleGroupByKeyspace()) + c, err := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, constants.NullKeyspaceID, controller.EnableSingleGroupByKeyspace()) + re.NoError(err) c.Start(suite.ctx) resourceGroupName := groupNames[0] @@ -955,7 +964,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { for _, g := range lresp { // Delete Resource Group dresp, err := cli.DeleteResourceGroup(suite.ctx, g.Name) - if g.Name == "default" { + if g.Name == server.DefaultResourceGroupName { re.Contains(err.Error(), "cannot delete reserved group") continue } @@ -1048,7 +1057,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() { respString, err := io.ReadAll(resp.Body) resp.Body.Close() re.NoError(err) - if g.Name == "default" { + if g.Name == server.DefaultResourceGroupName { re.Contains(string(respString), "cannot delete reserved group") continue } @@ -1233,7 +1242,8 @@ func (suite *resourceManagerClientTestSuite) TestResourceManagerClientDegradedMo } re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/resourcemanager/server/acquireFailed", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/degradedModeRU", "return(true)")) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + controller, err := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg, constants.NullKeyspaceID) + re.NoError(err) controller.Start(suite.ctx) tc := tokenConsumptionPerSecond{ rruTokensAtATime: 0, @@ -1265,7 +1275,7 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { re := suite.Require() cli := suite.client // Test load from resource manager. - ctr, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + ctr, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) re.NoError(err) config := ctr.GetConfig() re.NotNil(config) @@ -1283,7 +1293,7 @@ func (suite *resourceManagerClientTestSuite) TestLoadRequestUnitConfig() { WriteCostPerByte: 4, CPUMsCost: 5, } - ctr, err = controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig) + ctr, err = controller.NewResourceGroupController(suite.ctx, 1, cli, ruConfig, constants.NullKeyspaceID) re.NoError(err) config = ctr.GetConfig() re.NotNil(config) @@ -1321,7 +1331,8 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re.Contains(resp, "Success!") re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/fastCleanup", `return(true)`)) - controller, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + controller, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) + re.NoError(err) controller.Start(suite.ctx) testConfig := struct { @@ -1386,7 +1397,8 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { re.NoError(err) re.Contains(resp, "Success!") - c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + c, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) + re.NoError(err) c.Start(suite.ctx) resourceGroupName := enableBackgroundGroup(false) @@ -1405,7 +1417,7 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { // modify `Default` to check fallback. resp, err = cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ - Name: "default", + Name: server.DefaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{ RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{}}, @@ -1416,7 +1428,7 @@ func (suite *resourceManagerClientTestSuite) TestCheckBackgroundJobs() { re.Contains(resp, "Success!") // wait for watch event modify. testutil.Eventually(re, func() bool { - meta := c.GetActiveResourceGroup("default") + meta := c.GetActiveResourceGroup(server.DefaultResourceGroupName) if meta != nil && meta.BackgroundSettings != nil { return len(meta.BackgroundSettings.JobTypes) == 2 } @@ -1454,11 +1466,11 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh re.NoError(err) re.Contains(resp, "Success!") - c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) re.NoError(err) c1.Start(suite.ctx) // with client option - c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) + c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, constants.NullKeyspaceID, controller.WithMaxWaitDuration(time.Hour)) re.NoError(err) c2.Start(suite.ctx) // helper function for sending HTTP requests and checking responses @@ -1559,7 +1571,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh } // restart c1 c1.Stop() - c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil, constants.NullKeyspaceID) re.NoError(err) re.Equal(expectRUCfg, c1.GetConfig()) } @@ -1568,6 +1580,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() re := suite.Require() cli := suite.client keyspaceID := uint32(1) + clientKeyspace := mcs.SetupClientWithKeyspaceID( + suite.ctx, + re, + keyspaceID, + suite.cluster.GetConfig().GetClientURLs(), + ) + defer clientKeyspace.Close() // Add resource group group := &rmpb.ResourceGroup{ @@ -1594,27 +1613,23 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() rgs, err := cli.ListResourceGroups(suite.ctx) re.NoError(err) re.Len(rgs, 1) - re.Equal("default", rgs[0].Name) + re.Equal(server.DefaultResourceGroupName, rgs[0].Name) // Get and List resource group with keyspace id - opts := []pd.GetResourceGroupOption{ - pd.WithKeyspaceID(keyspaceID), - pd.WithRUStats, - } - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.NoError(err) re.NotNil(rg) - rgs, err = cli.ListResourceGroups(suite.ctx, opts...) + rgs, err = clientKeyspace.ListResourceGroups(suite.ctx, pd.WithRUStats) re.NoError(err) re.Len(rgs, 1) re.Equal(rgs[0].Name, group.Name) // Modify resource group with keyspace id group.RUSettings.RU.Settings.FillRate = 1000 - resp, err = cli.ModifyResourceGroup(suite.ctx, group) + resp, err = clientKeyspace.ModifyResourceGroup(suite.ctx, group) re.NoError(err) re.Contains(resp, "Success!") - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.NoError(err) re.Equal(group.RUSettings.RU.Settings.FillRate, rg.RUSettings.RU.Settings.FillRate) @@ -1639,19 +1654,19 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() TargetRequestPeriodMs: 1000, ClientUniqueId: 1, } - _, err = cli.AcquireTokenBuckets(suite.ctx, req) + _, err = clientKeyspace.AcquireTokenBuckets(suite.ctx, req) re.NoError(err) time.Sleep(10 * time.Millisecond) - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.NoError(err) re.NotEqual(rg.RUStats, testConsumption) // Test AcquireTokenBuckets with keyspace id req.Requests[0].KeyspaceId = &rmpb.KeyspaceIDValue{Value: keyspaceID} - _, err = cli.AcquireTokenBuckets(suite.ctx, req) + _, err = clientKeyspace.AcquireTokenBuckets(suite.ctx, req) re.NoError(err) time.Sleep(10 * time.Millisecond) - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.NoError(err) re.Equal(rg.RUStats, testConsumption) @@ -1659,15 +1674,136 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupCURDWithKeyspace() resp, err = cli.DeleteResourceGroup(suite.ctx, group.Name) re.NoError(err) re.Contains(resp, "Success!") - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.NoError(err) re.NotNil(rg) // Delete resource group with keyspace id - resp, err = cli.DeleteResourceGroup(suite.ctx, group.Name, pd.DeleteWithKeyspaceID(keyspaceID)) + resp, err = clientKeyspace.DeleteResourceGroup(suite.ctx, group.Name) re.NoError(err) re.Contains(resp, "Success!") - rg, err = cli.GetResourceGroup(suite.ctx, group.Name, opts...) + rg, err = clientKeyspace.GetResourceGroup(suite.ctx, group.Name, pd.WithRUStats) re.EqualError(err, fmt.Sprintf("get resource group %v failed, rpc error: code = Unknown desc = resource group not found", group.Name)) re.Nil(rg) } + +func (suite *resourceManagerClientTestSuite) TestLoadAndWatchWithDifferentKeyspace() { + re := suite.Require() + keyspaces := []uint32{1, 2, constants.NullKeyspaceID} + genGroupByKeyspace := func(keyspace uint32) *rmpb.ResourceGroup { + return &rmpb.ResourceGroup{ + Name: fmt.Sprintf("keyspace_test_%d", keyspace), + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 10000, + }, + Tokens: 100000, + }, + }, + KeyspaceId: &rmpb.KeyspaceIDValue{Value: keyspace}, + } + } + + // Create clients for different keyspaces + clients := map[uint32]pd.Client{} + for _, keyspace := range keyspaces { + if keyspace == constants.NullKeyspaceID { + clients[keyspace] = suite.client + continue + } + cli := mcs.SetupClientWithKeyspaceID( + suite.ctx, + re, + keyspace, + suite.cluster.GetConfig().GetClientURLs(), + ) + clients[keyspace] = cli + } + + // Add resource groups with different keyspaces + for _, keyspace := range keyspaces { + cli := clients[keyspace] + group := genGroupByKeyspace(keyspace) + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + // Create controllers for different keyspaces + // Test to load resource groups with different keyspaces + clientID := uint64(1) + tcs := tokenConsumptionPerSecond{rruTokensAtATime: 100} + controllers := map[uint32]*controller.ResourceGroupsController{} + for _, keyspace := range keyspaces { + cli := clients[keyspace] + c, err := controller.NewResourceGroupController(suite.ctx, clientID, cli, nil, keyspace) + re.NoError(err) + controllers[keyspace] = c + c.Start(suite.ctx) + for _, keyspaceToFind := range keyspaces { + groupToFind := genGroupByKeyspace(keyspaceToFind) + testutil.Eventually(re, func() bool { + c.OnRequestWait(suite.ctx, groupToFind.Name, tcs.makeReadRequest()) + meta := c.GetActiveResourceGroup(groupToFind.Name) + if keyspaceToFind == keyspace { + return meta != nil && + meta.Name == groupToFind.Name && + meta.RUSettings.RU.Settings.FillRate == groupToFind.RUSettings.RU.Settings.FillRate + } + return meta == nil + }) + } + clientID += 1 + } + + // Modify resource groups with different keyspaces + fillRate := uint64(12345) + for _, keyspace := range keyspaces { + cli := clients[keyspace] + group := genGroupByKeyspace(keyspace) + group.RUSettings.RU.Settings.FillRate = fillRate + resp, err := cli.ModifyResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + // Test to watch resource groups with different keyspaces + for _, keyspace := range keyspaces { + c := controllers[keyspace] + group := genGroupByKeyspace(keyspace) + testutil.Eventually(re, func() bool { + meta := c.GetActiveResourceGroup(group.Name) + return meta.RUSettings.RU.Settings.FillRate == fillRate + }) + } + + // Delete resource groups with different keyspaces + for _, keyspace := range keyspaces { + cli := clients[keyspace] + group := genGroupByKeyspace(keyspace) + resp, err := cli.DeleteResourceGroup(suite.ctx, group.Name) + re.NoError(err) + re.Contains(resp, "Success!") + } + + // Test to watch resource groups with different keyspaces + for _, keyspace := range keyspaces { + c := controllers[keyspace] + group := genGroupByKeyspace(keyspace) + testutil.Eventually(re, func() bool { + meta := c.GetActiveResourceGroup(group.Name) + return meta == nil + }) + } + + // Stop controllers and close clients + for _, keyspace := range keyspaces { + controllers[keyspace].Stop() + if keyspace == constants.NullKeyspaceID { + continue + } + clients[keyspace].Close() + } +} diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 30cdf8e63fc..a4d10909a79 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -67,7 +67,7 @@ func (suite *ruleTestSuite) TearDownTest() { def := placement.GroupBundle{ ID: "pd", Rules: []*placement.Rule{ - {GroupID: "pd", ID: "default", Role: "voter", Count: 3}, + {GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: "voter", Count: 3}, }, } data, err := json.Marshal([]placement.GroupBundle{def}) diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index 775adea8094..51b9fededac 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -88,7 +88,7 @@ func (suite *configTestSuite) TearDownTest() { def := placement.GroupBundle{ ID: "pd", Rules: []*placement.Rule{ - {GroupID: "pd", ID: "default", Role: "voter", Count: 3}, + {GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: "voter", Count: 3}, }, } data, err := json.Marshal([]placement.GroupBundle{def}) @@ -825,7 +825,7 @@ func (suite *configTestSuite) checkPlacementRuleBundle(cluster *pdTests.TestClus bundles = []placement.GroupBundle{{ ID: "pd", Rules: []*placement.Rule{ - {GroupID: "pd", ID: "default", Role: "voter", Count: 3}, + {GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: "voter", Count: 3}, }, }} b, err = json.Marshal(bundles) @@ -837,7 +837,7 @@ func (suite *configTestSuite) checkPlacementRuleBundle(cluster *pdTests.TestClus re.NoError(err) checkLoadRuleBundle(re, pdAddr, fname, []placement.GroupBundle{ - {ID: "pd", Index: 0, Override: false, Rules: []*placement.Rule{{GroupID: "pd", ID: placement.DefaultRuleID, Role: placement.Voter, Count: 3}}}, + {ID: "pd", Index: 0, Override: false, Rules: []*placement.Rule{{GroupID: placement.DefaultGroupID, ID: placement.DefaultRuleID, Role: placement.Voter, Count: 3}}}, }) } diff --git a/tools/pd-ut/testdata/group.9.etcdkey b/tools/pd-ut/testdata/group.9.etcdkey index f020f349501..631ec7ea649 100644 --- a/tools/pd-ut/testdata/group.9.etcdkey +++ b/tools/pd-ut/testdata/group.9.etcdkey @@ -79,10 +79,14 @@ resource_group/controller get resource_group/controller save resource_group/controller watch resource_group/keyspace/settings/ get +resource_group/keyspace/settings/ watch resource_group/keyspace/settings//keyspace_test remove resource_group/keyspace/settings//keyspace_test save +resource_group/keyspace/settings//keyspace_test_ remove +resource_group/keyspace/settings//keyspace_test_ save resource_group/keyspace/states/ get resource_group/keyspace/states//keyspace_test save +resource_group/keyspace/states//keyspace_test_ save resource_group/settings watch resource_group/settings/ get resource_group/settings/CRUD_test remove @@ -100,6 +104,8 @@ resource_group/settings/failover_test remove resource_group/settings/failover_test save resource_group/settings/keyspace_test remove resource_group/settings/keyspace_test save +resource_group/settings/keyspace_test_ remove +resource_group/settings/keyspace_test_ save resource_group/settings/mode_test remove resource_group/settings/mode_test save resource_group/settings/penalty_test remove @@ -123,6 +129,7 @@ resource_group/states/controller_test save resource_group/states/default save resource_group/states/failover_test save resource_group/states/keyspace_test save +resource_group/states/keyspace_test_ save resource_group/states/mode_test save resource_group/states/penalty_test save resource_group/states/stale_test save