Skip to content

Commit b7f2eba

Browse files
committed
koord-scheduler: support force sync data from informer
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
1 parent 9ff814a commit b7f2eba

File tree

19 files changed

+356
-150
lines changed

19 files changed

+356
-150
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
Copyright 2022 The Koordinator Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package helper
18+
19+
import (
20+
"reflect"
21+
"strconv"
22+
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
apimachinerytypes "k8s.io/apimachinery/pkg/types"
25+
"k8s.io/client-go/tools/cache"
26+
)
27+
28+
type forceSyncEventHandler struct {
29+
handler cache.ResourceEventHandler
30+
syncCh chan struct{}
31+
objects map[apimachinerytypes.UID]int64
32+
}
33+
34+
func newForceSyncEventHandler(handler cache.ResourceEventHandler) *forceSyncEventHandler {
35+
return &forceSyncEventHandler{
36+
handler: handler,
37+
syncCh: make(chan struct{}, 1),
38+
objects: map[apimachinerytypes.UID]int64{},
39+
}
40+
}
41+
42+
func (h *forceSyncEventHandler) syncDone() {
43+
close(h.syncCh)
44+
}
45+
46+
func (h *forceSyncEventHandler) waitForSyncDone() {
47+
<-h.syncCh
48+
}
49+
50+
func (h *forceSyncEventHandler) OnAdd(obj interface{}) {
51+
h.waitForSyncDone()
52+
if metaAccessor, ok := obj.(metav1.ObjectMetaAccessor); ok {
53+
objectMeta := metaAccessor.GetObjectMeta()
54+
objectUID := objectMeta.GetUID()
55+
if oldResourceVersion, ok := h.objects[objectUID]; ok && oldResourceVersion != 0 {
56+
resourceVersion, err := strconv.ParseInt(objectMeta.GetResourceVersion(), 10, 64)
57+
if err == nil && resourceVersion <= oldResourceVersion {
58+
return
59+
}
60+
delete(h.objects, objectUID)
61+
}
62+
}
63+
h.handler.OnAdd(obj)
64+
}
65+
66+
func (h *forceSyncEventHandler) OnUpdate(oldObj, newObj interface{}) {
67+
h.waitForSyncDone()
68+
if h.objects != nil {
69+
// Release objects map to reduce memory usage and reduce GC pressure
70+
h.objects = nil
71+
}
72+
h.handler.OnUpdate(oldObj, newObj)
73+
}
74+
75+
func (h *forceSyncEventHandler) OnDelete(obj interface{}) {
76+
h.waitForSyncDone()
77+
if h.objects != nil {
78+
// Release objects map to reduce memory usage and reduce GC pressure
79+
h.objects = nil
80+
}
81+
h.handler.OnDelete(obj)
82+
}
83+
84+
func (h *forceSyncEventHandler) addDirectly(obj interface{}) {
85+
h.handler.OnAdd(obj)
86+
if metaAccessor, ok := obj.(metav1.ObjectMetaAccessor); ok {
87+
objectMeta := metaAccessor.GetObjectMeta()
88+
resourceVersion, err := strconv.ParseInt(objectMeta.GetResourceVersion(), 10, 64)
89+
if err == nil {
90+
h.objects[objectMeta.GetUID()] = resourceVersion
91+
}
92+
}
93+
}
94+
95+
type CacheSyncer interface {
96+
Start(stopCh <-chan struct{})
97+
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
98+
}
99+
100+
// ForceSyncFromInformer ensures that the EventHandler will synchronize data immediately after registration,
101+
// helping those plugins that need to build memory status through EventHandler to correctly synchronize data
102+
func ForceSyncFromInformer(stopCh <-chan struct{}, cacheSyncer CacheSyncer, informer cache.SharedInformer, handler cache.ResourceEventHandler) {
103+
syncEventHandler := newForceSyncEventHandler(handler)
104+
informer.AddEventHandler(syncEventHandler)
105+
cacheSyncer.Start(stopCh)
106+
cacheSyncer.WaitForCacheSync(stopCh)
107+
allObjects := informer.GetStore().List()
108+
for _, obj := range allObjects {
109+
syncEventHandler.addDirectly(obj)
110+
}
111+
syncEventHandler.syncDone()
112+
return
113+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
Copyright 2022 The Koordinator Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package helper
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"sync"
23+
"testing"
24+
"time"
25+
26+
"github.com/stretchr/testify/assert"
27+
corev1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
30+
"k8s.io/apimachinery/pkg/util/uuid"
31+
"k8s.io/apimachinery/pkg/util/wait"
32+
"k8s.io/client-go/informers"
33+
kubefake "k8s.io/client-go/kubernetes/fake"
34+
"k8s.io/client-go/tools/cache"
35+
)
36+
37+
func TestSyncedEventHandler(t *testing.T) {
38+
var objects []runtime.Object
39+
for i := 0; i < 10; i++ {
40+
node := &corev1.Node{
41+
ObjectMeta: metav1.ObjectMeta{
42+
UID: uuid.NewUUID(),
43+
Name: fmt.Sprintf("node-%d", i),
44+
ResourceVersion: fmt.Sprintf("%d", i+1),
45+
},
46+
}
47+
objects = append(objects, node)
48+
}
49+
fakeClientSet := kubefake.NewSimpleClientset(objects...)
50+
sharedInformerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
51+
nodeInformer := sharedInformerFactory.Core().V1().Nodes()
52+
addTimes := map[string]int{}
53+
var wg sync.WaitGroup
54+
wg.Add(10)
55+
ForceSyncFromInformer(context.TODO().Done(), sharedInformerFactory, nodeInformer.Informer(), cache.ResourceEventHandlerFuncs{
56+
AddFunc: func(obj interface{}) {
57+
node := obj.(*corev1.Node)
58+
addTimes[node.Name]++
59+
wg.Done()
60+
},
61+
})
62+
wg.Wait()
63+
for _, v := range addTimes {
64+
if v > 1 {
65+
t.Errorf("unexpected add times, want 1 but got %d", v)
66+
break
67+
}
68+
}
69+
node, err := nodeInformer.Lister().Get("node-0")
70+
assert.NoError(t, err)
71+
assert.NotNil(t, node)
72+
node = node.DeepCopy()
73+
node.ResourceVersion = "100"
74+
_, err = fakeClientSet.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
75+
assert.NoError(t, err)
76+
err = wait.PollUntil(1*time.Second, func() (done bool, err error) {
77+
node, err := nodeInformer.Lister().Get("node-0")
78+
assert.NoError(t, err)
79+
assert.NotNil(t, node)
80+
return node.ResourceVersion == "100", nil
81+
}, wait.NeverStop)
82+
assert.NoError(t, err)
83+
}

pkg/scheduler/plugins/coscheduling/controller/podgroup_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,13 +270,8 @@ func setUp(ctx context.Context, podNames []string, pgName string, podPhase v1.Po
270270
podInformer := informerFactory.Core().V1().Pods()
271271
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
272272

273-
pgMgr := core.NewPodGroupManager(pgClient, pgInformer, podInformer, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: time.Second}})
273+
pgMgr := core.NewPodGroupManager(pgClient, pgInformerFactory, informerFactory, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: time.Second}})
274274
ctrl := NewPodGroupController(kubeClient, pgInformer, podInformer, pgClient, pgMgr, nil, 1)
275-
276-
pgInformerFactory.Start(ctx.Done())
277-
informerFactory.Start(ctx.Done())
278-
pgInformerFactory.WaitForCacheSync(ctx.Done())
279-
informerFactory.WaitForCacheSync(ctx.Done())
280275
return ctrl, kubeClient, pgClient
281276
}
282277

pkg/scheduler/plugins/coscheduling/core/core.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,20 @@ import (
2626
corev1 "k8s.io/api/core/v1"
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828
"k8s.io/apimachinery/pkg/types"
29-
informerv1 "k8s.io/client-go/informers/core/v1"
29+
"k8s.io/client-go/informers"
3030
listerv1 "k8s.io/client-go/listers/core/v1"
3131
"k8s.io/client-go/tools/cache"
3232
"k8s.io/klog/v2"
3333
"k8s.io/kubernetes/pkg/scheduler/framework"
34+
pgformers "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions"
3435

3536
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
3637
pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned"
37-
pginformer "sigs.k8s.io/scheduler-plugins/pkg/generated/informers/externalversions/scheduling/v1alpha1"
3838
pglister "sigs.k8s.io/scheduler-plugins/pkg/generated/listers/scheduling/v1alpha1"
3939

4040
"github.com/koordinator-sh/koordinator/apis/extension"
4141
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
42+
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
4243
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
4344
)
4445

@@ -88,25 +89,33 @@ type PodGroupManager struct {
8889
}
8990

9091
// NewPodGroupManager creates a new operation object.
91-
func NewPodGroupManager(pgClient pgclientset.Interface,
92-
pgInformer pginformer.PodGroupInformer, podInformer informerv1.PodInformer, args *config.CoschedulingArgs) *PodGroupManager {
92+
func NewPodGroupManager(
93+
pgClient pgclientset.Interface,
94+
pgSharedInformerFactory pgformers.SharedInformerFactory,
95+
sharedInformerFactory informers.SharedInformerFactory,
96+
args *config.CoschedulingArgs,
97+
) *PodGroupManager {
98+
pgInformer := pgSharedInformerFactory.Scheduling().V1alpha1().PodGroups()
99+
podInformer := sharedInformerFactory.Core().V1().Pods()
100+
gangCache := NewGangCache(args, podInformer.Lister(), pgInformer.Lister(), pgClient)
93101
pgMgr := &PodGroupManager{
94102
pgClient: pgClient,
95103
pgLister: pgInformer.Lister(),
96104
podLister: podInformer.Lister(),
105+
cache: gangCache,
97106
}
98-
gangCache := NewGangCache(args, podInformer.Lister(), pgInformer.Lister(), pgMgr.pgClient)
99-
pgMgr.cache = gangCache
100107

101-
pgInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
108+
podGroupEventHandler := cache.ResourceEventHandlerFuncs{
102109
AddFunc: gangCache.onPodGroupAdd,
103110
DeleteFunc: gangCache.onPodGroupDelete,
104-
})
105-
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
111+
}
112+
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), pgSharedInformerFactory, pgInformer.Informer(), podGroupEventHandler)
113+
114+
podEventHandler := cache.ResourceEventHandlerFuncs{
106115
AddFunc: gangCache.onPodAdd,
107116
DeleteFunc: gangCache.onPodDelete,
108-
})
109-
117+
}
118+
frameworkexthelper.ForceSyncFromInformer(context.TODO().Done(), sharedInformerFactory, podInformer.Informer(), podEventHandler)
110119
return pgMgr
111120
}
112121

pkg/scheduler/plugins/coscheduling/core/core_test.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,14 @@ type Mgr struct {
4545
pgInformer pginformer.PodGroupInformer
4646
}
4747

48-
func NewManager4Test() *Mgr {
48+
func NewManagerForTest() *Mgr {
4949
pgClient := fakepgclientset.NewSimpleClientset()
5050
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0)
5151
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
5252

5353
podClient := clientsetfake.NewSimpleClientset()
5454
informerFactory := informers.NewSharedInformerFactory(podClient, 0)
55-
podInformer := informerFactory.Core().V1().Pods()
56-
pgManager := NewPodGroupManager(pgClient, pgInformer, podInformer, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: 300 * time.Second}})
55+
pgManager := NewPodGroupManager(pgClient, pgInformerFactory, informerFactory, &config.CoschedulingArgs{DefaultTimeout: &metav1.Duration{Duration: 300 * time.Second}})
5756
return &Mgr{
5857
pgMgr: pgManager,
5958
pgInformer: pgInformer,
@@ -77,7 +76,7 @@ func makePg(name, namespace string, min int32, creationTime *time.Time, minResou
7776

7877
func TestPlugin_PreFilter(t *testing.T) {
7978
gangACreatedTime := time.Now()
80-
mgr := NewManager4Test().pgMgr
79+
mgr := NewManagerForTest().pgMgr
8180
tests := []struct {
8281
name string
8382
// test pod
@@ -379,7 +378,7 @@ func TestPermit(t *testing.T) {
379378
}
380379
for _, tt := range tests {
381380
t.Run(tt.name, func(t *testing.T) {
382-
mgr := NewManager4Test().pgMgr
381+
mgr := NewManagerForTest().pgMgr
383382
// pg create
384383
for _, pg := range tt.pgs {
385384
if tt.needGangGroup {
@@ -444,7 +443,7 @@ func TestPostBind(t *testing.T) {
444443
}
445444
for _, tt := range tests {
446445
t.Run(tt.name, func(t *testing.T) {
447-
bigMgr := NewManager4Test()
446+
bigMgr := NewManagerForTest()
448447
mgr, pginforme := bigMgr.pgMgr, bigMgr.pgInformer
449448
// pg create
450449
if tt.annotation != nil {

pkg/scheduler/plugins/coscheduling/coscheduling.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,8 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
7878
pgClient = pgclientset.NewForConfigOrDie(&kubeConfig)
7979
}
8080
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0)
81-
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
82-
podInformer := handle.SharedInformerFactory().Core().V1().Pods()
83-
84-
ctx := context.TODO()
8581

86-
pgMgr := core.NewPodGroupManager(pgClient, pgInformer, podInformer, args)
82+
pgMgr := core.NewPodGroupManager(pgClient, pgInformerFactory, handle.SharedInformerFactory(), args)
8783
var controllerWorkers int
8884
if args == nil {
8985
controllerWorkers = 1
@@ -96,12 +92,6 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error)
9692
workers: controllerWorkers,
9793
}
9894

99-
pgInformerFactory.Start(ctx.Done())
100-
handle.SharedInformerFactory().Start(ctx.Done())
101-
102-
pgInformerFactory.WaitForCacheSync(ctx.Done())
103-
handle.SharedInformerFactory().WaitForCacheSync(ctx.Done())
104-
10595
return plugin, nil
10696
}
10797

pkg/scheduler/plugins/deviceshare/device_cache.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,46 @@ func (n *nodeDeviceCache) createNodeDevice(nodeName string) *nodeDevice {
378378
}
379379

380380
func (n *nodeDeviceCache) removeNodeDevice(nodeName string) {
381+
if nodeName == "" {
382+
return
383+
}
381384
n.lock.Lock()
382385
defer n.lock.Unlock()
383386
delete(n.nodeDeviceInfos, nodeName)
384387
}
385388

389+
func (n *nodeDeviceCache) updateNodeDevice(nodeName string, device *schedulingv1alpha1.Device) {
390+
if nodeName == "" || device == nil {
391+
return
392+
}
393+
394+
info := n.getNodeDevice(nodeName)
395+
if info == nil {
396+
info = n.createNodeDevice(nodeName)
397+
}
398+
399+
info.lock.Lock()
400+
defer info.lock.Unlock()
401+
402+
nodeDeviceResource := map[schedulingv1alpha1.DeviceType]deviceResources{}
403+
for _, deviceInfo := range device.Spec.Devices {
404+
if nodeDeviceResource[deviceInfo.Type] == nil {
405+
nodeDeviceResource[deviceInfo.Type] = make(deviceResources)
406+
}
407+
if !deviceInfo.Health {
408+
nodeDeviceResource[deviceInfo.Type][int(*deviceInfo.Minor)] = make(corev1.ResourceList)
409+
klog.Errorf("Find device unhealthy, nodeName:%v, deviceType:%v, minor:%v",
410+
nodeName, deviceInfo.Type, deviceInfo.Minor)
411+
} else {
412+
nodeDeviceResource[deviceInfo.Type][int(*deviceInfo.Minor)] = deviceInfo.Resources
413+
klog.V(5).Infof("Find device resource update, nodeName:%v, deviceType:%v, minor:%v, res:%v",
414+
nodeName, deviceInfo.Type, deviceInfo.Minor, deviceInfo.Resources)
415+
}
416+
}
417+
418+
info.resetDeviceTotal(nodeDeviceResource)
419+
}
420+
386421
func (n *nodeDeviceCache) getNodeDeviceSummary(nodeName string) (*NodeDeviceSummary, bool) {
387422
n.lock.RLock()
388423
defer n.lock.RUnlock()

0 commit comments

Comments
 (0)