Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions apis/scheduling/v1alpha1/pod_migration_job_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,19 @@ const (

// These are valid reasons of PodMigrationJob.
const (
PodMigrationJobReasonTimeout = "Timeout"
PodMigrationJobReasonFailedCreateReservation = "FailedCreateReservation"
PodMigrationJobReasonInvalidReservation = "InvalidReservation"
PodMigrationJobReasonUnschedulable = "Unschedulable"
PodMigrationJobReasonMissingPod = "MissingPod"
PodMigrationJobReasonMissingReservation = "MissingReservation"
PodMigrationJobReasonPreempting = "Preempting"
PodMigrationJobReasonPreemptComplete = "PreemptComplete"
PodMigrationJobReasonEvicting = "Evicting"
PodMigrationJobReasonFailedEvict = "FailedEvict"
PodMigrationJobReasonEvictComplete = "EvictComplete"
PodMigrationJobReasonWaitForPodBindReservation = "WaitForPodBindReservation"
PodMigrationJobReasonTimeout = "Timeout"
PodMigrationJobReasonFailedCreateReservation = "FailedCreateReservation"
PodMigrationJobReasonReservationExpired = "ReservationExpired"
PodMigrationJobReasonReservationBoundByAnotherPod = "ReservationBoundByAnotherPod"
PodMigrationJobReasonUnschedulable = "Unschedulable"
PodMigrationJobReasonMissingPod = "MissingPod"
PodMigrationJobReasonMissingReservation = "MissingReservation"
PodMigrationJobReasonPreempting = "Preempting"
PodMigrationJobReasonPreemptComplete = "PreemptComplete"
PodMigrationJobReasonEvicting = "Evicting"
PodMigrationJobReasonFailedEvict = "FailedEvict"
PodMigrationJobReasonEvictComplete = "EvictComplete"
PodMigrationJobReasonWaitForPodBindReservation = "WaitForPodBindReservation"
)

type PodMigrationJobConditionStatus string
Expand Down
74 changes: 57 additions & 17 deletions pkg/descheduler/controllers/migration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,8 @@ func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigration
return reconcile.Result{}, err
}

if err = r.handleReservationScheduleFailed(ctx, job, reservationObj); err != nil {
// sync reservation Unschedulable message to PodMigrationJob
if err = r.syncReservationScheduleFailed(ctx, job, reservationObj); err != nil {
return reconcile.Result{}, err
}

Expand All @@ -329,12 +330,12 @@ func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigration
return reconcile.Result{RequeueAfter: defaultRequeueAfter}, nil
}

if reservation.IsReservationSucceeded(reservationObj) || reservation.IsReservationExpired(reservationObj) {
err := r.abortJobByInvalidReservation(ctx, job, reservationObj)
if reservation.IsReservationExpired(reservationObj) {
err := r.abortJobByReservationExpired(ctx, job)
return reconcile.Result{}, err
}

if !reservation.IsReservationAvailable(reservationObj) {
if !reservation.IsReservationScheduled(reservationObj) {
preemption := r.reservationInterpreter.Preemption()
if !reservationObj.NeedPreemption() || preemption == nil {
err := r.abortJobByReservationUnschedulable(ctx, job, reservationObj)
Expand All @@ -348,7 +349,7 @@ func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigration
}
}

if err = r.handleReservationScheduleSuccess(ctx, job, reservationObj); err != nil {
if err = r.syncReservationScheduleSuccess(ctx, job, reservationObj); err != nil {
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -442,14 +443,51 @@ func (r *Reconciler) abortJobByMissingReservation(ctx context.Context, job *sev1
return err
}

func (r *Reconciler) abortJobByInvalidReservation(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
klog.V(4).Infof("MigrationJob %s stop migration because Reservation %q phase becomes %s", job.Name, reservationObj, reservationObj.GetPhase())
func (r *Reconciler) abortJobByReservationExpired(ctx context.Context, job *sev1alpha1.PodMigrationJob) error {
klog.V(4).Infof("MigrationJob %s stop migration because Reservation expired", job.Name)
job.Status.Phase = sev1alpha1.PodMigrationJobFailed
job.Status.Reason = sev1alpha1.PodMigrationJobReasonInvalidReservation
job.Status.Message = fmt.Sprintf("Reservation phase is %q", reservationObj.GetPhase())
job.Status.Reason = sev1alpha1.PodMigrationJobReasonReservationExpired
job.Status.Message = "Reservation expired"
return r.Client.Status().Update(ctx, job)
}

func (r *Reconciler) abortJobByReservationBound(ctx context.Context, job *sev1alpha1.PodMigrationJob) error {
klog.V(4).Infof("MigrationJob %s stop migration because Reservation is already bound by another Pod", job.Name)
job.Status.Phase = sev1alpha1.PodMigrationJobFailed
job.Status.Reason = sev1alpha1.PodMigrationJobReasonReservationBoundByAnotherPod
job.Status.Message = "Reservation is already bound by another Pod"
return r.Client.Status().Update(ctx, job)
}

func (r *Reconciler) abortJobIfReservationBoundByAnotherPod(ctx context.Context, job *sev1alpha1.PodMigrationJob, pod *corev1.Pod) (bool, error) {
if job.Spec.ReservationOptions != nil && job.Spec.ReservationOptions.ReservationRef != nil {
reservationObj, err := r.reservationInterpreter.GetReservation(ctx, job.Spec.ReservationOptions.ReservationRef)
if err != nil {
if errors.IsNotFound(err) {
_ = r.abortJobByMissingReservation(ctx, job)
}
return true, err
}

if reservation.IsReservationSucceeded(reservationObj) {
boundByAnotherPod := true
if pod != nil {
if podRef := reservationObj.GetBoundPod(); podRef != nil {
if podRef.UID == pod.UID {
boundByAnotherPod = false
}
}
}
if boundByAnotherPod {
err = r.abortJobByReservationBound(ctx, job)
return true, err
}

}
}
return false, nil
}

func (r *Reconciler) abortJobByReservationUnschedulable(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
klog.V(4).Infof("MigrationJob %s stop migration because Reservation %q cannot be scheduled", job.Name, reservationObj)
var message string
Expand All @@ -463,7 +501,7 @@ func (r *Reconciler) abortJobByReservationUnschedulable(ctx context.Context, job
return r.Client.Status().Update(ctx, job)
}

func (r *Reconciler) handleReservationScheduleFailed(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
func (r *Reconciler) syncReservationScheduleFailed(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
_, cond := util.GetCondition(&job.Status, sev1alpha1.PodMigrationJobConditionReservationScheduled)
if cond == nil || cond.Status == sev1alpha1.PodMigrationJobConditionStatusFalse {
klog.V(4).Infof("MigrationJob %s checks whether Reservation %q is scheduled successfully", job.Name, reservationObj)
Expand Down Expand Up @@ -560,6 +598,10 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ
return false, reconcile.Result{RequeueAfter: defaultRequeueAfter}, nil
}

if aborted, err := r.abortJobIfReservationBoundByAnotherPod(ctx, job, nil); aborted {
return false, reconcile.Result{}, err
}

if job.Spec.DeleteOptions == nil {
job.Spec.DeleteOptions = r.args.DefaultDeleteOptions
}
Expand All @@ -582,7 +624,7 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ
return false, reconcile.Result{RequeueAfter: defaultRequeueAfter}, err
}

func (r *Reconciler) handleReservationScheduleSuccess(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
func (r *Reconciler) syncReservationScheduleSuccess(ctx context.Context, job *sev1alpha1.PodMigrationJob, reservationObj reservation.Object) error {
scheduledNodeName := reservationObj.GetScheduledNodeName()
if scheduledNodeName == "" || job.Status.NodeName != "" {
return nil
Expand Down Expand Up @@ -677,6 +719,10 @@ func (r *Reconciler) waitForPendingPodScheduled(ctx context.Context, job *sev1al

_, podCondition := podutil.GetPodCondition(&pod.Status, corev1.PodScheduled)
if podCondition == nil || podCondition.Status == corev1.ConditionFalse {
if aborted, err := r.abortJobIfReservationBoundByAnotherPod(ctx, job, pod); aborted {
return reconcile.Result{}, err
}

var message string
if podCondition != nil {
message = podCondition.Message
Expand All @@ -691,12 +737,6 @@ func (r *Reconciler) waitForPendingPodScheduled(ctx context.Context, job *sev1al
return reconcile.Result{RequeueAfter: defaultRequeueAfter}, err
}

// TODO(joseph): currently Reservation does not support allocateOnce semantics.
// It is our responsibility to clean up the Reservation ourselves
if err = r.deleteReservation(ctx, job); err != nil {
klog.Errorf("Failed to delete reservation, MigrationJob: %s, err: %v", job.Name, err)
}

job.Status.Phase = sev1alpha1.PodMigrationJobSucceeded
job.Status.Status = "Complete"
job.Status.Reason = ""
Expand Down
127 changes: 111 additions & 16 deletions pkg/descheduler/controllers/migration/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,9 @@ func TestAbortJobByInvalidReservation(t *testing.T) {
},
}
assert.Nil(t, reconciler.Create(context.TODO(), job))

reservationObj := reservation.NewReservation(&sev1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
Status: sev1alpha1.ReservationStatus{
Phase: sev1alpha1.ReservationSucceeded,
},
})
assert.Nil(t, reconciler.abortJobByInvalidReservation(context.TODO(), job, reservationObj))
assert.Nil(t, reconciler.abortJobByReservationBound(context.TODO(), job))
assert.Equal(t, sev1alpha1.PodMigrationJobFailed, job.Status.Phase)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonInvalidReservation, job.Status.Reason)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonReservationBoundByAnotherPod, job.Status.Reason)
}

func TestAbortJobByReservationUnschedulable(t *testing.T) {
Expand Down Expand Up @@ -311,7 +302,7 @@ func TestHandleScheduleFailed(t *testing.T) {
},
},
})
assert.Nil(t, reconciler.handleReservationScheduleFailed(context.TODO(), job, reservationObj))
assert.Nil(t, reconciler.syncReservationScheduleFailed(context.TODO(), job, reservationObj))
_, cond := util.GetCondition(&job.Status, sev1alpha1.PodMigrationJobConditionReservationScheduled)
assert.NotNil(t, cond)
expectCond := &sev1alpha1.PodMigrationJobCondition{
Expand Down Expand Up @@ -348,7 +339,7 @@ func TestHandleScheduleSuccess(t *testing.T) {
NodeName: "test-node",
},
})
assert.Nil(t, reconciler.handleReservationScheduleSuccess(context.TODO(), job, reservationObj))
assert.Nil(t, reconciler.syncReservationScheduleSuccess(context.TODO(), job, reservationObj))
_, cond := util.GetCondition(&job.Status, sev1alpha1.PodMigrationJobConditionReservationScheduled)
assert.NotNil(t, cond)
expectCond := &sev1alpha1.PodMigrationJobCondition{
Expand Down Expand Up @@ -777,7 +768,9 @@ func TestMigrate(t *testing.T) {
Phase: sev1alpha1.ReservationAvailable,
Conditions: []sev1alpha1.ReservationCondition{
{
Reason: string(corev1.PodScheduled),
Type: sev1alpha1.ReservationConditionScheduled,
Reason: sev1alpha1.ReasonReservationScheduled,
Status: sev1alpha1.ConditionStatusTrue,
},
},
CurrentOwners: []corev1.ObjectReference{
Expand Down Expand Up @@ -815,6 +808,108 @@ func TestMigrate(t *testing.T) {
assert.Equal(t, "Complete", job.Status.Status)
}

func TestMigrateWhenEvictingWithSucceededReservation(t *testing.T) {
reconciler := newTestReconciler()
reconciler.evictorInterpreter = fakeEvictionInterpreter{}

job := &sev1alpha1.PodMigrationJob{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
CreationTimestamp: metav1.Time{Time: time.Now()},
},
Spec: sev1alpha1.PodMigrationJobSpec{
Paused: true,
PodRef: &corev1.ObjectReference{
Namespace: "default",
Name: "test-pod",
},
},
}
assert.Nil(t, reconciler.Client.Create(context.TODO(), job))
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "test-pod",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Controller: pointer.Bool(true),
Kind: "StatefulSet",
Name: "test",
UID: "2f96233d-a6b9-4981-b594-7c90c987aed9",
},
},
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
},
}
assert.Nil(t, reconciler.Client.Create(context.TODO(), pod))

reconciler.reservationInterpreter = fakeReservationInterpreter{
reservation: &sev1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
Spec: sev1alpha1.ReservationSpec{
Owners: []sev1alpha1.ReservationOwner{
{
Controller: &sev1alpha1.ReservationControllerReference{
Namespace: "default",
OwnerReference: metav1.OwnerReference{
APIVersion: "apps/v1",
Controller: pointer.Bool(true),
Kind: "StatefulSet",
Name: "test",
UID: "2f96233d-a6b9-4981-b594-7c90c987aed9",
},
},
},
},
},
Status: sev1alpha1.ReservationStatus{
Phase: sev1alpha1.ReservationSucceeded,
Conditions: []sev1alpha1.ReservationCondition{
{
Type: sev1alpha1.ReservationConditionScheduled,
Reason: sev1alpha1.ReasonReservationScheduled,
Status: sev1alpha1.ConditionStatusTrue,
},
},
CurrentOwners: []corev1.ObjectReference{
{
Namespace: "default",
Name: "test-pod-1",
},
},
NodeName: "test-node-1",
},
},
}
for {
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: job.Name}})
if err != nil {
t.Fatal(err)
}
assert.Nil(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job))

if job.Spec.Paused {
job.Spec.Paused = false
assert.Nil(t, reconciler.Client.Update(context.TODO(), job))
}

if job.Status.Phase != "" && job.Status.Phase != sev1alpha1.PodMigrationJobRunning {
break
}
}
assert.Nil(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job))
assert.Equal(t, sev1alpha1.PodMigrationJobFailed, job.Status.Phase)
assert.Equal(t, string(sev1alpha1.PodMigrationJobConditionReservationScheduled), job.Status.Status)

_, cond := util.GetCondition(&job.Status, sev1alpha1.PodMigrationJobConditionEviction)
assert.Nil(t, cond)
}

func TestMigrateWithReservationScheduleFailed(t *testing.T) {
reconciler := newTestReconciler()
reconciler.evictorInterpreter = fakeEvictionInterpreter{}
Expand Down Expand Up @@ -985,7 +1080,7 @@ func TestMigrateWithReservationSucceeded(t *testing.T) {
}
assert.Nil(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job))
assert.Equal(t, sev1alpha1.PodMigrationJobFailed, job.Status.Phase)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonInvalidReservation, job.Status.Reason)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonReservationBoundByAnotherPod, job.Status.Reason)
}

func TestMigrateWithReservationExpired(t *testing.T) {
Expand Down Expand Up @@ -1071,7 +1166,7 @@ func TestMigrateWithReservationExpired(t *testing.T) {
}
assert.Nil(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job))
assert.Equal(t, sev1alpha1.PodMigrationJobFailed, job.Status.Phase)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonInvalidReservation, job.Status.Reason)
assert.Equal(t, sev1alpha1.PodMigrationJobReasonReservationExpired, job.Status.Reason)
}

func TestDoScavenge(t *testing.T) {
Expand Down
Loading