Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/jsonutil"
Expand Down Expand Up @@ -136,10 +137,11 @@
m.Lock()
m.groups = make(map[string]*ResourceGroup)
m.Unlock()
handler := func(k, v string) {
handler := func(keyspaceID uint32, name string, rawValue string) {
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal([]byte(v), group); err != nil {
log.Error("failed to parse the resource group", zap.Error(err), zap.String("k", k), zap.String("v", v))
if err := proto.Unmarshal([]byte(rawValue), group); err != nil {
log.Error("failed to parse the resource group",
zap.Uint32("keyspace-id", keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))

Check warning on line 144 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L143-L144

Added lines #L143 - L144 were not covered by tests
panic(err)
}
m.groups[group.Name] = FromProtoResourceGroup(group)
Expand All @@ -148,13 +150,14 @@
return err
}
// Load resource group states from storage.
tokenHandler := func(k, v string) {
tokenHandler := func(keyspaceID uint32, name string, rawValue string) {
tokens := &GroupStates{}
if err := json.Unmarshal([]byte(v), tokens); err != nil {
log.Error("failed to parse the resource group state", zap.Error(err), zap.String("k", k), zap.String("v", v))
if err := json.Unmarshal([]byte(rawValue), tokens); err != nil {
log.Error("failed to parse the resource group state",
zap.Uint32("keyspace-id", keyspaceID), zap.String("name", name), zap.String("raw-value", rawValue), zap.Error(err))

Check warning on line 157 in pkg/mcs/resourcemanager/server/manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/resourcemanager/server/manager.go#L156-L157

Added lines #L156 - L157 were not covered by tests
panic(err)
}
if group, ok := m.groups[k]; ok {
if group, ok := m.groups[name]; ok {
group.SetStatesIntoResourceGroup(tokens)
}
}
Expand Down Expand Up @@ -282,7 +285,7 @@
if name == reservedDefaultGroupName {
return errs.ErrDeleteReservedGroup
}
if err := m.storage.DeleteResourceGroupSetting(name); err != nil {
if err := m.storage.DeleteResourceGroupSetting(constant.NullKeyspaceID, name); err != nil {
return err
}
m.Lock()
Expand Down Expand Up @@ -475,10 +478,7 @@
m.RUnlock()
// prevent many groups and hold the lock long time.
for _, group := range groups {
ru := group.getRUToken()
if ru < 0 {
ru = 0
}
ru := math.Max(group.getRUToken(), 0)
availableRUCounter.WithLabelValues(group.Name, group.Name).Set(ru)
resourceGroupConfigGauge.WithLabelValues(group.Name, priorityLabel).Set(group.getPriority())
resourceGroupConfigGauge.WithLabelValues(group.Name, ruPerSecLabel).Set(group.getFillRate())
Expand Down
5 changes: 3 additions & 2 deletions pkg/mcs/resourcemanager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"

"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/syncutil"
)
Expand Down Expand Up @@ -233,7 +234,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
// TODO: persist the state of the group separately.
func (rg *ResourceGroup) persistSettings(storage endpoint.ResourceGroupStorage) error {
metaGroup := rg.IntoProtoResourceGroup()
return storage.SaveResourceGroupSetting(rg.Name, metaGroup)
return storage.SaveResourceGroupSetting(constant.NullKeyspaceID, rg.Name, metaGroup)
}

// GroupStates is the tokens set of a resource group.
Expand Down Expand Up @@ -301,5 +302,5 @@ func (rg *ResourceGroup) UpdateRUConsumption(c *rmpb.Consumption) {
// persistStates persists the resource group tokens.
func (rg *ResourceGroup) persistStates(storage endpoint.ResourceGroupStorage) error {
states := rg.GetGroupStates()
return storage.SaveResourceGroupStates(rg.Name, states)
return storage.SaveResourceGroupStates(constant.NullKeyspaceID, rg.Name, states)
}
68 changes: 50 additions & 18 deletions pkg/storage/endpoint/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,84 @@

import (
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"

"github.com/pingcap/log"

"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/utils/keypath"
)

// ResourceGroupStorage defines the storage operations on the resource group.
type ResourceGroupStorage interface {
LoadResourceGroupSettings(f func(k, v string)) error
SaveResourceGroupSetting(name string, msg proto.Message) error
DeleteResourceGroupSetting(name string) error
LoadResourceGroupStates(f func(k, v string)) error
SaveResourceGroupStates(name string, obj any) error
DeleteResourceGroupStates(name string) error
LoadResourceGroupSettings(f func(keyspaceID uint32, name, rawValue string)) error
SaveResourceGroupSetting(keyspaceID uint32, name string, msg proto.Message) error
DeleteResourceGroupSetting(keyspaceID uint32, name string) error
LoadResourceGroupStates(f func(keyspaceID uint32, name, rawValue string)) error
SaveResourceGroupStates(keyspaceID uint32, name string, obj any) error
DeleteResourceGroupStates(keyspaceID uint32, name string) error
SaveControllerConfig(config any) error
LoadControllerConfig() (string, error)
}

var _ ResourceGroupStorage = (*StorageEndpoint)(nil)

// SaveResourceGroupSetting stores a resource group to storage.
func (se *StorageEndpoint) SaveResourceGroupSetting(name string, msg proto.Message) error {
return se.saveProto(keypath.ResourceGroupSettingPath(name), msg)
func (se *StorageEndpoint) SaveResourceGroupSetting(keyspaceID uint32, name string, msg proto.Message) error {
return se.saveProto(keypath.KeyspaceResourceGroupSettingPath(keyspaceID, name), msg)
}

// DeleteResourceGroupSetting removes a resource group from storage.
func (se *StorageEndpoint) DeleteResourceGroupSetting(name string) error {
return se.Remove(keypath.ResourceGroupSettingPath(name))
func (se *StorageEndpoint) DeleteResourceGroupSetting(keyspaceID uint32, name string) error {
return se.Remove(keypath.KeyspaceResourceGroupSettingPath(keyspaceID, name))
}

// LoadResourceGroupSettings loads all resource groups from storage.
func (se *StorageEndpoint) LoadResourceGroupSettings(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.ResourceGroupSettingPrefix(), f)
func (se *StorageEndpoint) LoadResourceGroupSettings(f func(keyspaceID uint32, name string, rawValue string)) error {
if err := se.loadRangeByPrefix(keypath.ResourceGroupSettingPrefix(), func(key, value string) {
// Using the null keyspace ID for the resource group settings loaded from the legacy path.
f(constant.NullKeyspaceID, key, value)
}); err != nil {
return err
}

Check warning on line 58 in pkg/storage/endpoint/resource_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/resource_group.go#L57-L58

Added lines #L57 - L58 were not covered by tests
return se.loadRangeByPrefix(keypath.KeyspaceResourceGroupSettingPrefix(), func(key, value string) {
// Parse the key to get the keyspace ID and resource group name respectively.
keyspaceID, name, err := keypath.ParseKeyspaceResourceGroupPath(key)
if err != nil {
log.Error("failed to parse the keyspace ID and resource group name", zap.String("key", key), zap.Error(err))
return
}

Check warning on line 65 in pkg/storage/endpoint/resource_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/resource_group.go#L63-L65

Added lines #L63 - L65 were not covered by tests
f(keyspaceID, name, value)
})
}

// SaveResourceGroupStates stores a resource group to storage.
func (se *StorageEndpoint) SaveResourceGroupStates(name string, obj any) error {
return se.saveJSON(keypath.ResourceGroupStatePath(name), obj)
func (se *StorageEndpoint) SaveResourceGroupStates(keyspaceID uint32, name string, obj any) error {
return se.saveJSON(keypath.KeyspaceResourceGroupStatePath(keyspaceID, name), obj)
}

// DeleteResourceGroupStates removes a resource group from storage.
func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error {
return se.Remove(keypath.ResourceGroupStatePath(name))
func (se *StorageEndpoint) DeleteResourceGroupStates(keyspaceID uint32, name string) error {
return se.Remove(keypath.KeyspaceResourceGroupStatePath(keyspaceID, name))
}

// LoadResourceGroupStates loads all resource groups from storage.
func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.ResourceGroupStatePrefix(), f)
func (se *StorageEndpoint) LoadResourceGroupStates(f func(keyspaceID uint32, name string, rawValue string)) error {
if err := se.loadRangeByPrefix(keypath.ResourceGroupStatePrefix(), func(key, value string) {
// Using the null keyspace ID for the resource group states loaded from the legacy path.
f(constant.NullKeyspaceID, key, value)
}); err != nil {
return err
}

Check warning on line 87 in pkg/storage/endpoint/resource_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/resource_group.go#L86-L87

Added lines #L86 - L87 were not covered by tests
return se.loadRangeByPrefix(keypath.KeyspaceResourceGroupStatePrefix(), func(key, value string) {
// Parse the key to get the keyspace ID and resource group name respectively.
keyspaceID, name, err := keypath.ParseKeyspaceResourceGroupPath(key)
if err != nil {
log.Error("failed to parse the keyspace ID and resource group name", zap.String("key", key), zap.Error(err))
return
}

Check warning on line 94 in pkg/storage/endpoint/resource_group.go

View check run for this annotation

Codecov / codecov/patch

pkg/storage/endpoint/resource_group.go#L92-L94

Added lines #L92 - L94 were not covered by tests
f(keyspaceID, name, value)
})
}

// SaveControllerConfig stores the resource controller config to storage.
Expand Down
101 changes: 101 additions & 0 deletions pkg/storage/storage_resource_group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2025 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
"math/rand"
"testing"

"github.com/stretchr/testify/require"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"

"github.com/tikv/pd/pkg/mcs/utils/constant"
)

func TestResourceGroupStorage(t *testing.T) {
re := require.New(t)
storage := NewStorageWithMemoryBackend()

keyspaceGroups := map[uint32][]string{
constant.NullKeyspaceID: generateRandomResourceGroupNames(),
constant.DefaultKeyspaceID: generateRandomResourceGroupNames(),
1: generateRandomResourceGroupNames(),
2: generateRandomResourceGroupNames(),
}
// Test legacy and keyspace resource group settings.
for keyspaceID, names := range keyspaceGroups {
for _, name := range names {
err := storage.SaveResourceGroupSetting(keyspaceID, name, &rmpb.ResourceGroup{Name: name})
re.NoError(err)
}
}
err := storage.LoadResourceGroupSettings(func(keyspaceID uint32, name, _ string) {
re.Contains(keyspaceGroups[keyspaceID], name)
})
re.NoError(err)
for keyspaceID, names := range keyspaceGroups {
for _, name := range names {
err := storage.DeleteResourceGroupSetting(keyspaceID, name)
re.NoError(err)
}
}
err = storage.LoadResourceGroupSettings(func(_ uint32, _, _ string) {
re.Fail("should not load any resource group setting")
})
re.NoError(err)

// Test legacy and keyspace resource group states.
for keyspaceID, names := range keyspaceGroups {
for _, name := range names {
err := storage.SaveResourceGroupStates(keyspaceID, name, nil)
re.NoError(err)
}
}
err = storage.LoadResourceGroupStates(func(keyspaceID uint32, name, _ string) {
re.Contains(keyspaceGroups[keyspaceID], name)
})
re.NoError(err)
for keyspaceID, names := range keyspaceGroups {
for _, name := range names {
err := storage.DeleteResourceGroupStates(keyspaceID, name)
re.NoError(err)
}
}
err = storage.LoadResourceGroupStates(func(_ uint32, _, _ string) {
re.Fail("should not load any resource group state")
})
re.NoError(err)
}

const (
n = 100
charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*()-_=+[]{}|;:,.<>?/"
)

// generateRandomResourceGroupNames generates n random resource group names with letters, numbers and symbols.
func generateRandomResourceGroupNames() []string {
names := make([]string, n)
for i := range n {
// Generate a random length between 1 and 100 characters.
length := 1 + rand.Intn(100)
name := make([]byte, length)
for j := range length {
name[j] = charset[rand.Intn(len(charset))]
}
names[i] = string(name)
}
return names
}
6 changes: 6 additions & 0 deletions pkg/utils/keypath/absolute_key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ const (
msTsoKespaceExpectedLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary/expected_primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary"

// resource group path
keyspaceResourceGroupSettingsPathPrefixFormat = "resource_group/keyspace/settings/" // "resource_group/keyspace/settings/"
keyspaceResourceGroupSettingsPathFormat = "resource_group/keyspace/settings/%d/%s" // "resource_group/keyspace/settings/{keyspace_id}/{group_name}"
keyspaceResourceGroupStatesPathPrefixFormat = "resource_group/keyspace/states/" // "resource_group/keyspace/states/"
keyspaceResourceGroupStatesPathFormat = "resource_group/keyspace/states/%d/%s" // "resource_group/keyspace/states/{keyspace_id}/{group_name}"
// legacy resource group path without introducing keyspace, to keep compatibility,
// resource groups loaded from the legacy path will be assigned to the default keyspace ID.
resourceGroupSettingsPathFormat = "resource_group/settings/%s" // "resource_group/settings/{group_name}"
resourceGroupStatesPathFormat = "resource_group/states/%s" // "resource_group/states/{group_name}"
controllerConfigPath = "resource_group/controller" // "resource_group/controller"
Expand Down
64 changes: 55 additions & 9 deletions pkg/utils/keypath/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,75 @@

package keypath

import "fmt"
import (
"fmt"
"strconv"
"strings"

"github.com/tikv/pd/pkg/mcs/utils/constant"
)

// ControllerConfigPath returns the path to save the controller config.
func ControllerConfigPath() string {
return controllerConfigPath
}

// ResourceGroupSettingPath returns the path to save the resource group settings.
func ResourceGroupSettingPath(groupName string) string {
// resourceGroupSettingPath returns the path to save the legacy resource group settings.
func resourceGroupSettingPath(groupName string) string {
return fmt.Sprintf(resourceGroupSettingsPathFormat, groupName)
}

// ResourceGroupStatePath returns the path to save the resource group states.
func ResourceGroupStatePath(groupName string) string {
// resourceGroupStatePath returns the path to save the legacy resource group states.
func resourceGroupStatePath(groupName string) string {
return fmt.Sprintf(resourceGroupStatesPathFormat, groupName)
}

// ResourceGroupSettingPrefix returns the prefix of the resource group settings.
// ResourceGroupSettingPrefix returns the prefix of the legacy resource group settings.
func ResourceGroupSettingPrefix() string {
return ResourceGroupSettingPath("")
return resourceGroupSettingPath("")
}

// ResourceGroupStatePrefix returns the prefix of the resource group states.
// ResourceGroupStatePrefix returns the prefix of the legacy resource group states.
func ResourceGroupStatePrefix() string {
return ResourceGroupStatePath("")
return resourceGroupStatePath("")
}

// KeyspaceResourceGroupSettingPath returns the path to save the keyspace resource group settings.
func KeyspaceResourceGroupSettingPath(keyspaceID uint32, groupName string) string {
if keyspaceID == constant.NullKeyspaceID {
return resourceGroupSettingPath(groupName)
}
return fmt.Sprintf(keyspaceResourceGroupSettingsPathFormat, keyspaceID, groupName)
}

// KeyspaceResourceGroupStatePath returns the path to save the keyspace resource group states.
func KeyspaceResourceGroupStatePath(keyspaceID uint32, groupName string) string {
if keyspaceID == constant.NullKeyspaceID {
return resourceGroupStatePath(groupName)
}
return fmt.Sprintf(keyspaceResourceGroupStatesPathFormat, keyspaceID, groupName)
}

// KeyspaceResourceGroupSettingPrefix returns the prefix of the keyspace resource group settings.
func KeyspaceResourceGroupSettingPrefix() string {
return keyspaceResourceGroupSettingsPathPrefixFormat
}

// KeyspaceResourceGroupStatePrefix returns the prefix of the keyspace resource group states.
func KeyspaceResourceGroupStatePrefix() string {
return keyspaceResourceGroupStatesPathPrefixFormat
}

// ParseKeyspaceResourceGroupPath parses the keyspace ID and resource group name from the keyspace resource group path.
func ParseKeyspaceResourceGroupPath(path string) (uint32, string, error) {
parts := strings.Split(path, "/")
if len(parts) < 2 {
return 0, "", fmt.Errorf("invalid keyspace resource group setting path: %s", path)
}
keyspaceIDStr := parts[0]
keyspaceID, err := strconv.ParseUint(keyspaceIDStr, 10, 32)
if err != nil {
return 0, "", fmt.Errorf("invalid keyspace ID str: %s", keyspaceIDStr)
}
return uint32(keyspaceID), strings.TrimPrefix(path, keyspaceIDStr+"/"), nil
}
Loading