Skip to content

Commit c7126c6

Browse files
Merge pull request #358 from lminzhw/drf_fair_share
support fair share
2 parents f667715 + 2f918a5 commit c7126c6

18 files changed

Lines changed: 935 additions & 107 deletions

File tree

installer/helm/chart/volcano/templates/scheduler.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ rules:
4848
- apiGroups: [""]
4949
resources: ["namespaces"]
5050
verbs: ["list", "watch"]
51+
- apiGroups: [""]
52+
resources: ["resourcequotas"]
53+
verbs: ["list", "watch"]
5154
- apiGroups: ["storage.k8s.io"]
5255
resources: ["storageclasses"]
5356
verbs: ["list", "watch"]

installer/volcano-development.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ rules:
6767
- apiGroups: [""]
6868
resources: ["namespaces"]
6969
verbs: ["list", "watch"]
70+
- apiGroups: [""]
71+
resources: ["resourcequotas"]
72+
verbs: ["list", "watch"]
7073
- apiGroups: ["storage.k8s.io"]
7174
resources: ["storageclasses"]
7275
verbs: ["list", "watch"]

pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,20 @@ func (alloc *allocateAction) Initialize() {}
4242
func (alloc *allocateAction) Execute(ssn *framework.Session) {
4343
glog.V(3).Infof("Enter Allocate ...")
4444
defer glog.V(3).Infof("Leaving Allocate ...")
45-
// further
46-
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
47-
jobsMap := map[api.QueueID]*util.PriorityQueue{}
45+
46+
// the allocation for pod may have many stages
47+
// 1. pick a namespace named N (using ssn.NamespaceOrderFn)
48+
// 2. pick a queue named Q from N (using ssn.QueueOrderFn)
49+
// 3. pick a job named J from Q (using ssn.JobOrderFn)
50+
// 4. pick a task T from J (using ssn.TaskOrderFn)
51+
// 5. use predicateFn to filter out node that T can not be allocated on.
52+
// 6. use ssn.NodeOrderFn to judge the best node and assign it to T
53+
54+
namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn)
55+
56+
// jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo)
57+
// used to find job with highest priority in given queue and namespace
58+
jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{}
4859

4960
for _, job := range ssn.Jobs {
5061
if job.PodGroup.Status.Phase == scheduling.PodGroupPending {
@@ -55,23 +66,32 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
5566
continue
5667
}
5768

58-
if queue, found := ssn.Queues[job.Queue]; found {
59-
queues.Push(queue)
60-
} else {
69+
if _, found := ssn.Queues[job.Queue]; !found {
6170
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
6271
job.Namespace, job.Name, job.Queue)
6372
continue
6473
}
6574

66-
if _, found := jobsMap[job.Queue]; !found {
67-
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
75+
namespace := api.NamespaceName(job.Namespace)
76+
queueMap, found := jobsMap[namespace]
77+
if !found {
78+
namespaces.Push(namespace)
79+
80+
queueMap = make(map[api.QueueID]*util.PriorityQueue)
81+
jobsMap[namespace] = queueMap
82+
}
83+
84+
jobs, found := queueMap[job.Queue]
85+
if !found {
86+
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
87+
queueMap[job.Queue] = jobs
6888
}
6989

7090
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
71-
jobsMap[job.Queue].Push(job)
91+
jobs.Push(job)
7292
}
7393

74-
glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
94+
glog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap))
7595

7696
pendingTasks := map[api.JobID]*util.PriorityQueue{}
7797

@@ -92,21 +112,47 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
92112
return ssn.PredicateFn(task, node)
93113
}
94114

115+
// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
116+
// Because we believe that number of queues would less than namespaces in most case.
117+
// And, this action would make the resource usage among namespace balanced.
95118
for {
96-
if queues.Empty() {
119+
if namespaces.Empty() {
97120
break
98121
}
99122

100-
queue := queues.Pop().(*api.QueueInfo)
101-
if ssn.Overused(queue) {
102-
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
103-
continue
123+
// pick namespace from namespaces PriorityQueue
124+
namespace := namespaces.Pop().(api.NamespaceName)
125+
126+
queueInNamespace := jobsMap[namespace]
127+
128+
// pick queue for given namespace
129+
//
130+
// This block use a algorithm with time complex O(n).
131+
// But at least PriorityQueue could not be used here,
132+
// because the allocation of job would change the priority of queue among all namespaces,
133+
// and the PriorityQueue have no ability to update priority for a special queue.
134+
var queue *api.QueueInfo
135+
for queueId := range queueInNamespace {
136+
currentQueue := ssn.Queues[queueId]
137+
if ssn.Overused(currentQueue) {
138+
glog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name)
139+
delete(queueInNamespace, queueId)
140+
continue
141+
}
142+
143+
if queue == nil || ssn.QueueOrderFn(currentQueue, queue) {
144+
queue = currentQueue
145+
}
104146
}
105147

106-
jobs, found := jobsMap[queue.UID]
148+
if queue == nil {
149+
glog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace)
150+
continue
151+
}
107152

108-
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
153+
glog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name)
109154

155+
jobs, found := queueInNamespace[queue.UID]
110156
if !found || jobs.Empty() {
111157
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
112158
continue
@@ -194,8 +240,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
194240
} else {
195241
stmt.Discard()
196242
}
197-
// Added Queue back until no job in Queue.
198-
queues.Push(queue)
243+
244+
// Added Namespace back until no job in Namespace.
245+
namespaces.Push(namespace)
199246
}
200247
}
201248

pkg/scheduler/actions/allocate/allocate_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,10 @@ func TestAllocate(t *testing.T) {
182182
{
183183
Plugins: []conf.PluginOption{
184184
{
185-
Name: "drf",
186-
EnabledPreemptable: &trueValue,
187-
EnabledJobOrder: &trueValue,
185+
Name: "drf",
186+
EnabledPreemptable: &trueValue,
187+
EnabledJobOrder: &trueValue,
188+
EnabledNamespaceOrder: &trueValue,
188189
},
189190
{
190191
Name: "proportion",

pkg/scheduler/api/cluster_info.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import "fmt"
2020

2121
// ClusterInfo is a snapshot of cluster by cache.
2222
type ClusterInfo struct {
23-
Jobs map[JobID]*JobInfo
24-
Nodes map[string]*NodeInfo
25-
Queues map[QueueID]*QueueInfo
23+
Jobs map[JobID]*JobInfo
24+
Nodes map[string]*NodeInfo
25+
Queues map[QueueID]*QueueInfo
26+
NamespaceInfo map[NamespaceName]*NamespaceInfo
2627
}
2728

2829
func (ci ClusterInfo) String() string {
@@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string {
5758
}
5859
}
5960

61+
if len(ci.NamespaceInfo) != 0 {
62+
str = str + "Namespaces:\n"
63+
for _, ns := range ci.NamespaceInfo {
64+
str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
65+
ns.Name, ns.Weight)
66+
}
67+
}
68+
6069
return str
6170
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright 2018 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/golang/glog"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/client-go/tools/cache"
26+
)
27+
28+
// NamespaceName is name of namespace
29+
type NamespaceName string
30+
31+
const (
32+
// NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace
33+
NamespaceWeightKey = "volcano.sh/namespace.weight"
34+
// DefaultNamespaceWeight is the default weight of namespace
35+
DefaultNamespaceWeight = 1
36+
)
37+
38+
// NamespaceInfo records information of namespace
39+
type NamespaceInfo struct {
40+
// Name is the name of this namespace
41+
Name NamespaceName
42+
// Weight is the highest weight among many ResourceQuota.
43+
Weight int64
44+
}
45+
46+
// GetWeight returns weight of a namespace, any invalid case would get default value
47+
func (n *NamespaceInfo) GetWeight() int64 {
48+
if n == nil || n.Weight == 0 {
49+
return DefaultNamespaceWeight
50+
}
51+
return n.Weight
52+
}
53+
54+
type quotaItem struct {
55+
name string
56+
weight int64
57+
}
58+
59+
func quotaItemKeyFunc(obj interface{}) (string, error) {
60+
item, ok := obj.(*quotaItem)
61+
if !ok {
62+
return "", fmt.Errorf("obj with type %T could not parse", obj)
63+
}
64+
return item.name, nil
65+
}
66+
67+
// for big root heap
68+
func quotaItemLessFunc(a interface{}, b interface{}) bool {
69+
A := a.(*quotaItem)
70+
B := b.(*quotaItem)
71+
return A.weight > B.weight
72+
}
73+
74+
// NamespaceCollection will record all details about namespace
75+
type NamespaceCollection struct {
76+
Name string
77+
78+
quotaWeight *cache.Heap
79+
}
80+
81+
// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
82+
func NewNamespaceCollection(name string) *NamespaceCollection {
83+
n := &NamespaceCollection{
84+
Name: name,
85+
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
86+
}
87+
// add at least one item into quotaWeight.
88+
// Because cache.Heap.Pop would be blocked until queue is not empty
89+
n.updateWeight(&quotaItem{
90+
name: NamespaceWeightKey,
91+
weight: DefaultNamespaceWeight,
92+
})
93+
return n
94+
}
95+
96+
func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
97+
n.quotaWeight.Delete(q)
98+
}
99+
100+
func (n *NamespaceCollection) updateWeight(q *quotaItem) {
101+
n.quotaWeight.Update(q)
102+
}
103+
104+
func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
105+
var weight int64 = DefaultNamespaceWeight
106+
107+
quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey]
108+
if ok {
109+
weight = quotaWeight.Value()
110+
}
111+
112+
item := &quotaItem{
113+
name: quota.Name,
114+
weight: weight,
115+
}
116+
return item
117+
}
118+
119+
// Update modify the registered information according quota object
120+
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
121+
n.updateWeight(itemFromQuota(quota))
122+
}
123+
124+
// Delete remove the registered information according quota object
125+
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
126+
n.deleteWeight(itemFromQuota(quota))
127+
}
128+
129+
// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
130+
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
131+
var weight int64 = DefaultNamespaceWeight
132+
133+
obj, err := n.quotaWeight.Pop()
134+
if err != nil {
135+
glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
136+
} else {
137+
item := obj.(*quotaItem)
138+
weight = item.weight
139+
n.quotaWeight.Add(item)
140+
}
141+
142+
return &NamespaceInfo{
143+
Name: NamespaceName(n.Name),
144+
Weight: weight,
145+
}
146+
}

0 commit comments

Comments
 (0)