Skip to content
This repository was archived by the owner on Mar 31, 2023. It is now read-only.

Commit 8415f06

Browse files
authored
Merge pull request #99 from spodila/chgRmTaskSignature
change signature of TaskSchedulingService.removeTask
2 parents e9527cb + 17a7fc4 commit 8415f06

File tree

4 files changed

+37
-24
lines changed

4 files changed

+37
-24
lines changed

fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.netflix.fenzo.functions.Action0;
2020
import com.netflix.fenzo.functions.Action1;
2121
import com.netflix.fenzo.queues.*;
22+
import com.netflix.fenzo.queues.TaskQueue;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
2425

@@ -49,11 +50,24 @@
4950
* assigned from within this scheduling service. This service assigns the tasks before making the result
5051
* available to you via the callback. To mark tasks as running for those tasks that were running from
5152
* before this service was created, use {@link #initializeRunningTask(QueuableTask, String)}. Later, call
52-
* {@link #removeTask(QueuableTask, String)} when tasks complete or they no longer need resource assignments.
53+
* {@link #removeTask(String, QAttributes, String)} when tasks complete or they no longer need resource assignments.
5354
* </LI>
5455
* </UL>
5556
*/
5657
public class TaskSchedulingService {
58+
59+
private static class RemoveTaskRequest {
60+
private final String taskId;
61+
private final QAttributes qAttributes;
62+
private final String hostname;
63+
64+
public RemoveTaskRequest(String taskId, QAttributes qAttributes, String hostname) {
65+
this.taskId = taskId;
66+
this.qAttributes = qAttributes;
67+
this.hostname = hostname;
68+
}
69+
}
70+
5771
private static final Logger logger = LoggerFactory.getLogger(TaskSchedulingService.class);
5872
private final TaskScheduler taskScheduler;
5973
private final Action1<SchedulingResult> schedulingResultCallback;
@@ -63,7 +77,7 @@ public class TaskSchedulingService {
6377
private final Action0 preHook;
6478
private final BlockingQueue<VirtualMachineLease> leaseBlockingQueue = new LinkedBlockingQueue<>();
6579
private final BlockingQueue<Map<String, QueuableTask>> addRunningTasksQueue = new LinkedBlockingQueue<>();
66-
private final BlockingQueue<Map<QueuableTask, String>> removeTasksQueue = new LinkedBlockingQueue<>();
80+
private final BlockingQueue<RemoveTaskRequest> removeTasksQueue = new LinkedBlockingQueue<>();
6781
private final BlockingQueue<Action1<Map<TaskQueue.TaskState, Collection<QueuableTask>>>> taskMapRequest = new LinkedBlockingQueue<>(10);
6882
private final BlockingQueue<Action1<Map<String, Map<VMResource, Double[]>>>> resStatusRequest = new LinkedBlockingQueue<>(10);
6983
private final BlockingQueue<Action1<List<VirtualMachineCurrentState>>> vmCurrStateRequest = new LinkedBlockingQueue<>(10);
@@ -161,20 +175,18 @@ private void addPendingRunningTasks() {
161175

162176
private void removeTasks() {
163177
if (removeTasksQueue.peek() != null) {
164-
List<Map<QueuableTask, String>> r = new LinkedList<>();
165-
removeTasksQueue.drainTo(r);
166-
for (Map<QueuableTask, String> m: r) {
167-
for (Map.Entry<QueuableTask, String> e: m.entrySet()) {
168-
// remove it from the queue and call taskScheduler to unassign it if hostname is not null
169-
try {
170-
taskQueue.getUsageTracker().removeTask(e.getKey().getId(), e.getKey().getQAttributes());
171-
} catch (TaskQueueException e1) {
172-
// shouldn't happen since we're calling outside of scheduling iteration
173-
logger.warn("Unexpected to get exception outside of scheduling iteration: " + e1.getMessage(), e1);
174-
}
175-
if (e.getValue() != null)
176-
taskScheduler.getTaskUnAssigner().call(e.getKey().getId(), e.getValue());
178+
List<RemoveTaskRequest> l = new LinkedList<>();
179+
removeTasksQueue.drainTo(l);
180+
for (RemoveTaskRequest r: l) {
181+
// remove it from the queue and call taskScheduler to unassign it if hostname is not null
182+
try {
183+
taskQueue.getUsageTracker().removeTask(r.taskId, r.qAttributes);
184+
} catch (TaskQueueException e1) {
185+
// shouldn't happen since we're calling outside of scheduling iteration
186+
logger.warn("Unexpected to get exception outside of scheduling iteration: " + e1.getMessage(), e1);
177187
}
188+
if (r.hostname != null)
189+
taskScheduler.getTaskUnAssigner().call(r.taskId, r.hostname);
178190
}
179191
}
180192
}
@@ -288,12 +300,13 @@ public void initializeRunningTask(QueuableTask task, String hostname) {
288300
* not the task is already running. If the task is running, the <code>hostname</code> parameter must be set, otherwise,
289301
* it can be <code>null</code>. The actual remove operation is performed before the start of the next scheduling
290302
* iteration.
291-
* @param task The task to be removed.
303+
* @param taskId The Id of the task to be removed.
304+
* @param qAttributes The queue attributes of the queue that the task belongs to
292305
* @param hostname The name of the VM where the task was assigned resources from, or, <code>null</code> if it was
293306
* not assigned any resources.
294307
*/
295-
public void removeTask(QueuableTask task, String hostname) {
296-
removeTasksQueue.offer(Collections.singletonMap(task, hostname));
308+
public void removeTask(String taskId, QAttributes qAttributes, String hostname) {
309+
removeTasksQueue.offer(new RemoveTaskRequest(taskId, qAttributes, hostname));
297310
}
298311

299312
public final static class Builder {

fenzo-core/src/main/java/com/netflix/fenzo/queues/InternalTaskQueue.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
* the queue while a scheduling iteration using this queue is in progress. Implementations must handle this. Note that,
2828
* it may not be sufficient for the implementations to use concurrent versions of collection classes for queue of tasks.
2929
* The queue must be consistent throughout the scheduling iteration. One recommended way to achieve such consistency is
30-
* to place the {@link #queueTask(QueuableTask)} and {@link #remove(String, QAttributes)}
31-
* operations as requests in a holding area within the implementation and return immediately. Later, actually carry
32-
* them out during the {@link #reset()} method.
30+
* to place the {@link #queueTask(QueuableTask)} operations as requests in a holding area within the implementation and
31+
* return immediately. Later, actually carry them out during the {@link #reset()} method.
3332
*/
3433
public interface InternalTaskQueue extends TaskQueue {
3534
/**
@@ -41,7 +40,7 @@ public interface InternalTaskQueue extends TaskQueue {
4140
* adding to or removing from the queue.
4241
* @throws TaskQueueMultiException If any exceptions that may have occurred during resetting the pointer to the head
4342
* of the queue. Or, this may include exceptions that arose when applying any deferred operations from
44-
* {@link #queueTask(QueuableTask)} and {@link #remove(String, QAttributes)} methods.
43+
* {@link #queueTask(QueuableTask)} method.
4544
*/
4645
boolean reset() throws TaskQueueMultiException;
4746

fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) {
7878
case TASK_LOST:
7979
case TASK_FINISHED:
8080
System.out.println("Task status for " + status.getTaskId().getValue() + ": " + status.getState());
81-
schedSvcGetter.get().removeTask(allTasks.get(status.getTaskId().getValue()),
81+
schedSvcGetter.get().removeTask(status.getTaskId().getValue(),
82+
allTasks.get(status.getTaskId().getValue()).getQAttributes(),
8283
tasksToHostnameMap.get(status.getTaskId().getValue()));
8384
numTasksCompleted.incrementAndGet();
8485
}

fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ public void call(List<VirtualMachineCurrentState> states) {
428428
Assert.fail("Didn't get vm states in time");
429429
}
430430
Assert.assertTrue("Did not find task on vm", found.get());
431-
schedulingService.removeTask(task, leases.get(0).hostname());
431+
schedulingService.removeTask(task.getId(), task.getQAttributes(), leases.get(0).hostname());
432432
found.set(false);
433433
final CountDownLatch latch3 = new CountDownLatch(1);
434434
schedulingService.requestVmCurrentStates(new Action1<List<VirtualMachineCurrentState>>() {

0 commit comments

Comments
 (0)