Skip to content

Commit 6cd8a77

Browse files
mcs: enforce keyspace for Resource Group CRUD (#9342)
ref #9296 Signed-off-by: lhy1024 <admin@liudos.us> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 6e2c90b commit 6cd8a77

File tree

12 files changed

+242
-111
lines changed

12 files changed

+242
-111
lines changed

client/resource_group/controller/controller.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ type ResourceGroupProvider interface {
8787
ListResourceGroups(ctx context.Context, opts ...pd.GetResourceGroupOption) ([]*rmpb.ResourceGroup, error)
8888
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
8989
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
90-
DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...pd.DeleteResourceGroupOption) (string, error)
90+
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
9191
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
9292
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
9393

@@ -140,6 +140,7 @@ type ResourceGroupsController struct {
140140
provider ResourceGroupProvider
141141
groupsController sync.Map
142142
ruConfig *RUConfig
143+
keyspaceID uint32
143144

144145
loopCtx context.Context
145146
loopCancel func()
@@ -174,6 +175,7 @@ func NewResourceGroupController(
174175
clientUniqueID uint64,
175176
provider ResourceGroupProvider,
176177
requestUnitConfig *RequestUnitConfig,
178+
keyspaceID uint32,
177179
opts ...ResourceControlCreateOption,
178180
) (*ResourceGroupsController, error) {
179181
config, err := loadServerConfig(ctx, provider)
@@ -188,6 +190,7 @@ func NewResourceGroupController(
188190
controller := &ResourceGroupsController{
189191
clientUniqueID: clientUniqueID,
190192
provider: provider,
193+
keyspaceID: keyspaceID,
191194
ruConfig: ruConfig,
192195
lowTokenNotifyChan: make(chan notifyMsg, 1),
193196
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
@@ -197,7 +200,7 @@ func NewResourceGroupController(
197200
for _, opt := range opts {
198201
opt(controller)
199202
}
200-
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
203+
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig), zap.Uint32("keyspace-id", keyspaceID))
201204
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
202205
controller.safeRuConfig.Store(controller.ruConfig)
203206
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
@@ -272,7 +275,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
272275
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
273276
if !c.ruConfig.isSingleGroupByKeyspace {
274277
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
275-
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
278+
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
279+
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
276280
if err != nil {
277281
log.Warn("watch resource group meta failed", zap.Error(err))
278282
}
@@ -299,7 +303,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
299303
case <-watchRetryTimer.C:
300304
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
301305
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
302-
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
306+
prefix := pd.GroupSettingsPathPrefixBytes(c.keyspaceID)
307+
watchMetaChannel, err = c.provider.Watch(ctx, prefix, opt.WithRev(metaRevision), opt.WithPrefix(), opt.WithPrevKV())
303308
if err != nil {
304309
log.Warn("watch resource group meta failed", zap.Error(err))
305310
watchRetryTimer.Reset(watchRetryInterval)

client/resource_group/controller/controller_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
3333

3434
pd "github.com/tikv/pd/client"
35+
"github.com/tikv/pd/client/constants"
3536
"github.com/tikv/pd/client/errs"
3637
"github.com/tikv/pd/client/opt"
3738
)
@@ -210,7 +211,7 @@ func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, met
210211
return args.String(0), args.Error(1)
211212
}
212213

213-
func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string, _ ...pd.DeleteResourceGroupOption) (string, error) {
214+
func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
214215
args := m.Called(ctx, resourceGroupName)
215216
return args.String(0), args.Error(1)
216217
}
@@ -251,7 +252,7 @@ func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
251252
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")
252253

253254
mockProvider := newMockResourceGroupProvider()
254-
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
255+
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
255256
controller.Start(ctx)
256257

257258
defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
@@ -325,7 +326,7 @@ func TestTryGetController(t *testing.T) {
325326
defer cancel()
326327

327328
mockProvider := newMockResourceGroupProvider()
328-
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
329+
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil, constants.NullKeyspaceID)
329330
controller.Start(ctx)
330331

331332
defaultResourceGroup := &rmpb.ResourceGroup{Name: defaultResourceGroupName, Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}

client/resource_manager_client.go

Lines changed: 26 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package pd
1616

1717
import (
1818
"context"
19+
"fmt"
1920
"time"
2021

2122
"github.com/gogo/protobuf/proto"
@@ -34,14 +35,20 @@ import (
3435
type actionType int
3536

3637
const (
37-
add actionType = 0
38-
modify actionType = 1
39-
groupSettingsPathPrefix = "resource_group/settings"
40-
controllerConfigPathPrefix = "resource_group/controller"
38+
add actionType = 0
39+
modify actionType = 1
40+
groupSettingsPathPrefix = "resource_group/settings"
41+
keyspaceResourceGroupSettingPathPrefix = "resource_group/keyspace/settings/%d"
42+
controllerConfigPathPrefix = "resource_group/controller"
4143
)
4244

4345
// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
44-
var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix)
46+
func GroupSettingsPathPrefixBytes(keyspaceID uint32) []byte {
47+
if keyspaceID == constants.NullKeyspaceID {
48+
return []byte(groupSettingsPathPrefix)
49+
}
50+
return fmt.Appendf(nil, keyspaceResourceGroupSettingPathPrefix, keyspaceID)
51+
}
4552

4653
// ControllerConfigPathPrefixBytes is used to watch or get controller config.
4754
var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix)
@@ -52,16 +59,15 @@ type ResourceManagerClient interface {
5259
GetResourceGroup(ctx context.Context, resourceGroupName string, opts ...GetResourceGroupOption) (*rmpb.ResourceGroup, error)
5360
AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
5461
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
55-
DeleteResourceGroup(ctx context.Context, resourceGroupName string, opts ...DeleteResourceGroupOption) (string, error)
62+
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
5663
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)
5764
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
5865
Watch(ctx context.Context, key []byte, opts ...opt.MetaStorageOption) (chan []*meta_storagepb.Event, error)
5966
}
6067

6168
// GetResourceGroupOp represents available options when getting resource group.
6269
type GetResourceGroupOp struct {
63-
withRUStats bool
64-
withKeyspaceID *rmpb.KeyspaceIDValue
70+
withRUStats bool
6571
}
6672

6773
// GetResourceGroupOption configures GetResourceGroupOp.
@@ -72,32 +78,6 @@ func WithRUStats(op *GetResourceGroupOp) {
7278
op.withRUStats = true
7379
}
7480

75-
// WithKeyspaceID specifies to return resource group with keyspace id.
76-
func WithKeyspaceID(keyspaceID uint32) GetResourceGroupOption {
77-
return func(op *GetResourceGroupOp) {
78-
op.withKeyspaceID = &rmpb.KeyspaceIDValue{
79-
Value: keyspaceID,
80-
}
81-
}
82-
}
83-
84-
// DeleteResourceGroupOp represents available options when deleting resource group.
85-
type DeleteResourceGroupOp struct {
86-
withKeyspaceID *rmpb.KeyspaceIDValue
87-
}
88-
89-
// DeleteResourceGroupOption configures DeleteResourceGroupOp.
90-
type DeleteResourceGroupOption func(*DeleteResourceGroupOp)
91-
92-
// DeleteWithKeyspaceID specifies to delete resource group with keyspace id.
93-
func DeleteWithKeyspaceID(keyspaceID uint32) DeleteResourceGroupOption {
94-
return func(op *DeleteResourceGroupOp) {
95-
op.withKeyspaceID = &rmpb.KeyspaceIDValue{
96-
Value: keyspaceID,
97-
}
98-
}
99-
}
100-
10181
// resourceManagerClient gets the ResourceManager client of current PD leader.
10282
func (c *innerClient) resourceManagerClient() (rmpb.ResourceManagerClient, error) {
10383
cc, err := c.getOrCreateGRPCConn()
@@ -119,7 +99,9 @@ func (c *client) ListResourceGroups(ctx context.Context, ops ...GetResourceGroup
11999
}
120100
req := &rmpb.ListResourceGroupsRequest{
121101
WithRuStats: getOp.withRUStats,
122-
KeyspaceId: getOp.withKeyspaceID,
102+
KeyspaceId: &rmpb.KeyspaceIDValue{
103+
Value: c.inner.keyspaceID,
104+
},
123105
}
124106
resp, err := cc.ListResourceGroups(ctx, req)
125107
if err != nil {
@@ -146,7 +128,9 @@ func (c *client) GetResourceGroup(ctx context.Context, resourceGroupName string,
146128
req := &rmpb.GetResourceGroupRequest{
147129
ResourceGroupName: resourceGroupName,
148130
WithRuStats: getOp.withRUStats,
149-
KeyspaceId: getOp.withKeyspaceID,
131+
KeyspaceId: &rmpb.KeyspaceIDValue{
132+
Value: c.inner.keyspaceID,
133+
},
150134
}
151135
resp, err := cc.GetResourceGroup(ctx, req)
152136
if err != nil {
@@ -197,18 +181,16 @@ func (c *client) putResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceG
197181
}
198182

199183
// DeleteResourceGroup implements the ResourceManagerClient interface.
200-
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string, ops ...DeleteResourceGroupOption) (string, error) {
184+
func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
201185
cc, err := c.inner.resourceManagerClient()
202186
if err != nil {
203187
return "", err
204188
}
205-
deleteOp := &DeleteResourceGroupOp{}
206-
for _, op := range ops {
207-
op(deleteOp)
208-
}
209189
req := &rmpb.DeleteResourceGroupRequest{
210190
ResourceGroupName: resourceGroupName,
211-
KeyspaceId: deleteOp.withKeyspaceID,
191+
KeyspaceId: &rmpb.KeyspaceIDValue{
192+
Value: c.inner.keyspaceID,
193+
},
212194
}
213195
resp, err := cc.DeleteResourceGroup(ctx, req)
214196
if err != nil {
@@ -224,7 +206,8 @@ func (c *client) DeleteResourceGroup(ctx context.Context, resourceGroupName stri
224206

225207
// LoadResourceGroups implements the ResourceManagerClient interface.
226208
func (c *client) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
227-
resp, err := c.Get(ctx, GroupSettingsPathPrefixBytes, opt.WithPrefix())
209+
prefix := GroupSettingsPathPrefixBytes(c.inner.keyspaceID)
210+
resp, err := c.Get(ctx, prefix, opt.WithPrefix())
228211
if err != nil {
229212
return nil, 0, err
230213
}

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ import (
3232

3333
const (
3434
defaultConsumptionChanSize = 1024
35-
36-
reservedDefaultGroupName = "default"
37-
maxGroupNameLength = 32
38-
middlePriority = 8
39-
maxPriority = 16
40-
unlimitedRate = math.MaxInt32
41-
unlimitedBurstLimit = -1
35+
maxGroupNameLength = 32
36+
middlePriority = 8
37+
maxPriority = 16
38+
unlimitedRate = math.MaxInt32
39+
unlimitedBurstLimit = -1
40+
// DefaultResourceGroupName is the reserved default resource group name within each keyspace.
41+
DefaultResourceGroupName = "default"
4242
)
4343

4444
// consumptionItem is used to send the consumption info to the background metrics flusher.
@@ -96,13 +96,13 @@ func (krgm *keyspaceResourceGroupManager) setRawStatesIntoResourceGroup(name str
9696

9797
func (krgm *keyspaceResourceGroupManager) initDefaultResourceGroup() {
9898
krgm.RLock()
99-
if _, ok := krgm.groups[reservedDefaultGroupName]; ok {
99+
if _, ok := krgm.groups[DefaultResourceGroupName]; ok {
100100
krgm.RUnlock()
101101
return
102102
}
103103
krgm.RUnlock()
104104
defaultGroup := &ResourceGroup{
105-
Name: reservedDefaultGroupName,
105+
Name: DefaultResourceGroupName,
106106
Mode: rmpb.GroupMode_RUMode,
107107
RUSettings: &RequestUnitSettings{
108108
RU: &GroupTokenBucket{
@@ -159,7 +159,7 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour
159159
}
160160

161161
func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error {
162-
if name == reservedDefaultGroupName {
162+
if name == DefaultResourceGroupName {
163163
return errs.ErrDeleteReservedGroup
164164
}
165165
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
@@ -190,7 +190,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ
190190
krgm.RLock()
191191
res := make([]*ResourceGroup, 0, len(krgm.groups))
192192
for _, group := range krgm.groups {
193-
if !includeDefault && group.Name == reservedDefaultGroupName {
193+
if !includeDefault && group.Name == DefaultResourceGroupName {
194194
continue
195195
}
196196
res = append(res, group.Clone(withStats))
@@ -207,7 +207,7 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupNames(includeDefault b
207207
defer krgm.RUnlock()
208208
res := make([]string, 0, len(krgm.groups))
209209
for name := range krgm.groups {
210-
if !includeDefault && name == reservedDefaultGroupName {
210+
if !includeDefault && name == DefaultResourceGroupName {
211211
continue
212212
}
213213
res = append(res, name)

pkg/mcs/resourcemanager/server/keyspace_manager_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,16 @@ func TestInitDefaultResourceGroup(t *testing.T) {
3636
re.Empty(krgm.groups)
3737

3838
// No default resource group initially.
39-
_, exists := krgm.groups[reservedDefaultGroupName]
39+
_, exists := krgm.groups[DefaultResourceGroupName]
4040
re.False(exists)
4141

4242
// Initialize the default resource group.
4343
krgm.initDefaultResourceGroup()
4444

4545
// Verify the default resource group is created.
46-
defaultGroup, exists := krgm.groups[reservedDefaultGroupName]
46+
defaultGroup, exists := krgm.groups[DefaultResourceGroupName]
4747
re.True(exists)
48-
re.Equal(reservedDefaultGroupName, defaultGroup.Name)
48+
re.Equal(DefaultResourceGroupName, defaultGroup.Name)
4949
re.Equal(rmpb.GroupMode_RUMode, defaultGroup.Mode)
5050
re.Equal(uint32(middlePriority), defaultGroup.Priority)
5151

@@ -183,11 +183,11 @@ func TestDeleteResourceGroup(t *testing.T) {
183183

184184
// Try to delete the default group.
185185
krgm.initDefaultResourceGroup()
186-
err = krgm.deleteResourceGroup(reservedDefaultGroupName)
186+
err = krgm.deleteResourceGroup(DefaultResourceGroupName)
187187
re.Error(err) // Should not be able to delete default group.
188188

189189
// Verify default group still exists.
190-
re.NotNil(krgm.getResourceGroup(reservedDefaultGroupName, false))
190+
re.NotNil(krgm.getResourceGroup(DefaultResourceGroupName, false))
191191
}
192192

193193
func TestGetResourceGroup(t *testing.T) {

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {
273273

274274
// DeleteResourceGroup deletes a resource group.
275275
func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error {
276-
// TODO: should we allow to delete default resource group?
277276
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
278277
if krgm == nil {
279278
return errs.ErrKeyspaceNotExists.FastGenByArgs(keyspaceID)

pkg/schedule/checker/rule_checker_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2100,7 +2100,7 @@ func (suite *ruleCheckerTestSuite) TestRemoveOrphanPeer() {
21002100
},
21012101
}
21022102
suite.ruleManager.SetRule(rule)
2103-
suite.ruleManager.DeleteRule("pd", "default")
2103+
suite.ruleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID)
21042104

21052105
// case1: regionA has 3 peers but not extra peer can be removed, so it needs to add peer first
21062106
suite.cluster.AddLeaderRegionWithRange(1, "200", "300", 1, 2, 3)

pkg/schedule/schedulers/hot_region_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func checkHotWriteRegionPlacement(re *require.Assertions, enablePlacementRules b
310310
tc.SetRule(&placement.Rule{
311311
GroupID: "pd", ID: "voter", Role: placement.Follower, Count: 2, LabelConstraints: []placement.LabelConstraint{{Key: "zone", Op: "in", Values: []string{"z2"}}},
312312
})
313-
tc.RuleManager.DeleteRule("pd", "default")
313+
tc.RuleManager.DeleteRule(placement.DefaultGroupID, placement.DefaultRuleID)
314314

315315
tc.UpdateStorageWrittenBytes(1, 10*units.MiB*utils.StoreHeartBeatReportInterval)
316316
tc.UpdateStorageWrittenBytes(2, 0)

0 commit comments

Comments
 (0)