Skip to content

Commit cfb8f5f

Browse files
Merge pull request #520 from zsxwing/first
Fixed the blocking/non-blocking first
2 parents c3bbd89 + a1191a2 commit cfb8f5f

File tree

12 files changed

+1101
-291
lines changed

12 files changed

+1101
-291
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ def class ObservableTests {
278278
assertEquals("one", s)
279279
}
280280

281-
@Test(expected = IllegalStateException.class)
281+
@Test(expected = IllegalArgumentException.class)
282282
public void testSingle2() {
283283
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
284284
}

language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public class BasicKotlinTests {
255255
assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
256256
}
257257

258-
[Test(expected = javaClass<IllegalStateException>())]
258+
[Test(expected = javaClass<IllegalArgumentException>())]
259259
public fun testSingle() {
260260
assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 })
261261
Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 139 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,12 @@
5252
import rx.operators.OperationElementAt;
5353
import rx.operators.OperationFilter;
5454
import rx.operators.OperationFinally;
55-
import rx.operators.OperationFirstOrDefault;
5655
import rx.operators.OperationGroupBy;
5756
import rx.operators.OperationGroupByUntil;
5857
import rx.operators.OperationGroupJoin;
5958
import rx.operators.OperationInterval;
6059
import rx.operators.OperationJoin;
6160
import rx.operators.OperationJoinPatterns;
62-
import rx.operators.OperationLast;
6361
import rx.operators.OperationMap;
6462
import rx.operators.OperationMaterialize;
6563
import rx.operators.OperationMerge;
@@ -78,6 +76,7 @@
7876
import rx.operators.OperationSample;
7977
import rx.operators.OperationScan;
8078
import rx.operators.OperationSequenceEqual;
79+
import rx.operators.OperationSingle;
8180
import rx.operators.OperationSkip;
8281
import rx.operators.OperationSkipLast;
8382
import rx.operators.OperationSkipUntil;
@@ -5091,37 +5090,107 @@ public Observable<T> skip(int num) {
50915090
return create(OperationSkip.skip(this, num));
50925091
}
50935092

5093+
/**
5094+
* If the Observable completes after emitting a single item, return an
5095+
* Observable containing that item. If it emits more than one item or no
5096+
* item, throw an IllegalArgumentException.
5097+
*
5098+
* @return an Observable containing the single item emitted by the source
5099+
* Observable that matches the predicate.
5100+
* @throws IllegalArgumentException
5101+
* if the source emits more than one item or no item
5102+
*/
5103+
public Observable<T> single() {
5104+
return create(OperationSingle.<T> single(this));
5105+
}
5106+
5107+
/**
5108+
* If the Observable completes after emitting a single item that matches a
5109+
* predicate, return an Observable containing that item. If it emits more
5110+
* than one such item or no item, throw an IllegalArgumentException.
5111+
*
5112+
* @param predicate
5113+
* a predicate function to evaluate items emitted by the source
5114+
* Observable
5115+
* @return an Observable containing the single item emitted by the source
5116+
* Observable that matches the predicate.
5117+
* @throws IllegalArgumentException
5118+
* if the source emits more than one item or no item matching
5119+
* the predicate
5120+
*/
5121+
public Observable<T> single(Func1<? super T, Boolean> predicate) {
5122+
return filter(predicate).single();
5123+
}
5124+
5125+
/**
5126+
* If the Observable completes after emitting a single item, return an
5127+
* Observable containing that item. If it's empty, return an Observable
5128+
* containing the defaultValue. If it emits more than one item, throw an
5129+
* IllegalArgumentException.
5130+
*
5131+
* @param defaultValue
5132+
* a default value to return if the Observable emits no item
5133+
* @return an Observable containing the single item emitted by the source
5134+
* Observable, or an Observable containing the defaultValue if no
5135+
* item.
5136+
* @throws IllegalArgumentException
5137+
* if the source emits more than one item
5138+
*/
5139+
public Observable<T> singleOrDefault(T defaultValue) {
5140+
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
5141+
}
5142+
5143+
/**
5144+
* If the Observable completes after emitting a single item that matches a
5145+
* predicate, return an Observable containing that item. If it emits no such
5146+
* item, return an Observable containing the defaultValue. If it emits more
5147+
* than one such item, throw an IllegalArgumentException.
5148+
*
5149+
* @param defaultValue
5150+
* a default value to return if the {@link Observable} emits no
5151+
* matching items
5152+
* @param predicate
5153+
* a predicate function to evaluate items emitted by the
5154+
* Observable
5155+
* @return an Observable containing the single item emitted by the source
5156+
* Observable that matches the predicate, or an Observable
5157+
* containing the defaultValue if no item matches the predicate
5158+
* @throws IllegalArgumentException
5159+
* if the source emits more than one item matching the predicate
5160+
*/
5161+
public Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5162+
return filter(predicate).singleOrDefault(defaultValue);
5163+
}
5164+
50945165
/**
50955166
* Returns an Observable that emits only the very first item emitted by the
5096-
* source Observable.
5167+
* source Observable, or an <code>IllegalArgumentException</code> if the source
5168+
* {@link Observable} is empty.
50975169
* <p>
50985170
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
50995171
*
5100-
* @return an Observable that emits only the very first item emitted by the
5101-
* source Observable, or nothing if the source Observable completes
5102-
* without emitting a single item
5172+
* @return an Observable that emits only the very first item from the
5173+
* source, or an <code>IllegalArgumentException</code> if the source {@link Observable} is empty.
51035174
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
5104-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
51055175
*/
51065176
public Observable<T> first() {
5107-
return take(1);
5177+
return take(1).single();
51085178
}
51095179

51105180
/**
51115181
* Returns an Observable that emits only the very first item emitted by the
5112-
* source Observable that satisfies a given condition.
5182+
* source Observable that satisfies a given condition, or an <code>IllegalArgumentException</code>
5183+
* if no such items are emitted.
51135184
* <p>
51145185
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstN.png">
51155186
*
51165187
* @param predicate the condition any source emitted item has to satisfy
51175188
* @return an Observable that emits only the very first item satisfying the
5118-
* given condition from the source, or nothing if the source
5119-
* Observable completes without emitting a single matching item
5189+
* given condition from the source, or an <code>IllegalArgumentException</code> if no such items are emitted.
51205190
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
5121-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
51225191
*/
51235192
public Observable<T> first(Func1<? super T, Boolean> predicate) {
5124-
return skipWhile(not(predicate)).take(1);
5193+
return takeFirst(predicate).single();
51255194
}
51265195

51275196
/**
@@ -5139,7 +5208,7 @@ public Observable<T> first(Func1<? super T, Boolean> predicate) {
51395208
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320.aspx">MSDN: Observable.FirstOrDefault</a>
51405209
*/
51415210
public Observable<T> firstOrDefault(T defaultValue) {
5142-
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
5211+
return take(1).singleOrDefault(defaultValue);
51435212
}
51445213

51455214
/**
@@ -5157,8 +5226,8 @@ public Observable<T> firstOrDefault(T defaultValue) {
51575226
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#firstordefault">RxJava Wiki: firstOrDefault()</a>
51585227
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759.aspx">MSDN: Observable.FirstOrDefault</a>
51595228
*/
5160-
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
5161-
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
5229+
public Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5230+
return takeFirst(predicate).singleOrDefault(defaultValue);
51625231
}
51635232

51645233
/**
@@ -5245,14 +5314,15 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
52455314
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
52465315
*
52475316
* @return an Observable that emits only the very first item from the
5248-
* source, or none if the source Observable completes without
5317+
* source, or an empty Observable if the source Observable completes without
52495318
* emitting a single item
5319+
* @deprecated Use <code>take(1)</code> directly.
52505320
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
52515321
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
5252-
* @see #first()
52535322
*/
5323+
@Deprecated
52545324
public Observable<T> takeFirst() {
5255-
return first();
5325+
return take(1);
52565326
}
52575327

52585328
/**
@@ -5263,14 +5333,13 @@ public Observable<T> takeFirst() {
52635333
*
52645334
* @param predicate the condition any source emitted item has to satisfy
52655335
* @return an Observable that emits only the very first item satisfying the
5266-
* given condition from the source, or none if the source Observable
5336+
* given condition from the source, or an empty Observable if the source Observable
52675337
* completes without emitting a single matching item
52685338
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
52695339
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
5270-
* @see #first(Func1)
52715340
*/
52725341
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
5273-
return first(predicate);
5342+
return filter(predicate).take(1);
52745343
}
52755344

52765345
/**
@@ -5725,7 +5794,7 @@ public <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? sup
57255794
public Observable<Boolean> isEmpty() {
57265795
return create(OperationAny.isEmpty(this));
57275796
}
5728-
5797+
57295798
/**
57305799
* Returns an Observable that emits the last item emitted by the source or
57315800
* notifies observers of an <code>IllegalArgumentException</code> if the
@@ -5738,7 +5807,53 @@ public Observable<Boolean> isEmpty() {
57385807
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#last">RxJava Wiki: last()</a>
57395808
*/
57405809
public Observable<T> last() {
5741-
return create(OperationLast.last(this));
5810+
return takeLast(1).single();
5811+
}
5812+
5813+
/**
5814+
* Returns an Observable that emits only the last item emitted by the source
5815+
* Observable that satisfies a given condition, or an
5816+
* IllegalArgumentException if no such items are emitted.
5817+
*
5818+
* @param predicate
5819+
* the condition any source emitted item has to satisfy
5820+
* @return an Observable that emits only the last item satisfying the given
5821+
* condition from the source, or an IllegalArgumentException if no
5822+
* such items are emitted.
5823+
* @throws IllegalArgumentException
5824+
* if no such itmes are emmited
5825+
*/
5826+
public Observable<T> last(Func1<? super T, Boolean> predicate) {
5827+
return filter(predicate).takeLast(1).single();
5828+
}
5829+
5830+
/**
5831+
* Returns an Observable that emits only the last item emitted by the source
5832+
* Observable, or a default item if the source is empty.
5833+
*
5834+
* @param defaultValue
5835+
* the default item to emit if the source Observable is empty
5836+
* @return an Observable that emits only the last item from the source, or a
5837+
* default item if the source is empty
5838+
*/
5839+
public Observable<T> lastOrDefault(T defaultValue) {
5840+
return takeLast(1).singleOrDefault(defaultValue);
5841+
}
5842+
5843+
/**
5844+
* Returns an Observable that emits only the last item emitted by the source
5845+
* Observable that satisfies a given condition, or a default item otherwise.
5846+
*
5847+
* @param defaultValue
5848+
* the default item to emit if the source Observable doesn't emit
5849+
* anything that satisfies the given condition
5850+
* @param predicate
5851+
* the condition any source emitted item has to satisfy
5852+
* @return an Observable that emits only the last item from the source that
5853+
* satisfies the given condition, or a default item otherwise
5854+
*/
5855+
public Observable<T> lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
5856+
return filter(predicate).takeLast(1).singleOrDefault(defaultValue);
57425857
}
57435858

57445859
/**

0 commit comments

Comments
 (0)