Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"

"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"

"github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -68,6 +67,7 @@ type Manager interface {
Unreserve(context.Context, *framework.CycleState, *corev1.Pod, string, framework.Handle, string)
GetGangSummary(gangId string) (*GangSummary, bool)
GetGangSummaries() map[string]*GangSummary
GetID(*framework.QueuedPodInfo) types.UID
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -392,6 +392,23 @@ func (pgMgr *PodGroupManager) GetCreatTime(podInfo *framework.QueuedPodInfo) tim
return time.Now()
}

func (pgMgr *PodGroupManager) GetID(podInfo *framework.QueuedPodInfo) types.UID {
// first check if the pod belongs to the Gang
// it doesn't belong to the gang,we get the creation time of the pod
if !util.IsPodNeedGang(podInfo.Pod) {
return ""
}
gang := pgMgr.GetGangByPod(podInfo.Pod)
// it belongs to a gang,we get the creation time of the Gang
if gang != nil {
return gang.ID
}
klog.Errorf("getID didn't find gang: %v in gangCache, pod name: %v",
util.GetId(podInfo.Pod.Namespace, util.GetGangNameByPod(podInfo.Pod)), podInfo.Pod.Name)

return ""
}

// PatchPodGroup patches a podGroup.
func (pgMgr *PodGroupManager) PatchPodGroup(pgName string, namespace string, patch []byte) error {
if len(patch) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"

Expand All @@ -42,6 +43,7 @@ const (

// Gang basic podGroup info recorded in gangCache:
type Gang struct {
ID types.UID
Name string
WaitTime time.Duration
CreateTime time.Time
Expand Down Expand Up @@ -170,6 +172,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu
}
minRequiredNumber := pg.Spec.MinMember
gang.MinRequiredNumber = int(minRequiredNumber)
gang.ID = pg.UID

totalChildrenNum, err := strconv.ParseInt(pg.Annotations[extension.AnnotationGangTotalNum], 10, 32)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
retry.DefaultRetry,
errors.IsTooManyRequests,
func() error {
_, err := gangCache.pgClient.SchedulingV1alpha1().PodGroups(pod.Namespace).Create(context.TODO(), pgFromAnnotation, metav1.CreateOptions{})
pg, err := gangCache.pgClient.SchedulingV1alpha1().PodGroups(pod.Namespace).Create(context.TODO(), pgFromAnnotation, metav1.CreateOptions{})
if err == nil && pg != nil {
gang.ID = pg.UID
}

return err
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/plugins/coscheduling/coscheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool {
creationTime1 := cs.pgMgr.GetCreatTime(podInfo1)
creationTime2 := cs.pgMgr.GetCreatTime(podInfo2)
if creationTime1.Equal(creationTime2) {
return util.GetId(podInfo1.Pod.Namespace, podInfo1.Pod.Name) < util.GetId(podInfo2.Pod.Namespace, podInfo2.Pod.Name)
return cs.pgMgr.GetID(podInfo1) < cs.pgMgr.GetID(podInfo2)
}

return creationTime1.Before(creationTime2)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/coscheduling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func TestLess(t *testing.T) {
PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()),
InitialAttemptTimestamp: earltTime,
},
expected: true, // p1 should be ahead of p2 in the queue
expected: false, // p1 should be ahead of p2 in the queue
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down