Skip to content

Commit acb6410

Browse files
authored
Merge pull request volcano-sh#259 from k82cn/k8s_256_2
Fixed Bind & Evict error handling issue.
2 parents a050a52 + b2d1485 commit acb6410

8 files changed

Lines changed: 189 additions & 65 deletions

File tree

pkg/scheduler/api/job_info.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ func NewJobInfo(uid JobID) *JobInfo {
139139
}
140140
}
141141

142+
func (ps *JobInfo) UnsetSchedulingSpec() {
143+
ps.SchedSpec = nil
144+
}
145+
142146
func (ps *JobInfo) SetSchedulingSpec(spec *arbv1.SchedulingSpec) {
143147
ps.Name = spec.Name
144148
ps.Namespace = spec.Namespace

pkg/scheduler/api/node_info.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -71,23 +71,13 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {
7171
}
7272

7373
func (ni *NodeInfo) Clone() *NodeInfo {
74-
pods := make(map[TaskID]*TaskInfo, len(ni.Tasks))
74+
res := NewNodeInfo(ni.Node)
7575

7676
for _, p := range ni.Tasks {
77-
pods[PodKey(p.Pod)] = p.Clone()
77+
res.AddTask(p)
7878
}
7979

80-
return &NodeInfo{
81-
Name: ni.Name,
82-
Node: ni.Node,
83-
Idle: ni.Idle.Clone(),
84-
Used: ni.Used.Clone(),
85-
Releasing: ni.Releasing.Clone(),
86-
Allocatable: ni.Allocatable.Clone(),
87-
Capability: ni.Capability.Clone(),
88-
89-
Tasks: pods,
90-
}
80+
return res
9181
}
9282

9383
func (ni *NodeInfo) SetNode(node *v1.Node) {
@@ -183,3 +173,17 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {
183173

184174
return ni.AddTask(ti)
185175
}
176+
177+
func (ni NodeInfo) String() string {
178+
res := ""
179+
180+
i := 0
181+
for _, task := range ni.Tasks {
182+
res = res + fmt.Sprintf("\n\t %d: %v", i, task)
183+
i++
184+
}
185+
186+
return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>%s",
187+
ni.Name, ni.Idle, ni.Used, ni.Releasing, res)
188+
189+
}

pkg/scheduler/cache/cache.go

Lines changed: 93 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strings"
2222
"sync"
23+
"time"
2324

2425
"github.com/golang/glog"
2526

@@ -60,7 +61,8 @@ type SchedulerCache struct {
6061
Jobs map[arbapi.JobID]*arbapi.JobInfo
6162
Nodes map[string]*arbapi.NodeInfo
6263

63-
errTasks *cache.FIFO
64+
errTasks *cache.FIFO
65+
deletedJobs *cache.FIFO
6466
}
6567

6668
type defaultBinder struct {
@@ -98,11 +100,40 @@ func (de *defaultEvictor) Evict(p *v1.Pod) error {
98100
return nil
99101
}
100102

103+
func taskKey(obj interface{}) (string, error) {
104+
if obj == nil {
105+
return "", fmt.Errorf("the object is nil")
106+
}
107+
108+
task, ok := obj.(*arbapi.TaskInfo)
109+
110+
if !ok {
111+
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
112+
}
113+
114+
return string(task.UID), nil
115+
}
116+
117+
func jobKey(obj interface{}) (string, error) {
118+
if obj == nil {
119+
return "", fmt.Errorf("the object is nil")
120+
}
121+
122+
job, ok := obj.(*arbapi.JobInfo)
123+
124+
if !ok {
125+
return "", fmt.Errorf("failed to convert %v to TaskInfo", obj)
126+
}
127+
128+
return string(job.UID), nil
129+
}
130+
101131
func newSchedulerCache(config *rest.Config, schedulerName string) *SchedulerCache {
102132
sc := &SchedulerCache{
103-
Jobs: make(map[arbapi.JobID]*arbapi.JobInfo),
104-
Nodes: make(map[string]*arbapi.NodeInfo),
105-
errTasks: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
133+
Jobs: make(map[arbapi.JobID]*arbapi.JobInfo),
134+
Nodes: make(map[string]*arbapi.NodeInfo),
135+
errTasks: cache.NewFIFO(taskKey),
136+
deletedJobs: cache.NewFIFO(jobKey),
106137
}
107138

108139
sc.kubeclient = kubernetes.NewForConfigOrDie(config)
@@ -200,6 +231,9 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
200231

201232
// Re-sync error tasks.
202233
go sc.resync()
234+
235+
// Cleanup jobs.
236+
go sc.cleanupJobs()
203237
}
204238

205239
func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
@@ -255,7 +289,7 @@ func (sc *SchedulerCache) Evict(taskInfo *arbapi.TaskInfo) error {
255289
go func() {
256290
err := sc.Evictor.Evict(p)
257291
if err != nil {
258-
sc.enqueue(task)
292+
sc.resyncTask(task)
259293
}
260294
}()
261295

@@ -294,35 +328,81 @@ func (sc *SchedulerCache) Bind(taskInfo *arbapi.TaskInfo, hostname string) error
294328

295329
go func() {
296330
if err := sc.Binder.Bind(p, hostname); err != nil {
297-
sc.enqueue(task)
331+
sc.resyncTask(task)
298332
}
299333
}()
300334

301335
return nil
302336
}
303337

304-
func (sc *SchedulerCache) enqueue(task *arbapi.TaskInfo) {
305-
sc.errTasks.AddIfNotPresent(task.Pod)
338+
func (sc *SchedulerCache) deleteJob(job *arbapi.JobInfo) {
339+
time.AfterFunc(5*time.Second, func() {
340+
sc.deletedJobs.AddIfNotPresent(job)
341+
})
342+
}
343+
344+
func (sc *SchedulerCache) cleanupJob(job *arbapi.JobInfo) error {
345+
sc.Mutex.Lock()
346+
defer sc.Mutex.Unlock()
347+
348+
if job.SchedSpec == nil && len(job.Tasks) == 0 {
349+
delete(sc.Jobs, job.UID)
350+
return nil
351+
}
352+
353+
return fmt.Errorf("Job is not ready to clean up")
354+
}
355+
356+
func (sc *SchedulerCache) processCleanupJob() error {
357+
_, err := sc.deletedJobs.Pop(func(obj interface{}) error {
358+
job, ok := obj.(*arbapi.JobInfo)
359+
if !ok {
360+
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
361+
}
362+
363+
if err := sc.cleanupJob(job); err != nil {
364+
glog.Errorf("Failed to delete job <%v/%v>, retry ...", job.Namespace, job.Name)
365+
366+
sc.deleteJob(job)
367+
return err
368+
}
369+
return nil
370+
})
371+
372+
return err
373+
}
374+
375+
func (sc *SchedulerCache) cleanupJobs() {
376+
for {
377+
err := sc.processCleanupJob()
378+
if err != nil {
379+
glog.Errorf("Failed to process job clean up: %v", err)
380+
}
381+
}
382+
}
383+
384+
func (sc *SchedulerCache) resyncTask(task *arbapi.TaskInfo) {
385+
sc.errTasks.AddIfNotPresent(task)
306386
}
307387

308388
func (sc *SchedulerCache) resync() {
309389
for {
310-
err := sc.processWorkItem()
390+
err := sc.processResyncTask()
311391
if err != nil {
312392
glog.Errorf("Failed to process resync: %v", err)
313393
}
314394
}
315395
}
316396

317-
func (sc *SchedulerCache) processWorkItem() error {
397+
func (sc *SchedulerCache) processResyncTask() error {
318398
_, err := sc.errTasks.Pop(func(obj interface{}) error {
319-
pod, ok := obj.(*v1.Pod)
399+
task, ok := obj.(*arbapi.TaskInfo)
320400
if !ok {
321401
return fmt.Errorf("failed to convert %v to *v1.Pod", obj)
322402
}
323403

324-
if err := sc.syncPod(pod); err != nil {
325-
glog.Errorf("Failed to sync pod <%v/%v>", pod.Namespace, pod.Name)
404+
if err := sc.syncTask(task); err != nil {
405+
glog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name)
326406
return err
327407
}
328408
return nil

pkg/scheduler/cache/event_handlers.go

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,7 @@ func isTerminated(status arbapi.TaskStatus) bool {
3636
return status == arbapi.Succeeded || status == arbapi.Failed
3737
}
3838

39-
// Assumes that lock is already acquired.
40-
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
41-
pi := arbapi.NewTaskInfo(pod)
42-
39+
func (sc *SchedulerCache) addTask(pi *arbapi.TaskInfo) error {
4340
if len(pi.Job) != 0 {
4441
if _, found := sc.Jobs[pi.Job]; !found {
4542
sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job)
@@ -50,9 +47,6 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
5047
// client-go issue, we need to dig deeper for that.
5148
sc.Jobs[pi.Job].DeleteTaskInfo(pi)
5249
sc.Jobs[pi.Job].AddTaskInfo(pi)
53-
} else {
54-
glog.Warningf("The controller of pod %v/%v is empty, can not schedule it.",
55-
pod.Namespace, pod.Name)
5650
}
5751

5852
if len(pi.NodeName) != 0 {
@@ -71,22 +65,39 @@ func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
7165
return nil
7266
}
7367

74-
func (sc *SchedulerCache) syncPod(oldPod *v1.Pod) error {
68+
// Assumes that lock is already acquired.
69+
func (sc *SchedulerCache) addPod(pod *v1.Pod) error {
70+
pi := arbapi.NewTaskInfo(pod)
71+
72+
return sc.addTask(pi)
73+
}
74+
75+
func (sc *SchedulerCache) syncTask(oldTask *arbapi.TaskInfo) error {
7576
sc.Mutex.Lock()
7677
defer sc.Mutex.Unlock()
7778

78-
newPod, err := sc.kubeclient.CoreV1().Pods(oldPod.Namespace).Get(oldPod.Name, metav1.GetOptions{})
79+
newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(oldTask.Name, metav1.GetOptions{})
7980
if err != nil {
8081
if errors.IsNotFound(err) {
81-
sc.deletePod(oldPod)
82-
glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldPod.Namespace, oldPod.Name)
82+
sc.deleteTask(oldTask)
83+
glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)
8384

8485
return nil
8586
}
86-
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldPod.Namespace, oldPod.Name, err)
87+
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
88+
}
89+
90+
newTask := arbapi.NewTaskInfo(newPod)
91+
92+
return sc.updateTask(oldTask, newTask)
93+
}
94+
95+
func (sc *SchedulerCache) updateTask(oldTask, newTask *arbapi.TaskInfo) error {
96+
if err := sc.deleteTask(oldTask); err != nil {
97+
return err
8798
}
8899

89-
return sc.updatePod(oldPod, newPod)
100+
return sc.addTask(newTask)
90101
}
91102

92103
// Assumes that lock is already acquired.
@@ -97,17 +108,15 @@ func (sc *SchedulerCache) updatePod(oldPod, newPod *v1.Pod) error {
97108
return sc.addPod(newPod)
98109
}
99110

100-
// Assumes that lock is already acquired.
101-
func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
102-
pi := arbapi.NewTaskInfo(pod)
103-
111+
func (sc *SchedulerCache) deleteTask(pi *arbapi.TaskInfo) error {
104112
var jobErr, nodeErr error
105113

106114
if len(pi.Job) != 0 {
107115
if job, found := sc.Jobs[pi.Job]; found {
108116
jobErr = job.DeleteTaskInfo(pi)
109117
} else {
110-
jobErr = fmt.Errorf("failed to find Job for Task %v/%v", pi.Namespace, pi.Name)
118+
jobErr = fmt.Errorf("failed to find Job <%v> for Task %v/%v",
119+
pi.Job, pi.Namespace, pi.Name)
111120
}
112121
}
113122

@@ -125,6 +134,12 @@ func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
125134
return nil
126135
}
127136

137+
// Assumes that lock is already acquired.
138+
func (sc *SchedulerCache) deletePod(pod *v1.Pod) error {
139+
pi := arbapi.NewTaskInfo(pod)
140+
return sc.deleteTask(pi)
141+
}
142+
128143
func (sc *SchedulerCache) AddPod(obj interface{}) {
129144
pod, ok := obj.(*v1.Pod)
130145
if !ok {
@@ -140,6 +155,8 @@ func (sc *SchedulerCache) AddPod(obj interface{}) {
140155
glog.Errorf("Failed to add pod <%s/%s> into cache: %v",
141156
pod.Namespace, pod.Name, err)
142157
return
158+
} else {
159+
glog.V(3).Infof("Added pod <%s/%v> into cache.", pod.Namespace, pod.Name)
143160
}
144161
return
145162
}
@@ -164,6 +181,9 @@ func (sc *SchedulerCache) UpdatePod(oldObj, newObj interface{}) {
164181
glog.Errorf("Failed to update pod %v in cache: %v", oldPod.Name, err)
165182
return
166183
}
184+
185+
glog.V(3).Infof("Updated pod <%s/%v> in cache.", oldPod.Namespace, oldPod.Name)
186+
167187
return
168188
}
169189

@@ -192,6 +212,8 @@ func (sc *SchedulerCache) DeletePod(obj interface{}) {
192212
glog.Errorf("Failed to delete pod %v from cache: %v", pod.Name, err)
193213
return
194214
}
215+
216+
glog.V(3).Infof("Deleted pod <%s/%v> from cache.", pod.Namespace, pod.Name)
195217
return
196218
}
197219

@@ -326,20 +348,11 @@ func (sc *SchedulerCache) deleteSchedulingSpec(ss *arbv1.SchedulingSpec) error {
326348
return fmt.Errorf("can not found job %v:%v/%v", jobID, ss.Namespace, ss.Name)
327349
}
328350

329-
// Removed tasks from nodes.
330-
for _, task := range job.Tasks {
331-
if len(task.NodeName) != 0 {
332-
if node, found := sc.Nodes[task.NodeName]; found {
333-
node.RemoveTask(task)
334-
} else {
335-
glog.V(3).Infof("Failed to find node <%v> for task %v:%v/%v",
336-
task.NodeName, task.UID, task.Namespace, task.Name)
337-
}
338-
}
339-
}
351+
// Unset SchedulingSpec
352+
job.UnsetSchedulingSpec()
340353

341-
// Deleted Job from cache.
342-
delete(sc.Jobs, jobID)
354+
// TODO (k82cn): find another way to clean up Job.
355+
sc.deleteJob(job)
343356

344357
return nil
345358
}

0 commit comments

Comments
 (0)