Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit e669631

Browse files
authored
Merge pull request #157 from corindwyer/master
Change autoscaler executor service to only have a single task and add spreading fitness calculator
2 parents f48e471 + 9762cb2 commit e669631

File tree

3 files changed

+221
-3
lines changed

3 files changed

+221
-3
lines changed

fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import java.util.concurrent.ArrayBlockingQueue;
3636
import java.util.concurrent.ConcurrentHashMap;
3737
import java.util.concurrent.ConcurrentMap;
38+
import java.util.concurrent.ExecutorService;
3839
import java.util.concurrent.RejectedExecutionException;
40+
import java.util.concurrent.SynchronousQueue;
3941
import java.util.concurrent.ThreadPoolExecutor;
4042
import java.util.concurrent.TimeUnit;
4143
import java.util.concurrent.atomic.AtomicBoolean;
@@ -94,9 +96,10 @@ private ScalingActivity(long scaleUpAt, long scaleDownAt, int shortfall, int sca
9496
private long delayScaleUpBySecs =0L;
9597
private long delayScaleDownBySecs =0L;
9698
private volatile Func1<QueuableTask, List<String>> taskToClustersGetter = null;
97-
private final ThreadPoolExecutor executor =
98-
new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
99-
new ThreadPoolExecutor.DiscardOldestPolicy());
99+
private final ExecutorService executor =
100+
new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<>(),
101+
new ThreadPoolExecutor.DiscardPolicy());
102+
100103
private final AtomicBoolean isShutdown = new AtomicBoolean();
101104
private final ConcurrentMap<String, ScalingActivity> scalingActivityMap = new ConcurrentHashMap<>();
102105
final VMCollection vmCollection;
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.fenzo.plugins;
18+
19+
import com.netflix.fenzo.TaskRequest;
20+
import com.netflix.fenzo.TaskTrackerState;
21+
import com.netflix.fenzo.VMTaskFitnessCalculator;
22+
import com.netflix.fenzo.VirtualMachineCurrentState;
23+
24+
/**
25+
* A collection of spreading fitness calculators.
26+
*/
27+
public class SpreadingFitnessCalculators {
28+
29+
/**
30+
* A CPU spreading fitness calculator. This fitness calculator has the effect of assigning a task to a
31+
* host that has the most CPUs available.
32+
*/
33+
public final static VMTaskFitnessCalculator cpuSpreader = new VMTaskFitnessCalculator() {
34+
@Override
35+
public String getName() {
36+
return "CpuSpreader";
37+
}
38+
39+
@Override
40+
public double calculateFitness(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState) {
41+
VMTaskFitnessCalculator cpuBinPacker = BinPackingFitnessCalculators.cpuBinPacker;
42+
return 1 - cpuBinPacker.calculateFitness(taskRequest, targetVM, taskTrackerState);
43+
}
44+
};
45+
46+
/**
47+
* A memory bin packing fitness calculator. This fitness calculator has the effect of assigning a task to a
48+
* host that has the most memory available.
49+
*/
50+
public final static VMTaskFitnessCalculator memorySpreader = new VMTaskFitnessCalculator() {
51+
@Override
52+
public String getName() {
53+
return "MemorySpreader";
54+
}
55+
56+
@Override
57+
public double calculateFitness(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState) {
58+
VMTaskFitnessCalculator memoryBinPacker = BinPackingFitnessCalculators.memoryBinPacker;
59+
return 1 - memoryBinPacker.calculateFitness(taskRequest, targetVM, taskTrackerState);
60+
}
61+
};
62+
63+
/**
64+
* A network bandwidth spreading fitness calculator. This fitness calculator has the effect of assigning a
65+
* task to a host that has the most amount of available network bandwidth.
66+
*/
67+
public final static VMTaskFitnessCalculator networkSpreader = new VMTaskFitnessCalculator() {
68+
@Override
69+
public String getName() {
70+
return "NetworkSpreader";
71+
}
72+
73+
@Override
74+
public double calculateFitness(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState) {
75+
VMTaskFitnessCalculator networkBinPacker = BinPackingFitnessCalculators.networkBinPacker;
76+
return 1 - networkBinPacker.calculateFitness(taskRequest, targetVM, taskTrackerState);
77+
}
78+
};
79+
80+
/**
81+
* A bin packing fitness calculator that achieves both CPU and Memory spreading with equal weights to
82+
* both goals.
83+
*/
84+
public final static VMTaskFitnessCalculator cpuMemSpreader = new VMTaskFitnessCalculator() {
85+
@Override
86+
public String getName() {
87+
return "CpuAndMemorySpreader";
88+
}
89+
90+
@Override
91+
public double calculateFitness(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState) {
92+
double cpuFitness = cpuSpreader.calculateFitness(taskRequest, targetVM, taskTrackerState);
93+
double memoryFitness = memorySpreader.calculateFitness(taskRequest, targetVM, taskTrackerState);
94+
return (cpuFitness + memoryFitness) / 2.0;
95+
}
96+
};
97+
98+
/**
99+
* A fitness calculator that achieves CPU, Memory, and network bandwidth spreading with equal weights to
100+
* each of the three goals.
101+
*/
102+
public final static VMTaskFitnessCalculator cpuMemNetworkSpreader = new VMTaskFitnessCalculator() {
103+
@Override
104+
public String getName() {
105+
return "CPUAndMemoryAndNetworkBinPacker";
106+
}
107+
108+
@Override
109+
public double calculateFitness(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState) {
110+
double cpuFitness = cpuSpreader.calculateFitness(taskRequest, targetVM, taskTrackerState);
111+
double memFitness = memorySpreader.calculateFitness(taskRequest, targetVM, taskTrackerState);
112+
double networkFitness = networkSpreader.calculateFitness(taskRequest, targetVM, taskTrackerState);
113+
return (cpuFitness + memFitness + networkFitness) / 3.0;
114+
}
115+
};
116+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.netflix.fenzo;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.stream.Collectors;
23+
24+
import com.netflix.fenzo.plugins.SpreadingFitnessCalculators;
25+
import org.junit.Assert;
26+
import org.junit.Test;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class SpreadingSchedulerTests {
31+
private static final Logger logger = LoggerFactory.getLogger(SpreadingSchedulerTests.class);
32+
33+
private TaskScheduler getScheduler(VMTaskFitnessCalculator fitnessCalculator) {
34+
return new TaskScheduler.Builder()
35+
.withFitnessCalculator(fitnessCalculator)
36+
.withLeaseOfferExpirySecs(1000000)
37+
.withLeaseRejectAction(virtualMachineLease -> logger.info("Rejecting lease on " + virtualMachineLease.hostname()))
38+
.build();
39+
}
40+
41+
/**
42+
* Tests whether or not the tasks will be spread out across the hosts based on cpu such that each host has a single task.
43+
*/
44+
@Test
45+
public void testCPUBinPackingWithSeveralHosts() {
46+
testBinPackingWithSeveralHosts("CPU");
47+
}
48+
49+
/**
50+
* Tests whether or not the tasks will be spread out across the hosts based on memory such that each host has a single task.
51+
*/
52+
@Test
53+
public void testMemoryBinPackingWithSeveralHosts() {
54+
testBinPackingWithSeveralHosts("Memory");
55+
}
56+
57+
/**
58+
* Tests whether or not the tasks will be spread out across the hosts based on network such that each host has a single task.
59+
*/
60+
@Test
61+
public void testNetworkBinPackingWithSeveralHosts() {
62+
testBinPackingWithSeveralHosts("Network");
63+
}
64+
65+
private void testBinPackingWithSeveralHosts(String resource) {
66+
TaskScheduler scheduler = null;
67+
switch (resource) {
68+
case "CPU":
69+
scheduler = getScheduler(SpreadingFitnessCalculators.cpuSpreader);
70+
break;
71+
case "Memory":
72+
scheduler = getScheduler(SpreadingFitnessCalculators.memorySpreader);
73+
break;
74+
case "Network":
75+
scheduler = getScheduler(SpreadingFitnessCalculators.networkSpreader);
76+
break;
77+
default:
78+
Assert.fail("Unknown resource type " + resource);
79+
}
80+
double cpuCores = 4;
81+
double memory = 1024;
82+
double network = 1024;
83+
int numberOfInstances = 10;
84+
85+
List<VirtualMachineLease> leases = LeaseProvider.getLeases(numberOfInstances, cpuCores, memory, network, 1, 100);
86+
List<TaskRequest> taskRequests = new ArrayList<>();
87+
for (int i = 0; i < numberOfInstances; i++) {
88+
taskRequests.add(TaskRequestProvider.getTaskRequest(1, 256, 256, 0));
89+
}
90+
Map<String, VMAssignmentResult> resultMap = scheduler.scheduleOnce(taskRequests, leases).getResultMap();
91+
Assert.assertEquals(resultMap.size(), numberOfInstances);
92+
93+
Map<String, Long> assignmentCountPerHost = resultMap.values().stream().collect(Collectors.groupingBy(
94+
VMAssignmentResult::getHostname, Collectors.counting()));
95+
96+
boolean duplicates = assignmentCountPerHost.entrySet().stream().anyMatch(e -> e.getValue() > 1);
97+
Assert.assertEquals(duplicates, false);
98+
}
99+
}

0 commit comments

Comments
 (0)