Skip to content

Commit a274368

Browse files
committed
support fair share
1 parent 4492e66 commit a274368

File tree

12 files changed

+683
-93
lines changed

12 files changed

+683
-93
lines changed

pkg/scheduler/actions/allocate/allocate.go

Lines changed: 73 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,14 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
4343
defer glog.V(3).Infof("Leaving Allocate ...")
4444

4545
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
46+
// map[queueId]PriorityQueue(*api.JobInfo)
4647
jobsMap := map[api.QueueID]*util.PriorityQueue{}
48+
// map[queueId]PriorityQueue(namespaceName)
49+
namespaceMap := map[api.QueueID]*util.PriorityQueue{}
50+
// map[queueId]map[namespaceName]PriorityQueue(*api.JobInfo)
51+
jobInNamespaceMap := map[api.QueueID]map[string]*util.PriorityQueue{}
52+
53+
namespaceOrderEnabled := ssn.NamespaceOrderEnabled()
4754

4855
for _, job := range ssn.Jobs {
4956
if job.PodGroup.Status.Phase == api.PodGroupPending {
@@ -62,12 +69,37 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
6269
continue
6370
}
6471

72+
// ignore namespace order enabled or not, just add key in jobsMap
6573
if _, found := jobsMap[job.Queue]; !found {
6674
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
6775
}
6876

69-
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
70-
jobsMap[job.Queue].Push(job)
77+
if !namespaceOrderEnabled {
78+
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
79+
jobsMap[job.Queue].Push(job)
80+
continue
81+
}
82+
83+
if _, found := namespaceMap[job.Queue]; !found {
84+
namespaceMap[job.Queue] = util.NewPriorityQueue(ssn.NamespaceOrderFn)
85+
}
86+
87+
namespaceJob, found := jobInNamespaceMap[job.Queue]
88+
if !found {
89+
namespaceJob = make(map[string]*util.PriorityQueue)
90+
jobInNamespaceMap[job.Queue] = namespaceJob
91+
}
92+
93+
jobs, found := namespaceJob[job.Namespace]
94+
if !found {
95+
jobs = util.NewPriorityQueue(ssn.JobOrderFn)
96+
namespaceJob[job.Namespace] = jobs
97+
98+
namespaceMap[job.Queue].Push(job.Namespace)
99+
}
100+
101+
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s> Namespace <%s>", job.Namespace, job.Name, job.Queue, job.Namespace)
102+
jobs.Push(job)
71103
}
72104

73105
glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
@@ -102,16 +134,44 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
102134
continue
103135
}
104136

105-
jobs, found := jobsMap[queue.UID]
106-
107137
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
108138

109-
if !found || jobs.Empty() {
110-
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
111-
continue
139+
var jobQueue *util.PriorityQueue
140+
var namespace string
141+
var namespaceQueue *util.PriorityQueue
142+
143+
if !namespaceOrderEnabled {
144+
jobs, found := jobsMap[queue.UID]
145+
if !found || jobs.Empty() {
146+
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
147+
continue
148+
}
149+
jobQueue = jobs
150+
} else {
151+
namespaces, found := namespaceMap[queue.UID]
152+
if !found || namespaces.Empty() {
153+
glog.V(4).Infof("Can not find namespace for queue %s.", queue.Name)
154+
continue
155+
}
156+
namespaceQueue = namespaces
157+
namespace = namespaces.Pop().(string)
158+
159+
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>, namespace <%v>", queue.Name, namespace)
160+
161+
namespacesInQueue, found := jobInNamespaceMap[queue.UID]
162+
if !found {
163+
glog.V(4).Infof("Can not find namespace %s for queue %s", namespace, queue.Name)
164+
continue
165+
}
166+
namespaceJob, found := namespacesInQueue[namespace]
167+
if !found || namespaceJob.Empty() {
168+
glog.V(4).Infof("Can not find job for queue %s, namespace %s.", queue.Name, namespace)
169+
continue
170+
}
171+
jobQueue = namespaceJob
112172
}
113173

114-
job := jobs.Pop().(*api.JobInfo)
174+
job := jobQueue.Pop().(*api.JobInfo)
115175
if _, found := pendingTasks[job.UID]; !found {
116176
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
117177
for _, task := range job.TaskStatusIndex[api.Pending] {
@@ -183,7 +243,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
183243
}
184244

185245
if ssn.JobReady(job) {
186-
jobs.Push(job)
246+
jobQueue.Push(job)
187247
break
188248
}
189249
}
@@ -193,6 +253,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
193253
} else {
194254
stmt.Discard()
195255
}
256+
257+
if namespaceOrderEnabled {
258+
namespaceQueue.Push(namespace)
259+
}
196260
// Added Queue back until no job in Queue.
197261
queues.Push(queue)
198262
}

pkg/scheduler/api/cluster_info.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import "fmt"
2020

2121
// ClusterInfo is a snapshot of cluster by cache.
2222
type ClusterInfo struct {
23-
Jobs map[JobID]*JobInfo
24-
Nodes map[string]*NodeInfo
25-
Queues map[QueueID]*QueueInfo
23+
Jobs map[JobID]*JobInfo
24+
Nodes map[string]*NodeInfo
25+
Queues map[QueueID]*QueueInfo
26+
NamespaceInfo map[string]*NamespaceInfo
2627
}
2728

2829
func (ci ClusterInfo) String() string {
@@ -57,5 +58,13 @@ func (ci ClusterInfo) String() string {
5758
}
5859
}
5960

61+
if len(ci.NamespaceInfo) != 0 {
62+
str = str + "Namespaces:\n"
63+
for _, ns := range ci.NamespaceInfo {
64+
str = str + fmt.Sprintf("\t Namespace(%s) Weight(%v)\n",
65+
ns.Name, ns.Weight)
66+
}
67+
}
68+
6069
return str
6170
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
Copyright 2018 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package api
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
23+
"github.com/golang/glog"
24+
25+
v1 "k8s.io/api/core/v1"
26+
"k8s.io/client-go/tools/cache"
27+
)
28+
29+
const (
30+
// QuotaKey is the key in ResourceQuota.spec.hard indicating the weight of thi namespace
31+
QuotaKey = "volcano.sh/namespace.weight"
32+
// DefaultWeight is the default weight of namespace
33+
DefaultWeight = 1
34+
)
35+
36+
// NamespaceInfo records information of namespace
37+
type NamespaceInfo struct {
38+
Name string
39+
// Weight is the highest weight among many ResourceQuota.
40+
Weight int64
41+
}
42+
43+
// GetWeight returns weight of a namespace, any invalid case would get default value
44+
func (n *NamespaceInfo) GetWeight() int64 {
45+
if n == nil {
46+
return DefaultWeight
47+
}
48+
if n.Weight == 0 {
49+
return DefaultWeight
50+
}
51+
return n.Weight
52+
}
53+
54+
type quotaItem struct {
55+
name string
56+
weight int64
57+
}
58+
59+
func quotaItemKeyFunc(obj interface{}) (string, error) {
60+
item, ok := obj.(*quotaItem)
61+
if !ok {
62+
return "", fmt.Errorf("obj with type %T could not parse", obj)
63+
}
64+
return item.name, nil
65+
}
66+
67+
// for big root heap
68+
func quotaItemLessFunc(a interface{}, b interface{}) bool {
69+
A := a.(*quotaItem)
70+
B := b.(*quotaItem)
71+
return A.weight > B.weight
72+
}
73+
74+
// NamespaceCollection will record all details about namespace
75+
type NamespaceCollection struct {
76+
Name string
77+
78+
weightMu sync.Mutex
79+
quotaWeight *cache.Heap
80+
}
81+
82+
// NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace
83+
func NewNamespaceCollection(name string) *NamespaceCollection {
84+
n := &NamespaceCollection{
85+
Name: name,
86+
quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc),
87+
}
88+
// add at least one item into quotaWeight.
89+
// Because cache.Heap.Pop would be blocked until queue is not empty
90+
n.updateWeight(&quotaItem{
91+
name: QuotaKey,
92+
weight: DefaultWeight,
93+
})
94+
return n
95+
}
96+
97+
func (n *NamespaceCollection) deleteWeight(q *quotaItem) {
98+
n.weightMu.Lock()
99+
n.quotaWeight.Delete(q)
100+
n.weightMu.Unlock()
101+
}
102+
103+
func (n *NamespaceCollection) updateWeight(q *quotaItem) {
104+
n.weightMu.Lock()
105+
n.quotaWeight.Update(q)
106+
n.weightMu.Unlock()
107+
}
108+
109+
func itemFromQuota(quota *v1.ResourceQuota) *quotaItem {
110+
var weight int64 = DefaultWeight
111+
112+
quotaWeight, ok := quota.Spec.Hard[QuotaKey]
113+
if ok {
114+
weight = quotaWeight.Value()
115+
}
116+
117+
item := &quotaItem{
118+
name: quota.Name,
119+
weight: weight,
120+
}
121+
return item
122+
}
123+
124+
// Update modify the registered information according quota object
125+
func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) {
126+
n.updateWeight(itemFromQuota(quota))
127+
}
128+
129+
// Delete remove the registered information according quota object
130+
func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) {
131+
n.deleteWeight(itemFromQuota(quota))
132+
}
133+
134+
// Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection
135+
func (n *NamespaceCollection) Snapshot() *NamespaceInfo {
136+
var weight int64 = DefaultWeight
137+
138+
n.weightMu.Lock()
139+
obj, err := n.quotaWeight.Pop()
140+
if err != nil {
141+
glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err)
142+
} else {
143+
item := obj.(*quotaItem)
144+
weight = item.weight
145+
n.quotaWeight.Add(item)
146+
}
147+
n.weightMu.Unlock()
148+
149+
return &NamespaceInfo{
150+
Name: n.Name,
151+
Weight: weight,
152+
}
153+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package api
2+
3+
import (
4+
"testing"
5+
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/api/resource"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
)
10+
11+
func newQuota(name string, weight int) *v1.ResourceQuota {
12+
q := &v1.ResourceQuota{
13+
TypeMeta: metav1.TypeMeta{},
14+
ObjectMeta: metav1.ObjectMeta{
15+
Name: name,
16+
},
17+
Spec: v1.ResourceQuotaSpec{
18+
Hard: make(v1.ResourceList),
19+
},
20+
}
21+
22+
if weight >= 0 {
23+
q.Spec.Hard[v1.ResourceName(QuotaKey)] = *resource.NewQuantity(int64(weight), resource.DecimalSI)
24+
}
25+
26+
return q
27+
}
28+
29+
func TestNamespaceCollection(t *testing.T) {
30+
c := NewNamespaceCollection("testCollection")
31+
c.Update(newQuota("abc", 123))
32+
c.Update(newQuota("abc", 456))
33+
c.Update(newQuota("def", -1))
34+
c.Update(newQuota("def", 16))
35+
c.Update(newQuota("ghi", 0))
36+
37+
info := c.Snapshot()
38+
if info.Weight != 456 {
39+
t.Errorf("weight of namespace should be %d, but not %d", 456, info.Weight)
40+
}
41+
42+
c.Delete(newQuota("abc", 0))
43+
44+
info = c.Snapshot()
45+
if info.Weight != 16 {
46+
t.Errorf("weight of namespace should be %d, but not %d", 16, info.Weight)
47+
}
48+
49+
c.Delete(newQuota("abc", 0))
50+
c.Delete(newQuota("def", 15))
51+
c.Delete(newQuota("ghi", -1))
52+
53+
info = c.Snapshot()
54+
if info.Weight != DefaultWeight {
55+
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultWeight, info.Weight)
56+
}
57+
}
58+
59+
func TestEmptyNamespaceCollection(t *testing.T) {
60+
c := NewNamespaceCollection("testEmptyCollection")
61+
62+
info := c.Snapshot()
63+
if info.Weight != DefaultWeight {
64+
t.Errorf("weight of namespace should be %d, but not %d", DefaultWeight, info.Weight)
65+
}
66+
67+
// snapshot can be called anytime
68+
info = c.Snapshot()
69+
if info.Weight != DefaultWeight {
70+
t.Errorf("weight of namespace should be %d, but not %d", DefaultWeight, info.Weight)
71+
}
72+
73+
c.Delete(newQuota("abc", 0))
74+
75+
info = c.Snapshot()
76+
if info.Weight != DefaultWeight {
77+
t.Errorf("weight of namespace should be %d, but not %d", DefaultWeight, info.Weight)
78+
}
79+
80+
c.Delete(newQuota("abc", 0))
81+
c.Delete(newQuota("def", 15))
82+
c.Delete(newQuota("ghi", -1))
83+
84+
info = c.Snapshot()
85+
if info.Weight != DefaultWeight {
86+
t.Errorf("weight of namespace should be default weight %d, but not %d", DefaultWeight, info.Weight)
87+
}
88+
}

0 commit comments

Comments
 (0)