diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 8fe13c9e88..7427c24bdb 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -23,6 +23,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.operators.OperationLatest; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; import rx.operators.OperationToFuture; @@ -266,6 +267,21 @@ public Iterable next() { return OperationNext.next(o); } + /** + * Returns the latest item emitted by the underlying Observable, waiting if necessary + * for one to become available. + *

+ * If the underlying observable produces items faster than the Iterator.next() takes them + * onNext events might be skipped, but onError or onCompleted events are not. + *

+ * Note also that an onNext() directly followed by onCompleted() might hide the onNext() event. + * + * @return the Iterable sequence + */ + public Iterable latest() { + return OperationLatest.latest(o); + } + /** * If the {@link Observable} completes after emitting a single item, return that item, * otherwise throw an exception. diff --git a/rxjava-core/src/main/java/rx/operators/OperationLatest.java b/rxjava-core/src/main/java/rx/operators/OperationLatest.java new file mode 100644 index 0000000000..556e007a4a --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationLatest.java @@ -0,0 +1,114 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.util.Exceptions; + +/** + * Wait for and iterate over the latest values of the source observable. + * If the source works faster than the iterator, values may be skipped, but + * not the onError or onCompleted events. + */ +public final class OperationLatest { + /** Utility class. */ + private OperationLatest() { throw new IllegalStateException("No instances!"); } + + public static Iterable latest(final Observable source) { + return new Iterable() { + @Override + public Iterator iterator() { + LatestObserverIterator lio = new LatestObserverIterator(); + source.materialize().subscribe(lio); + return lio; + } + }; + } + + /** Observer of source, iterator for output. */ + static final class LatestObserverIterator implements Observer>, Iterator { + final Semaphore notify = new Semaphore(0); + // observer's notification + final AtomicReference> reference = new AtomicReference>(); + @Override + public void onNext(Notification args) { + boolean wasntAvailable = reference.getAndSet(args) == null; + if (wasntAvailable) { + notify.release(); + } + } + + @Override + public void onError(Throwable e) { + // not expected + } + + @Override + public void onCompleted() { + // not expected + } + + // iterator's notification + Notification iNotif; + @Override + public boolean hasNext() { + if (iNotif != null && iNotif.isOnError()) { + throw Exceptions.propagate(iNotif.getThrowable()); + } + if (iNotif == null || !iNotif.isOnCompleted()) { + if (iNotif == null) { + try { + notify.acquire(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + iNotif = new Notification(ex); + throw Exceptions.propagate(ex); + } + + iNotif = reference.getAndSet(null); + if (iNotif.isOnError()) { + throw Exceptions.propagate(iNotif.getThrowable()); + } + } + } + return !iNotif.isOnCompleted(); + } + + @Override + public T next() { + if (hasNext()) { + if (iNotif.isOnNext()) { + T v = iNotif.getValue(); + iNotif = null; + return v; + } + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator."); + } + + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java index 15406aeece..4e90d3e543 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMostRecent.java @@ -31,16 +31,16 @@ */ public final class OperationMostRecent { - public static Iterable mostRecent(final Observable source, T initialValue) { - - MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); - final MostRecentIterator nextIterator = new MostRecentIterator(mostRecentObserver); - - source.subscribe(mostRecentObserver); + public static Iterable mostRecent(final Observable source, final T initialValue) { return new Iterable() { @Override public Iterator iterator() { + MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); + final MostRecentIterator nextIterator = new MostRecentIterator(mostRecentObserver); + + source.subscribe(mostRecentObserver); + return nextIterator; } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationNext.java b/rxjava-core/src/main/java/rx/operators/OperationNext.java index 4f5e9dfd31..82e6587b20 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationNext.java +++ b/rxjava-core/src/main/java/rx/operators/OperationNext.java @@ -34,19 +34,18 @@ public final class OperationNext { public static Iterable next(final Observable items) { - - NextObserver nextObserver = new NextObserver(); - final NextIterator nextIterator = new NextIterator(nextObserver); - - items.materialize().subscribe(nextObserver); - return new Iterable() { @Override public Iterator iterator() { + NextObserver nextObserver = new NextObserver(); + final NextIterator nextIterator = new NextIterator(nextObserver); + + items.materialize().subscribe(nextObserver); + return nextIterator; } }; - + } private static class NextIterator implements Iterator { diff --git a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java index 2fcd51872e..9760805fdc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java @@ -16,6 +16,7 @@ package rx.operators; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -63,27 +64,26 @@ public void onNext(Notification args) { return new Iterator() { private Notification buf; - + @Override public boolean hasNext() { if (buf == null) { buf = take(); } + if (buf.isOnError()) { + throw Exceptions.propagate(buf.getThrowable()); + } return !buf.isOnCompleted(); } @Override public T next() { - if (buf == null) { - buf = take(); + if (hasNext()) { + T result = buf.getValue(); + buf = null; + return result; } - if (buf.isOnError()) { - throw Exceptions.propagate(buf.getThrowable()); - } - - T result = buf.getValue(); - buf = null; - return result; + throw new NoSuchElementException(); } private Notification take() { diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index f3e05189c1..008aedc6dd 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.*; import java.util.Iterator; +import java.util.NoSuchElementException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -201,6 +203,58 @@ public void testToIterable() { assertEquals(false, it.hasNext()); } + @Test(expected = NoSuchElementException.class) + public void testToIterableNextOnly() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterator it = obs.toIterable().iterator(); + + Assert.assertEquals((Integer)1, it.next()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertEquals((Integer)3, it.next()); + + it.next(); + } + + @Test(expected = NoSuchElementException.class) + public void testToIterableNextOnlyTwice() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterator it = obs.toIterable().iterator(); + + Assert.assertEquals((Integer)1, it.next()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertEquals((Integer)3, it.next()); + + boolean exc = false; + try { + it.next(); + } catch (NoSuchElementException ex) { + exc = true; + } + Assert.assertEquals(true, exc); + + it.next(); + } + + @Test + public void testToIterableManyTimes() { + BlockingObservable obs = BlockingObservable.from(Observable.from(1, 2, 3)); + + Iterable iter = obs.toIterable(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)1, it.next()); + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)2, it.next()); + Assert.assertTrue(it.hasNext()); + Assert.assertEquals((Integer)3, it.next()); + Assert.assertFalse(it.hasNext()); + } + } @Test(expected = TestException.class) public void testToIterableWithException() { diff --git a/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java new file mode 100644 index 0000000000..3dde8610f9 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationLatestTest.java @@ -0,0 +1,162 @@ + /** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; +import junit.framework.Assert; +import org.junit.Test; +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.schedulers.TestScheduler; +import rx.subjects.PublishSubject; + +public class OperationLatestTest { + @Test(timeout = 1000) + public void testSimple() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + Assert.assertEquals(false, it.hasNext()); + } + @Test(timeout = 1000) + public void testSameSourceMultipleIterators() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + Assert.assertEquals(false, it.hasNext()); + } + } + @Test(timeout = 1000, expected = NoSuchElementException.class) + public void testEmpty() { + BlockingObservable source = Observable.empty().toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + Assert.assertEquals(false, it.hasNext()); + + it.next(); + } + @Test(timeout = 1000, expected = NoSuchElementException.class) + public void testSimpleJustNext() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + // only 9 because take(10) will immediately call onCompleted when receiving the 10th item + // which onCompleted will overwrite the previous value + for (int i = 0; i < 10; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(Long.valueOf(i), it.next()); + } + } + @Test(/*timeout = 1000, */expected = RuntimeException.class) + public void testHasNextThrows() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.error(new RuntimeException("Forced failure!"), scheduler).toBlockingObservable(); + + Iterable iter = source.latest(); + + Iterator it = iter.iterator(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + it.hasNext(); + } + @Test(timeout = 1000, expected = RuntimeException.class) + public void testNextThrows() { + TestScheduler scheduler = new TestScheduler(); + + BlockingObservable source = Observable.error(new RuntimeException("Forced failure!"), scheduler).toBlockingObservable(); + + Iterable iter = source.latest(); + Iterator it = iter.iterator(); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + it.next(); + } + @Test(timeout = 1000) + public void testFasterSource() { + PublishSubject source = PublishSubject.create(); + BlockingObservable blocker = source.toBlockingObservable(); + + Iterable iter = blocker.latest(); + Iterator it = iter.iterator(); + + source.onNext(1); + + Assert.assertEquals(Integer.valueOf(1), it.next()); + + source.onNext(2); + source.onNext(3); + + Assert.assertEquals(Integer.valueOf(3), it.next()); + + source.onNext(4); + source.onNext(5); + source.onNext(6); + + Assert.assertEquals(Integer.valueOf(6), it.next()); + + source.onNext(7); + source.onCompleted(); + + Assert.assertEquals(false, it.hasNext()); + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java b/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java index d20f79e819..35382273fc 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationMostRecentTest.java @@ -19,8 +19,13 @@ import static rx.operators.OperationMostRecent.*; import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import org.junit.Assert; import org.junit.Test; +import rx.Observable; +import rx.observables.BlockingObservable; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.subjects.Subject; @@ -71,4 +76,29 @@ public void testMostRecentWithException() { private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } + + @Test(timeout = 1000) + public void testSingleSourceManyIterators() { + TestScheduler scheduler = new TestScheduler(); + BlockingObservable source = Observable.interval(1, TimeUnit.SECONDS, scheduler).take(10).toBlockingObservable(); + + Iterable iter = source.mostRecent(-1L); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + Assert.assertEquals(Long.valueOf(-1), it.next()); + + for (int i = 0; i < 9; i++) { + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(true, it.hasNext()); + Assert.assertEquals(Long.valueOf(i), it.next()); + } + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + Assert.assertEquals(false, it.hasNext()); + } + + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java index 8a5de26d1d..acbf5ed52e 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationNextTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationNextTest.java @@ -24,13 +24,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Assert; import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.observables.BlockingObservable; import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; @@ -293,4 +296,25 @@ public void run() { System.out.println("a: " + a + " b: " + b + " c: " + c); } + @Test(timeout = 8000) + public void testSingleSourceManyIterators() throws InterruptedException { + BlockingObservable source = Observable.interval(200, TimeUnit.MILLISECONDS).take(10).toBlockingObservable(); + + Iterable iter = source.next(); + + for (int j = 0; j < 3; j++) { + Iterator it = iter.iterator(); + + for (int i = 0; i < 9; i++) { + // hasNext has to set the waiting to true, otherwise, all onNext will be skipped + Assert.assertEquals(true, it.hasNext()); + Assert.assertEquals(Long.valueOf(i), it.next()); + } + + Thread.sleep(400); + + Assert.assertEquals(false, it.hasNext()); + } + + } }