Skip to content

Commit cfa263a

Browse files
mcs/scheduling: register the scheduler handler in API service mode (#7082)
ref #5839 To ensure that the HTTP API handler could be initialized properly for each scheduler in the API service mode, this PR updates the scheduler controller and PD server handler to support initialize the HTTP handler only. Signed-off-by: JmPotato <ghzpotato@gmail.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent b8d9c6e commit cfa263a

File tree

11 files changed

+263
-83
lines changed

11 files changed

+263
-83
lines changed

pkg/mcs/scheduling/server/config/config.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"os"
2020
"path/filepath"
2121
"strings"
22-
"sync"
2322
"sync/atomic"
2423
"time"
2524
"unsafe"
@@ -204,8 +203,6 @@ type PersistConfig struct {
204203
schedule atomic.Value
205204
replication atomic.Value
206205
storeConfig atomic.Value
207-
// Store the respective configurations for different schedulers.
208-
schedulerConfig sync.Map
209206
}
210207

211208
// NewPersistConfig creates a new PersistConfig instance.
@@ -275,24 +272,6 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig {
275272
return o.storeConfig.Load().(*sc.StoreConfig)
276273
}
277274

278-
// SetSchedulerConfig sets the scheduler configuration with the given name.
279-
func (o *PersistConfig) SetSchedulerConfig(name, data string) {
280-
o.schedulerConfig.Store(name, data)
281-
}
282-
283-
// RemoveSchedulerConfig removes the scheduler configuration with the given name.
284-
func (o *PersistConfig) RemoveSchedulerConfig(name string) {
285-
o.schedulerConfig.Delete(name)
286-
}
287-
288-
// GetSchedulerConfig returns the scheduler configuration with the given name.
289-
func (o *PersistConfig) GetSchedulerConfig(name string) string {
290-
if v, ok := o.schedulerConfig.Load(name); ok {
291-
return v.(string)
292-
}
293-
return ""
294-
}
295-
296275
// GetMaxReplicas returns the max replicas.
297276
func (o *PersistConfig) GetMaxReplicas() int {
298277
return int(o.GetReplicationConfig().MaxReplicas)

pkg/mcs/scheduling/server/config/watcher.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/coreos/go-semver/semver"
2424
"github.com/pingcap/log"
2525
sc "github.com/tikv/pd/pkg/schedule/config"
26+
"github.com/tikv/pd/pkg/storage"
2627
"github.com/tikv/pd/pkg/storage/endpoint"
2728
"github.com/tikv/pd/pkg/utils/etcdutil"
2829
"go.etcd.io/etcd/clientv3"
@@ -50,6 +51,9 @@ type Watcher struct {
5051
schedulerConfigWatcher *etcdutil.LoopWatcher
5152

5253
*PersistConfig
54+
// Some data, like the scheduler configs, should be loaded into the storage
55+
// to make sure the coordinator could access them correctly.
56+
storage storage.Storage
5357
}
5458

5559
type persistedConfig struct {
@@ -65,6 +69,7 @@ func NewWatcher(
6569
etcdClient *clientv3.Client,
6670
clusterID uint64,
6771
persistConfig *PersistConfig,
72+
storage storage.Storage,
6873
) (*Watcher, error) {
6974
ctx, cancel := context.WithCancel(ctx)
7075
cw := &Watcher{
@@ -74,6 +79,7 @@ func NewWatcher(
7479
schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID),
7580
etcdClient: etcdClient,
7681
PersistConfig: persistConfig,
82+
storage: storage,
7783
}
7884
err := cw.initializeConfigWatcher()
7985
if err != nil {
@@ -120,15 +126,15 @@ func (cw *Watcher) initializeConfigWatcher() error {
120126
func (cw *Watcher) initializeSchedulerConfigWatcher() error {
121127
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
122128
putFn := func(kv *mvccpb.KeyValue) error {
123-
cw.SetSchedulerConfig(
129+
return cw.storage.SaveScheduleConfig(
124130
strings.TrimPrefix(string(kv.Key), prefixToTrim),
125-
string(kv.Value),
131+
kv.Value,
126132
)
127-
return nil
128133
}
129134
deleteFn := func(kv *mvccpb.KeyValue) error {
130-
cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim))
131-
return nil
135+
return cw.storage.RemoveScheduleConfig(
136+
strings.TrimPrefix(string(kv.Key), prefixToTrim),
137+
)
132138
}
133139
postEventFn := func() error {
134140
return nil

pkg/mcs/scheduling/server/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -432,11 +432,11 @@ func (s *Server) startServer() (err error) {
432432

433433
func (s *Server) startCluster(context.Context) error {
434434
s.basicCluster = core.NewBasicCluster()
435+
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
435436
err := s.startWatcher()
436437
if err != nil {
437438
return err
438439
}
439-
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
440440
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
441441
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
442442
if err != nil {
@@ -458,7 +458,7 @@ func (s *Server) startWatcher() (err error) {
458458
if err != nil {
459459
return err
460460
}
461-
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig)
461+
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.persistConfig, s.storage)
462462
if err != nil {
463463
return err
464464
}

pkg/schedule/coordinator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
444444
if err = c.schedulers.AddScheduler(s); err != nil {
445445
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
446446
}
447+
} else if err = c.schedulers.AddSchedulerHandler(s); err != nil {
448+
log.Error("can not add scheduler handler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
447449
}
448450
}
449451

@@ -472,6 +474,8 @@ func (c *Coordinator) InitSchedulers(needRun bool) {
472474
scheduleCfg.Schedulers[k] = schedulerCfg
473475
k++
474476
}
477+
} else if err = c.schedulers.AddSchedulerHandler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
478+
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
475479
}
476480
}
477481

@@ -507,6 +511,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
507511
return
508512
}
509513
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
514+
// TODO: handle the plugin in API service mode.
510515
if err = c.schedulers.AddScheduler(s); err != nil {
511516
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
512517
return

pkg/schedule/schedulers/scheduler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,10 @@ func ConfigSliceDecoder(name string, args []string) ConfigDecoder {
9191
// CreateSchedulerFunc is for creating scheduler.
9292
type CreateSchedulerFunc func(opController *operator.Controller, storage endpoint.ConfigStorage, dec ConfigDecoder, removeSchedulerCb ...func(string) error) (Scheduler, error)
9393

94-
var schedulerMap = make(map[string]CreateSchedulerFunc)
95-
var schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder)
94+
var (
95+
schedulerMap = make(map[string]CreateSchedulerFunc)
96+
schedulerArgsToDecoder = make(map[string]ConfigSliceDecoderBuilder)
97+
)
9698

9799
// RegisterScheduler binds a scheduler creator. It should be called in init()
98100
// func of a package.

pkg/schedule/schedulers/scheduler_controller.go

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,28 @@ var denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues
4040
// Controller is used to manage all schedulers.
4141
type Controller struct {
4242
sync.RWMutex
43-
wg sync.WaitGroup
44-
ctx context.Context
45-
cluster sche.SchedulerCluster
46-
storage endpoint.ConfigStorage
47-
schedulers map[string]*ScheduleController
48-
opController *operator.Controller
43+
wg sync.WaitGroup
44+
ctx context.Context
45+
cluster sche.SchedulerCluster
46+
storage endpoint.ConfigStorage
47+
// schedulers is used to manage all schedulers, which will only be initialized
48+
// and used in the PD leader service mode now.
49+
schedulers map[string]*ScheduleController
50+
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
51+
// which will only be initialized and used in the API service mode now.
52+
schedulerHandlers map[string]http.Handler
53+
opController *operator.Controller
4954
}
5055

5156
// NewController creates a scheduler controller.
5257
func NewController(ctx context.Context, cluster sche.SchedulerCluster, storage endpoint.ConfigStorage, opController *operator.Controller) *Controller {
5358
return &Controller{
54-
ctx: ctx,
55-
cluster: cluster,
56-
storage: storage,
57-
schedulers: make(map[string]*ScheduleController),
58-
opController: opController,
59+
ctx: ctx,
60+
cluster: cluster,
61+
storage: storage,
62+
schedulers: make(map[string]*ScheduleController),
63+
schedulerHandlers: make(map[string]http.Handler),
64+
opController: opController,
5965
}
6066
}
6167

@@ -86,6 +92,9 @@ func (c *Controller) GetSchedulerNames() []string {
8692
func (c *Controller) GetSchedulerHandlers() map[string]http.Handler {
8793
c.RLock()
8894
defer c.RUnlock()
95+
if len(c.schedulerHandlers) > 0 {
96+
return c.schedulerHandlers
97+
}
8998
handlers := make(map[string]http.Handler, len(c.schedulers))
9099
for name, scheduler := range c.schedulers {
91100
handlers[name] = scheduler.Scheduler
@@ -117,6 +126,50 @@ func (c *Controller) ResetSchedulerMetrics() {
117126
schedulerStatusGauge.Reset()
118127
}
119128

129+
// AddSchedulerHandler adds the HTTP handler for a scheduler.
130+
func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) error {
131+
c.Lock()
132+
defer c.Unlock()
133+
134+
name := scheduler.GetName()
135+
if _, ok := c.schedulerHandlers[name]; ok {
136+
return errs.ErrSchedulerExisted.FastGenByArgs()
137+
}
138+
139+
c.schedulerHandlers[name] = scheduler
140+
c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args)
141+
return nil
142+
}
143+
144+
// RemoveSchedulerHandler removes the HTTP handler for a scheduler.
145+
func (c *Controller) RemoveSchedulerHandler(name string) error {
146+
c.Lock()
147+
defer c.Unlock()
148+
if c.cluster == nil {
149+
return errs.ErrNotBootstrapped.FastGenByArgs()
150+
}
151+
s, ok := c.schedulerHandlers[name]
152+
if !ok {
153+
return errs.ErrSchedulerNotFound.FastGenByArgs()
154+
}
155+
156+
conf := c.cluster.GetSchedulerConfig()
157+
conf.RemoveSchedulerCfg(s.(Scheduler).GetType())
158+
if err := conf.Persist(c.storage); err != nil {
159+
log.Error("the option can not persist scheduler config", errs.ZapError(err))
160+
return err
161+
}
162+
163+
if err := c.storage.RemoveScheduleConfig(name); err != nil {
164+
log.Error("can not remove the scheduler config", errs.ZapError(err))
165+
return err
166+
}
167+
168+
delete(c.schedulerHandlers, name)
169+
170+
return nil
171+
}
172+
120173
// AddScheduler adds a scheduler.
121174
func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
122175
c.Lock()
@@ -249,8 +302,9 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) {
249302
if c.cluster == nil {
250303
return false, errs.ErrNotBootstrapped.FastGenByArgs()
251304
}
252-
_, ok := c.schedulers[name]
253-
if !ok {
305+
_, existScheduler := c.schedulers[name]
306+
_, existHandler := c.schedulerHandlers[name]
307+
if !existScheduler && !existHandler {
254308
return false, errs.ErrSchedulerNotFound.FastGenByArgs()
255309
}
256310
return true, nil

server/cluster/cluster.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,16 @@ func (c *RaftCluster) GetSchedulerHandlers() map[string]http.Handler {
795795
return c.coordinator.GetSchedulersController().GetSchedulerHandlers()
796796
}
797797

798+
// AddSchedulerHandler adds a scheduler handler.
799+
func (c *RaftCluster) AddSchedulerHandler(scheduler schedulers.Scheduler, args ...string) error {
800+
return c.coordinator.GetSchedulersController().AddSchedulerHandler(scheduler, args...)
801+
}
802+
803+
// RemoveSchedulerHandler removes a scheduler handler.
804+
func (c *RaftCluster) RemoveSchedulerHandler(name string) error {
805+
return c.coordinator.GetSchedulersController().RemoveSchedulerHandler(name)
806+
}
807+
798808
// AddScheduler adds a scheduler.
799809
func (c *RaftCluster) AddScheduler(scheduler schedulers.Scheduler, args ...string) error {
800810
return c.coordinator.GetSchedulersController().AddScheduler(scheduler, args...)

server/handler.go

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -236,19 +236,24 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
236236
return err
237237
}
238238
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
239-
if !h.s.IsAPIServiceMode() {
239+
if h.s.IsAPIServiceMode() {
240+
if err = c.AddSchedulerHandler(s, args...); err != nil {
241+
log.Error("can not add scheduler handler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
242+
return err
243+
}
244+
log.Info("add scheduler handler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
245+
} else {
240246
if err = c.AddScheduler(s, args...); err != nil {
241247
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
242248
return err
243249
}
244-
} else {
245-
c.GetSchedulerConfig().AddSchedulerCfg(s.GetType(), args)
250+
log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
246251
}
247252
if err = h.opt.Persist(c.GetStorage()); err != nil {
248253
log.Error("can not persist scheduler config", errs.ZapError(err))
249254
return err
250255
}
251-
log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
256+
log.Info("persist scheduler config successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
252257
return nil
253258
}
254259

@@ -258,24 +263,18 @@ func (h *Handler) RemoveScheduler(name string) error {
258263
if err != nil {
259264
return err
260265
}
261-
if !h.s.IsAPIServiceMode() {
266+
if h.s.IsAPIServiceMode() {
267+
if err = c.RemoveSchedulerHandler(name); err != nil {
268+
log.Error("can not remove scheduler handler", zap.String("scheduler-name", name), errs.ZapError(err))
269+
} else {
270+
log.Info("remove scheduler handler successfully", zap.String("scheduler-name", name))
271+
}
272+
} else {
262273
if err = c.RemoveScheduler(name); err != nil {
263274
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
264275
} else {
265276
log.Info("remove scheduler successfully", zap.String("scheduler-name", name))
266277
}
267-
} else {
268-
conf := c.GetSchedulerConfig()
269-
c.GetSchedulerConfig().RemoveSchedulerCfg(schedulers.FindSchedulerTypeByName(name))
270-
if err := conf.Persist(c.GetStorage()); err != nil {
271-
log.Error("the option can not persist scheduler config", errs.ZapError(err))
272-
return err
273-
}
274-
275-
if err := c.GetStorage().RemoveScheduleConfig(name); err != nil {
276-
log.Error("can not remove the scheduler config", errs.ZapError(err))
277-
return err
278-
}
279278
}
280279
return err
281280
}

0 commit comments

Comments
 (0)