Skip to content

Commit 38e5068

Browse files
wangyuqing (C)wangyuqing4
authored andcommitted
fix state convert
1 parent 40e85f4 commit 38e5068

File tree

10 files changed

+91
-79
lines changed

10 files changed

+91
-79
lines changed

pkg/admission/admit_job.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,10 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5454
msg = validateJob(job, &reviewResponse)
5555
break
5656
case v1beta1.Update:
57-
oldJob, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
57+
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
5858
if err != nil {
5959
return ToAdmissionResponse(err)
6060
}
61-
msg = specDeepEqual(job, oldJob, &reviewResponse)
6261
break
6362
default:
6463
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
@@ -82,6 +81,11 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
8281
return fmt.Sprintf("'minAvailable' cannot be less than zero.")
8382
}
8483

84+
if job.Spec.MaxRetry < 0 {
85+
reviewResponse.Allowed = false
86+
return fmt.Sprintf("'maxRetry' cannot be less than zero.")
87+
}
88+
8589
if len(job.Spec.Tasks) == 0 {
8690
reviewResponse.Allowed = false
8791
return fmt.Sprintf("No task specified in job spec")

pkg/controllers/job/job_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func NewJobController(config *rest.Config) *Controller {
128128
cc.jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
129129
AddFunc: cc.addJob,
130130
// TODO: enable this until we find an appropriate way.
131-
// UpdateFunc: cc.updateJob,
131+
UpdateFunc: cc.updateJob,
132132
DeleteFunc: cc.deleteJob,
133133
})
134134
cc.jobLister = cc.jobInformer.Lister()

pkg/controllers/job/job_controller_actions.go

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
4141
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)
4242

4343
job := jobInfo.Job
44-
// Job version is bumped only when job is killed
45-
job.Status.Version = job.Status.Version + 1
4644
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
4745
if job.DeletionTimestamp != nil {
4846
glog.Infof("Job <%s/%s> is terminating, skip management process.",
@@ -88,6 +86,10 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
8886
return fmt.Errorf("failed to kill %d pods of %d", len(errs), total)
8987
}
9088

89+
job = job.DeepCopy()
90+
//Job version is bumped only when job is killed
91+
job.Status.Version = job.Status.Version + 1
92+
9193
job.Status = vkv1.JobStatus{
9294
State: job.Status.State,
9395

@@ -112,6 +114,8 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
112114
return err
113115
} else {
114116
if e := cc.cache.Update(job); e != nil {
117+
glog.Errorf("KillJob - Failed to update Job %v/%v in cache: %v",
118+
job.Namespace, job.Name, e)
115119
return e
116120
}
117121
}
@@ -138,21 +142,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
138142
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
139143
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)
140144

141-
job := jobInfo.Job
145+
job := jobInfo.Job.DeepCopy()
142146
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
143147

144-
newJob, err := cc.needUpdateForVolumeClaim(job)
145-
if err != nil {
148+
if update, err := cc.filljob(job); err != nil || update {
146149
return err
147150
}
148-
if newJob != nil {
149-
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
150-
glog.Errorf("Failed to update Job %v/%v: %v",
151-
job.Namespace, job.Name, err)
152-
return err
153-
}
154-
return nil
155-
}
156151

157152
if err := cc.pluginOnJobAdd(job); err != nil {
158153
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
@@ -168,14 +163,26 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
168163
return err
169164
}
170165

166+
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
167+
glog.Errorf("Failed to update status of Job %v/%v: %v",
168+
job.Namespace, job.Name, err)
169+
return err
170+
} else {
171+
if e := cc.cache.Update(job); e != nil {
172+
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
173+
job.Namespace, job.Name, e)
174+
return e
175+
}
176+
}
177+
171178
return nil
172179
}
173180

174181
func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
175182
glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
176183
defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name)
177184

178-
job := jobInfo.Job
185+
job := jobInfo.Job.DeepCopy()
179186
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
180187

181188
if job.DeletionTimestamp != nil {
@@ -313,6 +320,8 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
313320
return err
314321
} else {
315322
if e := cc.cache.Update(job); e != nil {
323+
glog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
324+
job.Namespace, job.Name, e)
316325
return e
317326
}
318327
}
@@ -356,10 +365,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
356365
return nil
357366
}
358367

359-
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
368+
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
360369
// If VolumeClaimName does not exist, generate them for Job.
361370
var newJob *vkv1.Job
362371
volumes := job.Spec.Volumes
372+
update := false
363373
for index, volume := range volumes {
364374
vcName := volume.VolumeClaimName
365375
if len(vcName) == 0 {
@@ -368,7 +378,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
368378
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
369379
exist, err := cc.checkPVCExist(job, vcName)
370380
if err != nil {
371-
return nil, err
381+
return false, nil, err
372382
}
373383
if exist {
374384
continue
@@ -377,11 +387,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
377387
newJob = job.DeepCopy()
378388
}
379389
newJob.Spec.Volumes[index].VolumeClaimName = vcName
390+
update = true
380391
break
381392
}
382393
}
383394
}
384-
return newJob, nil
395+
return update, newJob, nil
385396
}
386397

387398
func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
@@ -494,3 +505,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {
494505

495506
return minAvailableTasksRes.Convert2K8sResource()
496507
}
508+
509+
func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
510+
update, newJob, err := cc.needUpdateForVolumeClaim(job)
511+
if err != nil {
512+
return false, err
513+
}
514+
if update {
515+
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
516+
glog.Errorf("Failed to update Job %v/%v: %v",
517+
job.Namespace, job.Name, err)
518+
return false, err
519+
}
520+
return true, nil
521+
} else if job.Status.State.Phase == "" {
522+
job.Status.State.Phase = vkv1.Pending
523+
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
524+
glog.Errorf("Failed to update status of Job %v/%v: %v",
525+
job.Namespace, job.Name, err)
526+
} else {
527+
if e := cc.cache.Update(j); e != nil {
528+
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
529+
}
530+
}
531+
return true, nil
532+
}
533+
534+
return false, nil
535+
}

pkg/controllers/job/job_controller_handler.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
8181
return
8282
}
8383

84-
if err := cc.cache.Update(newJob); err != nil {
85-
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
86-
newJob.Namespace, newJob.Name, err)
87-
}
88-
8984
// NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
9085
// For Job status, it's used internally and always been updated via our controller.
91-
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
86+
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) && newJob.Status.State.Phase == oldJob.Status.State.Phase {
9287
glog.Infof("Job update event is ignored since no update in 'Spec'.")
9388
return
9489
}
9590

91+
if err := cc.cache.Update(newJob); err != nil {
92+
glog.Errorf("UpdateJob - Failed to update job <%s/%s>: %v in cache",
93+
newJob.Namespace, newJob.Name, err)
94+
}
95+
9696
req := apis.Request{
9797
Namespace: newJob.Namespace,
9898
JobName: newJob.Name,

pkg/controllers/job/state/aborted.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type abortedState struct {
2828
func (as *abortedState) Execute(action vkv1.Action) error {
2929
switch action {
3030
case vkv1.ResumeJobAction:
31-
return SyncJob(as.job, func(status *vkv1.JobStatus) {
31+
return KillJob(as.job, func(status *vkv1.JobStatus) {
3232
status.State.Phase = vkv1.Restarting
3333
status.RetryCount++
3434
})

pkg/controllers/job/state/aborting.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
2929
switch action {
3030
case vkv1.ResumeJobAction:
3131
// Already in Restarting phase, just sync it
32-
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
32+
return KillJob(ps.job, func(status *vkv1.JobStatus) {
3333
status.State.Phase = vkv1.Restarting
3434
status.RetryCount++
3535
})

pkg/controllers/job/state/factory.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State {
4747
return &runningState{job: jobInfo}
4848
case vkv1.Restarting:
4949
return &restartingState{job: jobInfo}
50-
case vkv1.Terminated, vkv1.Completed:
50+
case vkv1.Terminated, vkv1.Completed, vkv1.Failed:
5151
return &finishedState{job: jobInfo}
5252
case vkv1.Terminating:
5353
return &terminatingState{job: jobInfo}
@@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State {
5757
return &abortedState{job: jobInfo}
5858
case vkv1.Completing:
5959
return &completingState{job: jobInfo}
60-
case vkv1.Failed:
61-
return &failedState{job: jobInfo}
6260
case vkv1.Inqueue:
6361
return &inqueueState{job: jobInfo}
6462
}

pkg/controllers/job/state/failed.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

pkg/controllers/job/state/restarting.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type restartingState struct {
2626
}
2727

2828
func (ps *restartingState) Execute(action vkv1.Action) error {
29-
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
29+
return KillJob(ps.job, func(status *vkv1.JobStatus) {
3030
phase := vkv1.Restarting
3131

3232
// Get the maximum number of retries.
@@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error {
3939
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
4040
phase = vkv1.Failed
4141
} else {
42-
if status.Terminating == 0 {
43-
if status.Running >= ps.job.Job.Spec.MinAvailable {
44-
phase = vkv1.Running
45-
} else {
46-
phase = vkv1.Pending
47-
}
42+
total := int32(0)
43+
for _, task := range ps.job.Job.Spec.Tasks {
44+
total += task.Replicas
45+
}
46+
47+
if total-status.Terminating >= status.MinAvailable {
48+
phase = vkv1.Pending
4849
}
4950
}
5051

0 commit comments

Comments
 (0)