-
Notifications
You must be signed in to change notification settings - Fork 1.3k
support fair share #358
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support fair share #358
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,9 +42,20 @@ func (alloc *allocateAction) Initialize() {} | |
| func (alloc *allocateAction) Execute(ssn *framework.Session) { | ||
| glog.V(3).Infof("Enter Allocate ...") | ||
| defer glog.V(3).Infof("Leaving Allocate ...") | ||
| // further | ||
| queues := util.NewPriorityQueue(ssn.QueueOrderFn) | ||
| jobsMap := map[api.QueueID]*util.PriorityQueue{} | ||
|
|
||
| // the allocation for pod may have many stages | ||
| // 1. pick a namespace named N (using ssn.NamespaceOrderFn) | ||
| // 2. pick a queue named Q from N (using ssn.QueueOrderFn) | ||
| // 3. pick a job named J from Q (using ssn.JobOrderFn) | ||
| // 4. pick a task T from J (using ssn.TaskOrderFn) | ||
| // 5. use predicateFn to filter out node that T can not be allocated on. | ||
| // 6. use ssn.NodeOrderFn to judge the best node and assign it to T | ||
|
|
||
| namespaces := util.NewPriorityQueue(ssn.NamespaceOrderFn) | ||
|
|
||
| // jobsMap is map[api.NamespaceName]map[api.QueueID]PriorityQueue(*api.JobInfo) | ||
| // used to find job with highest priority in given queue and namespace | ||
| jobsMap := map[api.NamespaceName]map[api.QueueID]*util.PriorityQueue{} | ||
|
|
||
| for _, job := range ssn.Jobs { | ||
| if job.PodGroup.Status.Phase == scheduling.PodGroupPending { | ||
|
|
@@ -55,23 +66,32 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { | |
| continue | ||
| } | ||
|
|
||
| if queue, found := ssn.Queues[job.Queue]; found { | ||
| queues.Push(queue) | ||
| } else { | ||
| if _, found := ssn.Queues[job.Queue]; !found { | ||
| glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found", | ||
| job.Namespace, job.Name, job.Queue) | ||
| continue | ||
| } | ||
|
|
||
| if _, found := jobsMap[job.Queue]; !found { | ||
| jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) | ||
| namespace := api.NamespaceName(job.Namespace) | ||
| queueMap, found := jobsMap[namespace] | ||
| if !found { | ||
| namespaces.Push(namespace) | ||
|
|
||
| queueMap = make(map[api.QueueID]*util.PriorityQueue) | ||
| jobsMap[namespace] = queueMap | ||
| } | ||
|
|
||
| jobs, found := queueMap[job.Queue] | ||
| if !found { | ||
| jobs = util.NewPriorityQueue(ssn.JobOrderFn) | ||
| queueMap[job.Queue] = jobs | ||
| } | ||
|
|
||
| glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) | ||
| jobsMap[job.Queue].Push(job) | ||
| jobs.Push(job) | ||
| } | ||
|
|
||
| glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap)) | ||
| glog.V(3).Infof("Try to allocate resource to %d Namespaces", len(jobsMap)) | ||
|
|
||
| pendingTasks := map[api.JobID]*util.PriorityQueue{} | ||
|
|
||
|
|
@@ -92,21 +112,47 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { | |
| return ssn.PredicateFn(task, node) | ||
| } | ||
|
|
||
| // To pick <namespace, queue> tuple for job, we choose to pick namespace firstly. | ||
| // Because we believe that number of queues would less than namespaces in most case. | ||
| // And, this action would make the resource usage among namespace balanced. | ||
| for { | ||
| if queues.Empty() { | ||
| if namespaces.Empty() { | ||
| break | ||
| } | ||
|
|
||
| queue := queues.Pop().(*api.QueueInfo) | ||
| if ssn.Overused(queue) { | ||
| glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name) | ||
| continue | ||
| // pick namespace from namespaces PriorityQueue | ||
| namespace := namespaces.Pop().(api.NamespaceName) | ||
|
|
||
| queueInNamespace := jobsMap[namespace] | ||
|
|
||
| // pick queue for given namespace | ||
| // | ||
| // This block use a algorithm with time complex O(n). | ||
| // But at least PriorityQueue could not be used here, | ||
| // because the allocation of job would change the priority of queue among all namespaces, | ||
| // and the PriorityQueue have no ability to update priority for a special queue. | ||
| var queue *api.QueueInfo | ||
| for queueId := range queueInNamespace { | ||
| currentQueue := ssn.Queues[queueId] | ||
| if ssn.Overused(currentQueue) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the queue is overused, could not the queue get resource even the cluster has idle resource?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the overused queue will be skipped in master code too. |
||
| glog.V(3).Infof("Namespace <%s> Queue <%s> is overused, ignore it.", namespace, currentQueue.Name) | ||
| delete(queueInNamespace, queueId) | ||
| continue | ||
| } | ||
|
|
||
| if queue == nil || ssn.QueueOrderFn(currentQueue, queue) { | ||
| queue = currentQueue | ||
| } | ||
| } | ||
|
|
||
| jobs, found := jobsMap[queue.UID] | ||
| if queue == nil { | ||
| glog.V(3).Infof("Namespace <%s> have no queue, skip it", namespace) | ||
| continue | ||
| } | ||
|
|
||
| glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name) | ||
| glog.V(3).Infof("Try to allocate resource to Jobs in Namespace <%s> Queue <%v>", namespace, queue.Name) | ||
|
|
||
| jobs, found := queueInNamespace[queue.UID] | ||
| if !found || jobs.Empty() { | ||
| glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name) | ||
| continue | ||
|
|
@@ -194,8 +240,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { | |
| } else { | ||
| stmt.Discard() | ||
| } | ||
| // Added Queue back until no job in Queue. | ||
| queues.Push(queue) | ||
|
|
||
| // Added Namespace back until no job in Namespace. | ||
| namespaces.Push(namespace) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| /* | ||
| Copyright 2018 The Volcano Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package api | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "github.com/golang/glog" | ||
|
|
||
| v1 "k8s.io/api/core/v1" | ||
| "k8s.io/client-go/tools/cache" | ||
| ) | ||
|
|
||
| // NamespaceName is name of namespace | ||
| type NamespaceName string | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just want to make codes more readable, so choose to mark it as a new type but not mark it in comment. |
||
|
|
||
| const ( | ||
| // NamespaceWeightKey is the key in ResourceQuota.spec.hard indicating the weight of this namespace | ||
| NamespaceWeightKey = "volcano.sh/namespace.weight" | ||
| // DefaultNamespaceWeight is the default weight of namespace | ||
| DefaultNamespaceWeight = 1 | ||
| ) | ||
|
|
||
| // NamespaceInfo records information of namespace | ||
| type NamespaceInfo struct { | ||
| // Name is the name of this namespace | ||
| Name NamespaceName | ||
| // Weight is the highest weight among many ResourceQuota. | ||
| Weight int64 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we use signed integer, what's the best practice here? Do we suggest users to always use weights > 0 or it could be negative numbers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, weight < 1 is meaningless in this case, because we have a default value which is 1. Any weight < 1 will be ignored. |
||
| } | ||
|
|
||
| // GetWeight returns weight of a namespace, any invalid case would get default value | ||
| func (n *NamespaceInfo) GetWeight() int64 { | ||
| if n == nil || n.Weight == 0 { | ||
| return DefaultNamespaceWeight | ||
| } | ||
| return n.Weight | ||
| } | ||
|
|
||
| type quotaItem struct { | ||
| name string | ||
| weight int64 | ||
| } | ||
|
|
||
| func quotaItemKeyFunc(obj interface{}) (string, error) { | ||
| item, ok := obj.(*quotaItem) | ||
| if !ok { | ||
| return "", fmt.Errorf("obj with type %T could not parse", obj) | ||
| } | ||
| return item.name, nil | ||
| } | ||
|
|
||
| // for big root heap | ||
| func quotaItemLessFunc(a interface{}, b interface{}) bool { | ||
|
lminzhw marked this conversation as resolved.
|
||
| A := a.(*quotaItem) | ||
| B := b.(*quotaItem) | ||
| return A.weight > B.weight | ||
| } | ||
|
|
||
| // NamespaceCollection will record all details about namespace | ||
| type NamespaceCollection struct { | ||
| Name string | ||
|
|
||
| quotaWeight *cache.Heap | ||
|
lminzhw marked this conversation as resolved.
|
||
| } | ||
|
|
||
| // NewNamespaceCollection creates new NamespaceCollection object to record all information about a namespace | ||
| func NewNamespaceCollection(name string) *NamespaceCollection { | ||
| n := &NamespaceCollection{ | ||
| Name: name, | ||
| quotaWeight: cache.NewHeap(quotaItemKeyFunc, quotaItemLessFunc), | ||
| } | ||
| // add at least one item into quotaWeight. | ||
| // Because cache.Heap.Pop would be blocked until queue is not empty | ||
| n.updateWeight("aItem{ | ||
| name: NamespaceWeightKey, | ||
| weight: DefaultNamespaceWeight, | ||
| }) | ||
| return n | ||
| } | ||
|
|
||
| func (n *NamespaceCollection) deleteWeight(q *quotaItem) { | ||
| n.quotaWeight.Delete(q) | ||
| } | ||
|
|
||
| func (n *NamespaceCollection) updateWeight(q *quotaItem) { | ||
| n.quotaWeight.Update(q) | ||
| } | ||
|
|
||
| func itemFromQuota(quota *v1.ResourceQuota) *quotaItem { | ||
| var weight int64 = DefaultNamespaceWeight | ||
|
|
||
| quotaWeight, ok := quota.Spec.Hard[NamespaceWeightKey] | ||
| if ok { | ||
| weight = quotaWeight.Value() | ||
| } | ||
|
|
||
| item := "aItem{ | ||
| name: quota.Name, | ||
| weight: weight, | ||
| } | ||
| return item | ||
| } | ||
|
|
||
| // Update modify the registered information according quota object | ||
| func (n *NamespaceCollection) Update(quota *v1.ResourceQuota) { | ||
| n.updateWeight(itemFromQuota(quota)) | ||
| } | ||
|
|
||
| // Delete remove the registered information according quota object | ||
| func (n *NamespaceCollection) Delete(quota *v1.ResourceQuota) { | ||
| n.deleteWeight(itemFromQuota(quota)) | ||
| } | ||
|
|
||
| // Snapshot will clone a NamespaceInfo without Heap according NamespaceCollection | ||
| func (n *NamespaceCollection) Snapshot() *NamespaceInfo { | ||
| var weight int64 = DefaultNamespaceWeight | ||
|
|
||
| obj, err := n.quotaWeight.Pop() | ||
| if err != nil { | ||
| glog.Warningf("namespace %s, quota weight meets error %v when pop", n.Name, err) | ||
| } else { | ||
| item := obj.(*quotaItem) | ||
| weight = item.weight | ||
| n.quotaWeight.Add(item) | ||
| } | ||
|
|
||
| return &NamespaceInfo{ | ||
| Name: NamespaceName(n.Name), | ||
| Weight: weight, | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A little confused. What is the relationship between queue and namespace? According to the doc motivation, a namespace works like a user, and there might be many users(namespaces) in a queue. Will be many queues in a namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hex108 As I understand, generally speaking, different users (namespace) could schedule jobs in different Queues. This doc I think concentrates more on the faire resource sharing in one queue between users. This solution is mainly trying to resolve starvation problem in resource sharing.