Skip to content

Commit 73c89ff

Browse files
authored
Merge pull request volcano-sh#21 from qi-min/agent-scheduler-clean
clean unused functions in agent-scheduler
2 parents 8314768 + 9827628 commit 73c89ff

File tree

7 files changed

+26
-224
lines changed

7 files changed

+26
-224
lines changed

pkg/agentscheduler/cache/cache.go

Lines changed: 15 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ type SchedulerCache struct {
9696
nodeInformer infov1.NodeInformer
9797

9898
Binder Binder
99-
Evictor Evictor
10099
StatusUpdater StatusUpdater
101100

102101
Recorder record.EventRecorder
@@ -106,8 +105,6 @@ type SchedulerCache struct {
106105

107106
taskCache *TaskCache
108107

109-
NamespaceCollection map[string]*schedulingapi.NamespaceCollection
110-
111108
errTasks workqueue.TypedRateLimitingInterface[string]
112109
nodeQueue workqueue.TypedRateLimitingInterface[string]
113110

@@ -361,16 +358,15 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
361358
)
362359

363360
sc := &SchedulerCache{
364-
Nodes: make(map[string]*schedulingapi.NodeInfo),
365-
errTasks: workqueue.NewTypedRateLimitingQueue[string](errTaskRateLimiter),
366-
nodeQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
367-
kubeClient: kubeClient,
368-
vcClient: vcClient,
369-
restConfig: config,
370-
schedulerNames: schedulerNames,
371-
nodeSelectorLabels: make(map[string]sets.Empty),
372-
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
373-
imageStates: make(map[string]*imageState),
361+
Nodes: make(map[string]*schedulingapi.NodeInfo),
362+
errTasks: workqueue.NewTypedRateLimitingQueue[string](errTaskRateLimiter),
363+
nodeQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
364+
kubeClient: kubeClient,
365+
vcClient: vcClient,
366+
restConfig: config,
367+
schedulerNames: schedulerNames,
368+
nodeSelectorLabels: make(map[string]sets.Empty),
369+
imageStates: make(map[string]*imageState),
374370

375371
NodeList: []string{},
376372
nodeWorkers: nodeWorkers,
@@ -395,11 +391,6 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
395391
}
396392
sc.Binder = GetBindMethod()
397393

398-
sc.Evictor = &defaultEvictor{
399-
kubeclient: sc.kubeClient,
400-
recorder: sc.Recorder,
401-
}
402-
403394
sc.StatusUpdater = &defaultStatusUpdater{
404395
kubeclient: sc.kubeClient,
405396
vcclient: sc.vcClient,
@@ -572,9 +563,6 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
572563
// Re-sync error tasks.
573564
go wait.Until(sc.processResyncTask, 0, stopCh)
574565

575-
// Cleanup jobs.
576-
go wait.Until(sc.processCleanupJob, 0, stopCh)
577-
578566
go wait.Until(sc.processBindTask, time.Millisecond*20, stopCh)
579567

580568
sc.ConflictAwareBinder.Run(stopCh)
@@ -586,43 +574,6 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) {
586574
sc.vcInformerFactory.WaitForCacheSync(stopCh)
587575
}
588576

589-
// Evict will evict the pod.
590-
//
591-
// If error occurs both task and job are guaranteed to be in the original state.
592-
func (sc *SchedulerCache) Evict(task *schedulingapi.TaskInfo, reason string) error {
593-
sc.Mutex.Lock()
594-
defer sc.Mutex.Unlock()
595-
596-
node, found := sc.Nodes[task.NodeName]
597-
if !found {
598-
return fmt.Errorf("failed to evict Task %v from host %v, host does not exist",
599-
task.UID, task.NodeName)
600-
}
601-
602-
originalStatus := task.Status
603-
604-
// Add new task to node.
605-
if err := node.UpdateTask(task); err != nil {
606-
// After failing to update task to a node we need to revert task status from Releasing,
607-
// otherwise task might be stuck in the Releasing state indefinitely.
608-
klog.Errorf("Task <%s/%s> will be resynchronized after failing to revert status "+
609-
"from %s to %s after failing to update Task on Node <%s>: %v",
610-
task.Namespace, task.Name, task.Status, originalStatus, node.Name, err)
611-
sc.resyncTask(task)
612-
return err
613-
}
614-
615-
p := task.Pod
616-
617-
go func() {
618-
err := sc.Evictor.Evict(p, reason)
619-
if err != nil {
620-
sc.resyncTask(task)
621-
}
622-
}()
623-
return nil
624-
}
625-
626577
// Bind binds task to the target host.
627578
func (sc *SchedulerCache) Bind(ctx context.Context, bindContexts []*vcache.BindContext, preBinders map[string]PreBinder) {
628579
readyToBindTasks := make([]*schedulingapi.TaskInfo, len(bindContexts))
@@ -747,10 +698,6 @@ func (sc *SchedulerCache) TaskUnschedulable(task *schedulingapi.TaskInfo, reason
747698
return nil
748699
}
749700

750-
func (sc *SchedulerCache) processCleanupJob() {
751-
// TODO process clean up Job
752-
}
753-
754701
// GetTaskInfo retrieves a task by its ID.
755702
func (sc *SchedulerCache) GetTaskInfo(taskID schedulingapi.TaskID) (*schedulingapi.TaskInfo, bool) {
756703
return sc.taskCache.Get(taskID)
@@ -772,15 +719,7 @@ func (sc *SchedulerCache) DeleteTaskInfo(taskID schedulingapi.TaskID) {
772719
}
773720

774721
func (sc *SchedulerCache) resyncTask(task *schedulingapi.TaskInfo) {
775-
key := sc.generateErrTaskKey(task)
776-
sc.errTasks.AddRateLimited(key)
777-
}
778-
779-
func (sc *SchedulerCache) generateErrTaskKey(task *schedulingapi.TaskInfo) string {
780-
// Job UID is namespace + / +name, for example: theNs/theJob
781-
// Task UID is derived from the Pod UID, for example: d336abea-4f14-42c7-8a6b-092959a31407
782-
// In the example above, the key ultimately becomes: theNs/theJob/d336abea-4f14-42c7-8a6b-092959a31407
783-
return fmt.Sprintf("%s/%s", task.Job, task.UID)
722+
sc.errTasks.AddRateLimited(string(task.UID))
784723
}
785724

786725
func (sc *SchedulerCache) parseErrTaskKey(key string) (*schedulingapi.TaskInfo, error) {
@@ -990,17 +929,15 @@ func (sc *SchedulerCache) BindTask() {
990929
sc.bindCache = make([]*vcache.BindContext, 0)
991930
}
992931

993-
// Snapshot returns the complete snapshot of the cluster from cache
932+
// Snapshot returns the complete snapshot of the cluster from cache, used for dump purpose only
994933
func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
995934
sc.Mutex.Lock()
996935
defer sc.Mutex.Unlock()
997936

998937
snapshot := &schedulingapi.ClusterInfo{
999-
Nodes: make(map[string]*schedulingapi.NodeInfo),
1000-
RealNodesSet: make(map[string]sets.Set[string]),
1001-
NamespaceInfo: make(map[schedulingapi.NamespaceName]*schedulingapi.NamespaceInfo),
1002-
RevocableNodes: make(map[string]*schedulingapi.NodeInfo),
1003-
NodeList: make([]string, len(sc.NodeList)),
938+
Nodes: make(map[string]*schedulingapi.NodeInfo),
939+
RealNodesSet: make(map[string]sets.Set[string]),
940+
NodeList: make([]string, len(sc.NodeList)),
1004941
}
1005942

1006943
// TODO add agent scheduler cache
@@ -1016,17 +953,8 @@ func (sc *SchedulerCache) Snapshot() *schedulingapi.ClusterInfo {
1016953
}
1017954

1018955
snapshot.Nodes[value.Name] = value.Clone()
1019-
1020-
if value.RevocableZone != "" {
1021-
snapshot.RevocableNodes[value.Name] = snapshot.Nodes[value.Name]
1022-
}
1023956
}
1024-
1025-
for _, value := range sc.NamespaceCollection {
1026-
info := value.Snapshot()
1027-
snapshot.NamespaceInfo[info.Name] = info
1028-
}
1029-
klog.V(3).InfoS("SnapShot for scheduling", "NodeNum", "NamespaceNum", "RevocableNodesNum", len(snapshot.Nodes), len(snapshot.NamespaceInfo), len(snapshot.RevocableNodes))
957+
klog.V(3).InfoS("SnapShot for scheduling", "NodeNum", len(snapshot.Nodes))
1030958
return snapshot
1031959
}
1032960

@@ -1060,14 +988,6 @@ func (sc *SchedulerCache) String() string {
1060988
}
1061989
}
1062990

1063-
if len(sc.NamespaceCollection) != 0 {
1064-
str += "Namespaces:\n"
1065-
for _, ns := range sc.NamespaceCollection {
1066-
info := ns.Snapshot()
1067-
str += fmt.Sprintf("\t Namespace(%s)\n", info.Name)
1068-
}
1069-
}
1070-
1071991
if len(sc.NodeList) != 0 {
1072992
str += fmt.Sprintf("NodeList: %v\n", sc.NodeList)
1073993
}

pkg/agentscheduler/cache/cache_mock.go

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,14 @@ import (
4242
// NewCustomMockSchedulerCache returns a mock scheduler cache with custom interface
4343
func NewCustomMockSchedulerCache(schedulerName string,
4444
binder Binder,
45-
evictor Evictor,
4645
statusUpdater StatusUpdater,
47-
PodGroupBinder BatchBinder,
4846
recorder record.EventRecorder,
4947
) *SchedulerCache {
5048
msc := newMockSchedulerCache(schedulerName)
5149
// add all events handlers
5250
msc.addEventHandler()
5351
msc.Recorder = recorder
5452
msc.Binder = binder
55-
msc.Evictor = evictor
5653
msc.StatusUpdater = statusUpdater
5754
checkAndSetDefaultInterface(msc)
5855
return msc
@@ -79,12 +76,6 @@ func checkAndSetDefaultInterface(sc *SchedulerCache) {
7976
recorder: sc.Recorder,
8077
}
8178
}
82-
if sc.Evictor == nil {
83-
sc.Evictor = &defaultEvictor{
84-
kubeclient: sc.kubeClient,
85-
recorder: sc.Recorder,
86-
}
87-
}
8879
if sc.StatusUpdater == nil {
8980
sc.StatusUpdater = &defaultStatusUpdater{
9081
kubeclient: sc.kubeClient,
@@ -107,16 +98,15 @@ func getNodeWorkers() uint32 {
10798
// newMockSchedulerCache init the mock scheduler cache structure
10899
func newMockSchedulerCache(schedulerName string) *SchedulerCache {
109100
msc := &SchedulerCache{
110-
Nodes: make(map[string]*schedulingapi.NodeInfo),
111-
errTasks: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
112-
nodeQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
113-
kubeClient: fake.NewSimpleClientset(),
114-
vcClient: fakevcClient.NewSimpleClientset(),
115-
restConfig: nil,
116-
schedulerNames: []string{schedulerName},
117-
nodeSelectorLabels: make(map[string]sets.Empty),
118-
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
119-
imageStates: make(map[string]*imageState),
101+
Nodes: make(map[string]*schedulingapi.NodeInfo),
102+
errTasks: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
103+
nodeQueue: workqueue.NewTypedRateLimitingQueue[string](workqueue.DefaultTypedControllerRateLimiter[string]()),
104+
kubeClient: fake.NewSimpleClientset(),
105+
vcClient: fakevcClient.NewSimpleClientset(),
106+
restConfig: nil,
107+
schedulerNames: []string{schedulerName},
108+
nodeSelectorLabels: make(map[string]sets.Empty),
109+
imageStates: make(map[string]*imageState),
120110

121111
NodeList: []string{},
122112
binderRegistry: NewBinderRegistry(),

pkg/agentscheduler/cache/dumper.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,6 @@ func (d *Dumper) dumpAll() {
7979
klog.Info(d.printNodeInfo(nodeInfo))
8080
}
8181

82-
klog.Info("Dump of jobs info in scheduler cache")
83-
for _, jobInfo := range snapshot.Jobs {
84-
klog.Info(d.printJobInfo(jobInfo))
85-
}
86-
87-
klog.Info("Dump of hyperNodes info in scheduler cache")
88-
8982
d.displaySchedulerMemStats()
9083
}
9184

@@ -103,14 +96,6 @@ func (d *Dumper) printNodeInfo(node *api.NodeInfo) string {
10396
return data.String()
10497
}
10598

106-
func (d *Dumper) printJobInfo(jobInfo *api.JobInfo) string {
107-
var data strings.Builder
108-
data.WriteString("\n")
109-
data.WriteString(jobInfo.String())
110-
data.WriteString("\n")
111-
return data.String()
112-
}
113-
11499
// ListenForSignal starts a goroutine that will respond when process
115100
// receives SIGUSER1/SIGUSER2 signal.
116101
func (d *Dumper) ListenForSignal(stopCh <-chan struct{}) {

pkg/agentscheduler/cache/event_handlers.go

Lines changed: 0 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -506,80 +506,3 @@ func (sc *SchedulerCache) nodeCanAddCache(node *v1.Node) bool {
506506
klog.Infof("node %s ignore add/update/delete into schedulerCache", node.Name)
507507
return false
508508
}
509-
510-
func (sc *SchedulerCache) updateResourceQuota(quota *v1.ResourceQuota) {
511-
collection, ok := sc.NamespaceCollection[quota.Namespace]
512-
if !ok {
513-
collection = schedulingapi.NewNamespaceCollection(quota.Namespace)
514-
sc.NamespaceCollection[quota.Namespace] = collection
515-
}
516-
517-
collection.Update(quota)
518-
}
519-
520-
func (sc *SchedulerCache) deleteResourceQuota(quota *v1.ResourceQuota) {
521-
collection, ok := sc.NamespaceCollection[quota.Namespace]
522-
if !ok {
523-
return
524-
}
525-
526-
collection.Delete(quota)
527-
}
528-
529-
// DeleteResourceQuota delete ResourceQuota from the scheduler cache
530-
func (sc *SchedulerCache) DeleteResourceQuota(obj interface{}) {
531-
var r *v1.ResourceQuota
532-
switch t := obj.(type) {
533-
case *v1.ResourceQuota:
534-
r = t
535-
case cache.DeletedFinalStateUnknown:
536-
var ok bool
537-
r, ok = t.Obj.(*v1.ResourceQuota)
538-
if !ok {
539-
klog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t.Obj)
540-
return
541-
}
542-
default:
543-
klog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t)
544-
return
545-
}
546-
547-
sc.Mutex.Lock()
548-
defer sc.Mutex.Unlock()
549-
550-
klog.V(3).Infof("Delete ResourceQuota <%s/%v> in cache", r.Namespace, r.Name)
551-
sc.deleteResourceQuota(r)
552-
}
553-
554-
// UpdateResourceQuota update ResourceQuota to scheduler cache
555-
func (sc *SchedulerCache) UpdateResourceQuota(oldObj, newObj interface{}) {
556-
newR, ok := newObj.(*v1.ResourceQuota)
557-
if !ok {
558-
klog.Errorf("Cannot convert newObj to *v1.ResourceQuota: %v", newObj)
559-
return
560-
}
561-
562-
sc.Mutex.Lock()
563-
defer sc.Mutex.Unlock()
564-
565-
klog.V(3).Infof("Update ResourceQuota <%s/%v> in cache, with spec: %v.", newR.Namespace, newR.Name, newR.Spec.Hard)
566-
sc.updateResourceQuota(newR)
567-
}
568-
569-
// AddResourceQuota add ResourceQuota to scheduler cache
570-
func (sc *SchedulerCache) AddResourceQuota(obj interface{}) {
571-
var r *v1.ResourceQuota
572-
switch t := obj.(type) {
573-
case *v1.ResourceQuota:
574-
r = t
575-
default:
576-
klog.Errorf("Cannot convert to *v1.ResourceQuota: %v", t)
577-
return
578-
}
579-
580-
sc.Mutex.Lock()
581-
defer sc.Mutex.Unlock()
582-
583-
klog.V(3).Infof("Add ResourceQuota <%s/%v> in cache, with spec: %v.", r.Namespace, r.Name, r.Spec.Hard)
584-
sc.updateResourceQuota(r)
585-
}

pkg/agentscheduler/cache/interface.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ type Cache interface {
5454
// TODO(jinzhej): clean up expire Tasks.
5555
AddBindTask(bindCtx *vcache.BindContext) error
5656

57-
// Evict evicts the task to release resources.
58-
Evict(task *api.TaskInfo, reason string) error
5957
// Client returns the kubernetes clientSet, which can be used by plugins
6058
Client() kubernetes.Interface
6159

@@ -103,21 +101,11 @@ type Binder interface {
103101
Bind(kubeClient kubernetes.Interface, tasks []*api.TaskInfo) map[api.TaskID]string
104102
}
105103

106-
// Evictor interface for evict pods
107-
type Evictor interface {
108-
Evict(pod *v1.Pod, reason string) error
109-
}
110-
111104
// StatusUpdater updates pod with given PodCondition
112105
type StatusUpdater interface {
113106
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
114107
}
115108

116-
// BatchBinder updates podgroup or job information
117-
type BatchBinder interface {
118-
Bind(job *api.JobInfo, cluster string) (*api.JobInfo, error)
119-
}
120-
121109
type PreBinder interface {
122110
PreBind(ctx context.Context, bindCtx *vcache.BindContext) error
123111

pkg/agentscheduler/plugins/factory.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
)
2323

2424
func init() {
25-
// Plugins for Jobs
2625
framework.RegisterPluginBuilder(predicates.PluginName, predicates.New)
2726
// framework.RegisterPluginBuilder(nodeorder.PluginName, nodeorder.New)
2827
}

0 commit comments

Comments
 (0)