Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/mcs/resourcemanager/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,14 @@ func (s *Service) setKeyspaceServiceLimit(c *gin.Context) {
return
}
keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue)
s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit)
if err := s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit); err != nil {
if rmserver.IsMetadataWriteDisabledError(err) {
c.String(http.StatusForbidden, err.Error())
return
}
c.String(http.StatusInternalServerError, err.Error())
return
}
c.String(http.StatusOK, "Success!")
}

Expand Down
52 changes: 43 additions & 9 deletions pkg/mcs/resourcemanager/server/keyspace_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,25 @@ type keyspaceResourceGroupManager struct {

keyspaceID uint32
storage endpoint.ResourceGroupStorage
writeRole ResourceGroupWriteRole
}

func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.ResourceGroupStorage) *keyspaceResourceGroupManager {
func newKeyspaceResourceGroupManager(
keyspaceID uint32,
storage endpoint.ResourceGroupStorage,
writeRoles ...ResourceGroupWriteRole,
) *keyspaceResourceGroupManager {
writeRole := ResourceGroupWriteRoleLegacyAll
if len(writeRoles) > 0 {
writeRole = writeRoles[0]
}
return &keyspaceResourceGroupManager{
groups: make(map[string]*ResourceGroup),
groupRUTrackers: make(map[string]*groupRUTracker),
keyspaceID: keyspaceID,
storage: storage,
sl: newServiceLimiter(keyspaceID, 0, storage),
writeRole: writeRole,
sl: newServiceLimiter(keyspaceID, 0, storage, writeRole),
}
}

Expand Down Expand Up @@ -147,11 +157,15 @@ func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.Resourc
group := FromProtoResourceGroup(grouppb)
krgm.Lock()
defer krgm.Unlock()
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
if krgm.writeRole.AllowsMetadataWrite() {
if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil {
return err
}
}
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
return err
if krgm.writeRole.AllowsStateWrite() {
if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil {
return err
}
}
krgm.groups[group.Name] = group
return nil
Expand All @@ -172,6 +186,9 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour
if err != nil {
return err
}
if !krgm.writeRole.AllowsMetadataWrite() {
return nil
}
return curGroup.persistSettings(krgm.keyspaceID, krgm.storage)
}

Expand All @@ -185,8 +202,10 @@ func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error
if !ok {
return errs.ErrResourceGroupNotExists.FastGenByArgs(name)
}
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
if krgm.writeRole.AllowsMetadataWrite() {
if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil {
return err
}
}
krgm.Lock()
delete(krgm.groups, name)
Expand Down Expand Up @@ -236,6 +255,9 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ
}

func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
if !krgm.writeRole.AllowsStateWrite() {
return
}
krgm.RLock()
keys := make([]string, 0, len(krgm.groups))
for k := range krgm.groups {
Expand All @@ -259,11 +281,23 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() {
}

func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) {
krgm.updateServiceLimit(serviceLimit, true)
}

func (krgm *keyspaceResourceGroupManager) setServiceLimitFromStorage(serviceLimit float64) {
krgm.updateServiceLimit(serviceLimit, false)
}

func (krgm *keyspaceResourceGroupManager) updateServiceLimit(serviceLimit float64, persist bool) {
krgm.RLock()
sl := krgm.sl
krgm.RUnlock()
// Set the new service limit to the limiter.
sl.setServiceLimit(serviceLimit)
if persist {
sl.setServiceLimit(serviceLimit)
} else {
sl.setServiceLimitNoPersist(serviceLimit)
}
// Cleanup the overrides if the service limit is set to 0.
if serviceLimit <= 0 {
krgm.cleanupOverrides()
Expand Down
57 changes: 47 additions & 10 deletions pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Manager struct {
cancel context.CancelFunc
wg sync.WaitGroup
srv bs.Server
writeRole ResourceGroupWriteRole
controllerConfig *ControllerConfig
krgms map[uint32]*keyspaceResourceGroupManager
storage interface {
Expand Down Expand Up @@ -82,11 +83,21 @@ type factoryProvider interface {
GetMeteringWriter() *metering.Writer
}

// writeRoleProvider is an optional provider used to configure the manager write role.
type writeRoleProvider interface {
GetResourceGroupWriteRole() ResourceGroupWriteRole
}

// NewManager returns a new manager base on the given server,
// which should implement the `FactoryProvider` interface.
func NewManager[T factoryProvider](srv bs.Server) *Manager {
fp := srv.(T)
writeRole := ResourceGroupWriteRoleLegacyAll
if provider, ok := any(fp).(writeRoleProvider); ok {
writeRole = provider.GetResourceGroupWriteRole()
}
m := &Manager{
writeRole: writeRole,
controllerConfig: fp.GetControllerConfig(),
krgms: make(map[uint32]*keyspaceResourceGroupManager),
consumptionDispatcher: make(chan *consumptionItem, defaultConsumptionChanSize),
Expand Down Expand Up @@ -130,6 +141,11 @@ func (m *Manager) GetStorage() endpoint.ResourceGroupStorage {
return m.storage
}

// GetWriteRole returns the manager write role.
func (m *Manager) GetWriteRole() ResourceGroupWriteRole {
return m.writeRole
}

// GetKeyspaceServiceLimiter returns the service limit of the keyspace.
func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
Expand All @@ -140,16 +156,20 @@ func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter {
}

// SetKeyspaceServiceLimit sets the service limit of the keyspace.
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) {
func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
// If the keyspace is not found, create a new keyspace resource group manager.
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimit(serviceLimit)
return nil
}

func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager {
m.Lock()
krgm, ok := m.krgms[keyspaceID]
if !ok {
krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage)
krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage, m.writeRole)
m.krgms[keyspaceID] = krgm
}
m.Unlock()
Expand Down Expand Up @@ -193,8 +213,10 @@ func (m *Manager) Init(ctx context.Context) error {
}

// re-save the config to make sure the config has been persisted.
if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil {
return err
if m.writeRole.AllowsMetadataWrite() {
if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil {
return err
}
}

// Load keyspace resource groups from the storage.
Expand All @@ -205,13 +227,16 @@ func (m *Manager) Init(ctx context.Context) error {
// This context is derived from the leader/primary context, it will be canceled
// from the outside loop when the leader/primary step down.
ctx, m.cancel = context.WithCancel(ctx)
m.wg.Add(2)
m.wg.Add(1)
// Start the background metrics flusher.
go m.backgroundMetricsFlush(ctx)
go func() {
defer logutil.LogPanic()
m.persistLoop(ctx)
}()
if m.writeRole.AllowsStateWrite() {
m.wg.Add(1)
go func() {
defer logutil.LogPanic()
m.persistLoop(ctx)
}()
}
// TODO: Add a goroutine to loadKeyspaceResourceGroups periodically to avoid
// the resource group exists gap between PD server and resource manager service
// during redirection.
Expand Down Expand Up @@ -255,7 +280,7 @@ func (m *Manager) loadKeyspaceResourceGroups() error {
m.initReserved()
// Load service limits from the storage after all resource groups are loaded.
return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) {
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit)
m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimitFromStorage(serviceLimit)
})
}

Expand All @@ -270,6 +295,9 @@ func (m *Manager) initReserved() {

// UpdateControllerConfigItem updates the controller config item.
func (m *Manager) UpdateControllerConfigItem(key string, value any) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
kp := strings.Split(key, ".")
if len(kp) == 0 {
return errors.Errorf("invalid key %s", key)
Expand Down Expand Up @@ -313,6 +341,9 @@ func (m *Manager) GetControllerConfig() *ControllerConfig {
// NOTE: AddResourceGroup should also be idempotent because tidb depends
// on this retry mechanism.
func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId())
// If the keyspace is not initialized, it means this is the first resource group created for this keyspace,
// so we need to initialize the default resource group for the keyspace as well.
Expand All @@ -325,6 +356,9 @@ func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error {

// ModifyResourceGroup modifies an existing resource group.
func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId())
krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, grouppb.Name)
if err != nil {
Expand All @@ -335,6 +369,9 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error {

// DeleteResourceGroup deletes a resource group.
func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error {
if !m.writeRole.AllowsMetadataWrite() {
return errMetadataWriteDisabled
}
// "default" group can't be deleted, so there is not need to call accessKeyspaceResourceGroupManager
krgm := m.getKeyspaceResourceGroupManager(keyspaceID)
if krgm == nil {
Expand Down
Loading