Skip to content

Commit 86ef836

Browse files
ZiMengShengwangjianyu.wjy
andauthored
koord-scheduler: coscheduling supports sensing PodGroup changes (#1299)
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com> Co-authored-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
1 parent 986ca5e commit 86ef836

File tree

4 files changed

+51
-15
lines changed

4 files changed

+51
-15
lines changed

pkg/scheduler/plugins/coscheduling/core/core.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ type Manager interface {
7777
type PodGroupManager struct {
7878
// pgClient is a podGroup client
7979
pgClient pgclientset.Interface
80-
// scheduleTimeout is the default timeout for podgroup scheduling.
81-
// If podgroup's scheduleTimeoutSeconds is set, it will be used.
82-
scheduleTimeout *time.Duration
8380
// pgLister is podgroup lister
8481
pgLister pglister.PodGroupLister
8582
// podLister is pod lister
@@ -110,6 +107,7 @@ func NewPodGroupManager(
110107

111108
podGroupEventHandler := cache.ResourceEventHandlerFuncs{
112109
AddFunc: gangCache.onPodGroupAdd,
110+
UpdateFunc: gangCache.onPodGroupUpdate,
113111
DeleteFunc: gangCache.onPodGroupDelete,
114112
}
115113
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), pgSharedInformerFactory, pgInformer.Informer(), podGroupEventHandler)

pkg/scheduler/plugins/coscheduling/core/gang.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,9 +170,6 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
170170
func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.CoschedulingArgs) {
171171
gang.lock.Lock()
172172
defer gang.lock.Unlock()
173-
if gang.HasGangInit {
174-
return
175-
}
176173
minRequiredNumber := pg.Spec.MinMember
177174
gang.MinRequiredNumber = int(minRequiredNumber)
178175

pkg/scheduler/plugins/coscheduling/core/gang_cache.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,6 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
109109
}
110110
}
111111

112-
func (gangCache *GangCache) onPodUpdate(oldObj interface{}, newObj interface{}) {
113-
}
114-
115112
func (gangCache *GangCache) onPodDelete(obj interface{}) {
116113
pod, ok := obj.(*v1.Pod)
117114
if !ok {
@@ -144,16 +141,25 @@ func (gangCache *GangCache) onPodGroupAdd(obj interface{}) {
144141
gangName := pg.Name
145142

146143
gangId := util.GetId(gangNamespace, gangName)
147-
gang := gangCache.getGangFromCacheByGangId(gangId, false)
148-
if gang == nil {
149-
gang = gangCache.getGangFromCacheByGangId(gangId, true)
150-
klog.Infof("Create gang by podGroup on add, gangName: %v", gangId)
151-
}
152-
144+
gang := gangCache.getGangFromCacheByGangId(gangId, true)
153145
gang.tryInitByPodGroup(pg, gangCache.pluginArgs)
154146
}
155147

156148
func (gangCache *GangCache) onPodGroupUpdate(oldObj interface{}, newObj interface{}) {
149+
pg, ok := newObj.(*v1alpha1.PodGroup)
150+
if !ok {
151+
return
152+
}
153+
gangNamespace := pg.Namespace
154+
gangName := pg.Name
155+
156+
gangId := util.GetId(gangNamespace, gangName)
157+
gang := gangCache.getGangFromCacheByGangId(gangId, false)
158+
if gang == nil {
159+
klog.Errorf("Gang object isn't exist when got Update Event")
160+
return
161+
}
162+
gang.tryInitByPodGroup(pg, gangCache.pluginArgs)
157163
}
158164

159165
func (gangCache *GangCache) onPodGroupDelete(obj interface{}) {

pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -926,5 +926,40 @@ func TestGangCache_OnGangDelete(t *testing.T) {
926926
cacheGang := cache.getGangFromCacheByGangId("default/gangb", false)
927927
wantedGang.GangGroupId = util.GetGangGroupId(wantedGang.GangGroup)
928928
assert.Equal(t, wantedGang, cacheGang)
929+
}
930+
931+
func TestGangCache_onPodGroupUpdate(t *testing.T) {
932+
pgClient := fakepgclientset.NewSimpleClientset()
933+
preTimeNowFn := timeNowFn
934+
defer func() {
935+
timeNowFn = preTimeNowFn
936+
}()
937+
timeNowFn = fakeTimeNowFn
938+
939+
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0)
940+
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
941+
pglister := pgInformer.Lister()
942+
cache := NewGangCache(&config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: time.Second}}, nil, pglister, pgClient)
943+
944+
// init gang
945+
podGroup := &v1alpha1.PodGroup{
946+
ObjectMeta: metav1.ObjectMeta{
947+
Namespace: "default",
948+
Name: "ganga",
949+
},
950+
Spec: v1alpha1.PodGroupSpec{
951+
MinMember: 2,
952+
},
953+
}
954+
gangId := util.GetId("default", "ganga")
955+
cache.onPodGroupAdd(podGroup)
956+
gang := cache.getGangFromCacheByGangId(gangId, false)
957+
assert.Equal(t, gang.MinRequiredNumber, int(podGroup.Spec.MinMember))
929958

959+
// update gang
960+
newPodGroup := podGroup.DeepCopy()
961+
newPodGroup.Spec.MinMember = 3
962+
cache.onPodGroupUpdate(podGroup, newPodGroup)
963+
gang = cache.getGangFromCacheByGangId(gangId, false)
964+
assert.Equal(t, gang.MinRequiredNumber, int(newPodGroup.Spec.MinMember))
930965
}

0 commit comments

Comments
 (0)