From 8b039f5b8bdb4dcc18593251a4443f08891e3f9f Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:17:39 +0100 Subject: [PATCH 1/6] add OperationTimer --- .../java/rx/operators/OperationTimer.java | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationTimer.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationTimer.java b/rxjava-core/src/main/java/rx/operators/OperationTimer.java new file mode 100644 index 0000000000..3bb462cac0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationTimer.java @@ -0,0 +1,74 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; + +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; + +public final class OperationTimer { + + public static OnSubscribeFunc timer(long interval, TimeUnit unit) { + return timer(interval, unit, Schedulers.threadPoolForComputation()); + } + + public static OnSubscribeFunc timer(final long delay, final TimeUnit unit, final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new Timer(delay, unit, scheduler, observer).start(); + } + }; + } + + private static class Timer { + private final long period; + private final TimeUnit unit; + private final Scheduler scheduler; + private final Observer observer; + + private Timer(long period, TimeUnit unit, Scheduler scheduler, Observer observer) { + this.period = period; + this.unit = unit; + this.scheduler = scheduler; + this.observer = observer; + } + + public Subscription start() { + final Subscription s = scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(null); + observer.onCompleted(); + } + }, period, unit); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + s.unsubscribe(); + } + }); + } + } + +} From 4deca78d518eb4acf8d33e840bba8b336ad10c6c Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:26:41 +0100 Subject: [PATCH 2/6] add timer methods in Observable.java --- rxjava-core/src/main/java/rx/Observable.java | 28 ++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9482a5f09f..4b8af9c94b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -91,6 +91,7 @@ import rx.operators.OperationThrottleFirst; import rx.operators.OperationTimeInterval; import rx.operators.OperationTimeout; +import rx.operators.OperationTimer; import rx.operators.OperationTimestamp; import rx.operators.OperationToMap; import rx.operators.OperationToMultimap; @@ -1990,6 +1991,33 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler return create(OperationInterval.interval(interval, unit, scheduler)); } + /** + * Emits one item after a given delay, and then completes. + * + * @param interval + * interval size in time units + * @param unit + * time units to use for the interval size + */ + public static Observable timer(long interval, TimeUnit unit) { + return create(OperationTimer.timer(interval, unit)); + } + + /** + * Emits one item after a given delay, and then completes. + * + * @param interval + * interval size in time units + * @param unit + * time units to use for the interval size + * @param scheduler + * the scheduler to use for scheduling the item + */ + public static Observable timer(long interval, TimeUnit unit, + Scheduler scheduler) { + return create(OperationTimer.timer(interval, unit, scheduler)); + } + /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. From 82641f971e13879b859072752b5db94b008dd1f8 Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:30:08 +0100 Subject: [PATCH 3/6] add OperationDelay --- .../java/rx/operators/OperationDelay.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationDelay.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java new file mode 100644 index 0000000000..9cf44ae5fb --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -0,0 +1,42 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.TimeUnit; + +import rx.Observable; +import rx.Scheduler; +import rx.observables.ConnectableObservable; +import rx.util.functions.Func1; + +public final class OperationDelay { + + public static Observable delay(Observable observable, final long delay, final TimeUnit unit, final Scheduler scheduler) { + // observable.map(x => Observable.timer(t).map(_ => x).startItAlreadyNow()).concat() + Observable> seqs = observable.map(new Func1>() { + public Observable call(final T x) { + ConnectableObservable co = Observable.timer(delay, unit, scheduler).map(new Func1() { + public T call(Void ignored) { + return x; + } + }).replay(); + co.connect(); + return co; + } + }); + return Observable.concat(seqs); + } +} From d5964b1df55b739a4de774a8f64f45467987b99e Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:35:54 +0100 Subject: [PATCH 4/6] copy delay test by @jmhofer source: https://github.com/jmhofer/RxJava/blob/e9027293dadf846b64f62e91da7c5c5850ed76f5/rxjava-core/src/main/java/rx/operators/OperationDelay.java --- .../java/rx/operators/OperationDelayTest.java | 181 ++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/operators/OperationDelayTest.java diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java new file mode 100644 index 0000000000..5d6c1f9fe1 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -0,0 +1,181 @@ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observer; +import rx.concurrency.TestScheduler; +import rx.util.functions.Func1; + +public class OperationDelayTest { + @Mock + private Observer observer; + @Mock + private Observer observer2; + + private TestScheduler scheduler; + + @Before + public void before() { + initMocks(this); + scheduler = new TestScheduler(); + } + + @Test + public void testDelay() { + Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testLongDelay() { + Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = OperationDelay.delay(source, 5L, TimeUnit.SECONDS, scheduler); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(5999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(6000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + scheduler.advanceTimeTo(6999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(7000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + scheduler.advanceTimeTo(7999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(8000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithError() { + Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).map(new Func1() { + @Override + public Long call(Long value) { + if (value == 1L) { + throw new RuntimeException("error!"); + } + return value; + } + }); + Observable delayed = OperationDelay.delay(source, 1L, TimeUnit.SECONDS, scheduler); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + + scheduler.advanceTimeTo(5000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } + + // TODO activate this test once https://github.com/Netflix/RxJava/issues/552 is fixed + @Ignore + @Test + public void testDelayWithMultipleSubscriptions() { + Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler); + delayed.subscribe(observer); + delayed.subscribe(observer2); + + InOrder inOrder = inOrder(observer); + InOrder inOrder2 = inOrder(observer2); + + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder2.verify(observer2, times(1)).onNext(0L); + + scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder2.verify(observer2, times(1)).onNext(1L); + + verify(observer, never()).onCompleted(); + verify(observer2, never()).onCompleted(); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder2.verify(observer2, times(1)).onNext(2L); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder2.verify(observer2, times(1)).onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer2, never()).onError(any(Throwable.class)); + } +} From 84d0b66ef66e04580ce9abf5a9084f42bd7cd14d Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:51:58 +0100 Subject: [PATCH 5/6] add delay methods in Observable.java (copied those by @jmhofer) source: https://github.com/jmhofer/RxJava/blob/18d40522bb19f80c0ff8d4079bcb925742efecf4/rxjava-core/src/main/java/rx/Observable.java --- rxjava-core/src/main/java/rx/Observable.java | 34 ++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 4b8af9c94b..6a4b930b83 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -45,6 +45,7 @@ import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; +import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; @@ -2013,11 +2014,40 @@ public static Observable timer(long interval, TimeUnit unit) { * @param scheduler * the scheduler to use for scheduling the item */ - public static Observable timer(long interval, TimeUnit unit, - Scheduler scheduler) { + public static Observable timer(long interval, TimeUnit unit, Scheduler scheduler) { return create(OperationTimer.timer(interval, unit, scheduler)); } + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit) { + return OperationDelay.delay(this, delay, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @param scheduler + * the {@link Scheduler} to use for delaying + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { + return OperationDelay.delay(this, delay, unit, scheduler); + } + /** * Drops items emitted by an Observable that are followed by newer items * before a timeout value expires. The timer resets on each emission. From 90cdbc3a80a8c939976bcbfb5f81e8a76bef275f Mon Sep 17 00:00:00 2001 From: samuelgruetter Date: Fri, 6 Dec 2013 16:54:45 +0100 Subject: [PATCH 6/6] make OperationDelayTest test Observable.delay instead of OperationDelay.delay --- .../src/test/java/rx/operators/OperationDelayTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java index 5d6c1f9fe1..224a82be55 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationDelayTest.java @@ -38,7 +38,7 @@ public void before() { @Test public void testDelay() { Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); - Observable delayed = OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler); + Observable delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler); delayed.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -78,7 +78,7 @@ public void testDelay() { @Test public void testLongDelay() { Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); - Observable delayed = OperationDelay.delay(source, 5L, TimeUnit.SECONDS, scheduler); + Observable delayed = source.delay(5L, TimeUnit.SECONDS, scheduler); delayed.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -115,7 +115,7 @@ public Long call(Long value) { return value; } }); - Observable delayed = OperationDelay.delay(source, 1L, TimeUnit.SECONDS, scheduler); + Observable delayed = source.delay(1L, TimeUnit.SECONDS, scheduler); delayed.subscribe(observer); InOrder inOrder = inOrder(observer); @@ -141,7 +141,7 @@ public Long call(Long value) { @Test public void testDelayWithMultipleSubscriptions() { Observable source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3); - Observable delayed = OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler); + Observable delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler); delayed.subscribe(observer); delayed.subscribe(observer2);