Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit e9527cb

Browse files
authored
Merge pull request #98 from spodila/rmTaskFromTrackrFromQ
move the removeTask function from TaskQueue to TaskSchedulingService
2 parents a470586 + f388606 commit e9527cb

File tree

7 files changed

+169
-52
lines changed

7 files changed

+169
-52
lines changed

fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@
4949
* assigned from within this scheduling service. This service assigns the tasks before making the result
5050
* available to you via the callback. To mark tasks as running for those tasks that were running from
5151
* before this service was created, use {@link #initializeRunningTask(QueuableTask, String)}. Later, call
52-
* {@link com.netflix.fenzo.queues.TaskQueue#remove(String, QAttributes)} when tasks complete or they no
53-
* longer need resource assignments.
52+
* {@link #removeTask(QueuableTask, String)} when tasks complete or they no longer need resource assignments.
5453
* </LI>
5554
* </UL>
5655
*/
@@ -64,6 +63,7 @@ public class TaskSchedulingService {
6463
private final Action0 preHook;
6564
private final BlockingQueue<VirtualMachineLease> leaseBlockingQueue = new LinkedBlockingQueue<>();
6665
private final BlockingQueue<Map<String, QueuableTask>> addRunningTasksQueue = new LinkedBlockingQueue<>();
66+
private final BlockingQueue<Map<QueuableTask, String>> removeTasksQueue = new LinkedBlockingQueue<>();
6767
private final BlockingQueue<Action1<Map<TaskQueue.TaskState, Collection<QueuableTask>>>> taskMapRequest = new LinkedBlockingQueue<>(10);
6868
private final BlockingQueue<Action1<Map<String, Map<VMResource, Double[]>>>> resStatusRequest = new LinkedBlockingQueue<>(10);
6969
private final BlockingQueue<Action1<List<VirtualMachineCurrentState>>> vmCurrStateRequest = new LinkedBlockingQueue<>(10);
@@ -123,6 +123,7 @@ private void scheduleOnce() {
123123
// check if next scheduling iteration is actually needed right away
124124
final boolean qModified = taskQueue.reset();
125125
addPendingRunningTasks();
126+
removeTasks();
126127
final boolean newLeaseExists = leaseBlockingQueue.peek() != null;
127128
if ( qModified || newLeaseExists || doNextIteration()) {
128129
lastSchedIterationAt.set(System.currentTimeMillis());
@@ -158,6 +159,26 @@ private void addPendingRunningTasks() {
158159
}
159160
}
160161

162+
private void removeTasks() {
163+
if (removeTasksQueue.peek() != null) {
164+
List<Map<QueuableTask, String>> r = new LinkedList<>();
165+
removeTasksQueue.drainTo(r);
166+
for (Map<QueuableTask, String> m: r) {
167+
for (Map.Entry<QueuableTask, String> e: m.entrySet()) {
168+
// remove it from the queue and call taskScheduler to unassign it if hostname is not null
169+
try {
170+
taskQueue.getUsageTracker().removeTask(e.getKey().getId(), e.getKey().getQAttributes());
171+
} catch (TaskQueueException e1) {
172+
// shouldn't happen since we're calling outside of scheduling iteration
173+
logger.warn("Unexpected to get exception outside of scheduling iteration: " + e1.getMessage(), e1);
174+
}
175+
if (e.getValue() != null)
176+
taskScheduler.getTaskUnAssigner().call(e.getKey().getId(), e.getValue());
177+
}
178+
}
179+
}
180+
}
181+
161182
private void doPendingActions() {
162183
final Action1<Map<TaskQueue.TaskState, Collection<QueuableTask>>> action = taskMapRequest.poll();
163184
try {
@@ -262,6 +283,19 @@ public void initializeRunningTask(QueuableTask task, String hostname) {
262283
addRunningTasksQueue.offer(Collections.singletonMap(hostname, task));
263284
}
264285

286+
/**
287+
* Mark the task to be removed. This is expected to be called for all tasks that were added to the queue, whether or
288+
* not the task is already running. If the task is running, the <code>hostname</code> parameter must be set, otherwise,
289+
* it can be <code>null</code>. The actual remove operation is performed before the start of the next scheduling
290+
* iteration.
291+
* @param task The task to be removed.
292+
* @param hostname The name of the VM where the task was assigned resources from, or, <code>null</code> if it was
293+
* not assigned any resources.
294+
*/
295+
public void removeTask(QueuableTask task, String hostname) {
296+
removeTasksQueue.offer(Collections.singletonMap(task, hostname));
297+
}
298+
265299
public final static class Builder {
266300

267301
private TaskScheduler taskScheduler = null;

fenzo-core/src/main/java/com/netflix/fenzo/functions/Action0.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.netflix.fenzo.functions;
218

319
/**

fenzo-core/src/main/java/com/netflix/fenzo/queues/TaskQueue.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,4 @@ enum TaskState { QUEUED, LAUNCHED }
5454
* @param task A task to add to the queue.
5555
*/
5656
void queueTask(QueuableTask task);
57-
58-
/**
59-
* Remove a task from queue with the given {@code taskId} and {@code qAttributes}. The task is removed from the
60-
* queue, whether it is marked as queued for assignments or already marked as running. This must be called for all
61-
* tasks that either no longer need resource assignments or if previously running tasks complete for any reason.
62-
* <P>
63-
* This operation is designed to be performed asynchronously, when it is safe to modify the queue. The queue
64-
* implementations generally do not modify the queue while a scheduling iteration is in progress.
65-
* @param taskId The id of the task to remove.
66-
* @param qAttributes The queue attributes of the task to remove.
67-
*/
68-
void remove(String taskId, QAttributes qAttributes);
6957
}

fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/QueueBucket.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.netflix.fenzo.queues.tiered;
218

319
import com.netflix.fenzo.queues.*;

fenzo-core/src/main/java/com/netflix/fenzo/queues/tiered/TieredQueue.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public class TieredQueue implements InternalTaskQueue {
4040
private Iterator<Tier> iterator = null;
4141
private Tier currTier = null;
4242
private final BlockingQueue<QueuableTask> tasksToQueue;
43-
private final BlockingQueue<QAttributes.TaskIdAttributesTuple> taskIdsToRemove;
4443

4544
/**
4645
* Construct a tiered queue system with the given number of tiers.
@@ -51,7 +50,6 @@ public TieredQueue(int numTiers) {
5150
for ( int i=0; i<numTiers; i++ )
5251
tiers.add(new Tier(i));
5352
tasksToQueue = new LinkedBlockingQueue<>();
54-
taskIdsToRemove = new LinkedBlockingQueue<>();
5553
}
5654

5755
@Override
@@ -73,16 +71,6 @@ private void addRunningInternal(QueuableTask t) throws TaskQueueException {
7371
tiers.get(number).launchTask(t);
7472
}
7573

76-
@Override
77-
public void remove(final String taskId, final QAttributes qAttributes) {
78-
taskIdsToRemove.offer(new QAttributes.TaskIdAttributesTuple(taskId, qAttributes));
79-
}
80-
81-
private boolean removeInternalById(String id, QAttributes qAttributes) throws TaskQueueException {
82-
final Tier tierBuckets = tiers.get(qAttributes.getTierNumber());
83-
return tierBuckets != null && tierBuckets.removeTask(id, qAttributes) != null;
84-
}
85-
8674
/**
8775
* This implementation dynamically picks the next task to consider for resource assignment based on tiers and then
8876
* based on current dominant resource usage. The usage is updated with each resource assignment during the
@@ -134,23 +122,6 @@ public boolean reset() throws TaskQueueMultiException {
134122
}
135123
}
136124
}
137-
if (taskIdsToRemove.peek() != null) {
138-
List<QAttributes.TaskIdAttributesTuple> taskIdTuples = new ArrayList<>();
139-
taskIdsToRemove.drainTo(taskIdTuples);
140-
if (!taskIdTuples.isEmpty()) {
141-
for (QAttributes.TaskIdAttributesTuple tuple : taskIdTuples) {
142-
try {
143-
if (!removeInternalById(tuple.getId(), tuple.getqAttributes())) {
144-
logger.debug("Task with id " + tuple.getId() + " not found to remove");
145-
} else {
146-
queueChanged = true;
147-
}
148-
} catch (TaskQueueException e) {
149-
exceptions.add(e);
150-
}
151-
}
152-
}
153-
}
154125
if (!exceptions.isEmpty())
155126
throw new TaskQueueMultiException(exceptions);
156127
return queueChanged;

fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package com.netflix.fenzo.samples;
218

319
import com.netflix.fenzo.*;
@@ -15,21 +31,24 @@
1531
import java.util.Collection;
1632
import java.util.List;
1733
import java.util.Map;
34+
import java.util.concurrent.ConcurrentHashMap;
35+
import java.util.concurrent.ConcurrentMap;
1836
import java.util.concurrent.CountDownLatch;
1937
import java.util.concurrent.TimeUnit;
2038
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.concurrent.atomic.AtomicReference;
2140

2241
public class SampleQbasedScheduling {
2342

2443
private static class MesosScheduler implements Scheduler {
2544

2645
private final AtomicInteger numTasksCompleted;
27-
private final TaskQueue queue;
46+
private final AtomicReference<TaskSchedulingService> schedSvcGetter;
2847
private Action1<List<Protos.Offer>> leaseAction = null;
2948

30-
MesosScheduler(AtomicInteger numTasksCompleted, TaskQueue queue) {
49+
MesosScheduler(AtomicInteger numTasksCompleted, AtomicReference<TaskSchedulingService> schedSvcGetter) {
3150
this.numTasksCompleted = numTasksCompleted;
32-
this.queue = queue;
51+
this.schedSvcGetter = schedSvcGetter;
3352
}
3453

3554
@Override
@@ -59,7 +78,8 @@ public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
5978
case TASK_LOST:
6079
case TASK_FINISHED:
6180
System.out.println("Task status for " + status.getTaskId().getValue() + ": " + status.getState());
62-
queue.remove(status.getTaskId().getValue(), qAttribs);
81+
schedSvcGetter.get().removeTask(allTasks.get(status.getTaskId().getValue()),
82+
tasksToHostnameMap.get(status.getTaskId().getValue()));
6383
numTasksCompleted.incrementAndGet();
6484
}
6585
}
@@ -92,6 +112,9 @@ public void error(SchedulerDriver driver, String message) {
92112

93113
private final static QAttributes qAttribs = new QAttributes.QAttributesAdaptor(0, "onlyBucket");
94114

115+
private final static ConcurrentMap<String, QueuableTask> allTasks = new ConcurrentHashMap<>();
116+
private final static ConcurrentMap<String, String> tasksToHostnameMap = new ConcurrentHashMap<>();
117+
95118
/**
96119
* This is the main method of this sample framework. It showcases how to use Fenzo queues for scheduling. It creates
97120
* some number of tasks and launches them into Mesos using the Mesos built-in command executor. The tasks launched
@@ -123,7 +146,8 @@ public void call(VirtualMachineLease v) {
123146
final TaskQueue queue = TaskQueues.createTieredQueue(2);
124147

125148
// Create our Mesos scheduler callback implementation
126-
final MesosScheduler mesosSchedulerCallback = new MesosScheduler(numTasksCompleted, queue);
149+
AtomicReference<TaskSchedulingService> schedSvcGetter = new AtomicReference<>();
150+
final MesosScheduler mesosSchedulerCallback = new MesosScheduler(numTasksCompleted, schedSvcGetter);
127151

128152
// create Mesos driver
129153
Protos.FrameworkInfo framework = Protos.FrameworkInfo.newBuilder()
@@ -162,12 +186,14 @@ public void call(SchedulingResult schedulingResult) {
162186
for (VirtualMachineLease l: e.getValue().getLeasesUsed())
163187
offers.add(l.getOffer().getId());
164188
List<Protos.TaskInfo> taskInfos = new ArrayList<Protos.TaskInfo>();
165-
for (TaskAssignmentResult r: e.getValue().getTasksAssigned())
189+
for (TaskAssignmentResult r: e.getValue().getTasksAssigned()) {
166190
taskInfos.add(SampleFramework.getTaskInfo(
167191
e.getValue().getLeasesUsed().iterator().next().getOffer().getSlaveId(),
168192
r.getTaskId(),
169193
"sleep 2"
170194
));
195+
tasksToHostnameMap.put(r.getTaskId(), r.getHostname());
196+
}
171197
driver.launchTasks(
172198
offers,
173199
taskInfos
@@ -177,6 +203,7 @@ public void call(SchedulingResult schedulingResult) {
177203
}
178204
})
179205
.build();
206+
schedSvcGetter.set(schedulingService);
180207

181208
// set up action in our scheduler callback to send resource offers into our scheduling service
182209
mesosSchedulerCallback.leaseAction = new Action1<List<Protos.Offer>>() {
@@ -199,8 +226,11 @@ public void run() {
199226
}.start();
200227

201228
// submit some tasks
202-
for (int i=0; i<numTasks; i++)
203-
queue.queueTask(getTask(i));
229+
for (int i=0; i<numTasks; i++) {
230+
final QueuableTask task = getTask(i);
231+
allTasks.put(task.getId(), task);
232+
queue.queueTask(task);
233+
}
204234

205235
// wait for tasks to complete
206236
while (numTasksCompleted.get() < numTasks) {

fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.LinkedBlockingQueue;
3131
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.concurrent.atomic.AtomicReference;
3435

@@ -387,7 +388,68 @@ public void call(Map<TaskQueue.TaskState, Collection<QueuableTask>> stateCollect
387388

388389
@Test
389390
public void testRemoveFromQueue() throws Exception {
390-
// TODO
391+
final CountDownLatch latch = new CountDownLatch(1);
392+
TaskQueue queue = TaskQueues.createTieredQueue(2);
393+
final TaskScheduler scheduler = getScheduler();
394+
Action1<SchedulingResult> resultCallback = new Action1<SchedulingResult>() {
395+
@Override
396+
public void call(SchedulingResult schedulingResult) {
397+
//System.out.println("Got scheduling result with " + schedulingResult.getResultMap().size() + " results");
398+
if (schedulingResult.getResultMap().size() > 0) {
399+
//System.out.println("Assignment on host " + schedulingResult.getResultMap().values().iterator().next().getHostname());
400+
latch.countDown();
401+
}
402+
}
403+
};
404+
final TaskSchedulingService schedulingService = getSchedulingService(queue, scheduler, 100L, 200L, resultCallback);
405+
schedulingService.start();
406+
final List<VirtualMachineLease> leases = LeaseProvider.getLeases(1, 4, 4000, 1, 10);
407+
schedulingService.addLeases(leases);
408+
final QueuableTask task = QueuableTaskProvider.wrapTask(tier1bktA, TaskRequestProvider.getTaskRequest(2, 2000, 1));
409+
queue.queueTask(task);
410+
if (!latch.await(5, TimeUnit.SECONDS))
411+
Assert.fail("Did not assign resources in time");
412+
final CountDownLatch latch2 = new CountDownLatch(1);
413+
final AtomicBoolean found = new AtomicBoolean();
414+
schedulingService.requestVmCurrentStates(new Action1<List<VirtualMachineCurrentState>>() {
415+
@Override
416+
public void call(List<VirtualMachineCurrentState> states) {
417+
for (VirtualMachineCurrentState s: states) {
418+
for (TaskRequest t: s.getRunningTasks()) {
419+
if (t.getId().equals(task.getId())) {
420+
found.set(true);
421+
latch2.countDown();
422+
}
423+
}
424+
}
425+
}
426+
});
427+
if (!latch2.await(5, TimeUnit.SECONDS)) {
428+
Assert.fail("Didn't get vm states in time");
429+
}
430+
Assert.assertTrue("Did not find task on vm", found.get());
431+
schedulingService.removeTask(task, leases.get(0).hostname());
432+
found.set(false);
433+
final CountDownLatch latch3 = new CountDownLatch(1);
434+
schedulingService.requestVmCurrentStates(new Action1<List<VirtualMachineCurrentState>>() {
435+
@Override
436+
public void call(List<VirtualMachineCurrentState> states) {
437+
for (VirtualMachineCurrentState s: states) {
438+
for (TaskRequest t: s.getRunningTasks()) {
439+
if (t.getId().equals(task.getId())) {
440+
found.set(true);
441+
latch3.countDown();
442+
}
443+
}
444+
}
445+
latch3.countDown();
446+
}
447+
});
448+
if (!latch3.await(5, TimeUnit.SECONDS)) {
449+
Assert.fail("Timeout waiting for vm states");
450+
}
451+
Assert.assertFalse("Unexpected to find removed task on vm", found.get());
452+
scheduler.shutdown();
391453
}
392454

393455
@Test

0 commit comments

Comments
 (0)