Skip to content

Commit 8978f61

Browse files
committed
feat(resourcemanager): persist the service limits for keyspaces
Signed-off-by: JmPotato <github@ipotato.me>
1 parent fedff62 commit 8978f61

File tree

11 files changed

+185
-29
lines changed

11 files changed

+185
-29
lines changed

pkg/mcs/resourcemanager/server/keyspace_manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.Resourc
7676
ruTrackers: make(map[string]*ruTracker),
7777
keyspaceID: keyspaceID,
7878
storage: storage,
79-
sl: newServiceLimiter(keyspaceID, 0),
79+
sl: newServiceLimiter(keyspaceID, 0, storage),
8080
}
8181
}
8282

@@ -239,7 +239,7 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
239239
}
240240
}
241241

242-
func (krgm *keyspaceResourceGroupManager) setServiceLimiter(serviceLimit float64) {
242+
func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) {
243243
krgm.RLock()
244244
sl := krgm.sl
245245
krgm.RUnlock()

pkg/mcs/resourcemanager/server/manager.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
133133
// SetKeyspaceServiceLimit sets the service limit of the keyspace.
134134
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) {
135135
// If the keyspace is not found, create a new keyspace resource group manager.
136-
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimiter(serviceLimit)
136+
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimit(serviceLimit)
137137
}
138138

139139
func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager {
@@ -224,6 +224,12 @@ func (m *Manager) loadKeyspaceResourceGroups() error {
224224
}); err != nil {
225225
return err
226226
}
227+
// Load service limits from the storage.
228+
if err := m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
229+
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
230+
}); err != nil {
231+
return err
232+
}
227233
// Initialize the reserved keyspace resource group manager and default resource groups.
228234
m.initReserved()
229235
return nil

pkg/mcs/resourcemanager/server/manager_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,3 +367,47 @@ func TestKeyspaceNameLookup(t *testing.T) {
367367
re.NotNil(idValue)
368368
re.Equal(uint32(2), idValue.Value)
369369
}
370+
371+
func TestResourceGroupPersistence(t *testing.T) {
372+
re := require.New(t)
373+
m := prepareManager()
374+
375+
// Prepare the resource group and service limit.
376+
group := &rmpb.ResourceGroup{
377+
Name: "test_group",
378+
Mode: rmpb.GroupMode_RUMode,
379+
Priority: 5,
380+
KeyspaceId: &rmpb.KeyspaceIDValue{Value: 1},
381+
}
382+
err := m.AddResourceGroup(group)
383+
re.NoError(err)
384+
keyspaceID := ExtractKeyspaceID(group.KeyspaceId)
385+
m.SetKeyspaceServiceLimit(keyspaceID, 100.0)
386+
387+
// Use the same storage to rebuild a manager.
388+
storage := m.storage
389+
m = NewManager[*mockConfigProvider](&mockConfigProvider{})
390+
m.storage = storage
391+
// Initialize the manager.
392+
ctx, cancel := context.WithCancel(context.Background())
393+
defer cancel()
394+
err = m.Init(ctx)
395+
re.NoError(err)
396+
// Check the resource group is loaded from the storage.
397+
rg := m.GetResourceGroup(keyspaceID, group.Name, true)
398+
re.NotNil(rg)
399+
re.Equal(group.Name, rg.Name)
400+
re.Equal(group.Mode, rg.Mode)
401+
re.Equal(group.Priority, rg.Priority)
402+
// Check the service limit is loaded from the storage.
403+
limiter := m.GetKeyspaceServiceLimiter(keyspaceID)
404+
re.NotNil(limiter)
405+
re.Equal(100.0, limiter.ServiceLimit)
406+
// Null keyspace ID should have a default zero service limit.
407+
limiter = m.GetKeyspaceServiceLimiter(constant.NullKeyspaceID)
408+
re.NotNil(limiter)
409+
re.Equal(0.0, limiter.ServiceLimit)
410+
// Non-existing keyspace should have a nil limiter.
411+
limiter = m.GetKeyspaceServiceLimiter(2)
412+
re.Nil(limiter)
413+
}

pkg/mcs/resourcemanager/server/service_limit.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/pingcap/log"
2424

25+
"github.com/tikv/pd/pkg/storage/endpoint"
2526
"github.com/tikv/pd/pkg/utils/syncutil"
2627
)
2728

@@ -30,7 +31,6 @@ import (
3031
// Since the client will request tokens with a 5-second period by default, the burst factor is 5.0 here.
3132
const serviceLimiterBurstFactor = 5.0
3233

33-
// TODO: persist the service limit to the storage and reload it when the service starts.
3434
type serviceLimiter struct {
3535
syncutil.RWMutex
3636
// ServiceLimit is the configured service limit for this limiter.
@@ -42,15 +42,18 @@ type serviceLimiter struct {
4242
LastUpdate time.Time `json:"last_update"`
4343
// KeyspaceID is the keyspace ID of the keyspace that this limiter belongs to.
4444
keyspaceID uint32
45+
// storage is used to persist the service limit.
46+
storage endpoint.ResourceGroupStorage
4547
}
4648

47-
func newServiceLimiter(keyspaceID uint32, serviceLimit float64) *serviceLimiter {
49+
func newServiceLimiter(keyspaceID uint32, serviceLimit float64, storage endpoint.ResourceGroupStorage) *serviceLimiter {
4850
// The service limit should be non-negative.
4951
serviceLimit = math.Max(0, serviceLimit)
5052
return &serviceLimiter{
5153
ServiceLimit: serviceLimit,
5254
LastUpdate: time.Now(),
5355
keyspaceID: keyspaceID,
56+
storage: storage,
5457
}
5558
}
5659

@@ -76,6 +79,16 @@ func (krl *serviceLimiter) setServiceLimit(newServiceLimit float64) {
7679
// are not left unused, causing the new service limit to become invalid.
7780
krl.refillTokensLocked(now)
7881
}
82+
83+
// Persist the service limit to storage
84+
if krl.storage != nil {
85+
if err := krl.storage.SaveServiceLimit(krl.keyspaceID, newServiceLimit); err != nil {
86+
log.Error("failed to persist service limit",
87+
zap.Uint32("keyspace-id", krl.keyspaceID),
88+
zap.Float64("service-limit", newServiceLimit),
89+
zap.Error(err))
90+
}
91+
}
7992
}
8093

8194
func (krl *serviceLimiter) refillTokensLocked(now time.Time) {

pkg/mcs/resourcemanager/server/service_limit_test.go

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,35 +21,72 @@ import (
2121
"github.com/stretchr/testify/require"
2222

2323
"github.com/tikv/pd/pkg/mcs/utils/constant"
24+
"github.com/tikv/pd/pkg/storage"
2425
)
2526

2627
func TestNewServiceLimiter(t *testing.T) {
2728
re := require.New(t)
2829

2930
// Test creating a service limiter with positive limit
30-
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
31+
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
3132
re.NotNil(limiter)
3233
re.Equal(100.0, limiter.ServiceLimit)
3334
re.Equal(0.0, limiter.AvailableTokens)
3435

3536
// Test creating a service limiter with zero limit
36-
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
37+
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
3738
re.NotNil(limiter)
3839
re.Equal(0.0, limiter.ServiceLimit)
3940
re.Equal(0.0, limiter.AvailableTokens)
4041

4142
// Test creating a service limiter with negative limit
42-
limiter = newServiceLimiter(constant.NullKeyspaceID, -10.0)
43+
limiter = newServiceLimiter(constant.NullKeyspaceID, -10.0, nil)
4344
re.NotNil(limiter)
4445
re.Equal(0.0, limiter.ServiceLimit)
4546
re.Equal(0.0, limiter.AvailableTokens)
4647
}
4748

49+
func TestServiceLimiterPersistence(t *testing.T) {
50+
re := require.New(t)
51+
52+
// Create a storage backend for testing
53+
storage := storage.NewStorageWithMemoryBackend()
54+
55+
// Test persisting service limit
56+
limiter := newServiceLimiter(1, 0.0, storage)
57+
limiter.setServiceLimit(100.5)
58+
59+
// Verify the service limit was persisted
60+
loadedLimit, err := storage.LoadServiceLimit(1)
61+
re.NoError(err)
62+
re.Equal(100.5, loadedLimit)
63+
64+
// Test updating the service limit
65+
limiter.setServiceLimit(200.5)
66+
loadedLimit, err = storage.LoadServiceLimit(1)
67+
re.NoError(err)
68+
re.Equal(200.5, loadedLimit)
69+
70+
// Test loading non-existent service limit
71+
loadedLimit, err = storage.LoadServiceLimit(999)
72+
re.NoError(err) // No error should be returned for non-existent limit
73+
re.Equal(0.0, loadedLimit) // Should return 0 for non-existent limit
74+
75+
// Test loading service limits from storage
76+
for _, keyspaceID := range []uint32{1, 2, 3} {
77+
storage.SaveServiceLimit(keyspaceID, float64(keyspaceID)*100.0)
78+
}
79+
err = storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
80+
re.Equal(float64(keyspaceID)*100.0, serviceLimit)
81+
})
82+
re.NoError(err)
83+
}
84+
4885
func TestRefillTokensLocked(t *testing.T) {
4986
re := require.New(t)
5087

5188
// Test refill with positive service limit
52-
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
89+
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
5390
baseTime := time.Now()
5491
limiter.LastUpdate = baseTime
5592
limiter.AvailableTokens = 50.0
@@ -101,29 +138,29 @@ func TestApplyServiceLimit(t *testing.T) {
101138
re.Equal(50.0, tokens)
102139

103140
// Test with zero service limit (no limit)
104-
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
141+
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
105142
now := time.Now()
106143
tokens = limiter.applyServiceLimit(now, 50.0)
107144
re.Equal(50.0, tokens)
108145

109146
// Test request within available tokens (need to set available tokens first)
110-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
147+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
111148
limiter.AvailableTokens = 100.0 // Manually set available tokens
112149
limiter.LastUpdate = now
113150
tokens = limiter.applyServiceLimit(now, 50.0)
114151
re.Equal(50.0, tokens)
115152
re.Equal(50.0, limiter.AvailableTokens) // 100 - 50 = 50
116153

117154
// Test request exactly equal to available tokens
118-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
155+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
119156
limiter.AvailableTokens = 100.0 // Manually set available tokens
120157
limiter.LastUpdate = now
121158
tokens = limiter.applyServiceLimit(now, 100.0)
122159
re.Equal(100.0, tokens)
123160
re.Equal(0.0, limiter.AvailableTokens)
124161

125162
// Test request exceeding available tokens
126-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
163+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
127164
limiter.LastUpdate = now
128165
limiter.AvailableTokens = 30.0
129166
tokens = limiter.applyServiceLimit(now, 80.0)
@@ -135,7 +172,7 @@ func TestApplyServiceLimitWithRefill(t *testing.T) {
135172
re := require.New(t)
136173

137174
// Test that refill happens before applying limit
138-
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
175+
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
139176
baseTime := time.Now()
140177
limiter.LastUpdate = baseTime
141178
limiter.AvailableTokens = 20.0
@@ -148,7 +185,7 @@ func TestApplyServiceLimitWithRefill(t *testing.T) {
148185
re.Equal(futureTime, limiter.LastUpdate)
149186

150187
// Test partial refill scenario
151-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
188+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
152189
limiter.LastUpdate = baseTime
153190
limiter.AvailableTokens = 10.0
154191

@@ -163,28 +200,28 @@ func TestServiceLimiterEdgeCases(t *testing.T) {
163200
re := require.New(t)
164201

165202
// Test with very small service limit
166-
limiter := newServiceLimiter(constant.NullKeyspaceID, 0.1)
203+
limiter := newServiceLimiter(constant.NullKeyspaceID, 0.1, nil)
167204
limiter.AvailableTokens = 0.1 // Manually set available tokens
168205
limiter.LastUpdate = time.Now() // Set LastUpdate to current time to avoid refill
169206
now := time.Now()
170207
tokens := limiter.applyServiceLimit(now, 1.0)
171208
re.InDelta(0.1, tokens, 0.001) // Use InDelta to handle floating point precision
172209

173210
// Test with very large service limit
174-
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000000.0)
211+
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000000.0, nil)
175212
limiter.AvailableTokens = 1000000.0 // Manually set available tokens
176213
tokens = limiter.applyServiceLimit(now, 500000.0)
177214
re.Equal(500000.0, tokens)
178215

179216
// Test with zero requested tokens
180-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
217+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
181218
limiter.AvailableTokens = 100.0 // Manually set available tokens
182219
tokens = limiter.applyServiceLimit(now, 0.0)
183220
re.Equal(0.0, tokens)
184221
re.Equal(100.0, limiter.AvailableTokens) // Should remain unchanged
185222

186223
// Test with fractional tokens
187-
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.5)
224+
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.5, nil)
188225
limiter.LastUpdate = now
189226
limiter.AvailableTokens = 5.25
190227
tokens = limiter.applyServiceLimit(now, 7.75)
@@ -195,7 +232,7 @@ func TestSetServiceLimit(t *testing.T) {
195232
re := require.New(t)
196233

197234
// Test setting the same service limit (should be no-op)
198-
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
235+
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
199236
originalTokens := limiter.AvailableTokens
200237
originalUpdate := limiter.LastUpdate
201238

@@ -205,7 +242,7 @@ func TestSetServiceLimit(t *testing.T) {
205242
re.Equal(originalUpdate, limiter.LastUpdate) // Should remain unchanged
206243

207244
// Test increasing service limit
208-
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0)
245+
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0, nil)
209246
baseTime := time.Now()
210247
limiter.LastUpdate = baseTime
211248
limiter.AvailableTokens = 30.0
@@ -216,7 +253,7 @@ func TestSetServiceLimit(t *testing.T) {
216253
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
217254

218255
// Test decreasing service limit with available tokens exceeding new limit
219-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
256+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
220257
baseTime = time.Now()
221258
limiter.LastUpdate = baseTime
222259
limiter.AvailableTokens = 80.0
@@ -231,7 +268,7 @@ func TestSetServiceLimit(t *testing.T) {
231268
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
232269

233270
// Test decreasing service limit with available tokens below new limit
234-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
271+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
235272
baseTime = time.Now()
236273
limiter.LastUpdate = baseTime
237274
limiter.AvailableTokens = 30.0
@@ -242,7 +279,7 @@ func TestSetServiceLimit(t *testing.T) {
242279
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
243280

244281
// Test setting service limit to zero
245-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
282+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
246283
baseTime = time.Now()
247284
limiter.LastUpdate = baseTime
248285
limiter.AvailableTokens = 50.0
@@ -253,7 +290,7 @@ func TestSetServiceLimit(t *testing.T) {
253290
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
254291

255292
// Test setting service limit from zero to positive (should NOT initialize available tokens)
256-
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
293+
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
257294
baseTime = time.Now()
258295
limiter.LastUpdate = baseTime
259296
limiter.AvailableTokens = 0.0
@@ -264,7 +301,7 @@ func TestSetServiceLimit(t *testing.T) {
264301
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
265302

266303
// Test setting negative service limit (should be treated as zero)
267-
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
304+
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
268305
baseTime = time.Now()
269306
limiter.LastUpdate = baseTime
270307
limiter.AvailableTokens = 50.0
@@ -275,7 +312,7 @@ func TestSetServiceLimit(t *testing.T) {
275312
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
276313

277314
// Test setting service limit with time elapsed (should trigger refill)
278-
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0)
315+
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0, nil)
279316
baseTime = time.Now()
280317
limiter.LastUpdate = baseTime
281318
limiter.AvailableTokens = 20.0
@@ -290,7 +327,7 @@ func TestSetServiceLimit(t *testing.T) {
290327
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
291328

292329
// Test setting a smaller service limit
293-
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000.0)
330+
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000.0, nil)
294331
baseTime = time.Now()
295332
limiter.LastUpdate = baseTime
296333
limiter.AvailableTokens = 500.0
@@ -302,7 +339,7 @@ func TestSetServiceLimit(t *testing.T) {
302339
re.True(limiter.LastUpdate.After(baseTime)) // Should update time
303340

304341
// Test setting a larger service limit
305-
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.0)
342+
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.0, nil)
306343
baseTime = time.Now()
307344
limiter.LastUpdate = baseTime
308345
limiter.AvailableTokens = 5.0

0 commit comments

Comments
 (0)