Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM
return nil
}

func (cc *Controller) createJob(job *batch.Job) (*batch.Job, error) {
func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) {
klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name)
defer klog.V(3).Infof("Finished Job <%s/%s> initiate", job.Namespace, job.Name)

klog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name)
job, err := cc.initJobStatus(job)
if err != nil {
cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError),
Expand Down Expand Up @@ -185,9 +189,38 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
return nil
}

var err error
if job, err = cc.createJob(job); err != nil {
return err
// Skip job initiation if job is already accepted
if job.Status.State.Phase == "" || job.Status.State.Phase == batch.Pending || job.Status.State.Phase == batch.Restarting {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add isInitialized func for this check.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

var err error
if job, err = cc.initiateJob(job); err != nil {
return err
}
}
var syncTask bool
if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check this in handler?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure i am understanding what you mean. My thought is to split the SyncJob function: the first phase is initialization and the second phase is sync.

if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending {
syncTask = true
}
}

if !syncTask {
if updateStatus != nil {
if updateStatus(&job.Status) {
job.Status.State.LastTransitionTime = metav1.Now()
}
}
newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job)
if err != nil {
klog.Errorf("Failed to update status of Job %v/%v: %v",
job.Namespace, job.Name, err)
return err
}
if e := cc.cache.Update(newJob); e != nil {
klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v",
newJob.Namespace, newJob.Name, e)
return e
}
return nil
}

var running, pending, terminating, succeeded, failed, unknown int32
Expand Down Expand Up @@ -297,7 +330,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
fmt.Sprintf("Error deleting pods: %+v", deletionErrs))
return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete))
}

job.Status = batch.JobStatus{
State: job.Status.State,

Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/job/job_controller_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func TestSyncJobFunc(t *testing.T) {
Name: "job1",
Namespace: namespace,
},
Status: schedulingv1alpha2.PodGroupStatus{
Phase: schedulingv1alpha2.PodGroupInqueue,
},
},
PodRetainPhase: state.PodRetainPhaseNone,
UpdateStatus: nil,
Expand All @@ -248,6 +251,7 @@ func TestSyncJobFunc(t *testing.T) {

t.Run(testcase.Name, func(t *testing.T) {
fakeController := newFakeController()

jobPlugins := make(map[string][]string)

for _, plugin := range testcase.Plugins {
Expand All @@ -256,6 +260,8 @@ func TestSyncJobFunc(t *testing.T) {
testcase.JobInfo.Job = testcase.Job
testcase.JobInfo.Job.Spec.Plugins = jobPlugins

fakeController.pgInformer.Informer().GetIndexer().Add(testcase.PodGroup)

for _, pod := range testcase.Pods {
_, err := fakeController.kubeClient.CoreV1().Pods(namespace).Create(pod)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,9 @@ func (ps *pendingState) Execute(action vcbatch.Action) error {
})
default:
return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
phase := vcbatch.Pending

if ps.job.Job.Spec.MinAvailable <= status.Running+status.Succeeded+status.Failed {
phase = vcbatch.Running
status.State.Phase = vcbatch.Running
}

status.State.Phase = phase
return true
})
}
Expand Down