Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,10 @@ func (s *Service) setKeyspaceServiceLimit(c *gin.Context) {
return
}
keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue)
s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit)
if err := s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, "Success!")
}

Expand Down
52 changes: 43 additions & 9 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,25 @@ type keyspaceResourceGroupManager struct {

keyspaceID uint32
storage endpoint.ResourceGroupStorage
writeRole ResourceGroupWriteRole
}

func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.ResourceGroupStorage) *keyspaceResourceGroupManager {
func newKeyspaceResourceGroupManager(
keyspaceID uint32,
storage endpoint.ResourceGroupStorage,
writeRoles ...ResourceGroupWriteRole,
) *keyspaceResourceGroupManager {
writeRole := ResourceGroupWriteRoleLegacyAll
if len(writeRoles) > 0 {
writeRole = writeRoles[0]
}
return &keyspaceResourceGroupManager{
groups: make(map[string]*ResourceGroup),
groupRUTrackers: make(map[string]*groupRUTracker),
keyspaceID: keyspaceID,
storage: storage,
sl: newServiceLimiter(keyspaceID, 0, storage),
writeRole: writeRole,
sl: newServiceLimiter(keyspaceID, 0, storage, writeRole),
}
}

Expand Down Expand Up @@ -147,11 +157,15 @@ func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.Resourc
group := FromProtoResourceGroup(grouppb)
krgm.Lock()
defer krgm.Unlock()
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
if krgm.writeRole.AllowsMetadataWrite() {
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
}
}
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
return err
if krgm.writeRole.AllowsStateWrite() {
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
return err
}
}
krgm.groups[group.Name] = group
return nil
Expand All @@ -172,6 +186,9 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour
if err != nil {
return err
}
if !krgm.writeRole.AllowsMetadataWrite() {
return nil
}
return curGroup.persistSettings(krgm.keyspaceID, krgm.storage)
}

Expand All @@ -185,8 +202,10 @@ func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error
if !ok {
return errs.ErrResourceGroupNotExists.FastGenByArgs(name)
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
if krgm.writeRole.AllowsMetadataWrite() {
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
}
}
krgm.Lock()
delete(krgm.groups, name)
Expand Down Expand Up @@ -236,6 +255,9 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ
}

func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
if !krgm.writeRole.AllowsStateWrite() {
return
}
krgm.RLock()
keys := make([]string, 0, len(krgm.groups))
for k := range krgm.groups {
Expand All @@ -259,11 +281,23 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
}

func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) {
krgm.updateServiceLimit(serviceLimit, true)
}

func (krgm *keyspaceResourceGroupManager) setServiceLimitFromStorage(serviceLimit float64) {
krgm.updateServiceLimit(serviceLimit, false)
}

func (krgm *keyspaceResourceGroupManager) updateServiceLimit(serviceLimit float64, persist bool) {
krgm.RLock()
sl := krgm.sl
krgm.RUnlock()
// Set the new service limit to the limiter.
sl.setServiceLimit(serviceLimit)
if persist {
sl.setServiceLimit(serviceLimit)
} else {
sl.setServiceLimitNoPersist(serviceLimit)
}
// Cleanup the overrides if the service limit is set to 0.
if serviceLimit <= 0 {
krgm.cleanupOverrides()
Expand Down
57 changes: 47 additions & 10 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Manager struct {
cancel context.CancelFunc
wg sync.WaitGroup
srv bs.Server
writeRole ResourceGroupWriteRole
controllerConfig *ControllerConfig
krgms map[uint32]*keyspaceResourceGroupManager
storage interface {
Expand Down Expand Up @@ -82,11 +83,21 @@ type factoryProvider interface {
GetMeteringWriter() *metering.Writer
}

// writeRoleProvider is an optional provider used to configure the manager write role.
type writeRoleProvider interface {
GetResourceGroupWriteRole() ResourceGroupWriteRole
}

// NewManager returns a new manager base on the given server,
// which should implement the `FactoryProvider` interface.
func NewManager[T factoryProvider](srv bs.Server) *Manager {
fp := srv.(T)
writeRole := ResourceGroupWriteRoleLegacyAll
if provider, ok := any(fp).(writeRoleProvider); ok {
writeRole = provider.GetResourceGroupWriteRole()
}
m := &Manager{
writeRole: writeRole,
controllerConfig: fp.GetControllerConfig(),
krgms: make(map[uint32]*keyspaceResourceGroupManager),
consumptionDispatcher: make(chan *consumptionItem, defaultConsumptionChanSize),
Expand Down Expand Up @@ -130,6 +141,11 @@ func (m *Manager) GetStorage() endpoint.ResourceGroupStorage {
return m.storage
}

// GetWriteRole returns the manager write role.
func (m *Manager) GetWriteRole() ResourceGroupWriteRole {
return m.writeRole
}

// GetKeyspaceServiceLimiter returns the service limit of the keyspace.
func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
Expand All @@ -140,16 +156,20 @@ func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
}

// SetKeyspaceServiceLimit sets the service limit of the keyspace.
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) {
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
// If the keyspace is not found, create a new keyspace resource group manager.
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimit(serviceLimit)
return nil
}

func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager {
m.Lock()
krgm, ok := m.krgms[keyspaceID]
if !ok {
krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage)
krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage, m.writeRole)
m.krgms[keyspaceID] = krgm
}
m.Unlock()
Expand Down Expand Up @@ -193,8 +213,10 @@ func (m *Manager) Init(ctx context.Context) error {
}

// re-save the config to make sure the config has been persisted.
if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil {
return err
if m.writeRole.AllowsMetadataWrite() {
if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil {
return err
}
}

// Load keyspace resource groups from the storage.
Expand All @@ -205,13 +227,16 @@ func (m *Manager) Init(ctx context.Context) error {
// This context is derived from the leader/primary context, it will be canceled
// from the outside loop when the leader/primary step down.
ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(2)
m.wg.Add(1)
// Start the background metrics flusher.
go m.backgroundMetricsFlush(ctx)
go func() {
defer logutil.LogPanic()
m.persistLoop(ctx)
}()
if m.writeRole.AllowsStateWrite() {
m.wg.Add(1)
go func() {
defer logutil.LogPanic()
m.persistLoop(ctx)
}()
}
// TODO: Add a goroutine to loadKeyspaceResourceGroups periodically to avoid
// the resource group exists gap between PD server and resource manager service
// during redirection.
Expand Down Expand Up @@ -255,7 +280,7 @@ func (m *Manager) loadKeyspaceResourceGroups() error {
m.initReserved()
// Load service limits from the storage after all resource groups are loaded.
return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimitFromStorage(serviceLimit)
})
}

Expand All @@ -270,6 +295,9 @@ func (m *Manager) initReserved() {

// UpdateControllerConfigItem updates the controller config item.
func (m *Manager) UpdateControllerConfigItem(key string, value any) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
kp := strings.Split(key, ".")
if len(kp) == 0 {
return errors.Errorf("invalid key %s", key)
Expand Down Expand Up @@ -313,6 +341,9 @@ func (m *Manager) GetControllerConfig() *ControllerConfig {
// NOTE: AddResourceGroup should also be idempotent because tidb depends
// on this retry mechanism.
func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId())
// If the keyspace is not initialized, it means this is the first resource group created for this keyspace,
// so we need to initialize the default resource group for the keyspace as well.
Expand All @@ -325,6 +356,9 @@ func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error {

// ModifyResourceGroup modifies an existing resource group.
func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId())
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, grouppb.Name)
if err != nil {
Expand All @@ -335,6 +369,9 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {

// DeleteResourceGroup deletes a resource group.
func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
// "default" group can't be deleted, so there is not need to call accessKeyspaceResourceGroupManager
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
Expand Down
Loading