From b4fdd28be7a81334459302ff22d27d919a0f5471 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 14 Sep 2013 11:40:29 +0200 Subject: [PATCH 1/7] added initial delay operation implementation --- .../java/rx/operators/OperationDelay.java | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationDelay.java diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java new file mode 100644 index 0000000000..9f572c9c0b --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -0,0 +1,157 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; +import static rx.Observable.interval; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.util.functions.Action0; + +/** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. + */ +public final class OperationDelay { + + /** + * Delays the observable sequence by the given time interval. + */ + public static OnSubscribeFunc delay(final Observable source, long delay, TimeUnit unit) { + return new Delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * Delays the observable sequence by the given time interval. + */ + public static OnSubscribeFunc delay(final Observable source, long period, TimeUnit unit, Scheduler scheduler) { + return new Delay(source, period, unit, scheduler); + } + + private static class Delay implements OnSubscribeFunc { + private final Observable source; + private final long delay; + private final TimeUnit unit; + private final Scheduler scheduler; + + private Delay(Observable source, long delay, TimeUnit unit, Scheduler scheduler) { + this.source = source; + this.delay = delay; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + return source.subscribe(new Observer() { + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, unit); + } + + @Override + public void onError(Throwable e) { + // errors get propagated without delay + observer.onError(e); + } + + @Override + public void onNext(final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, unit); + } + }); + } + } + + public static class UnitTest { + @Mock + private Observer observer; + + private TestScheduler scheduler; + + @Before + public void before() { + initMocks(this); + scheduler = new TestScheduler(); + } + + @Test + public void testDelay() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(any(Long.class)); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3400L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + } +} From 9789720e31ba70952dfc2bc23d4bf3a1f996c487 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 14 Sep 2013 12:16:31 +0200 Subject: [PATCH 2/7] more tests against delay, found and fixed a bug with error handling in interval, too --- .../java/rx/operators/OperationDelay.java | 100 +++++++++++++++--- .../java/rx/operators/OperationInterval.java | 51 +++++++-- 2 files changed, 131 insertions(+), 20 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index 9f572c9c0b..f3e60bcfb2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -22,6 +22,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -36,6 +37,7 @@ import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; import rx.util.functions.Action0; +import rx.util.functions.Func1; /** * Returns an Observable that emits the results of shifting the items emitted by the source @@ -73,14 +75,18 @@ private Delay(Observable source, long delay, TimeUnit unit, Schedul @Override public Subscription onSubscribe(final Observer observer) { return source.subscribe(new Observer() { + private AtomicBoolean errorOccurred = new AtomicBoolean(); + @Override public void onCompleted() { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onCompleted(); - } - }, delay, unit); + if (!errorOccurred.get()) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, unit); + } } @Override @@ -91,12 +97,19 @@ public void onError(Throwable e) { @Override public void onNext(final T value) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onNext(value); - } - }, delay, unit); + if (!errorOccurred.get()) { + scheduler.schedule(new Action0() { + @Override + public void call() { + try { + observer.onNext(value); + } catch (Throwable t) { + errorOccurred.set(true); + observer.onError(t); + } + } + }, delay, unit); + } } }); } @@ -122,7 +135,7 @@ public void testDelay() { InOrder inOrder = inOrder(observer); scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); - verify(observer, never()).onNext(any(Long.class)); + verify(observer, never()).onNext(anyLong()); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); @@ -153,5 +166,66 @@ public void testDelay() { verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + + @Test + public void testLongDelay() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 5L, TimeUnit.SECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(5999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(6000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + scheduler.advanceTimeTo(6999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(7000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + scheduler.advanceTimeTo(7999L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + scheduler.advanceTimeTo(8000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testDelayWithError() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).map(new Func1() { + @Override + public Long call(Long value) { + if (value == 1L) { + throw new RuntimeException("error!"); + } + return value; + } + }); + Observable delayed = Observable.create(OperationDelay.delay(source, 1L, TimeUnit.SECONDS, scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1999L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(2000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onError(any(Throwable.class)); + inOrder.verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + + scheduler.advanceTimeTo(5000L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 0f53c884ba..0d2ded62b1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -35,6 +35,7 @@ import rx.observables.ConnectableObservable; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; +import rx.util.functions.Func1; /** * Returns an observable sequence that produces a value after each period. @@ -68,6 +69,7 @@ private static class Interval implements OnSubscribeFunc { private final Scheduler scheduler; private long currentValue; + private boolean errorOccurred; private Interval(long period, TimeUnit unit, Scheduler scheduler) { this.period = period; @@ -80,8 +82,15 @@ public Subscription onSubscribe(final Observer observer) { final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { @Override public void call() { - observer.onNext(currentValue); - currentValue++; + if (!errorOccurred) { + try { + observer.onNext(currentValue); + currentValue++; + } catch (Throwable t) { + errorOccurred = true; + observer.onError(t); + } + } } }, period, period, unit); @@ -89,7 +98,9 @@ public void call() { @Override public void call() { wrapped.unsubscribe(); - observer.onCompleted(); + if (!errorOccurred) { + observer.onCompleted(); + } } }); } @@ -110,7 +121,7 @@ public void before() { @Test public void testInterval() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub = w.subscribe(observer); verify(observer, never()).onNext(0L); @@ -133,9 +144,35 @@ public void testInterval() { verify(observer, never()).onError(any(Throwable.class)); } + @Test + public void testIntervalWithError() { + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)).map(new Func1() { + @Override + public Long call(Long value) { + if (value == 2L) { + throw new RuntimeException("error!"); + } + return value; + } + }); + w.subscribe(observer); + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(2, TimeUnit.SECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onNext(1L); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + + scheduler.advanceTimeTo(3, TimeUnit.SECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + verify(observer, never()).onCompleted(); + } + @Test public void testWithMultipleSubscribersStartingAtSameTime() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub1 = w.subscribe(observer); Subscription sub2 = w.subscribe(observer2); @@ -174,7 +211,7 @@ public void testWithMultipleSubscribersStartingAtSameTime() { @Test public void testWithMultipleStaggeredSubscribers() { - Observable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)); + Observable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)); Subscription sub1 = w.subscribe(observer); verify(observer, never()).onNext(anyLong()); @@ -214,7 +251,7 @@ public void testWithMultipleStaggeredSubscribers() { @Test public void testWithMultipleStaggeredSubscribersAndPublish() { - ConnectableObservable w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish(); + ConnectableObservable w = Observable.create(interval(1, TimeUnit.SECONDS, scheduler)).publish(); Subscription sub1 = w.subscribe(observer); w.connect(); From 50336563b81329830ee18b9f3bfd306dfe5c7649 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 14 Sep 2013 12:40:12 +0200 Subject: [PATCH 3/7] added test against multiple subscriptions --- .../java/rx/operators/OperationDelay.java | 46 ++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index f3e60bcfb2..c9b25d9bcc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -55,7 +55,7 @@ public static OnSubscribeFunc delay(final Observable source, /** * Delays the observable sequence by the given time interval. */ - public static OnSubscribeFunc delay(final Observable source, long period, TimeUnit unit, Scheduler scheduler) { + public static OnSubscribeFunc delay(final Observable source, final long period, final TimeUnit unit, final Scheduler scheduler) { return new Delay(source, period, unit, scheduler); } @@ -92,6 +92,7 @@ public void call() { @Override public void onError(Throwable e) { // errors get propagated without delay + errorOccurred.set(true); observer.onError(e); } @@ -118,6 +119,8 @@ public void call() { public static class UnitTest { @Mock private Observer observer; + @Mock + private Observer observer2; private TestScheduler scheduler; @@ -227,5 +230,46 @@ public Long call(Long value) { inOrder.verify(observer, never()).onError(any(Throwable.class)); verify(observer, never()).onCompleted(); } + + @Test + public void testDelayWithMultipleSubscriptions() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); + Observable delayed = Observable.create(OperationDelay.delay(source, 500L, TimeUnit.MILLISECONDS, scheduler)); + delayed.subscribe(observer); + delayed.subscribe(observer2); + + InOrder inOrder = inOrder(observer); + InOrder inOrder2 = inOrder(observer2); + + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder2.verify(observer2, times(1)).onNext(0L); + + scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(1L); + inOrder2.verify(observer2, times(1)).onNext(1L); + + verify(observer, never()).onCompleted(); + verify(observer2, never()).onCompleted(); + + scheduler.advanceTimeTo(3500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(2L); + inOrder2.verify(observer2, times(1)).onNext(2L); + inOrder.verify(observer, never()).onNext(anyLong()); + inOrder2.verify(observer2, never()).onNext(anyLong()); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder2.verify(observer2, times(1)).onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer2, never()).onError(any(Throwable.class)); + } } } From 26c200f612e11cc1bf75af586f5ae237f6d7ecba Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 14 Sep 2013 12:50:20 +0200 Subject: [PATCH 4/7] added due time delay and test against that --- .../java/rx/operators/OperationDelay.java | 43 ++++++++++++++++++- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index c9b25d9bcc..a8605148a8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -20,6 +20,7 @@ import static org.mockito.MockitoAnnotations.initMocks; import static rx.Observable.interval; +import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,7 +50,26 @@ public final class OperationDelay { * Delays the observable sequence by the given time interval. */ public static OnSubscribeFunc delay(final Observable source, long delay, TimeUnit unit) { - return new Delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + return delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * Delays the observable sequence by a time interval so that it starts at the given due time. + */ + public static OnSubscribeFunc delay(final Observable source, Date dueTime) { + return delay(source, dueTime, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * Delays the observable sequence by a time interval so that it starts at the given due time. + */ + public static OnSubscribeFunc delay(final Observable source, Date dueTime, final Scheduler scheduler) { + long scheduledTime = dueTime.getTime(); + long delay = scheduledTime - scheduler.now(); + if (delay < 0L) { + delay = 0L; + } + return new Delay(source, delay, TimeUnit.MILLISECONDS, scheduler); } /** @@ -58,7 +78,7 @@ public static OnSubscribeFunc delay(final Observable source, public static OnSubscribeFunc delay(final Observable source, final long period, final TimeUnit unit, final Scheduler scheduler) { return new Delay(source, period, unit, scheduler); } - + private static class Delay implements OnSubscribeFunc { private final Observable source; private final long delay; @@ -170,6 +190,25 @@ public void testDelay() { verify(observer, never()).onError(any(Throwable.class)); } + @Test + public void testDelayWithDueTime() { + Observable source = interval(1L, TimeUnit.SECONDS, scheduler).first(); + Observable delayed = Observable.create(OperationDelay.delay(source, new Date(1500L), scheduler)); + delayed.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(2499L, TimeUnit.MILLISECONDS); + verify(observer, never()).onNext(anyLong()); + verify(observer, never()).onCompleted(); + + scheduler.advanceTimeTo(2500L, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext(0L); + inOrder.verify(observer, times(1)).onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + } + @Test public void testLongDelay() { Observable source = interval(1L, TimeUnit.SECONDS, scheduler).take(3); From 18d40522bb19f80c0ff8d4079bcb925742efecf4 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 14 Sep 2013 12:59:43 +0200 Subject: [PATCH 5/7] added delay methods to Observable and documented them --- rxjava-core/src/main/java/rx/Observable.java | 60 ++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 85c0359b3f..0142f9837b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Date; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -35,6 +36,7 @@ import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; import rx.operators.OperationDefer; +import rx.operators.OperationDelay; import rx.operators.OperationDematerialize; import rx.operators.OperationDistinctUntilChanged; import rx.operators.OperationDistinct; @@ -3551,6 +3553,64 @@ public Observable scan(Func2 accumulator) { return create(OperationScan.scan(this, accumulator)); } + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Only errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit) { + return create(OperationDelay.delay(this, delay, unit)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a specified delay. Only errors emitted by the source Observable are not delayed. + * @param delay + * the delay to shift the source by + * @param unit + * the {@link TimeUnit} in which period is defined + * @param scheduler + * the {@link Scheduler} to use for delaying + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(long delay, TimeUnit unit, Scheduler scheduler) { + return create(OperationDelay.delay(this, delay, unit, scheduler)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a delay specified by the due time at which to begin emitting. + * Only errors emitted by the source Observable are not delayed. + * @param dueTime + * the due time at which to start emitting + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(Date dueTime) { + return create(OperationDelay.delay(this, dueTime)); + } + + /** + * Returns an Observable that emits the results of shifting the items emitted by the source + * Observable by a delay specified by the due time at which to begin emitting. + * Only errors emitted by the source Observable are not delayed. + * @param dueTime + * the due time at which to start emitting + * @param scheduler + * the {@link Scheduler} to use for delaying + * @return the source Observable, but shifted by the specified delay + * @see MSDN: Observable.Delay + */ + public Observable delay(Date dueTime, Scheduler scheduler) { + return create(OperationDelay.delay(this, dueTime, scheduler)); + } + /** * Returns an Observable that emits the results of sampling the items emitted by the source * Observable at a specified time interval. From b2e755cd509da4c0599ac9404dc00266bb38d21a Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 15 Sep 2013 09:46:36 +0200 Subject: [PATCH 6/7] now defaults to thread pool scheduling --- rxjava-core/src/main/java/rx/operators/OperationDelay.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationDelay.java b/rxjava-core/src/main/java/rx/operators/OperationDelay.java index a8605148a8..603be7080a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDelay.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDelay.java @@ -50,14 +50,14 @@ public final class OperationDelay { * Delays the observable sequence by the given time interval. */ public static OnSubscribeFunc delay(final Observable source, long delay, TimeUnit unit) { - return delay(source, delay, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + return delay(source, delay, unit, Schedulers.threadPoolForComputation()); } /** * Delays the observable sequence by a time interval so that it starts at the given due time. */ public static OnSubscribeFunc delay(final Observable source, Date dueTime) { - return delay(source, dueTime, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + return delay(source, dueTime, Schedulers.threadPoolForComputation()); } /** From e9027293dadf846b64f62e91da7c5c5850ed76f5 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 15 Sep 2013 10:29:21 +0200 Subject: [PATCH 7/7] added a delayed scheduler --- .../java/rx/concurrency/DelayedScheduler.java | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java b/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java new file mode 100644 index 0000000000..86aff00da0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DelayedScheduler.java @@ -0,0 +1,114 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.initMocks; + +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func2; + +/** + * Scheduler that delays the underlying scheduler by a fixed time delay. + */ +public class DelayedScheduler extends Scheduler { + private final Scheduler underlying; + private final long delay; + private final TimeUnit unit; + + public DelayedScheduler(Scheduler underlying, long delay, TimeUnit unit) { + this.underlying = underlying; + this.delay = delay; + this.unit = unit; + } + + @Override + public Subscription schedule(T state, Func2 action) { + return underlying.schedule(state, action, delay, unit); + } + + @Override + public Subscription schedule(T state, Func2 action, long delay, TimeUnit unit) { + long newDelay = unit.toNanos(delay) + this.unit.toNanos(this.delay); + return underlying.schedule(state, action, newDelay, TimeUnit.NANOSECONDS); + } + + public static class UnitTest { + @Mock + Action0 action; + + private TestScheduler scheduler = new TestScheduler(); + + @Before + public void before() { + initMocks(this); + } + + @Test + public void testNotDelayingAnAction() { + Scheduler delayed = new DelayedScheduler(scheduler, 0, TimeUnit.SECONDS); + delayed.schedule(action); + delayed.schedule(action, 1L, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(action); + + scheduler.triggerActions(); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(999L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(1L, TimeUnit.SECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(5L, TimeUnit.SECONDS); + inOrder.verify(action, never()).call(); + } + + @Test + public void testdelayingAnAction() { + Scheduler delayed = new DelayedScheduler(scheduler, 500, TimeUnit.MILLISECONDS); + delayed.schedule(action); + delayed.schedule(action, 1L, TimeUnit.SECONDS); + + InOrder inOrder = inOrder(action); + + scheduler.advanceTimeTo(499L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(500L, TimeUnit.MILLISECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS); + inOrder.verify(action, never()).call(); + + scheduler.advanceTimeTo(1500L, TimeUnit.MILLISECONDS); + inOrder.verify(action, times(1)).call(); + + scheduler.advanceTimeTo(5L, TimeUnit.SECONDS); + inOrder.verify(action, never()).call(); + } + } +}