Skip to content

Commit e0b4a68

Browse files
Merge pull request ReactiveX#228 from jmhofer/interval
Interval
2 parents 3eb5602 + f42e9c3 commit e0b4a68

File tree

2 files changed

+234
-0
lines changed

2 files changed

+234
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import java.util.Comparator;
19+
import java.util.PriorityQueue;
20+
import java.util.Queue;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import rx.Subscription;
24+
import rx.subscriptions.Subscriptions;
25+
import rx.util.functions.Func0;
26+
27+
public class TestScheduler extends AbstractScheduler {
28+
private final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());
29+
30+
private static class TimedAction {
31+
private final long time;
32+
private final Func0<Subscription> action;
33+
34+
private TimedAction(long time, Func0<Subscription> action) {
35+
this.time = time;
36+
this.action = action;
37+
}
38+
39+
@Override
40+
public String toString() {
41+
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
42+
}
43+
}
44+
45+
private static class CompareActionsByTime implements Comparator<TimedAction> {
46+
@Override
47+
public int compare(TimedAction action1, TimedAction action2) {
48+
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
49+
}
50+
}
51+
52+
private long time;
53+
54+
@Override
55+
public Subscription schedule(Func0<Subscription> action) {
56+
return schedule(action, 0L, TimeUnit.NANOSECONDS);
57+
}
58+
59+
@Override
60+
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
61+
queue.add(new TimedAction(now() + unit.toNanos(dueTime), action));
62+
return Subscriptions.empty();
63+
}
64+
65+
@Override
66+
public long now() {
67+
return time;
68+
}
69+
70+
public void advanceTimeBy(long dueTime, TimeUnit unit) {
71+
advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS);
72+
}
73+
74+
public void advanceTimeTo(long dueTime, TimeUnit unit) {
75+
long targetTime = unit.toNanos(dueTime);
76+
triggerActions(targetTime);
77+
}
78+
79+
public void triggerActions() {
80+
triggerActions(time);
81+
}
82+
83+
private void triggerActions(long targetTimeInNanos) {
84+
while (! queue.isEmpty()) {
85+
TimedAction current = queue.peek();
86+
if (current.time > targetTimeInNanos) {
87+
break;
88+
}
89+
time = current.time;
90+
queue.remove();
91+
current.action.call();
92+
}
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.mockito.InOrder;
28+
29+
import rx.Observable;
30+
import rx.Observer;
31+
import rx.Scheduler;
32+
import rx.Subscription;
33+
import rx.concurrency.Schedulers;
34+
import rx.concurrency.TestScheduler;
35+
import rx.subscriptions.Subscriptions;
36+
import rx.util.functions.Action0;
37+
import rx.util.functions.Func1;
38+
39+
/**
40+
* Returns an observable sequence that produces a value after each period.
41+
* The value starts at 0 and counts up each period.
42+
*/
43+
public final class OperationInterval {
44+
45+
/**
46+
* Creates an event each time interval.
47+
*/
48+
public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUnit unit) {
49+
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
50+
}
51+
52+
/**
53+
* Creates an event each time interval.
54+
*/
55+
public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUnit unit, Scheduler scheduler) {
56+
return new Interval(interval, unit, scheduler);
57+
}
58+
59+
private static class Interval implements Func1<Observer<Long>, Subscription> {
60+
private final long interval;
61+
private final TimeUnit unit;
62+
private final Scheduler scheduler;
63+
64+
private long currentValue;
65+
private final AtomicBoolean complete = new AtomicBoolean();
66+
67+
private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
68+
this.interval = interval;
69+
this.unit = unit;
70+
this.scheduler = scheduler;
71+
}
72+
73+
@Override
74+
public Subscription call(final Observer<Long> observer) {
75+
scheduler.schedule(new IntervalAction(observer), interval, unit);
76+
return Subscriptions.create(new Action0() {
77+
@Override
78+
public void call() {
79+
complete.set(true);
80+
}
81+
});
82+
}
83+
84+
private class IntervalAction implements Action0 {
85+
private final Observer<Long> observer;
86+
87+
private IntervalAction(Observer<Long> observer) {
88+
this.observer = observer;
89+
}
90+
91+
@Override
92+
public void call() {
93+
if (complete.get()) {
94+
observer.onCompleted();
95+
} else {
96+
observer.onNext(currentValue);
97+
currentValue++;
98+
scheduler.schedule(this, interval, unit);
99+
}
100+
}
101+
}
102+
}
103+
104+
public static class UnitTest {
105+
private TestScheduler scheduler;
106+
private Observer<Long> observer;
107+
108+
@Before
109+
@SuppressWarnings("unchecked") // due to mocking
110+
public void before() {
111+
scheduler = new TestScheduler();
112+
observer = mock(Observer.class);
113+
}
114+
115+
@Test
116+
public void testInterval() {
117+
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
118+
Subscription sub = w.subscribe(observer);
119+
120+
verify(observer, never()).onNext(0L);
121+
verify(observer, never()).onCompleted();
122+
verify(observer, never()).onError(any(Exception.class));
123+
124+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
125+
126+
InOrder inOrder = inOrder(observer);
127+
inOrder.verify(observer, times(1)).onNext(0L);
128+
inOrder.verify(observer, times(1)).onNext(1L);
129+
inOrder.verify(observer, never()).onNext(2L);
130+
verify(observer, never()).onCompleted();
131+
verify(observer, never()).onError(any(Exception.class));
132+
133+
sub.unsubscribe();
134+
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);
135+
verify(observer, never()).onNext(2L);
136+
verify(observer, times(1)).onCompleted();
137+
verify(observer, never()).onError(any(Exception.class));
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)