Skip to content

Commit 60fc9b5

Browse files
Change Scheduler implementations to use primary methods
Work done in ReactiveX#229 added the following methods: - Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)} - Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)} These are in fact the primary methods from RxNet (http://msdn.microsoft.com/en-us/library/hh211963(v=vs.103).aspx) and the others are just helper overloads. It seems it is better to set the precedent to use these 2 methods for actual implementation logic while all other methods are just decorating and forwarding from AbstractScheduler to these methods. I have updated the various implementations to achieve this. Unit tests are passing … but we don't have enough unit test coverage so I won't be surprised if bugs are found.
1 parent e18d821 commit 60fc9b5

9 files changed

+293
-156
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

+34-24
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,38 @@ public interface Scheduler {
3030
/**
3131
* Schedules a cancelable action to be executed.
3232
*
33-
* @param state State to pass into the action.
34-
* @param action Action to schedule.
33+
* @param state
34+
* State to pass into the action.
35+
* @param action
36+
* Action to schedule.
3537
* @return a subscription to be able to unsubscribe from action.
3638
*/
3739
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
3840

41+
/**
42+
* Schedules a cancelable action to be executed in delayTime.
43+
*
44+
* @param state
45+
* State to pass into the action.
46+
* @param action
47+
* Action to schedule.
48+
* @param delayTime
49+
* Time the action is to be delayed before executing.
50+
* @param unit
51+
* Time unit of the delay time.
52+
* @return a subscription to be able to unsubscribe from action.
53+
*/
54+
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
55+
3956
/**
4057
* Schedules a cancelable action to be executed.
4158
*
42-
* @param action Action to schedule.
59+
* @param action
60+
* Action to schedule.
4361
* @return a subscription to be able to unsubscribe from action.
4462
*/
4563
Subscription schedule(Func1<Scheduler, Subscription> action);
46-
64+
4765
/**
4866
* Schedules a cancelable action to be executed.
4967
*
@@ -63,43 +81,35 @@ public interface Scheduler {
6381
Subscription schedule(Action0 action);
6482

6583
/**
66-
* Schedules a cancelable action to be executed in dueTime.
84+
* Schedules a cancelable action to be executed in delayTime.
6785
*
68-
* @param state State to pass into the action.
69-
* @param action Action to schedule.
70-
* @param dueTime Time the action is due for executing.
71-
* @param unit Time unit of the due time.
86+
* @param action
87+
* Action to schedule.
88+
* @param delayTime
89+
* Time the action is to be delayed before executing.
90+
* @param unit
91+
* Time unit of the delay time.
7292
* @return a subscription to be able to unsubscribe from action.
7393
*/
74-
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit);
94+
Subscription schedule(Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit);
7595

7696
/**
77-
* Schedules a cancelable action to be executed in dueTime.
78-
*
79-
* @param action Action to schedule.
80-
* @param dueTime Time the action is due for executing.
81-
* @param unit Time unit of the due time.
82-
* @return a subscription to be able to unsubscribe from action.
83-
*/
84-
Subscription schedule(Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit);
85-
86-
/**
87-
* Schedules an action to be executed in dueTime.
97+
* Schedules an action to be executed in delayTime.
8898
*
8999
* @param action
90100
* action
91101
* @return a subscription to be able to unsubscribe from action.
92102
*/
93-
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
103+
Subscription schedule(Action0 action, long delayTime, TimeUnit unit);
94104

95105
/**
96-
* Schedules a cancelable action to be executed in dueTime.
106+
* Schedules a cancelable action to be executed in delayTime.
97107
*
98108
* @param action
99109
* action
100110
* @return a subscription to be able to unsubscribe from action.
101111
*/
102-
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
112+
Subscription schedule(Func0<Subscription> action, long delayTime, TimeUnit unit);
103113

104114
/**
105115
* Returns the scheduler's notion of current time.

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

+57-38
Original file line numberDiff line numberDiff line change
@@ -25,71 +25,90 @@
2525
import rx.util.functions.Func1;
2626
import rx.util.functions.Func2;
2727

28-
/* package */abstract class AbstractScheduler implements Scheduler {
28+
/**
29+
* Default implementations of various convenience overload methods on the Scheduler.
30+
* <p>
31+
* The methods left to implement are:
32+
* <ul>
33+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
34+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
35+
* </ul>
36+
* <p>
37+
* This is a utility class expected to be used by all {@link Scheduler} implementations since we can't yet rely on Java 8 default methods on the {@link Scheduler}.
38+
*/
39+
public abstract class AbstractScheduler implements Scheduler {
2940

3041
@Override
31-
public Subscription schedule(Action0 action) {
32-
return schedule(asFunc0(action));
42+
public Subscription schedule(final Action0 action) {
43+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
44+
45+
@Override
46+
public Subscription call(Scheduler scheduler, Void t2) {
47+
action.call();
48+
return Subscriptions.empty();
49+
}
50+
});
3351
}
3452

3553
@Override
3654
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
37-
return schedule(new Func0<Subscription>() {
55+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
56+
3857
@Override
39-
public Subscription call() {
40-
return action.call(AbstractScheduler.this);
58+
public Subscription call(Scheduler scheduler, Void t2) {
59+
return action.call(scheduler);
4160
}
4261
});
4362
}
4463

4564
@Override
46-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
47-
return schedule(new Func0<Subscription>() {
48-
@Override
49-
public Subscription call() {
50-
return action.call(AbstractScheduler.this, state);
51-
}
52-
});
65+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
66+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
67+
68+
@Override
69+
public Subscription call(Scheduler scheduler, Void t2) {
70+
action.call();
71+
return Subscriptions.empty();
72+
}
73+
}, delayTime, unit);
5374
}
5475

5576
@Override
56-
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
57-
return schedule(asFunc0(action), dueTime, unit);
77+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
78+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
79+
80+
@Override
81+
public Subscription call(Scheduler scheduler, Void t2) {
82+
return action.call(scheduler);
83+
}
84+
}, delayTime, unit);
5885
}
5986

6087
@Override
61-
public Subscription schedule(final Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
62-
return schedule(new Func0<Subscription>() {
63-
@Override
64-
public Subscription call() {
65-
return action.call(AbstractScheduler.this);
66-
}
67-
}, dueTime, unit);
88+
public Subscription schedule(final Func0<Subscription> action) {
89+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
90+
91+
@Override
92+
public Subscription call(Scheduler scheduler, Void t2) {
93+
return action.call();
94+
}
95+
});
6896
}
6997

7098
@Override
71-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
72-
return schedule(new Func0<Subscription>() {
99+
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
100+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
101+
73102
@Override
74-
public Subscription call() {
75-
return action.call(AbstractScheduler.this, state);
103+
public Subscription call(Scheduler scheduler, Void t2) {
104+
return action.call();
76105
}
77-
}, dueTime, unit);
106+
}, delayTime, unit);
78107
}
79-
108+
80109
@Override
81110
public long now() {
82111
return System.nanoTime();
83112
}
84113

85-
private static Func0<Subscription> asFunc0(final Action0 action) {
86-
return new Func0<Subscription>() {
87-
@Override
88-
public Subscription call() {
89-
action.call();
90-
return Subscriptions.empty();
91-
}
92-
};
93-
}
94-
95114
}

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import org.junit.Test;
2525
import org.mockito.InOrder;
2626

27+
import rx.Scheduler;
2728
import rx.Subscription;
2829
import rx.util.functions.Action0;
29-
import rx.util.functions.Func0;
30+
import rx.util.functions.Func2;
3031

3132
/**
3233
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
@@ -38,37 +39,40 @@ public static CurrentThreadScheduler getInstance() {
3839
return INSTANCE;
3940
}
4041

41-
private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();
42+
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();
4243

4344
private CurrentThreadScheduler() {
4445
}
4546

4647
@Override
47-
public Subscription schedule(Func0<Subscription> action) {
48-
DiscardableAction discardableAction = new DiscardableAction(action);
48+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
49+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
4950
enqueue(discardableAction);
5051
return discardableAction;
5152
}
5253

5354
@Override
54-
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
55-
return schedule(new SleepingAction(action, this, dueTime, unit));
55+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
56+
// since we are executing immediately on this thread we must cause this thread to sleep
57+
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
58+
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
59+
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
5660
}
5761

58-
private void enqueue(DiscardableAction action) {
59-
Queue<DiscardableAction> queue = QUEUE.get();
62+
private void enqueue(DiscardableAction<?> action) {
63+
Queue<DiscardableAction<?>> queue = QUEUE.get();
6064
boolean exec = queue == null;
6165

6266
if (exec) {
63-
queue = new LinkedList<DiscardableAction>();
67+
queue = new LinkedList<DiscardableAction<?>>();
6468
QUEUE.set(queue);
6569
}
6670

6771
queue.add(action);
6872

6973
if (exec) {
7074
while (!queue.isEmpty()) {
71-
queue.poll().call();
75+
queue.poll().call(this);
7276
}
7377

7478
QUEUE.set(null);
@@ -143,4 +147,5 @@ public void testSequenceOfActions() {
143147
}
144148

145149
}
150+
146151
}

rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,31 @@
1717

1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

20+
import rx.Scheduler;
2021
import rx.Subscription;
2122
import rx.util.AtomicObservableSubscription;
22-
import rx.util.functions.Func0;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
2325

2426
/**
2527
* Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked.
2628
*/
27-
/* package */class DiscardableAction implements Func0<Subscription>, Subscription {
28-
private final Func0<Subscription> underlying;
29+
/* package */class DiscardableAction<T> implements Func1<Scheduler, Subscription>, Subscription {
30+
private final Func2<Scheduler, T, Subscription> underlying;
31+
private final T state;
2932

3033
private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
3134
private final AtomicBoolean ready = new AtomicBoolean(true);
3235

33-
public DiscardableAction(Func0<Subscription> underlying) {
36+
public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {
37+
this.state = state;
3438
this.underlying = underlying;
3539
}
3640

3741
@Override
38-
public Subscription call() {
42+
public Subscription call(Scheduler scheduler) {
3943
if (ready.compareAndSet(true, false)) {
40-
Subscription subscription = underlying.call();
44+
Subscription subscription = underlying.call(scheduler, state);
4145
wrapper.wrap(subscription);
4246
return subscription;
4347
}
@@ -49,4 +53,5 @@ public void unsubscribe() {
4953
ready.set(false);
5054
wrapper.unsubscribe();
5155
}
56+
5257
}

0 commit comments

Comments
 (0)