Skip to content

Commit 506cae1

Browse files
scheduler: gc balance-range jobs (#9300)
close #9301 Signed-off-by: 童剑 <1045931706@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
1 parent e64b4c7 commit 506cae1

File tree

2 files changed

+209
-80
lines changed

2 files changed

+209
-80
lines changed

pkg/schedule/schedulers/balance_range.go

Lines changed: 122 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/unrolled/render"
3030
"go.uber.org/zap"
3131

32+
"github.com/pingcap/errors"
3233
"github.com/pingcap/kvproto/pkg/metapb"
3334
"github.com/pingcap/log"
3435

@@ -46,7 +47,10 @@ import (
4647
"github.com/tikv/pd/pkg/utils/syncutil"
4748
)
4849

49-
var defaultJobTimeout = time.Hour
50+
var (
51+
defaultJobTimeout = time.Hour
52+
reserveDuration = 7 * 24 * time.Hour
53+
)
5054

5155
type balanceRangeSchedulerHandler struct {
5256
rd *render.Render
@@ -71,7 +75,9 @@ func (handler *balanceRangeSchedulerHandler) updateConfig(w http.ResponseWriter,
7175
}
7276

7377
func (handler *balanceRangeSchedulerHandler) listConfig(w http.ResponseWriter, _ *http.Request) {
74-
conf := handler.config.clone()
78+
handler.config.Lock()
79+
defer handler.config.Unlock()
80+
conf := handler.config.cloneLocked()
7581
if err := handler.rd.JSON(w, http.StatusOK, conf); err != nil {
7682
log.Error("failed to marshal balance key range scheduler config", errs.ZapError(err))
7783
}
@@ -164,131 +170,132 @@ type balanceRangeSchedulerConfig struct {
164170
jobs []*balanceRangeSchedulerJob
165171
}
166172

167-
// MarshalJSON marshals to json.
168-
func (conf *balanceRangeSchedulerConfig) MarshalJSON() ([]byte, error) {
169-
return json.Marshal(conf.jobs)
170-
}
171-
172-
// UnmarshalJSON unmarshals from json.
173-
func (conf *balanceRangeSchedulerConfig) UnmarshalJSON(data []byte) error {
174-
jobs := make([]*balanceRangeSchedulerJob, 0)
175-
if err := json.Unmarshal(data, &jobs); err != nil {
176-
return err
173+
func (conf *balanceRangeSchedulerConfig) addJob(job *balanceRangeSchedulerJob) error {
174+
conf.Lock()
175+
defer conf.Unlock()
176+
job.Status = pending
177+
if len(conf.jobs) == 0 {
178+
job.JobID = 1
179+
} else {
180+
job.JobID = conf.jobs[len(conf.jobs)-1].JobID + 1
177181
}
178-
conf.jobs = jobs
179-
return nil
180-
}
181-
182-
type balanceRangeSchedulerJob struct {
183-
JobID uint64 `json:"job-id"`
184-
Rule core.Rule `json:"rule"`
185-
Engine string `json:"engine"`
186-
Timeout time.Duration `json:"timeout"`
187-
Ranges []keyutil.KeyRange `json:"ranges"`
188-
Alias string `json:"alias"`
189-
Start *time.Time `json:"start,omitempty"`
190-
Finish *time.Time `json:"finish,omitempty"`
191-
Create time.Time `json:"create"`
192-
Status JobStatus `json:"status"`
182+
return conf.persistLocked(func() {
183+
conf.jobs = append(conf.jobs, job)
184+
})
193185
}
194186

195187
func (conf *balanceRangeSchedulerConfig) deleteJob(jobID uint64) error {
196188
conf.Lock()
197189
defer conf.Unlock()
198190
for _, job := range conf.jobs {
199191
if job.JobID == jobID {
200-
status := job.Status
201-
if job.Status != pending && job.Status != running {
192+
if job.isComplete() {
202193
return errs.ErrInvalidArgument.FastGenByArgs(fmt.Sprintf(
203194
"The job:%d has been completed and cannot be cancelled.", jobID))
204195
}
205-
job.Status = cancelled
206-
start := job.Start
207-
now := time.Now()
208-
if job.Start == nil {
209-
job.Start = &now
210-
}
211-
job.Finish = &now
212-
if err := conf.save(); err != nil {
213-
job.Status = status
214-
job.Start = start
215-
job.Finish = nil
216-
return err
217-
}
218-
return nil
196+
return conf.persistLocked(func() {
197+
job.Status = cancelled
198+
now := time.Now()
199+
if job.Start == nil {
200+
job.Start = &now
201+
}
202+
job.Finish = &now
203+
})
219204
}
220205
}
221206
return errs.ErrScheduleConfigNotExist.FastGenByArgs(jobID)
222207
}
223208

224-
func (conf *balanceRangeSchedulerConfig) addJob(job *balanceRangeSchedulerJob) error {
209+
func (conf *balanceRangeSchedulerConfig) gc() error {
210+
needGC := false
211+
gcIdx := 0
225212
conf.Lock()
226213
defer conf.Unlock()
227-
job.Status = pending
228-
if len(conf.jobs) == 0 {
229-
job.JobID = 1
230-
} else {
231-
job.JobID = conf.jobs[len(conf.jobs)-1].JobID + 1
214+
for idx, job := range conf.jobs {
215+
if job.isComplete() && job.expired(reserveDuration) {
216+
needGC = true
217+
gcIdx = idx
218+
} else {
219+
// The jobs are sorted by the started time and executed by it.
220+
// So it can end util the first element doesn't satisfy the condition.
221+
break
222+
}
232223
}
233-
conf.jobs = append(conf.jobs, job)
224+
if !needGC {
225+
return nil
226+
}
227+
return conf.persistLocked(func() {
228+
conf.jobs = conf.jobs[gcIdx+1:]
229+
})
230+
}
231+
232+
func (conf *balanceRangeSchedulerConfig) persistLocked(updateFn func()) error {
233+
originJobs := conf.cloneLocked()
234+
updateFn()
234235
if err := conf.save(); err != nil {
235-
conf.jobs = conf.jobs[:len(conf.jobs)-1]
236+
conf.jobs = originJobs
237+
return err
238+
}
239+
return nil
240+
}
241+
242+
// MarshalJSON marshals to json.
243+
func (conf *balanceRangeSchedulerConfig) MarshalJSON() ([]byte, error) {
244+
return json.Marshal(conf.jobs)
245+
}
246+
247+
// UnmarshalJSON unmarshals from json.
248+
func (conf *balanceRangeSchedulerConfig) UnmarshalJSON(data []byte) error {
249+
jobs := make([]*balanceRangeSchedulerJob, 0)
250+
if err := json.Unmarshal(data, &jobs); err != nil {
236251
return err
237252
}
253+
conf.jobs = jobs
238254
return nil
239255
}
240256

241-
func (conf *balanceRangeSchedulerConfig) begin(index int) *balanceRangeSchedulerJob {
257+
func (conf *balanceRangeSchedulerConfig) begin(index int) error {
242258
conf.Lock()
243259
defer conf.Unlock()
244260
job := conf.jobs[index]
245261
if job.Status != pending {
246-
return nil
247-
}
248-
now := time.Now()
249-
job.Start = &now
250-
job.Status = running
251-
if err := conf.save(); err != nil {
252-
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
253-
job.Status = pending
254-
job.Start = nil
262+
return errors.New("the job is not pending")
255263
}
256-
return job
264+
return conf.persistLocked(func() {
265+
now := time.Now()
266+
job.Start = &now
267+
job.Status = running
268+
})
257269
}
258270

259-
func (conf *balanceRangeSchedulerConfig) finish(index int) *balanceRangeSchedulerJob {
271+
func (conf *balanceRangeSchedulerConfig) finish(index int) error {
260272
conf.Lock()
261273
defer conf.Unlock()
274+
262275
job := conf.jobs[index]
263276
if job.Status != running {
264-
return nil
265-
}
266-
now := time.Now()
267-
job.Finish = &now
268-
job.Status = finished
269-
if err := conf.save(); err != nil {
270-
log.Warn("failed to persist config", zap.Error(err), zap.Uint64("job-id", job.JobID))
271-
job.Status = running
272-
job.Finish = nil
277+
return errors.New("the job is not running")
273278
}
274-
return job
279+
return conf.persistLocked(func() {
280+
now := time.Now()
281+
job.Finish = &now
282+
job.Status = finished
283+
})
275284
}
276285

277286
func (conf *balanceRangeSchedulerConfig) peek() (int, *balanceRangeSchedulerJob) {
278287
conf.RLock()
279288
defer conf.RUnlock()
280289
for index, job := range conf.jobs {
281-
if job.Status == finished {
290+
if job.isComplete() {
282291
continue
283292
}
284293
return index, job
285294
}
286295
return 0, nil
287296
}
288297

289-
func (conf *balanceRangeSchedulerConfig) clone() []*balanceRangeSchedulerJob {
290-
conf.RLock()
291-
defer conf.RUnlock()
298+
func (conf *balanceRangeSchedulerConfig) cloneLocked() []*balanceRangeSchedulerJob {
292299
jobs := make([]*balanceRangeSchedulerJob, 0, len(conf.jobs))
293300
for _, job := range conf.jobs {
294301
ranges := make([]keyutil.KeyRange, len(job.Ranges))
@@ -310,6 +317,34 @@ func (conf *balanceRangeSchedulerConfig) clone() []*balanceRangeSchedulerJob {
310317
return jobs
311318
}
312319

320+
type balanceRangeSchedulerJob struct {
321+
JobID uint64 `json:"job-id"`
322+
Rule core.Rule `json:"rule"`
323+
Engine string `json:"engine"`
324+
Timeout time.Duration `json:"timeout"`
325+
Ranges []keyutil.KeyRange `json:"ranges"`
326+
Alias string `json:"alias"`
327+
Start *time.Time `json:"start,omitempty"`
328+
Finish *time.Time `json:"finish,omitempty"`
329+
Create time.Time `json:"create"`
330+
Status JobStatus `json:"status"`
331+
}
332+
333+
func (job *balanceRangeSchedulerJob) expired(dur time.Duration) bool {
334+
if job == nil {
335+
return true
336+
}
337+
if job.Finish == nil {
338+
return false
339+
}
340+
now := time.Now()
341+
return now.Sub(*job.Finish) > dur
342+
}
343+
344+
func (job *balanceRangeSchedulerJob) isComplete() bool {
345+
return job.Status == finished || job.Status == cancelled
346+
}
347+
313348
// EncodeConfig serializes the config.
314349
func (s *balanceRangeScheduler) EncodeConfig() ([]byte, error) {
315350
s.conf.RLock()
@@ -349,18 +384,25 @@ func (s *balanceRangeScheduler) IsScheduleAllowed(cluster sche.SchedulerCluster)
349384
if !allowed {
350385
operator.IncOperatorLimitCounter(s.GetType(), operator.OpRange)
351386
}
387+
if err := s.conf.gc(); err != nil {
388+
log.Error("balance range jobs gc failed", errs.ZapError(err))
389+
return false
390+
}
352391
index, job := s.conf.peek()
353392
if job != nil {
354393
if job.Status == pending {
355-
job = s.conf.begin(index)
394+
if err := s.conf.begin(index); err != nil {
395+
return false
396+
}
356397
}
357398
// todo: add other conditions such as the diff of the score between the source and target store.
358399
if time.Since(*job.Start) > job.Timeout {
359-
s.conf.finish(index)
400+
if err := s.conf.finish(index); err != nil {
401+
return false
402+
}
360403
balanceRangeExpiredCounter.Inc()
361404
}
362405
}
363-
364406
return allowed
365407
}
366408

0 commit comments

Comments
 (0)