Skip to content

Commit 771933e

Browse files
committed
support fair share
1 parent 11b5ad3 commit 771933e

File tree

14 files changed

+711
-105
lines changed

14 files changed

+711
-105
lines changed

installer/helm/chart/volcano/templates/scheduler.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ rules:
4848
- apiGroups: [""]
4949
resources: ["namespaces"]
5050
verbs: ["list", "watch"]
51+
- apiGroups: [""]
52+
resources: ["resourcequotas"]
53+
verbs: ["list", "watch"]
5154
- apiGroups: ["storage.k8s.io"]
5255
resources: ["storageclasses"]
5356
verbs: ["list", "watch"]

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 65 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,19 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
4242
glog.V(3).Infof("Enter Allocate ...")
4343
defer glog.V(3).Infof("Leaving Allocate ...")
4444

45-
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
46-
jobsMap := map[api.QueueID]*util.PriorityQueue{}
45+
// the allocation for pod may have many stages
46+
// 1. pick a namespace named N (using ssn.NamespaceOrderFn)
47+
// 2. pick a queue named Q from N (using ssn.QueueOrderFn)
48+
// 3. pick a job named J from Q (using ssn.JobOrderFn)
49+
// 4. pick a task T from J (using ssn.TaskOrderFn)
50+
// 5. use predicateFn to filter out node that T can not be allocated on.
51+
// 6. use ssn.NodeOrderFn to judge the best node and assign it to T
52+
53+
namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn)
54+
55+
// jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo)
56+
// used to find job with highest priority in given queue and namespace
57+
jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{}
4758

4859
for _, job := range ssn.Jobs {
4960
if job.PodGroup.Status.Phase == api.PodGroupPending {
@@ -54,23 +65,32 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
5465
continue
5566
}
5667

57-
if queue, found := ssn.Queues[job.Queue]; found {
58-
queues.Push(queue)
59-
} else {
68+
if _, found := ssn.Queues[job.Queue]; !found {
6069
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
6170
job.Namespace, job.Name, job.Queue)
6271
continue
6372
}
6473

65-
if _, found := jobsMap[job.Queue]; !found {
66-
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
74+
namespace := api.NamespaceName(job.Namespace)
75+
queueMap, found := jobsMap[namespace]
76+
if !found {
77+
namespaces.Push(namespace)
78+
79+
queueMap = make(map[api.QueueID]*util.PriorityQueue)
80+
jobsMap[namespace] = queueMap
81+
}
82+
83+
jobs, found := queueMap[job.Queue]
84+
if !found {
85+
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
86+
queueMap[job.Queue] = jobs
6787
}
6888

6989
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
70-
jobsMap[job.Queue].Push(job)
90+
jobs.Push(job)
7191
}
7292

73-
glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
93+
glog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap))
7494

7595
pendingTasks := map[api.JobID]*util.PriorityQueue{}
7696

@@ -91,21 +111,47 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
91111
return ssn.PredicateFn(task, node)
92112
}
93113

114+
// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
115+
// Because we believe that number of queues would less than namespaces in most case.
116+
// And, this action would make the resource usage among namespace balanced.
94117
for {
95-
if queues.Empty() {
118+
if namespaces.Empty() {
96119
break
97120
}
98121

99-
queue := queues.Pop().(*api.QueueInfo)
100-
if ssn.Overused(queue) {
101-
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
102-
continue
122+
// pick namespace from namespaces PriorityQueue
123+
namespace := namespaces.Pop().(api.NamespaceName)
124+
125+
queueInNamespace := jobsMap[namespace]
126+
127+
// pick queue for given namespace
128+
//
129+
// This block use a algorithm with time complex O(n).
130+
// But at least PriorityQueue could not be used here,
131+
// because the allocation of job would change the priority of queue among all namespaces,
132+
// and the PriorityQueue have no ability to update priority for a special queue.
133+
var queue *api.QueueInfo
134+
for queueId := range queueInNamespace {
135+
currentQueue := ssn.Queues[queueId]
136+
if ssn.Overused(currentQueue) {
137+
glog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name)
138+
delete(queueInNamespace, queueId)
139+
continue
140+
}
141+
142+
if queue == nil || ssn.QueueOrderFn(currentQueue, queue) {
143+
queue = currentQueue
144+
}
103145
}
104146

105-
jobs, found := jobsMap[queue.UID]
147+
if queue == nil {
148+
glog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace)
149+
continue
150+
}
106151

107-
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
152+
glog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name)
108153

154+
jobs, found := queueInNamespace[queue.UID]
109155
if !found || jobs.Empty() {
110156
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
111157
continue
@@ -193,8 +239,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
193239
} else {
194240
stmt.Discard()
195241
}
196-
// Added Queue back until no job in Queue.
197-
queues.Push(queue)
242+
243+
// Added Namespace back until no job in Namespace.
244+
namespaces.Push(namespace)
198245
}
199246
}
200247

pkg/scheduler/actions/allocate/allocate_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,10 @@ func TestAllocate(t *testing.T) {
181181
{
182182
Plugins: []conf.PluginOption{
183183
{
184-
Name: "drf",
185-
EnabledPreemptable: &trueValue,
186-
EnabledJobOrder: &trueValue,
184+
Name: "drf",
185+
EnabledPreemptable: &trueValue,
186+
EnabledJobOrder: &trueValue,
187+
EnabledNamespaceOrder: &trueValue,
187188
},
188189
{
189190
Name: "proportion",

pkg/scheduler/api/cluster_info.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import "fmt"
2020

2121
// ClusterInfo is a snapshot of cluster by cache.
2222
type ClusterInfo struct {
23-
Jobs map[JobID]*JobInfo
24-
Nodes map[string]*NodeInfo
25-
Queues map[QueueID]*QueueInfo
23+
Jobs map[JobID]*JobInfo
24+
Nodes map[string]*NodeInfo
25+
Queues map[QueueID]*QueueInfo
26+
NamespaceInfo map[NamespaceName]*NamespaceInfo
2627
}
2728

2829
func (ci ClusterInfo) String() string {
@@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string {
5758
}
5859
}
5960

61+
if len(ci.NamespaceInfo) != 0 {
62+
str = str + "Namespaces:\n"
63+
for _, ns := range ci.NamespaceInfo {
64+
str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
65+
ns.Name, ns.Weight)
66+
}
67+
}
68+
6069
return str
6170
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright 2018 The Volcano Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"fmt"
21+
22+
"github.com/golang/glog"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/client-go/tools/cache"
26+
)
27+
28+
// NamespaceName is name of namespace
29+
type NamespaceName string
30+
31+
const (
32+
// NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace
33+
NamespaceWeightKey = "volcano.sh/namespace.weight"
34+
// DefaultNamespaceWeight is the default weight of namespace
35+
DefaultNamespaceWeight = 1
36+
)
37+
38+
// NamespaceInfo records information of namespace
39+
type NamespaceInfo struct {
40+
// Name is the name of this namespace
41+
Name NamespaceName
42+
// Weight is the highest weight among many ResourceQuota.
43+
Weight int64
44+
}
45+
46+
// GetWeight returns weight of a namespace, any invalid case would get default value
47+
func (n *NamespaceInfo) GetWeight() int64 {
48+
if n == nil || n.Weight == 0 {
49+
return DefaultNamespaceWeight
50+
}
51+
return n.Weight
52+
}
53+
54+
type quotaItem struct {
55+
name string
56+
weight int64
57+
}
58+
59+
func quotaItemKeyFunc(obj interface{}) (string, error) {
60+
item, ok := obj.(*quotaItem)
61+
if !ok {
62+
return "", fmt.Errorf("obj with type %T could not parse", obj)
63+
}
64+
return item.name, nil
65+
}
66+
67+
// for big root heap
68+
func quotaItemLessFunc(a interface{}, b interface{}) bool {
69+
A := a.(*quotaItem)
70+
B := b.(*quotaItem)
71+
return A.weight > B.weight
72+
}
73+
74+
// NamespaceCollection will record all details about namespace
75+
type NamespaceCollection struct {
76+
Name string
77+
78+
quotaWeight *cache.Heap
79+
}
80+
81+
// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
82+
func NewNamespaceCollection(name string) *NamespaceCollection {
83+
n := &NamespaceCollection{
84+
Name: name,
85+
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
86+
}
87+
// add at least one item into quotaWeight.
88+
// Because cache.Heap.Pop would be blocked until queue is not empty
89+
n.updateWeight(&quotaItem{
90+
name: NamespaceWeightKey,
91+
weight: DefaultNamespaceWeight,
92+
})
93+
return n
94+
}
95+
96+
func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
97+
n.quotaWeight.Delete(q)
98+
}
99+
100+
func (n *NamespaceCollection) updateWeight(q *quotaItem) {
101+
n.quotaWeight.Update(q)
102+
}
103+
104+
func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
105+
var weight int64 = DefaultNamespaceWeight
106+
107+
quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey]
108+
if ok {
109+
weight = quotaWeight.Value()
110+
}
111+
112+
item := &quotaItem{
113+
name: quota.Name,
114+
weight: weight,
115+
}
116+
return item
117+
}
118+
119+
// Update modify the registered information according quota object
120+
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
121+
n.updateWeight(itemFromQuota(quota))
122+
}
123+
124+
// Delete remove the registered information according quota object
125+
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
126+
n.deleteWeight(itemFromQuota(quota))
127+
}
128+
129+
// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
130+
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
131+
var weight int64 = DefaultNamespaceWeight
132+
133+
obj, err := n.quotaWeight.Pop()
134+
if err != nil {
135+
glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
136+
} else {
137+
item := obj.(*quotaItem)
138+
weight = item.weight
139+
n.quotaWeight.Add(item)
140+
}
141+
142+
return &NamespaceInfo{
143+
Name: NamespaceName(n.Name),
144+
Weight: weight,
145+
}
146+
}

0 commit comments

Comments
 (0)