Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit be8a1a3

Browse files
author
tbak
authored
Handle idle/inactive VMs directly in AutoScaler (#145)
The scale down constraint evaluators work with the merged active/inactive lists, but we need to do extra processing in AutoScaler to enforce min/max idle constraint for the active server groups.
1 parent e110ecf commit be8a1a3

4 files changed

Lines changed: 91 additions & 22 deletions

File tree

fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ConcurrentMap;
2828
import java.util.concurrent.LinkedBlockingQueue;
2929
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.stream.Collectors;
3031

3132
class AssignableVMs {
3233

@@ -283,6 +284,10 @@ else if(logger.isDebugEnabled())
283284
return vms;
284285
}
285286

287+
List<AssignableVirtualMachine> getInactiveVMs() {
288+
return vmCollection.getAllVMs().stream().filter(avm -> !isInActiveVmGroup(avm)).collect(Collectors.toList());
289+
}
290+
286291
private void resetTotalResources() {
287292
totalResourcesMap.clear();
288293
}

fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,34 @@
2727
import java.util.Collection;
2828
import java.util.Collections;
2929
import java.util.HashMap;
30+
import java.util.Iterator;
3031
import java.util.List;
3132
import java.util.Map;
33+
import java.util.Optional;
34+
import java.util.Set;
3235
import java.util.concurrent.ArrayBlockingQueue;
3336
import java.util.concurrent.ConcurrentHashMap;
3437
import java.util.concurrent.ConcurrentMap;
3538
import java.util.concurrent.RejectedExecutionException;
3639
import java.util.concurrent.ThreadPoolExecutor;
3740
import java.util.concurrent.TimeUnit;
3841
import java.util.concurrent.atomic.AtomicBoolean;
42+
import java.util.stream.Collectors;
3943

4044
class AutoScaler {
4145

4246
private static class HostAttributeGroup {
4347
String name;
4448
List<VirtualMachineLease> idleHosts;
49+
List<VirtualMachineLease> idleInactiveHosts;
4550
int shortFall;
4651
AutoScaleRule rule;
4752

4853
private HostAttributeGroup(String name, AutoScaleRule rule) {
4954
this.name = name;
5055
this.rule = rule;
5156
this.idleHosts = new ArrayList<>();
57+
this.idleInactiveHosts = new ArrayList<>();
5258
this.shortFall=0;
5359
}
5460
}
@@ -165,7 +171,7 @@ void scheduleAutoscale(final AutoScalerInput autoScalerInput) {
165171
hostAttributeGroupMap.get(entry.getKey()).shortFall = entry.getValue() == null ? 0 : entry.getValue();
166172
}
167173
}
168-
populateIdleResources(autoScalerInput.getIdleResourcesList(), hostAttributeGroupMap, attributeName);
174+
populateIdleResources(autoScalerInput.getIdleResourcesList(), autoScalerInput.getIdleInactiveResourceList(), hostAttributeGroupMap);
169175
for (HostAttributeGroup hostAttributeGroup : hostAttributeGroupMap.values()) {
170176
processScalingNeeds(hostAttributeGroup, scalingActivityMap, assignableVMs);
171177
}
@@ -195,8 +201,11 @@ private void processScalingNeeds(HostAttributeGroup hostAttributeGroup, Concurre
195201
AutoScaleRule rule = hostAttributeGroup.rule;
196202
long now = System.currentTimeMillis();
197203
ScalingActivity prevScalingActivity= scalingActivityMap.get(rule.getRuleName());
204+
198205
int excess = hostAttributeGroup.shortFall>0? 0 : hostAttributeGroup.idleHosts.size() - rule.getMaxIdleHostsToKeep();
199-
if (excess > 0 && shouldScaleDown(now, prevScalingActivity, rule)) {
206+
int inactiveIdleCount = hostAttributeGroup.idleInactiveHosts.size();
207+
208+
if ((inactiveIdleCount > 0 || excess > 0) && shouldScaleDown(now, prevScalingActivity, rule)) {
200209
ScalingActivity scalingActivity = scalingActivityMap.get(rule.getRuleName());
201210
long lastReqstAge = (now - scalingActivity.scaleDownRequestedAt) / 1000L;
202211
if(delayScaleDownBySecs>0L && lastReqstAge > 2 * delayScaleDownBySecs) { // reset the request at time
@@ -206,11 +215,11 @@ else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) {
206215
final int size = vmCollection.size(rule.getRuleName());
207216
if (rule.getMinSize() > (size - excess))
208217
excess = Math.max(0, size - rule.getMinSize());
209-
if (excess > 0) {
218+
if (excess > 0 || inactiveIdleCount > 0) {
210219
scalingActivity.scaleDownRequestedAt = 0L;
211220
scalingActivity.scaleDownAt = now;
212221
scalingActivity.shortfall = hostAttributeGroup.shortFall;
213-
Map<String, String> hostsToTerminate = getHostsToTerminate(hostAttributeGroup.idleHosts, excess);
222+
Map<String, String> hostsToTerminate = getHostsToTerminate(hostAttributeGroup.idleHosts, hostAttributeGroup.idleInactiveHosts, excess);
214223
scalingActivity.scaledNumInstances = hostsToTerminate.size();
215224
scalingActivity.type = AutoScaleAction.Type.Down;
216225
StringBuilder sBuilder = new StringBuilder();
@@ -219,13 +228,15 @@ else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) {
219228
assignableVMs.disableUntil(host, now + rule.getCoolDownSecs() * 1000);
220229
}
221230
logger.info("Scaling down " + rule.getRuleName() + " by "
222-
+ excess + " hosts (" + sBuilder.toString() + ")");
231+
+ hostsToTerminate.size() + " hosts (" + sBuilder.toString() + ")");
223232
callback.call(
224233
new ScaleDownAction(rule.getRuleName(), hostsToTerminate.values())
225234
);
226235
}
227236
}
228-
} else if(hostAttributeGroup.shortFall>0 || (excess<=0 && shouldScaleUp(now, prevScalingActivity, rule))) {
237+
}
238+
239+
if(hostAttributeGroup.shortFall>0 || (excess<=0 && shouldScaleUp(now, prevScalingActivity, rule))) {
229240
if (hostAttributeGroup.shortFall>0 || rule.getMinIdleHostsToKeep() > hostAttributeGroup.idleHosts.size()) {
230241
// scale up to rule.getMaxIdleHostsToKeep() instead of just until rule.getMinIdleHostsToKeep()
231242
// but, if not shouldScaleUp(), then, scale up due to shortfall
@@ -259,17 +270,34 @@ else if(delayScaleUpBySecs ==0L || lastReqstAge > delayScaleUpBySecs) {
259270
}
260271
}
261272

262-
private void populateIdleResources(List<VirtualMachineLease> idleResources, Map<String, HostAttributeGroup> leasesMap, String attributeName) {
273+
private void populateIdleResources(List<VirtualMachineLease> idleResources,
274+
List<VirtualMachineLease> idleInactiveResources,
275+
Map<String, HostAttributeGroup> leasesMap) {
276+
// Leases from active clusters
263277
for (VirtualMachineLease l : idleResources) {
264-
if (l.getAttributeMap() != null && l.getAttributeMap().get(attributeName) != null &&
265-
l.getAttributeMap().get(attributeName).getText().hasValue()) {
266-
String attrValue = l.getAttributeMap().get(attributeName).getText().getValue();
278+
getAttribute(l).ifPresent(attrValue -> {
267279
if (leasesMap.containsKey(attrValue)) {
268280
if (!leasesMap.get(attrValue).rule.idleMachineTooSmall(l))
269281
leasesMap.get(attrValue).idleHosts.add(l);
270282
}
271-
}
283+
});
272284
}
285+
286+
// Leases from inactive clusters
287+
for(VirtualMachineLease l : idleInactiveResources) {
288+
getAttribute(l).ifPresent(attrValue -> {
289+
if (leasesMap.containsKey(attrValue)) {
290+
leasesMap.get(attrValue).idleInactiveHosts.add(l);
291+
}
292+
});
293+
}
294+
}
295+
296+
private Optional<String> getAttribute(VirtualMachineLease lease) {
297+
boolean hasValue = lease.getAttributeMap() != null
298+
&& lease.getAttributeMap().get(attributeName) != null
299+
&& lease.getAttributeMap().get(attributeName).getText().hasValue();
300+
return hasValue ? Optional.of(lease.getAttributeMap().get(attributeName).getText().getValue()) : Optional.empty();
273301
}
274302

275303
private Map<String, HostAttributeGroup> setupHostAttributeGroupMap(AutoScaleRules autoScaleRules, ConcurrentMap<String, ScalingActivity> lastScalingAt) {
@@ -290,22 +318,43 @@ private long getInitialCoolDown(long coolDownSecs) {
290318
return System.currentTimeMillis()- coolDownSecs*1000 + initialCoolDownInPastSecs*1000;
291319
}
292320

293-
private Map<String, String> getHostsToTerminate(List<VirtualMachineLease> hosts, int excess) {
321+
private Map<String, String> getHostsToTerminate(List<VirtualMachineLease> activeHosts, List<VirtualMachineLease> inactiveHosts, int excess) {
294322
if(scaleDownConstraintExecutor == null) {
295-
return getHostsToTerminateLegacy(hosts, excess);
323+
return activeHosts.isEmpty() ? Collections.emptyMap() : getHostsToTerminateLegacy(activeHosts, excess);
296324
} else {
297-
return getHostsToTerminateUsingCriteria(hosts, excess);
325+
return getHostsToTerminateUsingCriteria(activeHosts, inactiveHosts, excess);
298326
}
299327
}
300328

301-
private Map<String, String> getHostsToTerminateUsingCriteria(List<VirtualMachineLease> hosts, int excess) {
329+
private Map<String, String> getHostsToTerminateUsingCriteria(List<VirtualMachineLease> activeHosts,List<VirtualMachineLease> inactiveHosts, int excess) {
302330
Map<String, String> hostsMap = new HashMap<>();
303-
List<VirtualMachineLease> result = scaleDownConstraintExecutor.evaluate(hosts);
304-
int removeLimit = Math.min(result.size(), excess);
305-
for(int i = 0; i < removeLimit; i++) {
306-
VirtualMachineLease lease = result.get(i);
307-
hostsMap.put(lease.hostname(), getMappedHostname(lease));
331+
332+
if(excess <= 0) {
333+
// We have only inactive idle instances to scale down
334+
List<VirtualMachineLease> result = scaleDownConstraintExecutor.evaluate(inactiveHosts);
335+
result.forEach(lease -> hostsMap.put(lease.hostname(), getMappedHostname(lease)));
336+
} else {
337+
List<VirtualMachineLease> allIdle = new ArrayList<>(activeHosts);
338+
allIdle.addAll(inactiveHosts);
339+
List<VirtualMachineLease> result = scaleDownConstraintExecutor.evaluate(allIdle);
340+
341+
// The final result should contain only excess number of active hosts. Enforce this invariant.
342+
Set<String> activeIds = activeHosts.stream().map(VirtualMachineLease::hostname).collect(Collectors.toSet());
343+
int activeCounter = 0;
344+
Iterator<VirtualMachineLease> it = result.iterator();
345+
while(it.hasNext()) {
346+
VirtualMachineLease leaseToRemove = it.next();
347+
if(activeIds.contains(leaseToRemove.hostname())) {
348+
if(activeCounter < excess) {
349+
activeCounter++;
350+
} else {
351+
it.remove();
352+
}
353+
}
354+
}
355+
result.forEach(lease -> hostsMap.put(lease.hostname(), getMappedHostname(lease)));
308356
}
357+
309358
return hostsMap;
310359
}
311360

fenzo-core/src/main/java/com/netflix/fenzo/AutoScalerInput.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,23 @@
2121

2222
class AutoScalerInput {
2323
private final List<VirtualMachineLease> idleResourcesList;
24+
private final List<VirtualMachineLease> idleInactiveResources;
2425
private final Set<TaskRequest> failedTasks;
2526

26-
AutoScalerInput(List<VirtualMachineLease> idleResources, Set<TaskRequest> failedTasks) {
27+
AutoScalerInput(List<VirtualMachineLease> idleResources, List<VirtualMachineLease> idleInactiveResources, Set<TaskRequest> failedTasks) {
2728
this.idleResourcesList= idleResources;
29+
this.idleInactiveResources = idleInactiveResources;
2830
this.failedTasks = failedTasks;
2931
}
3032

3133
public List<VirtualMachineLease> getIdleResourcesList() {
3234
return idleResourcesList;
3335
}
3436

37+
public List<VirtualMachineLease> getIdleInactiveResourceList() {
38+
return idleInactiveResources;
39+
}
40+
3541
public Set<TaskRequest> getFailures() {
3642
return failedTasks;
3743
}

fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.concurrent.Future;
3737
import java.util.concurrent.atomic.AtomicBoolean;
3838
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.stream.Collectors;
3940

4041
/**
4142
* A scheduling service that you can use to optimize the assignment of tasks to hosts within a Mesos framework.
@@ -710,6 +711,7 @@ private SchedulingResult doSchedule(
710711
List<VirtualMachineLease> newLeases) throws Exception {
711712
AtomicInteger rejectedCount = new AtomicInteger();
712713
List<AssignableVirtualMachine> avms = assignableVMs.prepareAndGetOrderedVMs(newLeases, rejectedCount);
714+
List<AssignableVirtualMachine> inactiveAVMs = assignableVMs.getInactiveVMs();
713715
if(logger.isDebugEnabled())
714716
logger.debug("Found " + avms.size() + " VMs with non-zero offers to assign from");
715717
final boolean hasResAllocs = resAllocsEvaluator.prepare();
@@ -840,8 +842,15 @@ public EvalResult call() throws Exception {
840842
resultMap.put(avm.getHostname(), assignmentResult);
841843
}
842844
}
845+
846+
// Process inactive VMs
847+
List<VirtualMachineLease> idleInactiveAVMs = inactiveAVMs.stream()
848+
.filter(vm -> vm.getCurrTotalLease() != null && !vm.hasPreviouslyAssignedTasks())
849+
.map(AssignableVirtualMachine::getCurrTotalLease)
850+
.collect(Collectors.toList());
851+
843852
rejectedCount.addAndGet(assignableVMs.removeLimitedLeases(expirableLeases));
844-
final AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, failedTasksForAutoScaler);
853+
final AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, idleInactiveAVMs, failedTasksForAutoScaler);
845854
if (autoScaler != null)
846855
autoScaler.scheduleAutoscale(autoScalerInput);
847856
}

0 commit comments

Comments
 (0)