Skip to content

Commit 1670bbd

Browse files
Merge pull request #256 from lminzhw/errtask
support err task resync
2 parents 0c44e1a + 573d5a5 commit 1670bbd

4 files changed

Lines changed: 114 additions & 0 deletions

File tree

pkg/controllers/job/job_controller.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package job
1818

1919
import (
2020
"fmt"
21+
"sync"
2122

2223
"github.com/golang/glog"
2324

@@ -100,6 +101,9 @@ type Controller struct {
100101
//Job Event recorder
101102
recorder record.EventRecorder
102103
priorityClasses map[string]*v1beta1.PriorityClass
104+
105+
sync.Mutex
106+
errTasks workqueue.RateLimitingInterface
103107
}
104108

105109
// NewJobController create new Job Controller
@@ -122,6 +126,7 @@ func NewJobController(
122126
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
123127
commandQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
124128
cache: jobcache.New(),
129+
errTasks: newRateLimitingQueue(),
125130
recorder: recorder,
126131
priorityClasses: make(map[string]*v1beta1.PriorityClass),
127132
}
@@ -204,6 +209,9 @@ func (cc *Controller) Run(stopCh <-chan struct{}) {
204209

205210
go cc.cache.Run(stopCh)
206211

212+
// Re-sync error tasks.
213+
go wait.Until(cc.processResyncTask, 0, stopCh)
214+
207215
glog.Infof("JobController is running ...... ")
208216
}
209217

pkg/controllers/job/job_controller_actions.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
7272
}
7373
// record the err, and then collect the pod info like retained pod
7474
errs = append(errs, err)
75+
cc.resyncTask(pod)
7576
}
7677

7778
switch pod.Status.Phase {
@@ -271,6 +272,11 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
271272
pod.Name, job.Name, err)
272273
creationErrs = append(creationErrs, err)
273274
} else {
275+
if err != nil && apierrors.IsAlreadyExists(err) {
276+
cc.resyncTask(pod)
277+
}
278+
279+
// TODO: maybe not pending status, maybe unknown.
274280
pending++
275281
glog.V(3).Infof("Created Task <%s> of Job <%s/%s>",
276282
pod.Name, job.Namespace, job.Name)
@@ -298,6 +304,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
298304
glog.Errorf("Failed to delete pod %s for Job %s, err %#v",
299305
pod.Name, job.Name, err)
300306
deletionErrs = append(deletionErrs, err)
307+
cc.resyncTask(pod)
301308
} else {
302309
glog.V(3).Infof("Deleted Task <%s> of Job <%s/%s>",
303310
pod.Name, job.Namespace, job.Name)
@@ -484,6 +491,9 @@ func (cc *Controller) deleteJobPod(jobName string, pod *v1.Pod) error {
484491
}
485492

486493
func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {
494+
cc.Mutex.Lock()
495+
defer cc.Mutex.Unlock()
496+
487497
// sort task by priorityClasses
488498
var tasksPriority TasksPriority
489499
for index := range job.Spec.Tasks {

pkg/controllers/job/job_controller_handler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,9 @@ func (cc *Controller) addPriorityClass(obj interface{}) {
394394
return
395395
}
396396

397+
cc.Mutex.Lock()
398+
defer cc.Mutex.Unlock()
399+
397400
cc.priorityClasses[pc.Name] = pc
398401
return
399402
}
@@ -404,6 +407,9 @@ func (cc *Controller) deletePriorityClass(obj interface{}) {
404407
return
405408
}
406409

410+
cc.Mutex.Lock()
411+
defer cc.Mutex.Unlock()
412+
407413
delete(cc.priorityClasses, pc.Name)
408414
return
409415
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
Copyright 2019 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package job
18+
19+
import (
20+
"fmt"
21+
"time"
22+
23+
"golang.org/x/time/rate"
24+
25+
"github.com/golang/glog"
26+
27+
"k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/api/errors"
29+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/client-go/util/workqueue"
31+
)
32+
33+
func newRateLimitingQueue() workqueue.RateLimitingInterface {
34+
return workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
35+
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 180*time.Second),
36+
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
37+
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
38+
))
39+
}
40+
41+
func (cc *Controller) processResyncTask() {
42+
obj, shutdown := cc.errTasks.Get()
43+
if shutdown {
44+
return
45+
}
46+
47+
// one task only resync 10 times
48+
if cc.errTasks.NumRequeues(obj) > 10 {
49+
cc.errTasks.Forget(obj)
50+
return
51+
}
52+
53+
defer cc.errTasks.Done(obj)
54+
55+
task, ok := obj.(*v1.Pod)
56+
if !ok {
57+
glog.Errorf("failed to convert %v to *v1.Pod", obj)
58+
return
59+
}
60+
61+
if err := cc.syncTask(task); err != nil {
62+
glog.Errorf("Failed to sync pod <%v/%v>, retry it, err %v", task.Namespace, task.Name, err)
63+
cc.resyncTask(task)
64+
}
65+
}
66+
67+
func (cc *Controller) syncTask(oldTask *v1.Pod) error {
68+
cc.Mutex.Lock()
69+
defer cc.Mutex.Unlock()
70+
71+
newPod, err := cc.kubeClients.CoreV1().Pods(oldTask.Namespace).Get(oldTask.Name, metav1.GetOptions{})
72+
if err != nil {
73+
if errors.IsNotFound(err) {
74+
if err := cc.cache.DeletePod(oldTask); err != nil {
75+
glog.Errorf("failed to delete cache pod <%v/%v>, err %v.", oldTask.Namespace, oldTask.Name, err)
76+
return err
77+
}
78+
glog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name)
79+
80+
return nil
81+
}
82+
return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err)
83+
}
84+
85+
return cc.cache.UpdatePod(newPod)
86+
}
87+
88+
func (cc *Controller) resyncTask(task *v1.Pod) {
89+
cc.errTasks.AddRateLimited(task)
90+
}

0 commit comments

Comments
 (0)