Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.Resourc
ruTrackers: make(map[string]*ruTracker),
keyspaceID: keyspaceID,
storage: storage,
sl: newServiceLimiter(keyspaceID, 0),
sl: newServiceLimiter(keyspaceID, 0, storage),
}
}

Expand Down Expand Up @@ -239,7 +239,7 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
}
}

func (krgm *keyspaceResourceGroupManager) setServiceLimiter(serviceLimit float64) {
func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) {
krgm.RLock()
sl := krgm.sl
krgm.RUnlock()
Expand Down
8 changes: 7 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
// SetKeyspaceServiceLimit sets the service limit of the keyspace.
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) {
// If the keyspace is not found, create a new keyspace resource group manager.
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimiter(serviceLimit)
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimit(serviceLimit)
}

func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager {
Expand Down Expand Up @@ -224,6 +224,12 @@
}); err != nil {
return err
}
// Load service limits from the storage.
if err := m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
}); err != nil {
return err
}

Check warning on line 232 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L231-L232

Added lines #L231 - L232 were not covered by tests
// Initialize the reserved keyspace resource group manager and default resource groups.
m.initReserved()
return nil
Expand Down
44 changes: 44 additions & 0 deletions pkg/mcs/resourcemanager/server/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,3 +367,47 @@ func TestKeyspaceNameLookup(t *testing.T) {
re.NotNil(idValue)
re.Equal(uint32(2), idValue.Value)
}

func TestResourceGroupPersistence(t *testing.T) {
re := require.New(t)
m := prepareManager()

// Prepare the resource group and service limit.
group := &rmpb.ResourceGroup{
Name: "test_group",
Mode: rmpb.GroupMode_RUMode,
Priority: 5,
KeyspaceId: &rmpb.KeyspaceIDValue{Value: 1},
}
err := m.AddResourceGroup(group)
re.NoError(err)
keyspaceID := ExtractKeyspaceID(group.KeyspaceId)
m.SetKeyspaceServiceLimit(keyspaceID, 100.0)

// Use the same storage to rebuild a manager.
storage := m.storage
m = NewManager[*mockConfigProvider](&mockConfigProvider{})
m.storage = storage
// Initialize the manager.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = m.Init(ctx)
re.NoError(err)
// Check the resource group is loaded from the storage.
rg := m.GetResourceGroup(keyspaceID, group.Name, true)
re.NotNil(rg)
re.Equal(group.Name, rg.Name)
re.Equal(group.Mode, rg.Mode)
re.Equal(group.Priority, rg.Priority)
// Check the service limit is loaded from the storage.
limiter := m.GetKeyspaceServiceLimiter(keyspaceID)
re.NotNil(limiter)
re.Equal(100.0, limiter.ServiceLimit)
// Null keyspace ID should have a default zero service limit.
limiter = m.GetKeyspaceServiceLimiter(constant.NullKeyspaceID)
re.NotNil(limiter)
re.Equal(0.0, limiter.ServiceLimit)
// Non-existing keyspace should have a nil limiter.
limiter = m.GetKeyspaceServiceLimiter(2)
re.Nil(limiter)
}
17 changes: 15 additions & 2 deletions pkg/mcs/resourcemanager/server/service_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
)

Expand All @@ -30,7 +31,6 @@
// Since the client will request tokens with a 5-second period by default, the burst factor is 5.0 here.
const serviceLimiterBurstFactor = 5.0

// TODO: persist the service limit to the storage and reload it when the service starts.
type serviceLimiter struct {
syncutil.RWMutex
// ServiceLimit is the configured service limit for this limiter.
Expand All @@ -42,15 +42,18 @@
LastUpdate time.Time `json:"last_update"`
// KeyspaceID is the keyspace ID of the keyspace that this limiter belongs to.
keyspaceID uint32
// storage is used to persist the service limit.
storage endpoint.ResourceGroupStorage
}

func newServiceLimiter(keyspaceID uint32, serviceLimit float64) *serviceLimiter {
func newServiceLimiter(keyspaceID uint32, serviceLimit float64, storage endpoint.ResourceGroupStorage) *serviceLimiter {
// The service limit should be non-negative.
serviceLimit = math.Max(0, serviceLimit)
return &serviceLimiter{
ServiceLimit: serviceLimit,
LastUpdate: time.Now(),
keyspaceID: keyspaceID,
storage: storage,
}
}

Expand All @@ -76,6 +79,16 @@
// are not left unused, causing the new service limit to become invalid.
krl.refillTokensLocked(now)
}

// Persist the service limit to storage
if krl.storage != nil {
if err := krl.storage.SaveServiceLimit(krl.keyspaceID, newServiceLimit); err != nil {
log.Error("failed to persist service limit",
zap.Uint32("keyspace-id", krl.keyspaceID),
zap.Float64("service-limit", newServiceLimit),
zap.Error(err))
}

Check warning on line 90 in pkg/mcs/resourcemanager/server/service_limit.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/service_limit.go#L86-L90

Added lines #L86 - L90 were not covered by tests
}
}

func (krl *serviceLimiter) refillTokensLocked(now time.Time) {
Expand Down
85 changes: 61 additions & 24 deletions pkg/mcs/resourcemanager/server/service_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,72 @@ import (
"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage"
)

func TestNewServiceLimiter(t *testing.T) {
re := require.New(t)

// Test creating a service limiter with positive limit
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
re.NotNil(limiter)
re.Equal(100.0, limiter.ServiceLimit)
re.Equal(0.0, limiter.AvailableTokens)

// Test creating a service limiter with zero limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
re.NotNil(limiter)
re.Equal(0.0, limiter.ServiceLimit)
re.Equal(0.0, limiter.AvailableTokens)

// Test creating a service limiter with negative limit
limiter = newServiceLimiter(constant.NullKeyspaceID, -10.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, -10.0, nil)
re.NotNil(limiter)
re.Equal(0.0, limiter.ServiceLimit)
re.Equal(0.0, limiter.AvailableTokens)
}

func TestServiceLimiterPersistence(t *testing.T) {
re := require.New(t)

// Create a storage backend for testing
storage := storage.NewStorageWithMemoryBackend()

// Test persisting service limit
limiter := newServiceLimiter(1, 0.0, storage)
limiter.setServiceLimit(100.5)

// Verify the service limit was persisted
loadedLimit, err := storage.LoadServiceLimit(1)
re.NoError(err)
re.Equal(100.5, loadedLimit)

// Test updating the service limit
limiter.setServiceLimit(200.5)
loadedLimit, err = storage.LoadServiceLimit(1)
re.NoError(err)
re.Equal(200.5, loadedLimit)

// Test loading non-existent service limit
loadedLimit, err = storage.LoadServiceLimit(999)
re.NoError(err) // No error should be returned for non-existent limit
re.Equal(0.0, loadedLimit) // Should return 0 for non-existent limit

// Test loading service limits from storage
for _, keyspaceID := range []uint32{1, 2, 3} {
storage.SaveServiceLimit(keyspaceID, float64(keyspaceID)*100.0)
}
err = storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
re.Equal(float64(keyspaceID)*100.0, serviceLimit)
})
re.NoError(err)
}

func TestRefillTokensLocked(t *testing.T) {
re := require.New(t)

// Test refill with positive service limit
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime := time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 50.0
Expand Down Expand Up @@ -101,29 +138,29 @@ func TestApplyServiceLimit(t *testing.T) {
re.Equal(50.0, tokens)

// Test with zero service limit (no limit)
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
now := time.Now()
tokens = limiter.applyServiceLimit(now, 50.0)
re.Equal(50.0, tokens)

// Test request within available tokens (need to set available tokens first)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
limiter.AvailableTokens = 100.0 // Manually set available tokens
limiter.LastUpdate = now
tokens = limiter.applyServiceLimit(now, 50.0)
re.Equal(50.0, tokens)
re.Equal(50.0, limiter.AvailableTokens) // 100 - 50 = 50

// Test request exactly equal to available tokens
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
limiter.AvailableTokens = 100.0 // Manually set available tokens
limiter.LastUpdate = now
tokens = limiter.applyServiceLimit(now, 100.0)
re.Equal(100.0, tokens)
re.Equal(0.0, limiter.AvailableTokens)

// Test request exceeding available tokens
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
limiter.LastUpdate = now
limiter.AvailableTokens = 30.0
tokens = limiter.applyServiceLimit(now, 80.0)
Expand All @@ -135,7 +172,7 @@ func TestApplyServiceLimitWithRefill(t *testing.T) {
re := require.New(t)

// Test that refill happens before applying limit
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime := time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 20.0
Expand All @@ -148,7 +185,7 @@ func TestApplyServiceLimitWithRefill(t *testing.T) {
re.Equal(futureTime, limiter.LastUpdate)

// Test partial refill scenario
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 10.0

Expand All @@ -163,28 +200,28 @@ func TestServiceLimiterEdgeCases(t *testing.T) {
re := require.New(t)

// Test with very small service limit
limiter := newServiceLimiter(constant.NullKeyspaceID, 0.1)
limiter := newServiceLimiter(constant.NullKeyspaceID, 0.1, nil)
limiter.AvailableTokens = 0.1 // Manually set available tokens
limiter.LastUpdate = time.Now() // Set LastUpdate to current time to avoid refill
now := time.Now()
tokens := limiter.applyServiceLimit(now, 1.0)
re.InDelta(0.1, tokens, 0.001) // Use InDelta to handle floating point precision

// Test with very large service limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000000.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000000.0, nil)
limiter.AvailableTokens = 1000000.0 // Manually set available tokens
tokens = limiter.applyServiceLimit(now, 500000.0)
re.Equal(500000.0, tokens)

// Test with zero requested tokens
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
limiter.AvailableTokens = 100.0 // Manually set available tokens
tokens = limiter.applyServiceLimit(now, 0.0)
re.Equal(0.0, tokens)
re.Equal(100.0, limiter.AvailableTokens) // Should remain unchanged

// Test with fractional tokens
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.5)
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.5, nil)
limiter.LastUpdate = now
limiter.AvailableTokens = 5.25
tokens = limiter.applyServiceLimit(now, 7.75)
Expand All @@ -195,7 +232,7 @@ func TestSetServiceLimit(t *testing.T) {
re := require.New(t)

// Test setting the same service limit (should be no-op)
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter := newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
originalTokens := limiter.AvailableTokens
originalUpdate := limiter.LastUpdate

Expand All @@ -205,7 +242,7 @@ func TestSetServiceLimit(t *testing.T) {
re.Equal(originalUpdate, limiter.LastUpdate) // Should remain unchanged

// Test increasing service limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0, nil)
baseTime := time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 30.0
Expand All @@ -216,7 +253,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test decreasing service limit with available tokens exceeding new limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 80.0
Expand All @@ -231,7 +268,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test decreasing service limit with available tokens below new limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 30.0
Expand All @@ -242,7 +279,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting service limit to zero
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 50.0
Expand All @@ -253,7 +290,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting service limit from zero to positive (should NOT initialize available tokens)
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 0.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 0.0
Expand All @@ -264,7 +301,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting negative service limit (should be treated as zero)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 100.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 50.0
Expand All @@ -275,7 +312,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting service limit with time elapsed (should trigger refill)
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 50.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 20.0
Expand All @@ -290,7 +327,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting a smaller service limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 1000.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 500.0
Expand All @@ -302,7 +339,7 @@ func TestSetServiceLimit(t *testing.T) {
re.True(limiter.LastUpdate.After(baseTime)) // Should update time

// Test setting a larger service limit
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.0)
limiter = newServiceLimiter(constant.NullKeyspaceID, 10.0, nil)
baseTime = time.Now()
limiter.LastUpdate = baseTime
limiter.AvailableTokens = 5.0
Expand Down
Loading