Skip to content

Commit 67b518d

Browse files
feat(resourcemanager): add service limit for keyspace (#9354)
ref #9296 Add a keyspace-level service limiter to control the maximum request rate for each keyspace. The service limit is an unburstable RU per second rate limit that applies across all resource groups within a keyspace. Signed-off-by: JmPotato <github@ipotato.me> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent 3bc8b2c commit 67b518d

File tree

11 files changed

+836
-12
lines changed

11 files changed

+836
-12
lines changed

pkg/mcs/resourcemanager/server/apis/v1/api.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ func (s *Service) RegisterRouter() {
9999
configEndpoint.DELETE("/group/:name", s.deleteResourceGroup)
100100
configEndpoint.GET("/controller", s.getControllerConfig)
101101
configEndpoint.POST("/controller", s.setControllerConfig)
102+
// Without keyspace name, it will get/set the service limit of the null keyspace.
103+
configEndpoint.POST("/keyspace/service-limit", s.setKeyspaceServiceLimit)
104+
configEndpoint.GET("/keyspace/service-limit", s.getKeyspaceServiceLimit)
105+
// With keyspace name, it will get/set the service limit of the given keyspace.
106+
configEndpoint.POST("/keyspace/service-limit/:keyspace_name", s.setKeyspaceServiceLimit)
107+
configEndpoint.GET("/keyspace/service-limit/:keyspace_name", s.getKeyspaceServiceLimit)
102108
}
103109

104110
func (s *Service) handler() http.Handler {
@@ -239,3 +245,62 @@ func (s *Service) setControllerConfig(c *gin.Context) {
239245
}
240246
c.String(http.StatusOK, "Success!")
241247
}
248+
249+
// KeyspaceServiceLimitRequest is the request body for setting the service limit of the keyspace.
250+
type KeyspaceServiceLimitRequest struct {
251+
ServiceLimit float64 `json:"service_limit"`
252+
}
253+
254+
// SetKeyspaceServiceLimit
255+
//
256+
// @Tags ResourceManager
257+
// @Summary Set the service limit of the keyspace. If the keyspace is valid, the service limit will be set.
258+
// @Param keyspace_name path string true "Keyspace name"
259+
// @Param service_limit body object true "json params, keyspaceServiceLimitRequest"
260+
// @Success 200 {string} string "Success!"
261+
// @Failure 400 {string} error
262+
// @Router /config/keyspace/service-limit/{keyspace_name} [post]
263+
func (s *Service) setKeyspaceServiceLimit(c *gin.Context) {
264+
keyspaceName := c.Param("keyspace_name")
265+
keyspaceIDValue, err := s.manager.GetKeyspaceIDByName(c, keyspaceName)
266+
if err != nil {
267+
c.String(http.StatusBadRequest, err.Error())
268+
return
269+
}
270+
var req KeyspaceServiceLimitRequest
271+
if err := c.ShouldBindJSON(&req); err != nil {
272+
c.String(http.StatusBadRequest, err.Error())
273+
return
274+
}
275+
if req.ServiceLimit < 0 {
276+
c.String(http.StatusBadRequest, "service_limit must be non-negative")
277+
return
278+
}
279+
s.manager.SetKeyspaceServiceLimit(keyspaceIDValue.GetValue(), req.ServiceLimit)
280+
c.String(http.StatusOK, "Success!")
281+
}
282+
283+
// GetKeyspaceServiceLimit
284+
//
285+
// @Tags ResourceManager
286+
// @Summary Get the service limit of the keyspace. If the keyspace name is empty, it will return the service limit of the null keyspace.
287+
// @Param keyspace_name path string true "Keyspace name"
288+
// @Success 200 {string} json format of rmserver.serviceLimiter
289+
// @Failure 400 {string} error
290+
// @Failure 404 {string} error
291+
// @Router /config/keyspace/service-limit/{keyspace_name} [get]
292+
func (s *Service) getKeyspaceServiceLimit(c *gin.Context) {
293+
keyspaceName := c.Param("keyspace_name")
294+
keyspaceIDValue, err := s.manager.GetKeyspaceIDByName(c, keyspaceName)
295+
if err != nil {
296+
c.String(http.StatusBadRequest, err.Error())
297+
return
298+
}
299+
keyspaceID := keyspaceIDValue.GetValue()
300+
limiter := s.manager.GetKeyspaceServiceLimiter(keyspaceID)
301+
if limiter == nil {
302+
c.String(http.StatusNotFound, fmt.Sprintf("keyspace manager not found with keyspace name: %s, id: %d", keyspaceName, keyspaceID))
303+
return
304+
}
305+
c.IndentedJSON(http.StatusOK, limiter)
306+
}

pkg/mcs/resourcemanager/server/grpc_service.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,27 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
174174
if err := s.checkServing(); err != nil {
175175
return err
176176
}
177-
targetPeriodMs := request.GetTargetRequestPeriodMs()
178-
clientUniqueID := request.GetClientUniqueId()
179-
resps := &rmpb.TokenBucketsResponse{}
177+
var (
178+
targetPeriodMs = request.GetTargetRequestPeriodMs()
179+
clientUniqueID = request.GetClientUniqueId()
180+
resps = &rmpb.TokenBucketsResponse{}
181+
logFields = make([]zap.Field, 2)
182+
)
180183
for _, req := range request.Requests {
184+
keyspaceID := extractKeyspaceID(req.GetKeyspaceId())
181185
resourceGroupName := req.GetResourceGroupName()
186+
logFields[0] = zap.Uint32("keyspace-id", keyspaceID)
187+
logFields[1] = zap.String("resource-group", resourceGroupName)
188+
// Get keyspace resource group manager to apply service limit later.
189+
krgm := s.manager.getKeyspaceResourceGroupManager(keyspaceID)
190+
if krgm == nil {
191+
log.Warn("keyspace resource group manager not found", logFields...)
192+
continue
193+
}
182194
// Get the resource group from manager to acquire token buckets.
183-
rg := s.manager.GetMutableResourceGroup(extractKeyspaceID(req.GetKeyspaceId()), resourceGroupName)
195+
rg := s.manager.GetMutableResourceGroup(keyspaceID, resourceGroupName)
184196
if rg == nil {
185-
log.Warn("resource group not found", zap.String("resource-group", resourceGroupName))
197+
log.Warn("resource group not found", logFields...)
186198
continue
187199
}
188200
// Send the consumption to update the metrics.
@@ -202,18 +214,19 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
202214
var tokens *rmpb.GrantedRUTokenBucket
203215
for _, re := range req.GetRuItems().GetRequestRU() {
204216
if re.Type == rmpb.RequestUnitType_RU {
205-
tokens = rg.RequestRU(now, re.Value, targetPeriodMs, clientUniqueID)
217+
tokens = rg.RequestRU(now, re.Value, targetPeriodMs, clientUniqueID, krgm.getServiceLimiter())
206218
}
207219
if tokens == nil {
208220
continue
209221
}
210222
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
211223
}
212224
case rmpb.GroupMode_RawMode:
213-
log.Warn("not supports the resource type", zap.String("resource-group", resourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)]))
225+
log.Warn("not supports the resource type",
226+
append(logFields, zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)]))...)
214227
continue
215228
}
216-
log.Debug("finish token request from", zap.String("resource-group", resourceGroupName))
229+
log.Debug("finish token request from", logFields...)
217230
resps.Responses = append(resps.Responses, resp)
218231
}
219232
if err := stream.Send(resps); err != nil {

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,15 @@ type keyspaceResourceGroupManager struct {
5656

5757
keyspaceID uint32
5858
storage endpoint.ResourceGroupStorage
59+
sl *serviceLimiter
5960
}
6061

6162
func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.ResourceGroupStorage) *keyspaceResourceGroupManager {
6263
return &keyspaceResourceGroupManager{
6364
groups: make(map[string]*ResourceGroup),
6465
keyspaceID: keyspaceID,
6566
storage: storage,
67+
sl: newServiceLimiter(keyspaceID, 0),
6668
}
6769
}
6870

@@ -237,3 +239,17 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
237239
krgm.RUnlock()
238240
}
239241
}
242+
243+
func (krgm *keyspaceResourceGroupManager) setServiceLimiter(serviceLimit float64) {
244+
krgm.RLock()
245+
sl := krgm.sl
246+
krgm.RUnlock()
247+
// Set the new service limit to the limiter.
248+
sl.setServiceLimit(serviceLimit)
249+
}
250+
251+
func (krgm *keyspaceResourceGroupManager) getServiceLimiter() *serviceLimiter {
252+
krgm.RLock()
253+
defer krgm.RUnlock()
254+
return krgm.sl
255+
}

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ type Manager struct {
6767
consumptionDispatcher chan *consumptionItem
6868
// cached keyspace name for each keyspace ID.
6969
keyspaceNameLookup map[uint32]string
70+
// used to get the keyspace ID by name.
71+
keyspaceIDLookup map[string]uint32
7072
}
7173

7274
// ConfigProvider is used to get resource manager config from the given
@@ -83,6 +85,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager {
8385
krgms: make(map[uint32]*keyspaceResourceGroupManager),
8486
consumptionDispatcher: make(chan *consumptionItem, defaultConsumptionChanSize),
8587
keyspaceNameLookup: make(map[uint32]string),
88+
keyspaceIDLookup: make(map[string]uint32),
8689
}
8790
// The first initialization after the server is started.
8891
srv.AddStartCallback(func() {
@@ -116,6 +119,21 @@ func (m *Manager) GetStorage() endpoint.ResourceGroupStorage {
116119
return m.storage
117120
}
118121

122+
// GetKeyspaceServiceLimiter returns the service limit of the keyspace.
123+
func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
124+
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
125+
if krgm == nil {
126+
return nil
127+
}
128+
return krgm.getServiceLimiter().Clone()
129+
}
130+
131+
// SetKeyspaceServiceLimit sets the service limit of the keyspace.
132+
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) {
133+
// If the keyspace is not found, create a new keyspace resource group manager.
134+
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimiter(serviceLimit)
135+
}
136+
119137
func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager {
120138
m.Lock()
121139
krgm, ok := m.krgms[keyspaceID]
@@ -369,7 +387,9 @@ func (m *Manager) getKeyspaceNameByID(ctx context.Context, id uint32) (string, e
369387
return "", nil
370388
}
371389
// Try to get the keyspace name from the cache first.
390+
m.RLock()
372391
name, ok := m.keyspaceNameLookup[id]
392+
m.RUnlock()
373393
if ok {
374394
return name, nil
375395
}
@@ -391,10 +411,51 @@ func (m *Manager) getKeyspaceNameByID(ctx context.Context, id uint32) (string, e
391411
return "", fmt.Errorf("got an empty keyspace name by id %d", id)
392412
}
393413
// Update the cache.
394-
m.keyspaceNameLookup[id] = loadedName
414+
m.updateKeyspaceNameLookup(id, loadedName)
395415
return loadedName, nil
396416
}
397417

418+
func (m *Manager) updateKeyspaceNameLookup(id uint32, name string) {
419+
m.Lock()
420+
defer m.Unlock()
421+
m.keyspaceNameLookup[id] = name
422+
m.keyspaceIDLookup[name] = id
423+
}
424+
425+
// GetKeyspaceIDByName gets the keyspace ID by name.
426+
func (m *Manager) GetKeyspaceIDByName(ctx context.Context, name string) (*rmpb.KeyspaceIDValue, error) {
427+
if len(name) == 0 {
428+
return &rmpb.KeyspaceIDValue{Value: constant.NullKeyspaceID}, nil
429+
}
430+
m.RLock()
431+
id, ok := m.keyspaceIDLookup[name]
432+
m.RUnlock()
433+
if ok {
434+
return &rmpb.KeyspaceIDValue{Value: id}, nil
435+
}
436+
var (
437+
loadedID uint32
438+
err error
439+
)
440+
err = m.storage.RunInTxn(ctx, func(txn kv.Txn) error {
441+
ok, loadedID, err = m.storage.LoadKeyspaceID(txn, name)
442+
if err != nil {
443+
return err
444+
}
445+
return nil
446+
})
447+
if err != nil {
448+
log.Error("failed to get the keyspace id", zap.String("keyspace-name", name), zap.Error(err))
449+
return nil, err
450+
}
451+
if !ok {
452+
return nil, fmt.Errorf("keyspace not found with name: %s", name)
453+
}
454+
// Update the cache.
455+
m.updateKeyspaceNameLookup(loadedID, name)
456+
return &rmpb.KeyspaceIDValue{Value: loadedID}, nil
457+
}
458+
398459
func (m *Manager) backgroundMetricsFlush(ctx context.Context) {
399460
defer logutil.LogPanic()
400461
defer m.wg.Done()

pkg/mcs/resourcemanager/server/manager_test.go

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,10 @@ func prepareKeyspaceName(ctx context.Context, re *require.Assertions, manager *M
164164
}
165165
err := manager.storage.RunInTxn(ctx, func(txn kv.Txn) error {
166166
err := manager.storage.SaveKeyspaceMeta(txn, keyspaceMeta)
167-
return err
167+
if err != nil {
168+
return err
169+
}
170+
return manager.storage.SaveKeyspaceID(txn, keyspaceMeta.Id, keyspaceMeta.Name)
168171
})
169172
re.NoError(err)
170173
}
@@ -262,3 +265,106 @@ func TestCleanUpTicker(t *testing.T) {
262265
re.NoError(err)
263266
re.Equal("test_keyspace", keyspaceName)
264267
}
268+
269+
func TestKeyspaceServiceLimit(t *testing.T) {
270+
re := require.New(t)
271+
272+
storage := storage.NewStorageWithMemoryBackend()
273+
m := NewManager[*mockConfigProvider](&mockConfigProvider{})
274+
m.storage = storage
275+
276+
ctx, cancel := context.WithCancel(context.Background())
277+
defer cancel()
278+
err := m.Init(ctx)
279+
re.NoError(err)
280+
// Test the default service limit is 0.0.
281+
limiter := m.GetKeyspaceServiceLimiter(constant.NullKeyspaceID)
282+
re.NotNil(limiter)
283+
re.Equal(0.0, limiter.ServiceLimit)
284+
re.Equal(0.0, limiter.AvailableTokens)
285+
group := &rmpb.ResourceGroup{
286+
Name: "test_group",
287+
Mode: rmpb.GroupMode_RUMode,
288+
Priority: 5,
289+
RUSettings: &rmpb.GroupRequestUnitSettings{
290+
RU: &rmpb.TokenBucket{
291+
Settings: &rmpb.TokenLimitSettings{
292+
FillRate: 100,
293+
BurstLimit: 200,
294+
},
295+
},
296+
},
297+
KeyspaceId: &rmpb.KeyspaceIDValue{Value: 1},
298+
}
299+
// Test the limiter of the non-existing keyspace is nil.
300+
limiter = m.GetKeyspaceServiceLimiter(group.KeyspaceId.Value)
301+
re.Nil(limiter)
302+
// Test the limiter of the newly created keyspace is 0.0.
303+
err = m.AddResourceGroup(group)
304+
re.NoError(err)
305+
limiter = m.GetKeyspaceServiceLimiter(1)
306+
re.Equal(0.0, limiter.ServiceLimit)
307+
re.Equal(0.0, limiter.AvailableTokens)
308+
// Test set the service limit of the keyspace.
309+
m.SetKeyspaceServiceLimit(1, 100.0)
310+
limiter = m.GetKeyspaceServiceLimiter(1)
311+
re.Equal(100.0, limiter.ServiceLimit)
312+
re.Equal(0.0, limiter.AvailableTokens) // When setting from 0 to positive, available tokens remain 0
313+
// Test set the service limit of the non-existing keyspace.
314+
limiter = m.GetKeyspaceServiceLimiter(2)
315+
re.Nil(limiter)
316+
m.SetKeyspaceServiceLimit(2, 100.0)
317+
limiter = m.GetKeyspaceServiceLimiter(2)
318+
re.Equal(100.0, limiter.ServiceLimit)
319+
re.Equal(0.0, limiter.AvailableTokens)
320+
// Ensure the keyspace resource group manager is initialized correctly.
321+
krgm := m.getKeyspaceResourceGroupManager(2)
322+
re.NotNil(krgm)
323+
re.Equal(uint32(2), krgm.keyspaceID)
324+
re.Equal(DefaultResourceGroupName, krgm.getMutableResourceGroup(DefaultResourceGroupName).Name)
325+
}
326+
327+
func TestKeyspaceNameLookup(t *testing.T) {
328+
re := require.New(t)
329+
m := prepareManager()
330+
331+
ctx, cancel := context.WithCancel(context.Background())
332+
defer cancel()
333+
err := m.Init(ctx)
334+
re.NoError(err)
335+
// Get the null keyspace ID by an empty name.
336+
idValue, err := m.GetKeyspaceIDByName(ctx, "")
337+
re.NoError(err)
338+
re.NotNil(idValue)
339+
re.Equal(constant.NullKeyspaceID, idValue.Value)
340+
// Get the non-existing keyspace ID by name.
341+
idValue, err = m.GetKeyspaceIDByName(ctx, "non-existing-keyspace")
342+
re.Error(err)
343+
re.Nil(idValue)
344+
// Get the null keyspace name.
345+
name, err := m.getKeyspaceNameByID(ctx, constant.NullKeyspaceID)
346+
re.NoError(err)
347+
re.Empty(name)
348+
// Get the non-existing keyspace name.
349+
name, err = m.getKeyspaceNameByID(ctx, 1)
350+
re.Error(err)
351+
re.Empty(name)
352+
// Get the keyspace ID by name first, then get the keyspace name by ID.
353+
prepareKeyspaceName(ctx, re, m, &rmpb.KeyspaceIDValue{Value: 1}, "test_keyspace")
354+
idValue, err = m.GetKeyspaceIDByName(ctx, "test_keyspace")
355+
re.NoError(err)
356+
re.NotNil(idValue)
357+
re.Equal(uint32(1), idValue.Value)
358+
name, err = m.getKeyspaceNameByID(ctx, 1)
359+
re.NoError(err)
360+
re.Equal("test_keyspace", name)
361+
// Get the keyspace name by ID first, then get the keyspace ID by name.
362+
prepareKeyspaceName(ctx, re, m, &rmpb.KeyspaceIDValue{Value: 2}, "test_keyspace_2")
363+
name, err = m.getKeyspaceNameByID(ctx, 2)
364+
re.NoError(err)
365+
re.Equal("test_keyspace_2", name)
366+
idValue, err = m.GetKeyspaceIDByName(ctx, "test_keyspace_2")
367+
re.NoError(err)
368+
re.NotNil(idValue)
369+
re.Equal(uint32(2), idValue.Value)
370+
}

0 commit comments

Comments
 (0)