diff --git a/pkg/mcs/resourcemanager/server/apis/v1/api.go b/pkg/mcs/resourcemanager/server/apis/v1/api.go index 727bb53c959..db73324f819 100644 --- a/pkg/mcs/resourcemanager/server/apis/v1/api.go +++ b/pkg/mcs/resourcemanager/server/apis/v1/api.go @@ -375,7 +375,14 @@ func (s *Service) setKeyspaceServiceLimit(c *gin.Context) { return } keyspaceID := rmserver.ExtractKeyspaceID(keyspaceIDValue) - s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit) + if err := s.manager.SetKeyspaceServiceLimit(keyspaceID, req.ServiceLimit); err != nil { + if rmserver.IsMetadataWriteDisabledError(err) { + c.String(http.StatusForbidden, err.Error()) + return + } + c.String(http.StatusInternalServerError, err.Error()) + return + } c.String(http.StatusOK, "Success!") } diff --git a/pkg/mcs/resourcemanager/server/keyspace_manager.go b/pkg/mcs/resourcemanager/server/keyspace_manager.go index fcd460bfcab..1dc0e2bd19a 100644 --- a/pkg/mcs/resourcemanager/server/keyspace_manager.go +++ b/pkg/mcs/resourcemanager/server/keyspace_manager.go @@ -71,15 +71,25 @@ type keyspaceResourceGroupManager struct { keyspaceID uint32 storage endpoint.ResourceGroupStorage + writeRole ResourceGroupWriteRole } -func newKeyspaceResourceGroupManager(keyspaceID uint32, storage endpoint.ResourceGroupStorage) *keyspaceResourceGroupManager { +func newKeyspaceResourceGroupManager( + keyspaceID uint32, + storage endpoint.ResourceGroupStorage, + writeRoles ...ResourceGroupWriteRole, +) *keyspaceResourceGroupManager { + writeRole := ResourceGroupWriteRoleLegacyAll + if len(writeRoles) > 0 { + writeRole = writeRoles[0] + } return &keyspaceResourceGroupManager{ groups: make(map[string]*ResourceGroup), groupRUTrackers: make(map[string]*groupRUTracker), keyspaceID: keyspaceID, storage: storage, - sl: newServiceLimiter(keyspaceID, 0, storage), + writeRole: writeRole, + sl: newServiceLimiter(keyspaceID, 0, storage, writeRole), } } @@ -147,11 +157,15 @@ func (krgm *keyspaceResourceGroupManager) addResourceGroup(grouppb *rmpb.Resourc group := FromProtoResourceGroup(grouppb) krgm.Lock() defer krgm.Unlock() - if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil { - return err + if krgm.writeRole.AllowsMetadataWrite() { + if err := group.persistSettings(krgm.keyspaceID, krgm.storage); err != nil { + return err + } } - if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil { - return err + if krgm.writeRole.AllowsStateWrite() { + if err := group.persistStates(krgm.keyspaceID, krgm.storage); err != nil { + return err + } } krgm.groups[group.Name] = group return nil @@ -172,6 +186,9 @@ func (krgm *keyspaceResourceGroupManager) modifyResourceGroup(group *rmpb.Resour if err != nil { return err } + if !krgm.writeRole.AllowsMetadataWrite() { + return nil + } return curGroup.persistSettings(krgm.keyspaceID, krgm.storage) } @@ -185,8 +202,10 @@ func (krgm *keyspaceResourceGroupManager) deleteResourceGroup(name string) error if !ok { return errs.ErrResourceGroupNotExists.FastGenByArgs(name) } - if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil { - return err + if krgm.writeRole.AllowsMetadataWrite() { + if err := krgm.storage.DeleteResourceGroupSetting(krgm.keyspaceID, name); err != nil { + return err + } } krgm.Lock() delete(krgm.groups, name) @@ -236,6 +255,9 @@ func (krgm *keyspaceResourceGroupManager) getResourceGroupList(withStats, includ } func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() { + if !krgm.writeRole.AllowsStateWrite() { + return + } krgm.RLock() keys := make([]string, 0, len(krgm.groups)) for k := range krgm.groups { @@ -259,11 +281,23 @@ func (krgm *keyspaceResourceGroupManager) persistResourceGroupRunningState() { } func (krgm *keyspaceResourceGroupManager) setServiceLimit(serviceLimit float64) { + krgm.updateServiceLimit(serviceLimit, true) +} + +func (krgm *keyspaceResourceGroupManager) setServiceLimitFromStorage(serviceLimit float64) { + krgm.updateServiceLimit(serviceLimit, false) +} + +func (krgm *keyspaceResourceGroupManager) updateServiceLimit(serviceLimit float64, persist bool) { krgm.RLock() sl := krgm.sl krgm.RUnlock() // Set the new service limit to the limiter. - sl.setServiceLimit(serviceLimit) + if persist { + sl.setServiceLimit(serviceLimit) + } else { + sl.setServiceLimitNoPersist(serviceLimit) + } // Cleanup the overrides if the service limit is set to 0. if serviceLimit <= 0 { krgm.cleanupOverrides() diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 603f77ed67b..afb0026fd39 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -54,6 +54,7 @@ type Manager struct { cancel context.CancelFunc wg sync.WaitGroup srv bs.Server + writeRole ResourceGroupWriteRole controllerConfig *ControllerConfig krgms map[uint32]*keyspaceResourceGroupManager storage interface { @@ -82,11 +83,21 @@ type factoryProvider interface { GetMeteringWriter() *metering.Writer } +// writeRoleProvider is an optional provider used to configure the manager write role. +type writeRoleProvider interface { + GetResourceGroupWriteRole() ResourceGroupWriteRole +} + // NewManager returns a new manager base on the given server, // which should implement the `FactoryProvider` interface. func NewManager[T factoryProvider](srv bs.Server) *Manager { fp := srv.(T) + writeRole := ResourceGroupWriteRoleLegacyAll + if provider, ok := any(fp).(writeRoleProvider); ok { + writeRole = provider.GetResourceGroupWriteRole() + } m := &Manager{ + writeRole: writeRole, controllerConfig: fp.GetControllerConfig(), krgms: make(map[uint32]*keyspaceResourceGroupManager), consumptionDispatcher: make(chan *consumptionItem, defaultConsumptionChanSize), @@ -130,6 +141,11 @@ func (m *Manager) GetStorage() endpoint.ResourceGroupStorage { return m.storage } +// GetWriteRole returns the manager write role. +func (m *Manager) GetWriteRole() ResourceGroupWriteRole { + return m.writeRole +} + // GetKeyspaceServiceLimiter returns the service limit of the keyspace. func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter { krgm := m.getKeyspaceResourceGroupManager(keyspaceID) @@ -140,16 +156,20 @@ func (m *Manager) GetKeyspaceServiceLimiter(keyspaceID uint32) *serviceLimiter { } // SetKeyspaceServiceLimit sets the service limit of the keyspace. -func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) { +func (m *Manager) SetKeyspaceServiceLimit(keyspaceID uint32, serviceLimit float64) error { + if !m.writeRole.AllowsMetadataWrite() { + return errMetadataWriteDisabled + } // If the keyspace is not found, create a new keyspace resource group manager. m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, true).setServiceLimit(serviceLimit) + return nil } func (m *Manager) getOrCreateKeyspaceResourceGroupManager(keyspaceID uint32, initDefault bool) *keyspaceResourceGroupManager { m.Lock() krgm, ok := m.krgms[keyspaceID] if !ok { - krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage) + krgm = newKeyspaceResourceGroupManager(keyspaceID, m.storage, m.writeRole) m.krgms[keyspaceID] = krgm } m.Unlock() @@ -193,8 +213,10 @@ func (m *Manager) Init(ctx context.Context) error { } // re-save the config to make sure the config has been persisted. - if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { - return err + if m.writeRole.AllowsMetadataWrite() { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + return err + } } // Load keyspace resource groups from the storage. @@ -205,13 +227,16 @@ func (m *Manager) Init(ctx context.Context) error { // This context is derived from the leader/primary context, it will be canceled // from the outside loop when the leader/primary step down. ctx, m.cancel = context.WithCancel(ctx) - m.wg.Add(2) + m.wg.Add(1) // Start the background metrics flusher. go m.backgroundMetricsFlush(ctx) - go func() { - defer logutil.LogPanic() - m.persistLoop(ctx) - }() + if m.writeRole.AllowsStateWrite() { + m.wg.Add(1) + go func() { + defer logutil.LogPanic() + m.persistLoop(ctx) + }() + } // TODO: Add a goroutine to loadKeyspaceResourceGroups periodically to avoid // the resource group exists gap between PD server and resource manager service // during redirection. @@ -255,7 +280,7 @@ func (m *Manager) loadKeyspaceResourceGroups() error { m.initReserved() // Load service limits from the storage after all resource groups are loaded. return m.storage.LoadServiceLimits(func(keyspaceID uint32, serviceLimit float64) { - m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimit(serviceLimit) + m.getOrCreateKeyspaceResourceGroupManager(keyspaceID, false).setServiceLimitFromStorage(serviceLimit) }) } @@ -270,6 +295,9 @@ func (m *Manager) initReserved() { // UpdateControllerConfigItem updates the controller config item. func (m *Manager) UpdateControllerConfigItem(key string, value any) error { + if !m.writeRole.AllowsMetadataWrite() { + return errMetadataWriteDisabled + } kp := strings.Split(key, ".") if len(kp) == 0 { return errors.Errorf("invalid key %s", key) @@ -313,6 +341,9 @@ func (m *Manager) GetControllerConfig() *ControllerConfig { // NOTE: AddResourceGroup should also be idempotent because tidb depends // on this retry mechanism. func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error { + if !m.writeRole.AllowsMetadataWrite() { + return errMetadataWriteDisabled + } keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId()) // If the keyspace is not initialized, it means this is the first resource group created for this keyspace, // so we need to initialize the default resource group for the keyspace as well. @@ -325,6 +356,9 @@ func (m *Manager) AddResourceGroup(grouppb *rmpb.ResourceGroup) error { // ModifyResourceGroup modifies an existing resource group. func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error { + if !m.writeRole.AllowsMetadataWrite() { + return errMetadataWriteDisabled + } keyspaceID := ExtractKeyspaceID(grouppb.GetKeyspaceId()) krgm, err := m.accessKeyspaceResourceGroupManager(keyspaceID, grouppb.Name) if err != nil { @@ -335,6 +369,9 @@ func (m *Manager) ModifyResourceGroup(grouppb *rmpb.ResourceGroup) error { // DeleteResourceGroup deletes a resource group. func (m *Manager) DeleteResourceGroup(keyspaceID uint32, name string) error { + if !m.writeRole.AllowsMetadataWrite() { + return errMetadataWriteDisabled + } // "default" group can't be deleted, so there is not need to call accessKeyspaceResourceGroupManager krgm := m.getKeyspaceResourceGroupManager(keyspaceID) if krgm == nil { diff --git a/pkg/mcs/resourcemanager/server/manager_test.go b/pkg/mcs/resourcemanager/server/manager_test.go index 67db476da8a..afd6c932d2c 100644 --- a/pkg/mcs/resourcemanager/server/manager_test.go +++ b/pkg/mcs/resourcemanager/server/manager_test.go @@ -44,10 +44,29 @@ func (*mockConfigProvider) GetControllerConfig() *ControllerConfig { return &Con func (*mockConfigProvider) GetMeteringWriter() *metering.Writer { return nil } +func (*mockConfigProvider) GetResourceGroupWriteRole() ResourceGroupWriteRole { + return ResourceGroupWriteRoleLegacyAll +} + func (*mockConfigProvider) AddStartCallback(...func()) {} func (*mockConfigProvider) AddServiceReadyCallback(...func(context.Context) error) {} +type mockRoleConfigProvider struct { + bs.Server + role ResourceGroupWriteRole +} + +func (*mockRoleConfigProvider) GetControllerConfig() *ControllerConfig { return &ControllerConfig{} } + +func (*mockRoleConfigProvider) GetMeteringWriter() *metering.Writer { return nil } + +func (m *mockRoleConfigProvider) GetResourceGroupWriteRole() ResourceGroupWriteRole { return m.role } + +func (*mockRoleConfigProvider) AddStartCallback(...func()) {} + +func (*mockRoleConfigProvider) AddServiceReadyCallback(...func(context.Context) error) {} + func prepareManager() *Manager { storage := storage.NewStorageWithMemoryBackend() m := NewManager[*mockConfigProvider](&mockConfigProvider{}) @@ -316,14 +335,14 @@ func TestKeyspaceServiceLimit(t *testing.T) { re.Equal(0.0, limiter.ServiceLimit) re.Equal(0.0, limiter.AvailableTokens) // Test set the service limit of the keyspace. - m.SetKeyspaceServiceLimit(1, 100.0) + re.NoError(m.SetKeyspaceServiceLimit(1, 100.0)) limiter = m.GetKeyspaceServiceLimiter(1) re.Equal(100.0, limiter.ServiceLimit) re.Equal(0.0, limiter.AvailableTokens) // When setting from 0 to positive, available tokens remain 0 // Test set the service limit of the non-existing keyspace. limiter = m.GetKeyspaceServiceLimiter(2) re.Nil(limiter) - m.SetKeyspaceServiceLimit(2, 100.0) + re.NoError(m.SetKeyspaceServiceLimit(2, 100.0)) limiter = m.GetKeyspaceServiceLimiter(2) re.Equal(100.0, limiter.ServiceLimit) re.Equal(0.0, limiter.AvailableTokens) @@ -393,7 +412,7 @@ func TestResourceGroupPersistence(t *testing.T) { err := m.AddResourceGroup(group) re.NoError(err) keyspaceID := ExtractKeyspaceID(group.KeyspaceId) - m.SetKeyspaceServiceLimit(keyspaceID, 100.0) + re.NoError(m.SetKeyspaceServiceLimit(keyspaceID, 100.0)) // Use the same storage to rebuild a manager. storage := m.storage @@ -423,3 +442,77 @@ func TestResourceGroupPersistence(t *testing.T) { limiter = m.GetKeyspaceServiceLimiter(2) re.Nil(limiter) } + +func TestManagerMetadataWriteRoleGates(t *testing.T) { + re := require.New(t) + m := NewManager[*mockRoleConfigProvider](&mockRoleConfigProvider{ + role: ResourceGroupWriteRoleRMTokenOnly, + }) + m.storage = storage.NewStorageWithMemoryBackend() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(m.Init(ctx)) + re.Equal(ResourceGroupWriteRoleRMTokenOnly, m.GetWriteRole()) + + group := &rmpb.ResourceGroup{ + Name: "test_group", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}}, + }, + } + re.ErrorIs(m.AddResourceGroup(group), errMetadataWriteDisabled) + re.ErrorIs(m.ModifyResourceGroup(group), errMetadataWriteDisabled) + re.ErrorIs(m.DeleteResourceGroup(constant.NullKeyspaceID, group.Name), errMetadataWriteDisabled) + re.ErrorIs(m.UpdateControllerConfigItem("request-unit.read-base-cost", 1.0), errMetadataWriteDisabled) + re.ErrorIs(m.SetKeyspaceServiceLimit(constant.NullKeyspaceID, 1.0), errMetadataWriteDisabled) +} + +func TestKeyspaceResourceGroupManagerWriteRoleGates(t *testing.T) { + re := require.New(t) + memStorage := storage.NewStorageWithMemoryBackend() + group := &rmpb.ResourceGroup{ + Name: "test_group", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 100}}, + }, + } + + tokenOnlyKRGM := newKeyspaceResourceGroupManager(1, memStorage, ResourceGroupWriteRoleRMTokenOnly) + re.NoError(tokenOnlyKRGM.addResourceGroup(group)) + + var tokenOnlySettingsCount int + re.NoError(memStorage.LoadResourceGroupSettings(func(keyspaceID uint32, name, _ string) { + if keyspaceID == tokenOnlyKRGM.keyspaceID && name == group.GetName() { + tokenOnlySettingsCount++ + } + })) + var tokenOnlyStatesCount int + re.NoError(memStorage.LoadResourceGroupStates(func(keyspaceID uint32, name, _ string) { + if keyspaceID == tokenOnlyKRGM.keyspaceID && name == group.GetName() { + tokenOnlyStatesCount++ + } + })) + re.Equal(0, tokenOnlySettingsCount) + re.Equal(1, tokenOnlyStatesCount) + + metaOnlyKRGM := newKeyspaceResourceGroupManager(2, memStorage, ResourceGroupWriteRolePDMetaOnly) + re.NoError(metaOnlyKRGM.addResourceGroup(group)) + metaOnlyKRGM.persistResourceGroupRunningState() + + var metaOnlySettingsCount int + re.NoError(memStorage.LoadResourceGroupSettings(func(keyspaceID uint32, name, _ string) { + if keyspaceID == metaOnlyKRGM.keyspaceID && name == group.GetName() { + metaOnlySettingsCount++ + } + })) + var metaOnlyStatesCount int + re.NoError(memStorage.LoadResourceGroupStates(func(keyspaceID uint32, name, _ string) { + if keyspaceID == metaOnlyKRGM.keyspaceID && name == group.GetName() { + metaOnlyStatesCount++ + } + })) + re.Equal(1, metaOnlySettingsCount) + re.Equal(0, metaOnlyStatesCount) +} diff --git a/pkg/mcs/resourcemanager/server/server.go b/pkg/mcs/resourcemanager/server/server.go index 60805ab2c94..d5d7ae1d320 100644 --- a/pkg/mcs/resourcemanager/server/server.go +++ b/pkg/mcs/resourcemanager/server/server.go @@ -302,6 +302,11 @@ func (s *Server) GetControllerConfig() *ControllerConfig { return &s.cfg.Controller } +// GetResourceGroupWriteRole returns the manager write role for the resource manager service. +func (*Server) GetResourceGroupWriteRole() ResourceGroupWriteRole { + return ResourceGroupWriteRoleLegacyAll +} + // IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. func (s *Server) IsServing() bool { return !s.IsClosed() && s.participant.IsServing() diff --git a/pkg/mcs/resourcemanager/server/service_limit.go b/pkg/mcs/resourcemanager/server/service_limit.go index f1d5294c9f0..0b981bc823f 100644 --- a/pkg/mcs/resourcemanager/server/service_limit.go +++ b/pkg/mcs/resourcemanager/server/service_limit.go @@ -44,9 +44,20 @@ type serviceLimiter struct { keyspaceID uint32 // storage is used to persist the service limit. storage endpoint.ResourceGroupStorage + // writeRole controls whether metadata writes can be persisted. + writeRole ResourceGroupWriteRole } -func newServiceLimiter(keyspaceID uint32, serviceLimit float64, storage endpoint.ResourceGroupStorage) *serviceLimiter { +func newServiceLimiter( + keyspaceID uint32, + serviceLimit float64, + storage endpoint.ResourceGroupStorage, + writeRoles ...ResourceGroupWriteRole, +) *serviceLimiter { + writeRole := ResourceGroupWriteRoleLegacyAll + if len(writeRoles) > 0 { + writeRole = writeRoles[0] + } // The service limit should be non-negative. serviceLimit = math.Max(0, serviceLimit) return &serviceLimiter{ @@ -54,10 +65,19 @@ func newServiceLimiter(keyspaceID uint32, serviceLimit float64, storage endpoint LastUpdate: time.Now(), keyspaceID: keyspaceID, storage: storage, + writeRole: writeRole, } } func (krl *serviceLimiter) setServiceLimit(newServiceLimit float64) { + krl.setServiceLimitInternal(newServiceLimit, true) +} + +func (krl *serviceLimiter) setServiceLimitNoPersist(newServiceLimit float64) { + krl.setServiceLimitInternal(newServiceLimit, false) +} + +func (krl *serviceLimiter) setServiceLimitInternal(newServiceLimit float64, persist bool) { // The service limit should be non-negative. newServiceLimit = math.Max(0, newServiceLimit) krl.Lock() @@ -80,8 +100,8 @@ func (krl *serviceLimiter) setServiceLimit(newServiceLimit float64) { krl.refillTokensLocked(now) } - // Persist the service limit to storage - if krl.storage != nil { + // Persist the service limit to storage. + if persist && krl.writeRole.AllowsMetadataWrite() && krl.storage != nil { if err := krl.storage.SaveServiceLimit(krl.keyspaceID, newServiceLimit); err != nil { log.Error("failed to persist service limit", zap.Uint32("keyspace-id", krl.keyspaceID), diff --git a/pkg/mcs/resourcemanager/server/write_role.go b/pkg/mcs/resourcemanager/server/write_role.go new file mode 100644 index 00000000000..3b84cfb0c0a --- /dev/null +++ b/pkg/mcs/resourcemanager/server/write_role.go @@ -0,0 +1,46 @@ +// Copyright 2026 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import "github.com/pingcap/errors" + +// ResourceGroupWriteRole controls which type of data is allowed to be persisted. +type ResourceGroupWriteRole uint8 + +const ( + // ResourceGroupWriteRoleLegacyAll keeps the legacy behavior and allows both metadata and state writes. + ResourceGroupWriteRoleLegacyAll ResourceGroupWriteRole = iota + // ResourceGroupWriteRolePDMetaOnly allows metadata writes only. + ResourceGroupWriteRolePDMetaOnly + // ResourceGroupWriteRoleRMTokenOnly allows state writes only. + ResourceGroupWriteRoleRMTokenOnly +) + +// AllowsMetadataWrite returns whether metadata writes are allowed in this role. +func (r ResourceGroupWriteRole) AllowsMetadataWrite() bool { + return r == ResourceGroupWriteRoleLegacyAll || r == ResourceGroupWriteRolePDMetaOnly +} + +// AllowsStateWrite returns whether running state writes are allowed in this role. +func (r ResourceGroupWriteRole) AllowsStateWrite() bool { + return r == ResourceGroupWriteRoleLegacyAll || r == ResourceGroupWriteRoleRMTokenOnly +} + +var errMetadataWriteDisabled = errors.New("metadata write is disabled") + +// IsMetadataWriteDisabledError reports whether err is a metadata-write-disabled error. +func IsMetadataWriteDisabledError(err error) bool { + return errors.ErrorEqual(err, errMetadataWriteDisabled) +}