Skip to content

Commit 0fa8167

Browse files
author
wangyuqing (C)
committed
fix state convert
1 parent 40e85f4 commit 0fa8167

File tree

6 files changed

+58
-56
lines changed

6 files changed

+58
-56
lines changed

pkg/controllers/job/job_controller_actions.go

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,21 +138,12 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
138138
glog.V(3).Infof("Starting to create Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
139139
defer glog.V(3).Infof("Finished Job <%s/%s> create", jobInfo.Job.Namespace, jobInfo.Job.Name)
140140

141-
job := jobInfo.Job
141+
job := jobInfo.Job.DeepCopy()
142142
glog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
143143

144-
newJob, err := cc.needUpdateForVolumeClaim(job)
145-
if err != nil {
144+
if update, err := cc.filljob(job); err != nil || update {
146145
return err
147146
}
148-
if newJob != nil {
149-
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
150-
glog.Errorf("Failed to update Job %v/%v: %v",
151-
job.Namespace, job.Name, err)
152-
return err
153-
}
154-
return nil
155-
}
156147

157148
if err := cc.pluginOnJobAdd(job); err != nil {
158149
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PluginError),
@@ -168,6 +159,18 @@ func (cc *Controller) createJob(jobInfo *apis.JobInfo, nextState state.UpdateSta
168159
return err
169160
}
170161

162+
if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
163+
glog.Errorf("Failed to update status of Job %v/%v: %v",
164+
job.Namespace, job.Name, err)
165+
return err
166+
} else {
167+
if e := cc.cache.Update(job); e != nil {
168+
glog.Errorf("CreateJob - Failed to update Job %v/%v in cache: %v",
169+
job.Namespace, job.Name, e)
170+
return e
171+
}
172+
}
173+
171174
return nil
172175
}
173176

@@ -356,10 +359,11 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
356359
return nil
357360
}
358361

359-
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error) {
362+
func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (bool, *vkv1.Job, error) {
360363
// If VolumeClaimName does not exist, generate them for Job.
361364
var newJob *vkv1.Job
362365
volumes := job.Spec.Volumes
366+
update := false
363367
for index, volume := range volumes {
364368
vcName := volume.VolumeClaimName
365369
if len(vcName) == 0 {
@@ -368,7 +372,7 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
368372
vcName = fmt.Sprintf("%s-volume-%s", job.Name, randomStr)
369373
exist, err := cc.checkPVCExist(job, vcName)
370374
if err != nil {
371-
return nil, err
375+
return false, nil, err
372376
}
373377
if exist {
374378
continue
@@ -377,11 +381,12 @@ func (cc *Controller) needUpdateForVolumeClaim(job *vkv1.Job) (*vkv1.Job, error)
377381
newJob = job.DeepCopy()
378382
}
379383
newJob.Spec.Volumes[index].VolumeClaimName = vcName
384+
update = true
380385
break
381386
}
382387
}
383388
}
384-
return newJob, nil
389+
return update, newJob, nil
385390
}
386391

387392
func (cc *Controller) checkPVCExist(job *vkv1.Job, vcName string) (bool, error) {
@@ -494,3 +499,31 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList {
494499

495500
return minAvailableTasksRes.Convert2K8sResource()
496501
}
502+
503+
func (cc *Controller) filljob(job *vkv1.Job) (bool, error) {
504+
update, newJob, err := cc.needUpdateForVolumeClaim(job)
505+
if err != nil {
506+
return false, err
507+
}
508+
if update {
509+
if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(newJob); err != nil {
510+
glog.Errorf("Failed to update Job %v/%v: %v",
511+
job.Namespace, job.Name, err)
512+
return false, err
513+
}
514+
return true, nil
515+
} else if job.Status.State.Phase == "" {
516+
job.Status.State.Phase = vkv1.Pending
517+
if j, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job); err != nil {
518+
glog.Errorf("Failed to update status of Job %v/%v: %v",
519+
job.Namespace, job.Name, err)
520+
} else {
521+
if e := cc.cache.Update(j); e != nil {
522+
glog.Error("Failed to update cache status of Job %v/%v: %v", job.Namespace, job.Name, e)
523+
}
524+
}
525+
return true, nil
526+
}
527+
528+
return false, nil
529+
}

pkg/controllers/job/state/aborted.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type abortedState struct {
2828
func (as *abortedState) Execute(action vkv1.Action) error {
2929
switch action {
3030
case vkv1.ResumeJobAction:
31-
return SyncJob(as.job, func(status *vkv1.JobStatus) {
31+
return KillJob(as.job, func(status *vkv1.JobStatus) {
3232
status.State.Phase = vkv1.Restarting
3333
status.RetryCount++
3434
})

pkg/controllers/job/state/aborting.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
2929
switch action {
3030
case vkv1.ResumeJobAction:
3131
// Already in Restarting phase, just sync it
32-
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
32+
return KillJob(ps.job, func(status *vkv1.JobStatus) {
3333
status.State.Phase = vkv1.Restarting
3434
status.RetryCount++
3535
})

pkg/controllers/job/state/factory.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func NewState(jobInfo *apis.JobInfo) State {
4747
return &runningState{job: jobInfo}
4848
case vkv1.Restarting:
4949
return &restartingState{job: jobInfo}
50-
case vkv1.Terminated, vkv1.Completed:
50+
case vkv1.Terminated, vkv1.Completed, vkv1.Failed:
5151
return &finishedState{job: jobInfo}
5252
case vkv1.Terminating:
5353
return &terminatingState{job: jobInfo}
@@ -57,8 +57,6 @@ func NewState(jobInfo *apis.JobInfo) State {
5757
return &abortedState{job: jobInfo}
5858
case vkv1.Completing:
5959
return &completingState{job: jobInfo}
60-
case vkv1.Failed:
61-
return &failedState{job: jobInfo}
6260
case vkv1.Inqueue:
6361
return &inqueueState{job: jobInfo}
6462
}

pkg/controllers/job/state/failed.go

Lines changed: 0 additions & 30 deletions
This file was deleted.

pkg/controllers/job/state/restarting.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type restartingState struct {
2626
}
2727

2828
func (ps *restartingState) Execute(action vkv1.Action) error {
29-
return SyncJob(ps.job, func(status *vkv1.JobStatus) {
29+
return KillJob(ps.job, func(status *vkv1.JobStatus) {
3030
phase := vkv1.Restarting
3131

3232
// Get the maximum number of retries.
@@ -39,12 +39,13 @@ func (ps *restartingState) Execute(action vkv1.Action) error {
3939
// Failed is the phase that the job is restarted failed reached the maximum number of retries.
4040
phase = vkv1.Failed
4141
} else {
42-
if status.Terminating == 0 {
43-
if status.Running >= ps.job.Job.Spec.MinAvailable {
44-
phase = vkv1.Running
45-
} else {
46-
phase = vkv1.Pending
47-
}
42+
total := int32(0)
43+
for _, task := range ps.job.Job.Spec.Tasks {
44+
total += task.Replicas
45+
}
46+
47+
if total-status.Terminating >= status.MinAvailable {
48+
phase = vkv1.Pending
4849
}
4950
}
5051

0 commit comments

Comments
 (0)