Skip to content

Commit d8181f5

Browse files
Merge pull request #239 from mairbek/sched-priorityqueue
CurrentThreadScheduler Delayed Execution Fix
2 parents ce65cca + c978e36 commit d8181f5

File tree

3 files changed

+106
-27
lines changed

3 files changed

+106
-27
lines changed

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 92 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,9 +17,9 @@
1717

1818
import static org.mockito.Mockito.*;
1919

20-
import java.util.LinkedList;
21-
import java.util.Queue;
20+
import java.util.PriorityQueue;
2221
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2323

2424
import org.junit.Test;
2525
import org.mockito.InOrder;
@@ -39,46 +39,70 @@ public static CurrentThreadScheduler getInstance() {
3939
return INSTANCE;
4040
}
4141

42-
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();
42+
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
4343

4444
private CurrentThreadScheduler() {
4545
}
4646

47+
private final AtomicInteger counter = new AtomicInteger(0);
48+
4749
@Override
4850
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
4951
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
50-
enqueue(discardableAction);
52+
enqueue(discardableAction, now());
5153
return discardableAction;
5254
}
5355

5456
@Override
5557
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
56-
// since we are executing immediately on this thread we must cause this thread to sleep
57-
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
58-
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
59-
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
58+
long execTime = now() + unit.toMillis(dueTime);
59+
60+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
61+
enqueue(discardableAction, execTime);
62+
return discardableAction;
6063
}
6164

62-
private void enqueue(DiscardableAction<?> action) {
63-
Queue<DiscardableAction<?>> queue = QUEUE.get();
65+
private void enqueue(DiscardableAction<?> action, long execTime) {
66+
PriorityQueue<TimedAction> queue = QUEUE.get();
6467
boolean exec = queue == null;
6568

6669
if (exec) {
67-
queue = new LinkedList<DiscardableAction<?>>();
70+
queue = new PriorityQueue<TimedAction>();
6871
QUEUE.set(queue);
6972
}
7073

71-
queue.add(action);
74+
queue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
7275

7376
if (exec) {
7477
while (!queue.isEmpty()) {
75-
queue.poll().call(this);
78+
queue.poll().action.call(this);
7679
}
7780

7881
QUEUE.set(null);
7982
}
8083
}
8184

85+
private static class TimedAction implements Comparable<TimedAction> {
86+
final DiscardableAction<?> action;
87+
final Long execTime;
88+
final Integer count; // In case if time between enqueueing took less than 1ms
89+
90+
private TimedAction(DiscardableAction<?> action, Long execTime, Integer count) {
91+
this.action = action;
92+
this.execTime = execTime;
93+
this.count = count;
94+
}
95+
96+
@Override
97+
public int compareTo(TimedAction that) {
98+
int result = execTime.compareTo(that.execTime);
99+
if (result == 0) {
100+
return count.compareTo(that.count);
101+
}
102+
return result;
103+
}
104+
}
105+
82106
public static class UnitTest {
83107

84108
@Test
@@ -146,6 +170,58 @@ public void testSequenceOfActions() {
146170

147171
}
148172

173+
@Test
174+
public void testSequenceOfDelayedActions() {
175+
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
176+
177+
final Action0 first = mock(Action0.class);
178+
final Action0 second = mock(Action0.class);
179+
180+
scheduler.schedule(new Action0() {
181+
@Override
182+
public void call() {
183+
scheduler.schedule(first, 30, TimeUnit.MILLISECONDS);
184+
scheduler.schedule(second, 10, TimeUnit.MILLISECONDS);
185+
}
186+
});
187+
188+
InOrder inOrder = inOrder(first, second);
189+
190+
inOrder.verify(second, times(1)).call();
191+
inOrder.verify(first, times(1)).call();
192+
193+
194+
}
195+
196+
@Test
197+
public void testMixOfDelayedAndNonDelayedActions() {
198+
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
199+
200+
final Action0 first = mock(Action0.class);
201+
final Action0 second = mock(Action0.class);
202+
final Action0 third = mock(Action0.class);
203+
final Action0 fourth = mock(Action0.class);
204+
205+
scheduler.schedule(new Action0() {
206+
@Override
207+
public void call() {
208+
scheduler.schedule(first);
209+
scheduler.schedule(second, 300, TimeUnit.MILLISECONDS);
210+
scheduler.schedule(third, 100, TimeUnit.MILLISECONDS);
211+
scheduler.schedule(fourth);
212+
}
213+
});
214+
215+
InOrder inOrder = inOrder(first, second, third, fourth);
216+
217+
inOrder.verify(first, times(1)).call();
218+
inOrder.verify(fourth, times(1)).call();
219+
inOrder.verify(third, times(1)).call();
220+
inOrder.verify(second, times(1)).call();
221+
222+
223+
}
224+
149225
}
150226

151227
}

rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
4848
@Override
4949
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
5050
// since we are executing immediately on this thread we must cause this thread to sleep
51-
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
51+
long execTime = now() + unit.toMillis(dueTime);
52+
53+
return schedule(state, new SleepingAction<T>(action, this, execTime));
5254
}
5355

5456
public static class UnitTest {

rxjava-core/src/main/java/rx/concurrency/SleepingAction.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,24 +26,25 @@
2626
private final Scheduler scheduler;
2727
private final long execTime;
2828

29-
public SleepingAction(Func2<Scheduler, T, Subscription> underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
29+
public SleepingAction(Func2<Scheduler, T, Subscription> underlying, Scheduler scheduler, long execTime) {
3030
this.underlying = underlying;
3131
this.scheduler = scheduler;
32-
this.execTime = scheduler.now() + timeUnit.toMillis(timespan);
32+
this.execTime = execTime;
3333
}
3434

35+
3536
@Override
3637
public Subscription call(Scheduler s, T state) {
3738
if (execTime > scheduler.now()) {
3839
long delay = execTime - scheduler.now();
3940
if (delay> 0) {
40-
try {
41+
try {
4142
Thread.sleep(delay);
4243
}
43-
catch (InterruptedException e) {
44-
Thread.currentThread().interrupt();
45-
throw new RuntimeException(e);
46-
}
44+
catch (InterruptedException e) {
45+
Thread.currentThread().interrupt();
46+
throw new RuntimeException(e);
47+
}
4748
}
4849
}
4950

0 commit comments

Comments
 (0)