Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit 2d6c99e

Browse files
authored
Merge pull request #105 from spodila/fixQueueBucketSorting
Fix queue bucket sorting
2 parents 71a387a + e4f9210 commit 2d6c99e

File tree

14 files changed

+435
-34
lines changed

14 files changed

+435
-34
lines changed

fenzo-core/fenzo-core.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
dependencies {
22
compile ("org.apache.mesos:mesos:latest.release")
33
compile ("com.fasterxml.jackson.core:jackson-databind:latest.release")
4+
compile ("org.json:json:20140107")
45
testCompile ("junit:junit-dep:latest.release")
56
testRuntime ("org.slf4j:slf4j-log4j12:latest.release")
67
}

fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private void reset() {
6666
private final TaskTracker taskTracker;
6767
private final String attrNameToGroupMaxResources;
6868
private final Map<String, Map<VMResource, Double>> maxResourcesMap;
69+
private final Map<VMResource, Double> totalResourcesMap;
6970
private final VMRejectLimiter vmRejectLimiter;
7071
private final AssignableVirtualMachine dummyVM = new AssignableVirtualMachine(null, null, "", null, 0L, null) {
7172
@Override
@@ -87,6 +88,7 @@ void assignResult(TaskAssignmentResult result) {
8788
this.leaseOfferExpirySecs = leaseOfferExpirySecs;
8889
this.attrNameToGroupMaxResources = attrNameToGroupMaxResources;
8990
maxResourcesMap = new HashMap<>();
91+
totalResourcesMap = new HashMap<>();
9092
vmRejectLimiter = new VMRejectLimiter(maxOffersToReject, leaseOfferExpirySecs); // ToDo make this configurable?
9193
activeVmGroups = new ActiveVmGroups();
9294
this.singleLeaseMode = singleLeaseMode;
@@ -214,11 +216,12 @@ List<AssignableVirtualMachine> prepareAndGetOrderedVMs(List<VirtualMachineLease>
214216
List<AssignableVirtualMachine> vms = new ArrayList<>();
215217
taskTracker.clearAssignedTasks();
216218
vmRejectLimiter.reset();
219+
resetTotalResources();
217220
// ToDo make this parallel maybe?
218221
for(Map.Entry<String, AssignableVirtualMachine> entry: virtualMachinesMap.entrySet()) {
219222
AssignableVirtualMachine avm = entry.getValue();
220223
avm.prepareForScheduling();
221-
if(isInActiveVmGroup(entry.getValue()) && entry.getValue().isAssignableNow()) {
224+
if(isInActiveVmGroup(avm) && avm.isAssignableNow()) {
222225
// for now, only add it if it is available right now
223226
if(logger.isDebugEnabled())
224227
logger.debug("Host " + avm.getHostname() + " available for assignments");
@@ -227,11 +230,31 @@ List<AssignableVirtualMachine> prepareAndGetOrderedVMs(List<VirtualMachineLease>
227230
else if(logger.isDebugEnabled())
228231
logger.debug("Host " + avm.getHostname() + " not available for assignments");
229232
saveMaxResources(avm);
233+
if (isInActiveVmGroup(avm) && !avm.isDisabled())
234+
addTotalResources(avm);
230235
}
236+
taskTracker.setTotalResources(totalResourcesMap);
231237
//Collections.sort(vms);
232238
return vms;
233239
}
234240

241+
private void resetTotalResources() {
242+
totalResourcesMap.clear();
243+
}
244+
245+
private void addTotalResources(AssignableVirtualMachine avm) {
246+
final Map<VMResource, Double> maxResources = avm.getMaxResources();
247+
for (VMResource r: maxResources.keySet()) {
248+
Double v = maxResources.get(r);
249+
if (v != null) {
250+
if (totalResourcesMap.get(r) == null)
251+
totalResourcesMap.put(r, v);
252+
else
253+
totalResourcesMap.put(r, totalResourcesMap.get(r) + v);
254+
}
255+
}
256+
}
257+
235258
private void removeExpiredLeases() {
236259
for(AssignableVirtualMachine avm: virtualMachinesMap.values())
237260
avm.removeExpiredLeases(!isInActiveVmGroup(avm));

fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,11 @@ boolean isActive() {
426426
}
427427

428428
boolean isAssignableNow() {
429-
return (System.currentTimeMillis()>disabledUntil) && !leasesMap.isEmpty();
429+
return !isDisabled() && !leasesMap.isEmpty();
430+
}
431+
432+
boolean isDisabled() {
433+
return System.currentTimeMillis() < disabledUntil;
430434
}
431435

432436
void setAssignedTask(TaskRequest request) {

fenzo-core/src/main/java/com/netflix/fenzo/TaskTracker.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,4 +222,9 @@ Map<String, ActiveTask> getAllAssignedTasks() {
222222
TaskGroupUsage getUsage(String taskGroupName) {
223223
return taskGroupUsages.get(taskGroupName);
224224
}
225+
226+
void setTotalResources(Map<VMResource, Double> totalResourcesMap) {
227+
if (usageTrackedQueue != null)
228+
usageTrackedQueue.setTotalResources(totalResourcesMap);
229+
}
225230
}

fenzo-core/src/main/java/com/netflix/fenzo/queues/UsageTrackedQueue.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.netflix.fenzo.queues;
1818

19+
import com.netflix.fenzo.VMResource;
20+
1921
import java.util.Collection;
2022
import java.util.Map;
2123

@@ -75,17 +77,18 @@ public double getDisk() {
7577
return disk;
7678
}
7779

78-
public double getDominantUsageShareFrom(ResUsage totals) {
79-
double max = totals.cpus == 0.0? cpus : cpus / totals.cpus;
80-
double tmp = totals.memory == 0.0? memory : memory / totals.memory;
81-
if (tmp > max)
82-
max = tmp;
83-
tmp = totals.networkMbps == 0.0? networkMbps : networkMbps / totals.networkMbps;
84-
if (tmp > max)
85-
max = tmp;
86-
tmp = totals.disk == 0.0? disk : disk / totals.disk;
87-
if (tmp > max)
88-
max = tmp;
80+
public double getDominantResUsageFrom(Map<VMResource, Double> totals) {
81+
Double tval = totals.get(VMResource.CPU);
82+
double max = tval != null && tval > 0.0? cpus / tval : cpus;
83+
tval = totals.get(VMResource.Memory);
84+
double tmp = tval != null && tval > 0.0? memory / tval : memory;
85+
max = Math.max(max, tmp);
86+
tval = totals.get(VMResource.Network);
87+
tmp = tval != null && tval > 0.0? networkMbps / tval : networkMbps;
88+
max = Math.max(max, tmp);
89+
tval = totals.get(VMResource.Disk);
90+
tmp = tval != null && tval > 0.0? disk / tval : disk;
91+
max = Math.max(max, tmp);
8992
return max;
9093
}
9194
}
@@ -146,11 +149,10 @@ public double getDominantUsageShareFrom(ResUsage totals) {
146149
QueuableTask removeTask(String id, QAttributes qAttributes) throws TaskQueueException;
147150

148151
/**
149-
* Get the usage of the dominant resource, expressed as a share within the given parent entity's usage.
150-
* @param parentUsage The resource usage of the parent entity.
151-
* @return The dominant resource usage as a share of the given parent entity's usage.
152+
* Get the usage of the dominant resource, expressed as a share of the total known available resources.
153+
* @return The dominant resource usage.
152154
*/
153-
double getDominantUsageShare(ResUsage parentUsage);
155+
double getDominantUsageShare();
154156

155157
/**
156158
* Reset the queue to mark the end of a scheduling iteration.
@@ -165,4 +167,11 @@ public double getDominantUsageShareFrom(ResUsage totals) {
165167
* @throws TaskQueueException if called concurrently with a scheduling iteration in progress.
166168
*/
167169
Map<TaskQueue.TaskState, Collection<QueuableTask>> getAllTasks() throws TaskQueueException;
170+
171+
/**
172+
* Set the map of total resources available. This is required to evaluate the dominant resource share used that may
173+
* be used by some queue implementations for fair share purposes.
174+
* @param totalResourcesMap Map of total resources to set.
175+
*/
176+
void setTotalResources(Map<VMResource, Double> totalResourcesMap);
168177
}

fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/QueueBucket.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.netflix.fenzo.queues.tiered;
1818

19+
import com.netflix.fenzo.VMResource;
1920
import com.netflix.fenzo.queues.*;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ class QueueBucket implements UsageTrackedQueue {
3940
// that scheduler's taskTracker will trigger call into assignTask() during the scheduling iteration.
4041
private final LinkedHashMap<String, QueuableTask> assignedTasks;
4142
private Iterator<Map.Entry<String, QueuableTask>> iterator = null;
43+
private Map<VMResource, Double> totalResourcesMap = Collections.emptyMap();
4244

4345
QueueBucket(int tierNumber, String name) {
4446
this.tierNumber = tierNumber;
@@ -118,8 +120,8 @@ public QueuableTask removeTask(String id, QAttributes qAttributes) throws TaskQu
118120
}
119121

120122
@Override
121-
public double getDominantUsageShare(ResUsage parentUsage) {
122-
return totals.getDominantUsageShareFrom(parentUsage);
123+
public double getDominantUsageShare() {
124+
return totals.getDominantResUsageFrom(totalResourcesMap);
123125
}
124126

125127
@Override
@@ -137,6 +139,11 @@ public Map<TaskQueue.TaskState, Collection<QueuableTask>> getAllTasks() throws T
137139
return result;
138140
}
139141

142+
@Override
143+
public void setTotalResources(Map<VMResource, Double> totalResourcesMap) {
144+
this.totalResourcesMap = totalResourcesMap;
145+
}
146+
140147
int size() {
141148
return queuedTasks.size() + launchedTasks.size(); // don't queueTask assignedTasks.size(), they are duplicate of queuedTasks
142149
}

fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/SortedBuckets.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class SortedBuckets {
5050
comparator = new Comparator<QueueBucket>() {
5151
@Override
5252
public int compare(QueueBucket o1, QueueBucket o2) {
53-
return Double.compare(o1.getDominantUsageShare(parentUsage), o2.getDominantUsageShare(parentUsage));
53+
return Double.compare(o1.getDominantUsageShare(), o2.getDominantUsageShare());
5454
}
5555
};
5656
this.parentUsage = parentUsage;
@@ -78,9 +78,9 @@ QueueBucket remove(String bucketName) {
7878
// interested in (with the same name) may be the same one or it may be to the left or right a few positions.
7979
int remPos = buckets.get(index).getName().equals(bucketName)? index : -1;
8080
if (remPos < 0)
81-
remPos = findWalkingLeft(buckets, index, bucketName, bucket.getDominantUsageShare(parentUsage));
81+
remPos = findWalkingLeft(buckets, index, bucketName, bucket.getDominantUsageShare());
8282
if (remPos < 0)
83-
remPos = findWalkingRight(buckets, index, bucketName, bucket.getDominantUsageShare(parentUsage));
83+
remPos = findWalkingRight(buckets, index, bucketName, bucket.getDominantUsageShare());
8484
if (remPos < 0)
8585
throw new IllegalStateException("Unexpected: bucket with name=" + bucketName + " not found to remove");
8686
buckets.remove(remPos);
@@ -94,7 +94,7 @@ QueueBucket get(String bucketName) {
9494

9595
private int findWalkingRight(List<QueueBucket> buckets, int index, String bucketName, double dominantUsageShare) {
9696
int pos = index;
97-
while (++pos < buckets.size() && buckets.get(pos).getDominantUsageShare(parentUsage) == dominantUsageShare) {
97+
while (++pos < buckets.size() && buckets.get(pos).getDominantUsageShare() == dominantUsageShare) {
9898
if (buckets.get(pos).getName().equals(bucketName))
9999
return pos;
100100
}
@@ -103,7 +103,7 @@ private int findWalkingRight(List<QueueBucket> buckets, int index, String bucket
103103

104104
private int findWalkingLeft(List<QueueBucket> buckets, int index, String bucketName, double dominantUsageShare) {
105105
int pos = index;
106-
while (--pos >= 0 && buckets.get(pos).getDominantUsageShare(parentUsage) == dominantUsageShare) {
106+
while (--pos >= 0 && buckets.get(pos).getDominantUsageShare() == dominantUsageShare) {
107107
if (buckets.get(pos).getName().equals(bucketName))
108108
return pos;
109109
}
@@ -120,4 +120,12 @@ private int findInsertionPoint(QueueBucket bucket, List<QueueBucket> buckets) {
120120
List<QueueBucket> getSortedList() {
121121
return Collections.unmodifiableList(buckets);
122122
}
123+
124+
void resort() {
125+
List<QueueBucket> old = new ArrayList<>(buckets);
126+
bucketMap.clear();
127+
buckets.clear();
128+
for(QueueBucket b: old)
129+
add(b);
130+
}
123131
}

fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/Tier.java

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package com.netflix.fenzo.queues.tiered;
1818

19+
import com.netflix.fenzo.VMResource;
1920
import com.netflix.fenzo.queues.*;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
2023

21-
import java.util.Collection;
22-
import java.util.HashMap;
23-
import java.util.LinkedList;
24-
import java.util.Map;
24+
import java.util.*;
2525

2626
/**
2727
* This class represents a tier of the multi-tiered queue that {@link TieredQueue} represents. The tier holds one or
@@ -31,9 +31,11 @@
3131
*/
3232
class Tier implements UsageTrackedQueue {
3333

34+
private static final Logger logger = LoggerFactory.getLogger(Tier.class);
3435
private final int tierNumber;
3536
private final ResUsage totals;
3637
private final SortedBuckets sortedBuckets;
38+
private Map<VMResource, Double> currTotalResourcesMap = new HashMap<>();
3739

3840
Tier(int tierNumber) {
3941
totals = new ResUsage();
@@ -96,6 +98,8 @@ public boolean launchTask(QueuableTask t) throws TaskQueueException {
9698
// We do this by removing the bucket from the sortedBuckets, launching the task in the bucket,
9799
// then adding the bucket back into the sortedBuckets. It will then fall into its right new place.
98100
// This operation therefore takes time complexity of O(log N).
101+
if (logger.isDebugEnabled())
102+
logger.debug("Adding " + t.getId() + ": to ordered buckets: " + getSortedListString());
99103
final String bucketName = t.getQAttributes().getBucketName();
100104
QueueBucket bucket = sortedBuckets.remove(bucketName);
101105
if (bucket == null) {
@@ -113,6 +117,22 @@ public boolean launchTask(QueuableTask t) throws TaskQueueException {
113117
return false;
114118
}
115119

120+
private void verifySortedBuckets() throws TaskQueueException {
121+
if (sortedBuckets.getSortedList().isEmpty())
122+
return;
123+
List<QueueBucket> list = new ArrayList<>(sortedBuckets.getSortedList());
124+
if (list.size() > 1) {
125+
QueueBucket prev = list.get(0);
126+
for (int i=1; i<list.size(); i++) {
127+
if (list.get(i).getDominantUsageShare() < prev.getDominantUsageShare()) {
128+
final String msg = "Incorrect sorting order : " + getSortedListString();
129+
throw new TaskQueueException(msg);
130+
}
131+
prev = list.get(i);
132+
}
133+
}
134+
}
135+
116136
@Override
117137
public QueuableTask removeTask(String id, QAttributes qAttributes) throws TaskQueueException {
118138
// removing a task can change the resource usage and therefore the sorting order of queues. So, we take the
@@ -135,17 +155,60 @@ public QueuableTask removeTask(String id, QAttributes qAttributes) throws TaskQu
135155
}
136156

137157
@Override
138-
public double getDominantUsageShare(ResUsage parentUsage) {
158+
public double getDominantUsageShare() {
139159
return 0.0; // undefined for a tier
140160
}
141161

142162
@Override
143163
public void reset() {
164+
if (logger.isDebugEnabled()) {
165+
try {
166+
verifySortedBuckets();
167+
} catch (TaskQueueException e) {
168+
logger.error(e.getMessage());
169+
}
170+
}
144171
for (QueueBucket bucket: sortedBuckets.getSortedList()) {
145172
bucket.reset();
146173
}
147174
}
148175

176+
private String getSortedListString() {
177+
StringBuilder b = new StringBuilder("Tier " + tierNumber + " sortedBs: [");
178+
for (QueueBucket bucket: sortedBuckets.getSortedList()) {
179+
b.append(bucket.getName()).append(" (").append(bucket.getDominantUsageShare()).append("), ");
180+
}
181+
b.append("]");
182+
return b.toString();
183+
}
184+
185+
@Override
186+
public void setTotalResources(Map<VMResource, Double> totalResourcesMap) {
187+
if (totalResMapChanged(currTotalResourcesMap, totalResourcesMap)) {
188+
currTotalResourcesMap.clear();
189+
currTotalResourcesMap.putAll(totalResourcesMap);
190+
for (QueueBucket b: sortedBuckets.getSortedList()) {
191+
b.setTotalResources(totalResourcesMap);
192+
}
193+
logger.info("Re-sorting buckets in tier " + tierNumber + " after totals changed");
194+
sortedBuckets.resort();
195+
}
196+
}
197+
198+
private boolean totalResMapChanged(Map<VMResource, Double> currTotalResourcesMap, Map<VMResource, Double> totalResourcesMap) {
199+
if (currTotalResourcesMap.size() != totalResourcesMap.size())
200+
return true;
201+
Set<VMResource> curr = new HashSet<>(currTotalResourcesMap.keySet());
202+
for (VMResource r: totalResourcesMap.keySet()) {
203+
final Double c = currTotalResourcesMap.get(r);
204+
final Double n = totalResourcesMap.get(r);
205+
if ((c == null && n != null) || (c != null && n == null) || (n != null &&!n.equals(c)))
206+
return true;
207+
curr.remove(r);
208+
}
209+
return !curr.isEmpty();
210+
}
211+
149212
@Override
150213
public Map<TaskQueue.TaskState, Collection<QueuableTask>> getAllTasks() throws TaskQueueException {
151214
Map<TaskQueue.TaskState, Collection<QueuableTask>> result = new HashMap<>();

0 commit comments

Comments
 (0)