Skip to content

Commit f757324

Browse files
Merge pull request #626 from akarnokd/OperationLatestAndFixes
Added: BO.Latest, fixed: BO.next, BO.mostRecent, BO.toIterable
2 parents 6647bd5 + c11900c commit f757324

9 files changed

+422
-23
lines changed

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

+16
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Observable;
2424
import rx.Observer;
2525
import rx.Subscription;
26+
import rx.operators.OperationLatest;
2627
import rx.operators.OperationMostRecent;
2728
import rx.operators.OperationNext;
2829
import rx.operators.OperationToFuture;
@@ -297,6 +298,21 @@ public Iterable<T> next() {
297298
return OperationNext.next(o);
298299
}
299300

301+
/**
302+
* Returns the latest item emitted by the underlying Observable, waiting if necessary
303+
* for one to become available.
304+
* <p>
305+
* If the underlying observable produces items faster than the Iterator.next() takes them
306+
* onNext events might be skipped, but onError or onCompleted events are not.
307+
* <p>
308+
* Note also that an onNext() directly followed by onCompleted() might hide the onNext() event.
309+
*
310+
* @return the Iterable sequence
311+
*/
312+
public Iterable<T> latest() {
313+
return OperationLatest.latest(o);
314+
}
315+
300316
/**
301317
* If the {@link Observable} completes after emitting a single item, return that item,
302318
* otherwise throw an IllegalArgumentException.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
20+
import java.util.concurrent.Semaphore;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
import rx.Notification;
23+
import rx.Observable;
24+
import rx.Observer;
25+
import rx.util.Exceptions;
26+
27+
/**
28+
* Wait for and iterate over the latest values of the source observable.
29+
* If the source works faster than the iterator, values may be skipped, but
30+
* not the onError or onCompleted events.
31+
*/
32+
public final class OperationLatest {
33+
/** Utility class. */
34+
private OperationLatest() { throw new IllegalStateException("No instances!"); }
35+
36+
public static <T> Iterable<T> latest(final Observable<? extends T> source) {
37+
return new Iterable<T>() {
38+
@Override
39+
public Iterator<T> iterator() {
40+
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
41+
source.materialize().subscribe(lio);
42+
return lio;
43+
}
44+
};
45+
}
46+
47+
/** Observer of source, iterator for output. */
48+
static final class LatestObserverIterator<T> implements Observer<Notification<? extends T>>, Iterator<T> {
49+
final Semaphore notify = new Semaphore(0);
50+
// observer's notification
51+
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
52+
@Override
53+
public void onNext(Notification<? extends T> args) {
54+
boolean wasntAvailable = reference.getAndSet(args) == null;
55+
if (wasntAvailable) {
56+
notify.release();
57+
}
58+
}
59+
60+
@Override
61+
public void onError(Throwable e) {
62+
// not expected
63+
}
64+
65+
@Override
66+
public void onCompleted() {
67+
// not expected
68+
}
69+
70+
// iterator's notification
71+
Notification<? extends T> iNotif;
72+
@Override
73+
public boolean hasNext() {
74+
if (iNotif != null && iNotif.isOnError()) {
75+
throw Exceptions.propagate(iNotif.getThrowable());
76+
}
77+
if (iNotif == null || !iNotif.isOnCompleted()) {
78+
if (iNotif == null) {
79+
try {
80+
notify.acquire();
81+
} catch (InterruptedException ex) {
82+
Thread.currentThread().interrupt();
83+
iNotif = new Notification<T>(ex);
84+
throw Exceptions.propagate(ex);
85+
}
86+
87+
iNotif = reference.getAndSet(null);
88+
if (iNotif.isOnError()) {
89+
throw Exceptions.propagate(iNotif.getThrowable());
90+
}
91+
}
92+
}
93+
return !iNotif.isOnCompleted();
94+
}
95+
96+
@Override
97+
public T next() {
98+
if (hasNext()) {
99+
if (iNotif.isOnNext()) {
100+
T v = iNotif.getValue();
101+
iNotif = null;
102+
return v;
103+
}
104+
}
105+
throw new NoSuchElementException();
106+
}
107+
108+
@Override
109+
public void remove() {
110+
throw new UnsupportedOperationException("Read-only iterator.");
111+
}
112+
113+
}
114+
}

rxjava-core/src/main/java/rx/operators/OperationMostRecent.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@
3131
*/
3232
public final class OperationMostRecent {
3333

34-
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, T initialValue) {
35-
36-
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
37-
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
38-
39-
source.subscribe(mostRecentObserver);
34+
public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {
4035

4136
return new Iterable<T>() {
4237
@Override
4338
public Iterator<T> iterator() {
39+
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
40+
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);
41+
42+
source.subscribe(mostRecentObserver);
43+
4444
return nextIterator;
4545
}
4646
};

rxjava-core/src/main/java/rx/operators/OperationNext.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,18 @@
3434
public final class OperationNext {
3535

3636
public static <T> Iterable<T> next(final Observable<? extends T> items) {
37-
38-
NextObserver<T> nextObserver = new NextObserver<T>();
39-
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
40-
41-
items.materialize().subscribe(nextObserver);
42-
4337
return new Iterable<T>() {
4438
@Override
4539
public Iterator<T> iterator() {
40+
NextObserver<T> nextObserver = new NextObserver<T>();
41+
final NextIterator<T> nextIterator = new NextIterator<T>(nextObserver);
42+
43+
items.materialize().subscribe(nextObserver);
44+
4645
return nextIterator;
4746
}
4847
};
49-
48+
5049
}
5150

5251
private static class NextIterator<T> implements Iterator<T> {

rxjava-core/src/main/java/rx/operators/OperationToIterator.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import java.util.Iterator;
19+
import java.util.NoSuchElementException;
1920
import java.util.concurrent.BlockingQueue;
2021
import java.util.concurrent.LinkedBlockingQueue;
2122

@@ -63,27 +64,26 @@ public void onNext(Notification<? extends T> args) {
6364

6465
return new Iterator<T>() {
6566
private Notification<? extends T> buf;
66-
67+
6768
@Override
6869
public boolean hasNext() {
6970
if (buf == null) {
7071
buf = take();
7172
}
73+
if (buf.isOnError()) {
74+
throw Exceptions.propagate(buf.getThrowable());
75+
}
7276
return !buf.isOnCompleted();
7377
}
7478

7579
@Override
7680
public T next() {
77-
if (buf == null) {
78-
buf = take();
81+
if (hasNext()) {
82+
T result = buf.getValue();
83+
buf = null;
84+
return result;
7985
}
80-
if (buf.isOnError()) {
81-
throw Exceptions.propagate(buf.getThrowable());
82-
}
83-
84-
T result = buf.getValue();
85-
buf = null;
86-
return result;
86+
throw new NoSuchElementException();
8787
}
8888

8989
private Notification<? extends T> take() {

rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java

+54
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import static org.junit.Assert.*;
1919

2020
import java.util.Iterator;
21+
import java.util.NoSuchElementException;
22+
import org.junit.Assert;
2123

2224
import org.junit.Before;
2325
import org.junit.Test;
@@ -201,6 +203,58 @@ public void testToIterable() {
201203
assertEquals(false, it.hasNext());
202204

203205
}
206+
@Test(expected = NoSuchElementException.class)
207+
public void testToIterableNextOnly() {
208+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
209+
210+
Iterator<Integer> it = obs.toIterable().iterator();
211+
212+
Assert.assertEquals((Integer)1, it.next());
213+
Assert.assertEquals((Integer)2, it.next());
214+
Assert.assertEquals((Integer)3, it.next());
215+
216+
it.next();
217+
}
218+
219+
@Test(expected = NoSuchElementException.class)
220+
public void testToIterableNextOnlyTwice() {
221+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
222+
223+
Iterator<Integer> it = obs.toIterable().iterator();
224+
225+
Assert.assertEquals((Integer)1, it.next());
226+
Assert.assertEquals((Integer)2, it.next());
227+
Assert.assertEquals((Integer)3, it.next());
228+
229+
boolean exc = false;
230+
try {
231+
it.next();
232+
} catch (NoSuchElementException ex) {
233+
exc = true;
234+
}
235+
Assert.assertEquals(true, exc);
236+
237+
it.next();
238+
}
239+
240+
@Test
241+
public void testToIterableManyTimes() {
242+
BlockingObservable<Integer> obs = BlockingObservable.from(Observable.from(1, 2, 3));
243+
244+
Iterable<Integer> iter = obs.toIterable();
245+
246+
for (int j = 0; j < 3; j++) {
247+
Iterator<Integer> it = iter.iterator();
248+
249+
Assert.assertTrue(it.hasNext());
250+
Assert.assertEquals((Integer)1, it.next());
251+
Assert.assertTrue(it.hasNext());
252+
Assert.assertEquals((Integer)2, it.next());
253+
Assert.assertTrue(it.hasNext());
254+
Assert.assertEquals((Integer)3, it.next());
255+
Assert.assertFalse(it.hasNext());
256+
}
257+
}
204258

205259
@Test(expected = TestException.class)
206260
public void testToIterableWithException() {

0 commit comments

Comments
 (0)