Skip to content

Commit 672cb8c

Browse files
committed
Do not create jobs until pg inqueue
1 parent c4dbaf3 commit 672cb8c

1 file changed

Lines changed: 46 additions & 6 deletions

File tree

pkg/controllers/job/job_controller_actions.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
142142
return nil
143143
}
144144

145-
func (cc *Controller) createJob(job *batch.Job) (*batch.Job, error) {
145+
func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) {
146+
klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
147+
defer klog.V(3).Infof("Finished Job <%s/%s> initiate", job.Namespace, job.Name)
148+
149+
klog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
146150
job, err := cc.initJobStatus(job)
147151
if err != nil {
148152
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
@@ -185,9 +189,46 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
185189
return nil
186190
}
187191

188-
var err error
189-
if job, err = cc.createJob(job); err != nil {
190-
return err
192+
// Skip job initiation if job is already accepted
193+
if job.Status.State.Phase == "" {
194+
var err error
195+
if job, err = cc.initiateJob(job); err != nil {
196+
return err
197+
}
198+
}
199+
200+
var syncTask bool
201+
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil {
202+
if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
203+
syncTask = true
204+
}
205+
}
206+
207+
if !syncTask {
208+
if updateStatus != nil {
209+
if updateStatus(&job.Status) {
210+
job.Status.State.LastTransitionTime = metav1.Now()
211+
}
212+
}
213+
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
214+
if err != nil {
215+
klog.Errorf("Failed to update status of Job %v/%v: %v",
216+
job.Namespace, job.Name, err)
217+
return err
218+
}
219+
if e := cc.cache.Update(newJob); e != nil {
220+
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
221+
newJob.Namespace, newJob.Name, e)
222+
return e
223+
}
224+
return nil
225+
}
226+
227+
// Skip job task sync if it is pending
228+
if job.Status.State.Phase == batch.Pending {
229+
klog.Infof("Job <%s/%s> is pending, skip pod sync.",
230+
job.Namespace, job.Name)
231+
return nil
191232
}
192233

193234
var running, pending, terminating, succeeded, failed, unknown int32
@@ -297,7 +338,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
297338
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
298339
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
299340
}
300-
301341
job.Status = batch.JobStatus{
302342
State: job.Status.State,
303343

@@ -422,7 +462,7 @@ func (cc *Controller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.P
422462

423463
func (cc *Controller) createPodGroupIfNotExist(job *batch.Job) error {
424464
// If PodGroup does not exist, create one for Job.
425-
if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
465+
if pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil {
426466
if !apierrors.IsNotFound(err) {
427467
klog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v",
428468
job.Namespace, job.Name, err)

0 commit comments

Comments
 (0)