Skip to content

Commit a050a52

Browse files
authored
Merge pull request volcano-sh#257 from k82cn/ka_256
Added error handling for Bind & Evict.
2 parents 02b0906 + 7d1db66 commit a050a52

File tree

13 files changed

+220
-99
lines changed

13 files changed

+220
-99
lines changed

pkg/controller/queuejob/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ func createQueueJobSchedulingSpec(qj *arbv1.QueueJob) *arbv1.SchedulingSpec {
8282
func createQueueJobPod(qj *arbv1.QueueJob, ix int32) *corev1.Pod {
8383
templateCopy := qj.Spec.Template.DeepCopy()
8484

85-
podName := fmt.Sprintf("%s-%d-%s", qj.Name, ix, generateUUID())
85+
prefix := fmt.Sprintf("%s-", qj.Name)
8686

8787
return &corev1.Pod{
8888
ObjectMeta: metav1.ObjectMeta{
89-
Name: podName,
90-
Namespace: qj.Namespace,
89+
GenerateName: prefix,
90+
Namespace: qj.Namespace,
9191
OwnerReferences: []metav1.OwnerReference{
9292
*metav1.NewControllerRef(qj, queueJobKind),
9393
},

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
6868
}
6969
tasks := pendingTasks[job.UID]
7070

71-
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v:%v/%v>",
72-
tasks.Len(), job.UID, job.Namespace, job.Name)
71+
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
72+
tasks.Len(), job.Namespace, job.Name)
7373

7474
for !tasks.Empty() {
7575
task := tasks.Pop().(*api.TaskInfo)
@@ -83,19 +83,19 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
8383
nodes = ssn.Nodes
8484
}
8585

86-
glog.V(3).Infof("There are <%d> nodes for Job <%v:%v/%v>",
87-
len(nodes), job.UID, job.Namespace, job.Name)
86+
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
87+
len(nodes), job.Namespace, job.Name)
8888

8989
for _, node := range nodes {
9090
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
91-
task.Job, task.UID, node.Name, task.Resreq, node.Idle)
91+
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
9292
// Allocate idle resource to the task.
9393
if task.Resreq.LessEqual(node.Idle) {
9494
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
95-
task.Job, task.UID, node.Name)
95+
task.Namespace, task.Name, node.Name)
9696
if err := ssn.Allocate(task, node.Name); err != nil {
9797
glog.Errorf("Failed to bind Task %v on %v in Session %v",
98-
task.UID, node.Name, ssn.ID)
98+
task.UID, node.Name, ssn.UID)
9999
continue
100100
}
101101
assigned = true
@@ -104,11 +104,11 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
104104

105105
// Allocate releasing resource to the task if any.
106106
if task.Resreq.LessEqual(node.Releasing) {
107-
glog.V(3).Infof("Pipelining Task <%v:%v/%v> to node <%v> for <%v> on <%v>",
108-
task.UID, task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
107+
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
108+
task.Namespace, task.Name, node.Name, task.Resreq, node.Releasing)
109109
if err := ssn.Pipeline(task, node.Name); err != nil {
110110
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
111-
task.UID, node.Name, ssn.ID)
111+
task.UID, node.Name, ssn.UID)
112112
continue
113113
}
114114

pkg/scheduler/actions/decorate/decorate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ func (alloc *decorateAction) Execute(ssn *framework.Session) {
4949

5050
for _, job := range jobs {
5151
job.Candidates = fetchMatchNodeForPodSet(job, nodes)
52-
glog.V(3).Infof("Got %d candidate nodes for Job %v:%v/%v",
53-
len(job.Candidates), job.UID, job.Namespace, job.Name)
52+
glog.V(3).Infof("Got %d candidate nodes for Job %v/%v",
53+
len(job.Candidates), job.Namespace, job.Name)
5454
}
5555
}
5656

pkg/scheduler/actions/preempt/preempt.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
9797
break
9898
}
9999

100-
glog.V(3).Infof("The preemptor is %v:%v/%v, the preemptee is %v:%v/%v",
101-
preemptorJob.UID, preemptorJob.Namespace, preemptorJob.Name,
102-
preempteeJob.UID, preempteeJob.Namespace, preempteeJob.Name)
100+
glog.V(3).Infof("The preemptor is %v/%v, the preemptee is %v/%v",
101+
preemptorJob.Namespace, preemptorJob.Name,
102+
preempteeJob.Namespace, preempteeJob.Name)
103103

104104
preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)
105105
preemptee := preempteeTasks[preempteeJob.UID].Pop().(*api.TaskInfo)
@@ -108,14 +108,16 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
108108

109109
if ssn.Preemptable(preemptor, preemptee) {
110110
if err := ssn.Preempt(preemptor, preemptee); err != nil {
111-
glog.Errorf("Failed to evict task %v for task %v: %v", nil, nil, err)
111+
glog.Errorf("Failed to evict task %v/%v for task %v/%v: %v",
112+
preemptee.Namespace, preemptee.Name,
113+
preemptor.Namespace, preemptor.Name, err)
112114
} else {
113115
preempted = true
114116
}
115117
} else {
116-
glog.V(3).Infof("Can not preempt task <%v:%v/%v> for task <%v:%v/%v>",
117-
preemptee.UID, preemptee.Namespace, preemptee.Name,
118-
preemptor.UID, preemptor.Namespace, preemptor.Name)
118+
glog.V(3).Infof("Can not preempt task <%v/%v> for task <%v/%v>",
119+
preemptee.Namespace, preemptee.Name,
120+
preemptor.Namespace, preemptor.Name)
119121
}
120122

121123
// If preempted resource, put it back to the queue.

pkg/scheduler/api/helpers.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,30 @@ func AllocatedStatus(status TaskStatus) bool {
6868
return false
6969
}
7070
}
71+
72+
func MergeErrors(errs ...error) error {
73+
msg := "errors: "
74+
75+
foundErr := false
76+
i := 1
77+
78+
for _, e := range errs {
79+
if e != nil {
80+
if foundErr {
81+
msg = fmt.Sprintf("%s, %d: ", msg, i)
82+
} else {
83+
msg = fmt.Sprintf("%s %d: ", msg, i)
84+
}
85+
86+
msg = fmt.Sprintf("%s%v", msg, e)
87+
foundErr = true
88+
i++
89+
}
90+
}
91+
92+
if foundErr {
93+
return fmt.Errorf("%s", msg)
94+
}
95+
96+
return nil
97+
}

pkg/scheduler/api/job_info.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -206,28 +206,32 @@ func (ps *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error {
206206
return nil
207207
}
208208

209-
func (ps *JobInfo) deleteTaskIndex(pi *TaskInfo) {
210-
if ts, found := ps.TaskStatusIndex[pi.Status]; found {
211-
delete(ts, pi.UID)
209+
func (ps *JobInfo) deleteTaskIndex(ti *TaskInfo) {
210+
if tasks, found := ps.TaskStatusIndex[ti.Status]; found {
211+
delete(tasks, ti.UID)
212212

213-
if len(ts) == 0 {
214-
delete(ps.TaskStatusIndex, pi.Status)
213+
if len(tasks) == 0 {
214+
delete(ps.TaskStatusIndex, ti.Status)
215215
}
216216
}
217217
}
218218

219-
func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) {
219+
func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) error {
220220
if task, found := ps.Tasks[pi.UID]; found {
221221
ps.TotalRequest.Sub(task.Resreq)
222222

223223
if AllocatedStatus(task.Status) {
224224
ps.Allocated.Sub(task.Resreq)
225225
}
226226

227-
delete(ps.Tasks, pi.UID)
227+
delete(ps.Tasks, task.UID)
228+
229+
ps.deleteTaskIndex(task)
230+
return nil
228231
}
229232

230-
ps.deleteTaskIndex(pi)
233+
return fmt.Errorf("failed to find task <%v/%v> in job <%v/%v>",
234+
pi.Namespace, pi.Name, ps.Namespace, ps.Name)
231235
}
232236

233237
func (ps *JobInfo) Clone() *JobInfo {

pkg/scheduler/api/node_info.go

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ limitations under the License.
1717
package api
1818

1919
import (
20-
"github.com/golang/glog"
20+
"fmt"
2121

2222
"k8s.io/api/core/v1"
2323
)
@@ -110,50 +110,56 @@ func (ni *NodeInfo) SetNode(node *v1.Node) {
110110
ni.Capability = NewResource(node.Status.Capacity)
111111
}
112112

113-
func (ni *NodeInfo) PipelineTask(task *TaskInfo) {
113+
func (ni *NodeInfo) PipelineTask(task *TaskInfo) error {
114114
key := PodKey(task.Pod)
115115
if _, found := ni.Tasks[key]; found {
116-
glog.Errorf("Task <%v/%v> already on node <%v>, should not add again.",
116+
return fmt.Errorf("task <%v/%v> already on node <%v>",
117117
task.Namespace, task.Name, ni.Name)
118-
return
119118
}
120119

120+
ti := task.Clone()
121+
121122
if ni.Node != nil {
122-
ni.Releasing.Sub(task.Resreq)
123-
ni.Used.Add(task.Resreq)
123+
ni.Releasing.Sub(ti.Resreq)
124+
ni.Used.Add(ti.Resreq)
124125
}
125126

126-
ni.Tasks[key] = task
127+
ni.Tasks[key] = ti
128+
129+
return nil
127130
}
128131

129-
func (ni *NodeInfo) AddTask(task *TaskInfo) {
132+
func (ni *NodeInfo) AddTask(task *TaskInfo) error {
130133
key := PodKey(task.Pod)
131134
if _, found := ni.Tasks[key]; found {
132-
glog.Errorf("Task <%v/%v> already on node <%v>, should not add again.",
135+
return fmt.Errorf("task <%v/%v> already on node <%v>",
133136
task.Namespace, task.Name, ni.Name)
134-
return
135137
}
136138

139+
// Node will hold a copy of task to make sure the status
140+
// change will not impact resource in node.
141+
ti := task.Clone()
142+
137143
if ni.Node != nil {
138-
if task.Status == Releasing {
139-
ni.Releasing.Add(task.Resreq)
144+
if ti.Status == Releasing {
145+
ni.Releasing.Add(ti.Resreq)
140146
}
141-
ni.Idle.Sub(task.Resreq)
142-
ni.Used.Add(task.Resreq)
147+
ni.Idle.Sub(ti.Resreq)
148+
ni.Used.Add(ti.Resreq)
143149
}
144150

145-
glog.V(3).Infof("After added Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
146-
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)
151+
ni.Tasks[key] = ti
147152

148-
ni.Tasks[key] = task
153+
return nil
149154
}
150155

151-
func (ni *NodeInfo) RemoveTask(ti *TaskInfo) {
156+
func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error {
152157
key := PodKey(ti.Pod)
153158

154159
task, found := ni.Tasks[key]
155160
if !found {
156-
return
161+
return fmt.Errorf("failed to find task <%v/%v> on host <%v>",
162+
ti.Namespace, ti.Name, ni.Name)
157163
}
158164

159165
if ni.Node != nil {
@@ -165,8 +171,15 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) {
165171
ni.Used.Sub(task.Resreq)
166172
}
167173

168-
glog.V(3).Infof("After removed Task <%v> from Node <%v>: idle <%v>, used <%v>, releasing <%v>",
169-
key, ni.Name, ni.Idle, ni.Used, ni.Releasing)
170-
171174
delete(ni.Tasks, key)
175+
176+
return nil
177+
}
178+
179+
func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {
180+
if err := ni.RemoveTask(ti); err != nil {
181+
return err
182+
}
183+
184+
return ni.AddTask(ti)
172185
}

0 commit comments

Comments
 (0)