Skip to content

Commit 5ee54a0

Browse files
author
jmhofer
committed
Initial implementation for interval (ReactiveX#55). Needs a test scheduler...
1 parent 1fa96db commit 5ee54a0

File tree

1 file changed

+83
-0
lines changed

1 file changed

+83
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
import static rx.operators.Tester.UnitTest.*;
22+
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.junit.Test;
26+
27+
import rx.Observer;
28+
import rx.Scheduler;
29+
import rx.Subscription;
30+
import rx.concurrency.Schedulers;
31+
import rx.util.functions.Func0;
32+
import rx.util.functions.Func1;
33+
34+
/**
35+
* Returns an observable sequence that produces a value after each period.
36+
* The value starts at 0 and counts up each period.
37+
*/
38+
public final class OperationInterval {
39+
40+
/**
41+
* Creates an event each time interval.
42+
*/
43+
public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUnit unit) {
44+
return new Interval(interval, unit, Schedulers.currentThread());
45+
}
46+
47+
/**
48+
* Creates an event each time interval.
49+
*/
50+
public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUnit unit, Scheduler scheduler) {
51+
return new Interval(interval, unit, scheduler);
52+
}
53+
54+
private static class Interval implements Func1<Observer<Long>, Subscription> {
55+
private final long interval;
56+
private final TimeUnit unit;
57+
private final Scheduler scheduler;
58+
59+
private long currentValue;
60+
61+
private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
62+
this.interval = interval;
63+
this.unit = unit;
64+
this.scheduler = scheduler;
65+
}
66+
67+
@Override
68+
public Subscription call(final Observer<Long> observer) {
69+
return scheduler.schedule(new Func0<Subscription>() {
70+
@Override
71+
public Subscription call() {
72+
observer.onNext(currentValue);
73+
currentValue++;
74+
return Interval.this.call(observer);
75+
}
76+
}, interval, unit);
77+
}
78+
}
79+
80+
public static class UnitTest {
81+
// TODO
82+
}
83+
}

0 commit comments

Comments
 (0)