Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit 4b446e3

Browse files
authored
Merge pull request #107 from spodila/shortfallScaleupChg
Optional task to autoscale group name for shortfall based scale up
2 parents 5d809e7 + 2b2ec8a commit 4b446e3

File tree

6 files changed

+185
-39
lines changed

6 files changed

+185
-39
lines changed

fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.netflix.fenzo;
1818

1919
import com.netflix.fenzo.functions.Action1;
20+
import com.netflix.fenzo.functions.Func1;
21+
import com.netflix.fenzo.queues.QueuableTask;
2022
import org.apache.mesos.Protos;
2123
import org.slf4j.Logger;
2224
import org.slf4j.LoggerFactory;
@@ -83,11 +85,12 @@ private ScalingActivity(long scaleUpAt, long scaleDownAt, int shortfall, int sca
8385
private final AssignableVMs assignableVMs;
8486
private long delayScaleUpBySecs =0L;
8587
private long delayScaleDownBySecs =0L;
88+
private volatile Func1<QueuableTask, List<String>> taskToClustersGetter = null;
8689
private final ThreadPoolExecutor executor =
8790
new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
8891
new ThreadPoolExecutor.DiscardOldestPolicy());
8992
private final AtomicBoolean isShutdown = new AtomicBoolean();
90-
final ConcurrentMap<String, ScalingActivity> scalingActivityMap = new ConcurrentHashMap<>();
93+
private final ConcurrentMap<String, ScalingActivity> scalingActivityMap = new ConcurrentHashMap<>();
9194
final VMCollection vmCollection;
9295

9396
AutoScaler(final String attributeName, String mapHostnameAttributeName, String scaleDownBalancedByAttributeName,
@@ -128,28 +131,30 @@ void setDelayScaleDownBySecs(long secs) {
128131
delayScaleDownBySecs = secs;
129132
}
130133

134+
void setTaskToClustersGetter(Func1<QueuableTask, List<String>> getter) {
135+
this.taskToClustersGetter = getter;
136+
}
137+
131138
void scheduleAutoscale(final AutoScalerInput autoScalerInput) {
132139
if(isShutdown.get())
133140
return;
134141
try {
135-
executor.submit(new Runnable() {
136-
@Override
137-
public void run() {
138-
if(isShutdown.get())
139-
return;
140-
autoScaleRules.prepare();
141-
Map<String, HostAttributeGroup> hostAttributeGroupMap = setupHostAttributeGroupMap(autoScaleRules, scalingActivityMap);
142-
if (!disableShortfallEvaluation) {
143-
Map<String, Integer> shortfall = shortfallEvaluator.getShortfall(hostAttributeGroupMap.keySet(), autoScalerInput.getFailures());
144-
for (Map.Entry<String, Integer> entry : shortfall.entrySet()) {
145-
hostAttributeGroupMap.get(entry.getKey()).shortFall = entry.getValue() == null ? 0 : entry.getValue();
146-
}
147-
}
148-
populateIdleResources(autoScalerInput.getIdleResourcesList(), hostAttributeGroupMap, attributeName);
149-
for (HostAttributeGroup hostAttributeGroup : hostAttributeGroupMap.values()) {
150-
processScalingNeeds(hostAttributeGroup, scalingActivityMap, assignableVMs);
142+
executor.submit(() -> {
143+
if(isShutdown.get())
144+
return;
145+
shortfallEvaluator.setTaskToClustersGetter(taskToClustersGetter);
146+
autoScaleRules.prepare();
147+
Map<String, HostAttributeGroup> hostAttributeGroupMap = setupHostAttributeGroupMap(autoScaleRules, scalingActivityMap);
148+
if (!disableShortfallEvaluation) {
149+
Map<String, Integer> shortfall = shortfallEvaluator.getShortfall(hostAttributeGroupMap.keySet(), autoScalerInput.getFailures());
150+
for (Map.Entry<String, Integer> entry : shortfall.entrySet()) {
151+
hostAttributeGroupMap.get(entry.getKey()).shortFall = entry.getValue() == null ? 0 : entry.getValue();
151152
}
152153
}
154+
populateIdleResources(autoScalerInput.getIdleResourcesList(), hostAttributeGroupMap, attributeName);
155+
for (HostAttributeGroup hostAttributeGroup : hostAttributeGroupMap.values()) {
156+
processScalingNeeds(hostAttributeGroup, scalingActivityMap, assignableVMs);
157+
}
153158
});
154159
}
155160
catch (RejectedExecutionException e) {

fenzo-core/src/main/java/com/netflix/fenzo/ShortfallEvaluator.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,29 @@
1616

1717
package com.netflix.fenzo;
1818

19-
import java.util.HashMap;
20-
import java.util.HashSet;
21-
import java.util.Map;
22-
import java.util.Set;
19+
import com.netflix.fenzo.functions.Func1;
20+
import com.netflix.fenzo.queues.QueuableTask;
21+
22+
import java.util.*;
2323

2424
/**
2525
* Determines by how many hosts the system is currently undershooting its needs, based on task assignment
2626
* failures. This is used in autoscaling.
2727
*/
2828
class ShortfallEvaluator {
29-
private static final long TOO_OLD_THRESHOLD_MILLIS = 15 * 60000; // 15 mins
29+
private static final long TOO_OLD_THRESHOLD_MILLIS = 10 * 60000; // 15 mins
3030
private final TaskScheduler phantomTaskScheduler;
3131
private final Map<String, Long> requestedForTasksSet = new HashMap<>();
32+
private volatile Func1<QueuableTask, List<String>> taskToClustersGetter = null;
3233

3334
ShortfallEvaluator(TaskScheduler phantomTaskScheduler) {
3435
this.phantomTaskScheduler = phantomTaskScheduler;
3536
}
3637

38+
void setTaskToClustersGetter(Func1<QueuableTask, List<String>> getter) {
39+
taskToClustersGetter = getter;
40+
}
41+
3742
Map<String, Integer> getShortfall(Set<String> attrKeys, Set<TaskRequest> failures) {
3843
// A naive approach to figuring out shortfall of hosts to satisfy the tasks that failed assignments is,
3944
// strictly speaking, not possible by just attempting to add up required resources for the tasks and then mapping
@@ -63,22 +68,43 @@ Map<String, Integer> getShortfall(Set<String> attrKeys, Set<TaskRequest> failure
6368
final HashMap<String, Integer> shortfallMap = new HashMap<>();
6469
if(attrKeys!=null && failures!=null && !failures.isEmpty()) {
6570
removeOldInserts();
66-
int shortfall=0;
6771
long now = System.currentTimeMillis();
6872
for(TaskRequest r: failures) {
6973
String tid = r.getId();
7074
if(requestedForTasksSet.get(tid) == null) {
7175
requestedForTasksSet.put(tid, now);
72-
shortfall++;
76+
fillShortfallMap(shortfallMap, attrKeys, r);
7377
}
7478
}
75-
for(String key: attrKeys) {
76-
shortfallMap.put(key, shortfall);
77-
}
7879
}
7980
return shortfallMap;
8081
}
8182

83+
private void fillShortfallMap(HashMap<String, Integer> shortfallMap, Set<String> attrKeys, TaskRequest r) {
84+
for (String k: attrKeys) {
85+
if (matchesTask(r, k)) {
86+
if (shortfallMap.get(k) == null)
87+
shortfallMap.put(k, 1);
88+
else
89+
shortfallMap.put(k, shortfallMap.get(k) + 1);
90+
}
91+
}
92+
}
93+
94+
private boolean matchesTask(TaskRequest r, String k) {
95+
if (!(r instanceof QueuableTask) || taskToClustersGetter == null)
96+
return true;
97+
final List<String> strings = taskToClustersGetter.call((QueuableTask) r);
98+
if (strings != null && !strings.isEmpty()) {
99+
for (String s: strings)
100+
if(k.equals(s))
101+
return true;
102+
return false; // doesn't match
103+
}
104+
// matches anything
105+
return true;
106+
}
107+
82108
private void removeOldInserts() {
83109
long tooOld = System.currentTimeMillis() - TOO_OLD_THRESHOLD_MILLIS;
84110
Set<String> tasks = new HashSet<>(requestedForTasksSet.keySet());

fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.netflix.fenzo;
1818

19+
import com.netflix.fenzo.queues.QueuableTask;
1920
import com.netflix.fenzo.sla.ResAllocs;
2021
import org.slf4j.Logger;
2122
import org.slf4j.LoggerFactory;
@@ -548,6 +549,11 @@ public void removeAutoScaleRule(String ruleName) {
548549
usingSchedulingService = b;
549550
}
550551

552+
/* package */ void setTaskToClusterAutoScalerMapGetter(Func1<QueuableTask, List<String>> getter) {
553+
if (autoScaler != null)
554+
autoScaler.setTaskToClustersGetter(getter);
555+
}
556+
551557
/**
552558
* Schedule a list of task requests by using any newly-added resource leases in addition to any
553559
* previously-unused leases. This is the main scheduling method that attempts to assign resources to task

fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.netflix.fenzo.functions.Action0;
2020
import com.netflix.fenzo.functions.Action1;
21+
import com.netflix.fenzo.functions.Func1;
2122
import com.netflix.fenzo.queues.*;
2223
import com.netflix.fenzo.queues.TaskQueue;
2324
import org.slf4j.Logger;
@@ -83,6 +84,7 @@ public RemoveTaskRequest(String taskId, QAttributes qAttributes, String hostname
8384
private final BlockingQueue<Action1<List<VirtualMachineCurrentState>>> vmCurrStateRequest = new LinkedBlockingQueue<>(10);
8485
private final AtomicLong lastSchedIterationAt = new AtomicLong();
8586
private final long maxSchedIterDelay;
87+
private volatile Func1<QueuableTask, List<String>> taskToClusterAutoScalerMapGetter = null;
8688

8789
private TaskSchedulingService(Builder builder) {
8890
taskScheduler = builder.taskScheduler;
@@ -140,6 +142,7 @@ private void scheduleOnce() {
140142
removeTasks();
141143
final boolean newLeaseExists = leaseBlockingQueue.peek() != null;
142144
if ( qModified || newLeaseExists || doNextIteration()) {
145+
taskScheduler.setTaskToClusterAutoScalerMapGetter(taskToClusterAutoScalerMapGetter);
143146
lastSchedIterationAt.set(System.currentTimeMillis());
144147
if (preHook != null)
145148
preHook.call();
@@ -309,6 +312,20 @@ public void removeTask(String taskId, QAttributes qAttributes, String hostname)
309312
removeTasksQueue.offer(new RemoveTaskRequest(taskId, qAttributes, hostname));
310313
}
311314

315+
/**
316+
* Set the getter function that maps a given queuable task object to a list of names of VM groups for which
317+
* cluster autoscaling rules have been set. This function will be called by autoscaler, if it was setup for
318+
* the {@link TaskScheduler} using {@link TaskScheduler.Builder#withAutoScaleRule(AutoScaleRule)}, to determine if
319+
* the autoscaling rule should be triggered for aggressive scale up. The function call is expected to return a list
320+
* of autoscale group names to which the task can be launched, if there are resources available. If either this
321+
* function is not set, or if the function returns no entries when called, the task is assumed to be able to run
322+
* on any autoscale group.
323+
* @param getter The function that takes a queuable task object and returns a list of autoscale group names
324+
*/
325+
public void setTaskToClusterAutoScalerMapGetter(Func1<QueuableTask, List<String>> getter) {
326+
taskToClusterAutoScalerMapGetter = getter;
327+
}
328+
312329
public final static class Builder {
313330

314331
private TaskScheduler taskScheduler = null;

fenzo-core/src/test/java/com/netflix/fenzo/AutoScalerTest.java

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818

1919
import com.netflix.fenzo.plugins.BinPackingFitnessCalculators;
2020
import com.netflix.fenzo.plugins.HostAttrValueConstraint;
21+
import com.netflix.fenzo.queues.QAttributes;
22+
import com.netflix.fenzo.queues.QueuableTask;
23+
import com.netflix.fenzo.queues.TaskQueue;
24+
import com.netflix.fenzo.queues.TaskQueues;
25+
import com.netflix.fenzo.queues.tiered.QueuableTaskProvider;
2126
import org.apache.mesos.Protos;
2227
import org.junit.After;
2328
import org.junit.Assert;
@@ -26,12 +31,7 @@
2631
import com.netflix.fenzo.functions.Action1;
2732
import com.netflix.fenzo.functions.Func1;
2833

29-
import java.util.ArrayList;
30-
import java.util.Collection;
31-
import java.util.Collections;
32-
import java.util.HashMap;
33-
import java.util.List;
34-
import java.util.Map;
34+
import java.util.*;
3535
import java.util.concurrent.CountDownLatch;
3636
import java.util.concurrent.TimeUnit;
3737
import java.util.concurrent.atomic.AtomicBoolean;
@@ -1091,4 +1091,99 @@ public void testRuleWithMaxSize2() throws Exception {
10911091
}
10921092
Assert.assertEquals(0, scaleUp.get());
10931093
}
1094+
1095+
// Test that aggressive scale up gives a callback only for the cluster, among multiple clusters, that is mapped
1096+
// to by the mapper supplied to task scheduling service. The other cluster should not get scale up request for
1097+
// aggressive scale up.
1098+
@Test
1099+
public void testTask2ClustersGetterAggressiveScaleUp() throws Exception {
1100+
final long cooldownMillis=3000;
1101+
final AutoScaleRule rule1 = AutoScaleRuleProvider.createRule("cluster1", minIdle, maxIdle, cooldownMillis/1000L, 1, 1000);
1102+
final AutoScaleRule rule2 = AutoScaleRuleProvider.createRule("cluster2", minIdle, maxIdle, cooldownMillis/1000L, 1, 1000);
1103+
Map<String, Protos.Attribute> attributes1 = new HashMap<>();
1104+
Protos.Attribute attribute1 = Protos.Attribute.newBuilder().setName(hostAttrName)
1105+
.setType(Protos.Value.Type.TEXT)
1106+
.setText(Protos.Value.Text.newBuilder().setValue("cluster1")).build();
1107+
List<VirtualMachineLease.Range> ports = new ArrayList<>();
1108+
ports.add(new VirtualMachineLease.Range(1, 10));
1109+
attributes1.put(hostAttrName, attribute1);
1110+
Map<String, Protos.Attribute> attributes2 = new HashMap<>();
1111+
Protos.Attribute attribute2 = Protos.Attribute.newBuilder().setName(hostAttrName)
1112+
.setType(Protos.Value.Type.TEXT)
1113+
.setText(Protos.Value.Text.newBuilder().setValue("cluster2")).build();
1114+
attributes2.put(hostAttrName, attribute1);
1115+
final List<VirtualMachineLease> leases = new ArrayList<>();
1116+
for(int l=0; l<minIdle; l++)
1117+
leases.add(LeaseProvider.getLeaseOffer("host"+l, cpus1, memory1, ports, attributes1));
1118+
final Map<String, Integer> scaleUpRequests = new HashMap<>();
1119+
final CountDownLatch initialScaleUpLatch = new CountDownLatch(2);
1120+
final AtomicReference<CountDownLatch> scaleUpLatchRef = new AtomicReference<>();
1121+
Action1<AutoScaleAction> callback = autoScaleAction -> {
1122+
if (autoScaleAction instanceof ScaleUpAction) {
1123+
final ScaleUpAction action = (ScaleUpAction) autoScaleAction;
1124+
System.out.println("**************** scale up by " + action.getScaleUpCount());
1125+
scaleUpRequests.putIfAbsent(action.getRuleName(), 0);
1126+
scaleUpRequests.put(action.getRuleName(), scaleUpRequests.get(action.getRuleName()) + action.getScaleUpCount());
1127+
if (scaleUpLatchRef.get() != null)
1128+
scaleUpLatchRef.get().countDown();
1129+
}
1130+
};
1131+
scaleUpLatchRef.set(initialScaleUpLatch);
1132+
final TaskScheduler scheduler = getScheduler(false, callback, rule1, rule2);
1133+
final TaskQueue queue = TaskQueues.createTieredQueue(2);
1134+
int numTasks = minIdle * cpus1; // minIdle number of hosts each with cpu1 number of cpus
1135+
final CountDownLatch latch = new CountDownLatch(numTasks);
1136+
final AtomicReference<List<Exception>> exceptions = new AtomicReference<>();
1137+
final TaskSchedulingService schedulingService = new TaskSchedulingService.Builder()
1138+
.withMaxDelayMillis(100)
1139+
.withLoopIntervalMillis(20)
1140+
.withTaskQuue(queue)
1141+
.withTaskScheduler(scheduler)
1142+
.withSchedulingResultCallback(schedulingResult -> {
1143+
final List<Exception> elist = schedulingResult.getExceptions();
1144+
if (elist != null && !elist.isEmpty())
1145+
exceptions.set(elist);
1146+
final Map<String, VMAssignmentResult> resultMap = schedulingResult.getResultMap();
1147+
if (resultMap != null && !resultMap.isEmpty()) {
1148+
for (VMAssignmentResult vmar: resultMap.values()) {
1149+
vmar.getTasksAssigned().forEach(t -> latch.countDown());
1150+
}
1151+
}
1152+
})
1153+
.build();
1154+
schedulingService.setTaskToClusterAutoScalerMapGetter(task -> Collections.singletonList("cluster1"));
1155+
schedulingService.start();
1156+
schedulingService.addLeases(leases);
1157+
for (int i=0; i<numTasks; i++)
1158+
queue.queueTask(
1159+
QueuableTaskProvider.wrapTask(
1160+
new QAttributes.QAttributesAdaptor(0, "default"),
1161+
TaskRequestProvider.getTaskRequest(1, 10, 1)
1162+
)
1163+
);
1164+
if (!latch.await(10, TimeUnit.SECONDS))
1165+
Assert.fail("Timeout waiting for tasks to get assigned");
1166+
// wait for scale up to happen
1167+
if (!initialScaleUpLatch.await(cooldownMillis+1000, TimeUnit.MILLISECONDS))
1168+
Assert.fail("Timeout waiting for initial scale up request");
1169+
Assert.assertEquals(2, scaleUpRequests.size());
1170+
scaleUpRequests.clear();
1171+
scaleUpLatchRef.set(null);
1172+
// now submit more tasks for aggressive scale up to trigger
1173+
int laterTasksSize = numTasks*3;
1174+
for (int i=0; i<laterTasksSize; i++) {
1175+
queue.queueTask(
1176+
QueuableTaskProvider.wrapTask(
1177+
new QAttributes.QAttributesAdaptor(0, "default"),
1178+
TaskRequestProvider.getTaskRequest(1, 10, 1)
1179+
)
1180+
);
1181+
}
1182+
// wait for less than cooldown time to get aggressive scale up requests
1183+
Thread.sleep(cooldownMillis/2);
1184+
// expect to get scale up request only for cluster1 VMs
1185+
Assert.assertEquals(1, scaleUpRequests.size());
1186+
Assert.assertEquals("cluster1", scaleUpRequests.keySet().iterator().next());
1187+
Assert.assertEquals(laterTasksSize, scaleUpRequests.values().iterator().next().intValue());
1188+
}
10941189
}

fenzo-core/src/test/java/com/netflix/fenzo/OfferRejectionsTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -235,12 +235,9 @@ public void testRejectAllOffersOfVm() throws Exception {
235235
final int leaseExpirySecs=2;
236236
final Set<String> hostsRejectedFrom = new HashSet<>();
237237
final TaskScheduler scheduler = new TaskScheduler.Builder()
238-
.withLeaseRejectAction(new Action1<VirtualMachineLease>() {
239-
@Override
240-
public void call(VirtualMachineLease virtualMachineLease) {
241-
expireCount.incrementAndGet();
242-
hostsRejectedFrom.add(virtualMachineLease.hostname());
243-
}
238+
.withLeaseRejectAction(virtualMachineLease -> {
239+
expireCount.incrementAndGet();
240+
hostsRejectedFrom.add(virtualMachineLease.hostname());
244241
})
245242
.withLeaseOfferExpirySecs(leaseExpirySecs)
246243
.withMaxOffersToReject(1)

0 commit comments

Comments
 (0)