From 3424601e2a90fa38a527b69714bfe3626de4092e Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 21:44:36 +0200 Subject: [PATCH 1/7] Initial implementation for interval (#55). Needs a test scheduler... --- .../java/rx/operators/OperationInterval.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationInterval.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java new file mode 100644 index 0000000000..b971da45ba --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -0,0 +1,83 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static rx.operators.Tester.UnitTest.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Returns an observable sequence that produces a value after each period. + * The value starts at 0 and counts up each period. + */ +public final class OperationInterval { + + /** + * Creates an event each time interval. + */ + public static Func1, Subscription> interval(long interval, TimeUnit unit) { + return new Interval(interval, unit, Schedulers.currentThread()); + } + + /** + * Creates an event each time interval. + */ + public static Func1, Subscription> interval(long interval, TimeUnit unit, Scheduler scheduler) { + return new Interval(interval, unit, scheduler); + } + + private static class Interval implements Func1, Subscription> { + private final long interval; + private final TimeUnit unit; + private final Scheduler scheduler; + + private long currentValue; + + private Interval(long interval, TimeUnit unit, Scheduler scheduler) { + this.interval = interval; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + observer.onNext(currentValue); + currentValue++; + return Interval.this.call(observer); + } + }, interval, unit); + } + } + + public static class UnitTest { + // TODO + } +} From e1c235240e83078e4b3aa6d3fe18b5c77fdfb51c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 22:33:49 +0200 Subject: [PATCH 2/7] Added a TestScheduler which collects actions in a queue and has adjustable time. --- .../java/rx/concurrency/TestScheduler.java | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/concurrency/TestScheduler.java diff --git a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java new file mode 100644 index 0000000000..145018dcc9 --- /dev/null +++ b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java @@ -0,0 +1,89 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; + +public class TestScheduler extends AbstractScheduler { + private final Queue queue = new PriorityQueue(11, new CompareActionsByTime()); + + private static class TimedAction { + private final long time; + private final Func0 action; + + private TimedAction(long time, Func0 action) { + this.time = time; + this.action = action; + } + } + + private static class CompareActionsByTime implements Comparator { + @Override + public int compare(TimedAction action1, TimedAction action2) { + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + } + } + + private long time; + + @Override + public Subscription schedule(Func0 action) { + return schedule(action, 0L, TimeUnit.NANOSECONDS); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + queue.add(new TimedAction(now() + unit.toNanos(dueTime), action)); + return Subscriptions.empty(); + } + + @Override + public long now() { + return time; + } + + public void advanceTimeBy(long dueTime, TimeUnit unit) { + advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS); + } + + public void advanceTimeTo(long dueTime, TimeUnit unit) { + long targetTime = unit.toNanos(dueTime); + triggerActions(targetTime); + } + + public void triggerActions() { + triggerActions(time); + } + + private void triggerActions(long targetTimeInNanos) { + while (! queue.isEmpty()) { + TimedAction current = queue.peek(); + if (current.time > targetTimeInNanos) { + break; + } + time = current.time; + queue.remove(); + current.action.call(); + } + } +} From 0181f0ef2b7564fbca01435449c0cbb879deaaaa Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 23:18:50 +0200 Subject: [PATCH 3/7] Added a unit test. Fixed the implementation. Maybe still a bit naive when it comes to intervals that are too small to keep up with? --- .../java/rx/operators/OperationInterval.java | 74 ++++++++++++++++--- .../java/rx/concurrency/TestScheduler.java | 5 ++ 2 files changed, 70 insertions(+), 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index b971da45ba..20e73a7521 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -15,20 +15,24 @@ */ package rx.operators; -import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.Tester.UnitTest.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; +import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.util.functions.Func0; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func1; /** @@ -57,6 +61,7 @@ private static class Interval implements Func1, Subscription> { private final Scheduler scheduler; private long currentValue; + private final AtomicBoolean complete = new AtomicBoolean(); private Interval(long interval, TimeUnit unit, Scheduler scheduler) { this.interval = interval; @@ -66,18 +71,69 @@ private Interval(long interval, TimeUnit unit, Scheduler scheduler) { @Override public Subscription call(final Observer observer) { - return scheduler.schedule(new Func0() { - @Override - public Subscription call() { + scheduler.schedule(new IntervalAction(observer), interval, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + } + }); + } + + private class IntervalAction implements Action0 { + private final Observer observer; + + private IntervalAction(Observer observer) { + this.observer = observer; + } + + @Override + public void call() { + if (complete.get()) { + observer.onCompleted(); + } else { observer.onNext(currentValue); currentValue++; - return Interval.this.call(observer); + scheduler.schedule(this, interval, unit); } - }, interval, unit); + } } } public static class UnitTest { - // TODO + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") // due to mocking + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testInterval() { + Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Subscription sub = w.subscribe(observer); + + verify(observer, never()).onNext(0L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, never()).onNext(2L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + sub.unsubscribe(); + scheduler.advanceTimeTo(4, TimeUnit.SECONDS); + verify(observer, never()).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } } } diff --git a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java index 145018dcc9..ae49ae17d4 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java @@ -35,6 +35,11 @@ private TimedAction(long time, Func0 action) { this.time = time; this.action = action; } + + @Override + public String toString() { + return String.format("TimedAction(time = %d, action = %s)", time, action.toString()); + } } private static class CompareActionsByTime implements Comparator { From 70bd7099ec082958928b821e89fbb7c76b1ed01c Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 23:58:15 +0200 Subject: [PATCH 4/7] Switched to a default scheduler that actually works together with this operator. --- rxjava-core/src/main/java/rx/operators/OperationInterval.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 20e73a7521..0e03b7fe75 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -18,6 +18,7 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,7 +46,7 @@ public final class OperationInterval { * Creates an event each time interval. */ public static Func1, Subscription> interval(long interval, TimeUnit unit) { - return new Interval(interval, unit, Schedulers.currentThread()); + return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } /** From b316700b76a311ad7821811838f7ba27cd088e87 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 8 Apr 2013 23:59:33 +0200 Subject: [PATCH 5/7] Fixed indentation. --- .../src/main/java/rx/operators/OperationInterval.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 0e03b7fe75..19a3736826 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -74,10 +74,10 @@ private Interval(long interval, TimeUnit unit, Scheduler scheduler) { public Subscription call(final Observer observer) { scheduler.schedule(new IntervalAction(observer), interval, unit); return Subscriptions.create(new Action0() { - @Override - public void call() { - complete.set(true); - } + @Override + public void call() { + complete.set(true); + } }); } From 394fb92a0b41be1d6d98a19c2588987f0d3d3af1 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Tue, 9 Apr 2013 00:06:07 +0200 Subject: [PATCH 6/7] Fixed yet more indentation issues. --- .../src/test/java/rx/concurrency/TestScheduler.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java index ae49ae17d4..27d3c18bd4 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java @@ -45,7 +45,7 @@ public String toString() { private static class CompareActionsByTime implements Comparator { @Override public int compare(TimedAction action1, TimedAction action2) { - return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); + return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time)); } } @@ -53,13 +53,13 @@ public int compare(TimedAction action1, TimedAction action2) { @Override public Subscription schedule(Func0 action) { - return schedule(action, 0L, TimeUnit.NANOSECONDS); + return schedule(action, 0L, TimeUnit.NANOSECONDS); } @Override public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - queue.add(new TimedAction(now() + unit.toNanos(dueTime), action)); - return Subscriptions.empty(); + queue.add(new TimedAction(now() + unit.toNanos(dueTime), action)); + return Subscriptions.empty(); } @Override @@ -68,7 +68,7 @@ public long now() { } public void advanceTimeBy(long dueTime, TimeUnit unit) { - advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS); + advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS); } public void advanceTimeTo(long dueTime, TimeUnit unit) { From 61fb9244c525138e921ce93d2c9fe98ef8054fde Mon Sep 17 00:00:00 2001 From: jmhofer Date: Tue, 9 Apr 2013 00:32:04 +0200 Subject: [PATCH 7/7] Oops, the test scheduler has to go into the main sources. --- .../src/{test => main}/java/rx/concurrency/TestScheduler.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename rxjava-core/src/{test => main}/java/rx/concurrency/TestScheduler.java (100%) diff --git a/rxjava-core/src/test/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java similarity index 100% rename from rxjava-core/src/test/java/rx/concurrency/TestScheduler.java rename to rxjava-core/src/main/java/rx/concurrency/TestScheduler.java