18
18
19
19
import org .reactivestreams .*;
20
20
21
- import io .reactivex .NbpObservable .*;
21
+ import io .reactivex .Observable .*;
22
22
import io .reactivex .Single .*;
23
23
import io .reactivex .annotations .*;
24
24
import io .reactivex .disposables .*;
@@ -353,7 +353,7 @@ public static Completable concat(Iterable<? extends Completable> sources) {
353
353
* @throws NullPointerException if sources is null
354
354
*/
355
355
@ SchedulerSupport (SchedulerKind .NONE )
356
- public static Completable concat (Observable <? extends Completable > sources ) {
356
+ public static Completable concat (Flowable <? extends Completable > sources ) {
357
357
return concat (sources , 2 );
358
358
}
359
359
@@ -365,7 +365,7 @@ public static Completable concat(Observable<? extends Completable> sources) {
365
365
* @throws NullPointerException if sources is null
366
366
*/
367
367
@ SchedulerSupport (SchedulerKind .NONE )
368
- public static Completable concat (Observable <? extends Completable > sources , int prefetch ) {
368
+ public static Completable concat (Flowable <? extends Completable > sources , int prefetch ) {
369
369
Objects .requireNonNull (sources , "sources is null" );
370
370
if (prefetch < 1 ) {
371
371
throw new IllegalArgumentException ("prefetch > 0 required but it was " + prefetch );
@@ -517,7 +517,7 @@ public void accept(CompletableSubscriber s) {
517
517
* @throws NullPointerException if flowable is null
518
518
*/
519
519
@ SchedulerSupport (SchedulerKind .NONE )
520
- public static <T > Completable fromFlowable (final Observable <T > flowable ) {
520
+ public static <T > Completable fromFlowable (final Flowable <T > flowable ) {
521
521
Objects .requireNonNull (flowable , "flowable is null" );
522
522
return create (new CompletableOnSubscribe () {
523
523
@ Override
@@ -559,12 +559,12 @@ public void onSubscribe(Subscription s) {
559
559
* @throws NullPointerException if flowable is null
560
560
*/
561
561
@ SchedulerSupport (SchedulerKind .NONE )
562
- public static <T > Completable fromNbpObservable (final NbpObservable <T > observable ) {
562
+ public static <T > Completable fromNbpObservable (final Observable <T > observable ) {
563
563
Objects .requireNonNull (observable , "observable is null" );
564
564
return create (new CompletableOnSubscribe () {
565
565
@ Override
566
566
public void accept (final CompletableSubscriber s ) {
567
- observable .subscribe (new NbpSubscriber <T >() {
567
+ observable .subscribe (new Observer <T >() {
568
568
569
569
@ Override
570
570
public void onComplete () {
@@ -695,7 +695,7 @@ public static Completable merge(Iterable<? extends Completable> sources) {
695
695
* @return the new Completable instance
696
696
* @throws NullPointerException if sources is null
697
697
*/
698
- public static Completable merge (Observable <? extends Completable > sources ) {
698
+ public static Completable merge (Flowable <? extends Completable > sources ) {
699
699
return merge0 (sources , Integer .MAX_VALUE , false );
700
700
}
701
701
@@ -708,7 +708,7 @@ public static Completable merge(Observable<? extends Completable> sources) {
708
708
* @throws NullPointerException if sources is null
709
709
* @throws IllegalArgumentException if maxConcurrency is less than 1
710
710
*/
711
- public static Completable merge (Observable <? extends Completable > sources , int maxConcurrency ) {
711
+ public static Completable merge (Flowable <? extends Completable > sources , int maxConcurrency ) {
712
712
return merge0 (sources , maxConcurrency , false );
713
713
714
714
}
@@ -724,7 +724,7 @@ public static Completable merge(Observable<? extends Completable> sources, int m
724
724
* @throws NullPointerException if sources is null
725
725
* @throws IllegalArgumentException if maxConcurrency is less than 1
726
726
*/
727
- protected static Completable merge0 (Observable <? extends Completable > sources , int maxConcurrency , boolean delayErrors ) {
727
+ protected static Completable merge0 (Flowable <? extends Completable > sources , int maxConcurrency , boolean delayErrors ) {
728
728
Objects .requireNonNull (sources , "sources is null" );
729
729
if (maxConcurrency < 1 ) {
730
730
throw new IllegalArgumentException ("maxConcurrency > 0 required but it was " + maxConcurrency );
@@ -767,7 +767,7 @@ public static Completable mergeDelayError(Iterable<? extends Completable> source
767
767
* @return the new Completable instance
768
768
* @throws NullPointerException if sources is null
769
769
*/
770
- public static Completable mergeDelayError (Observable <? extends Completable > sources ) {
770
+ public static Completable mergeDelayError (Flowable <? extends Completable > sources ) {
771
771
return merge0 (sources , Integer .MAX_VALUE , true );
772
772
}
773
773
@@ -781,7 +781,7 @@ public static Completable mergeDelayError(Observable<? extends Completable> sour
781
781
* @return the new Completable instance
782
782
* @throws NullPointerException if sources is null
783
783
*/
784
- public static Completable mergeDelayError (Observable <? extends Completable > sources , int maxConcurrency ) {
784
+ public static Completable mergeDelayError (Flowable <? extends Completable > sources , int maxConcurrency ) {
785
785
return merge0 (sources , maxConcurrency , true );
786
786
}
787
787
@@ -1390,7 +1390,7 @@ public final Completable endWith(Completable other) {
1390
1390
* @throws NullPointerException if next is null
1391
1391
*/
1392
1392
@ SchedulerSupport (SchedulerKind .CUSTOM )
1393
- public final <T > NbpObservable <T > endWith (NbpObservable <T > next ) {
1393
+ public final <T > Observable <T > endWith (Observable <T > next ) {
1394
1394
return next .startWith (this .<T >toNbpObservable ());
1395
1395
}
1396
1396
@@ -1403,7 +1403,7 @@ public final <T> NbpObservable<T> endWith(NbpObservable<T> next) {
1403
1403
* @throws NullPointerException if next is null
1404
1404
*/
1405
1405
@ SchedulerSupport (SchedulerKind .CUSTOM )
1406
- public final <T > Observable <T > endWith (Observable <T > next ) {
1406
+ public final <T > Flowable <T > endWith (Flowable <T > next ) {
1407
1407
return next .startWith (this .<T >toFlowable ());
1408
1408
}
1409
1409
@@ -1787,7 +1787,7 @@ public final Completable repeatUntil(BooleanSupplier stop) {
1787
1787
* FIXME the Observable<Void> type doesn't make sense here because nulls are not allowed
1788
1788
* FIXME add unit test once the type has been fixed
1789
1789
*/
1790
- public final Completable repeatWhen (Function <? super Observable <Object >, ? extends Publisher <Object >> handler ) {
1790
+ public final Completable repeatWhen (Function <? super Flowable <Object >, ? extends Publisher <Object >> handler ) {
1791
1791
return fromFlowable (toFlowable ().repeatWhen (handler ));
1792
1792
}
1793
1793
@@ -1847,7 +1847,7 @@ public final Completable retry(Predicate<? super Throwable> predicate) {
1847
1847
* @throws NullPointerException if handler is null
1848
1848
*/
1849
1849
@ SchedulerSupport (SchedulerKind .NONE )
1850
- public final Completable retryWhen (Function <? super Observable <? extends Throwable >, ? extends Publisher <Object >> handler ) {
1850
+ public final Completable retryWhen (Function <? super Flowable <? extends Throwable >, ? extends Publisher <Object >> handler ) {
1851
1851
return fromFlowable (toFlowable ().retryWhen (handler ));
1852
1852
}
1853
1853
@@ -1873,7 +1873,7 @@ public final Completable startWith(Completable other) {
1873
1873
* @throws NullPointerException if other is null
1874
1874
*/
1875
1875
@ SchedulerSupport (SchedulerKind .NONE )
1876
- public final <T > NbpObservable <T > startWith (NbpObservable <T > other ) {
1876
+ public final <T > Observable <T > startWith (Observable <T > other ) {
1877
1877
Objects .requireNonNull (other , "other is null" );
1878
1878
return other .endWith (this .<T >toNbpObservable ());
1879
1879
}
@@ -1886,7 +1886,7 @@ public final <T> NbpObservable<T> startWith(NbpObservable<T> other) {
1886
1886
* @throws NullPointerException if other is null
1887
1887
*/
1888
1888
@ SchedulerSupport (SchedulerKind .NONE )
1889
- public final <T > Observable <T > startWith (Observable <T > other ) {
1889
+ public final <T > Flowable <T > startWith (Flowable <T > other ) {
1890
1890
Objects .requireNonNull (other , "other is null" );
1891
1891
return other .endWith (this .<T >toFlowable ());
1892
1892
}
@@ -1988,7 +1988,7 @@ public void onSubscribe(Disposable d) {
1988
1988
* @throws NullPointerException if s is null
1989
1989
*/
1990
1990
@ SchedulerSupport (SchedulerKind .NONE )
1991
- public final void subscribe (final NbpSubscriber <?> s ) {
1991
+ public final void subscribe (final Observer <?> s ) {
1992
1992
Objects .requireNonNull (s , "s is null" );
1993
1993
try {
1994
1994
// TODO plugin wrapping the subscriber
@@ -2221,8 +2221,8 @@ public final <U> U to(Function<? super Completable, U> converter) {
2221
2221
* @return the new Observable created
2222
2222
*/
2223
2223
@ SchedulerSupport (SchedulerKind .NONE )
2224
- public final <T > Observable <T > toFlowable () {
2225
- return Observable .create (new Publisher <T >() {
2224
+ public final <T > Flowable <T > toFlowable () {
2225
+ return Flowable .create (new Publisher <T >() {
2226
2226
@ Override
2227
2227
public void subscribe (Subscriber <? super T > s ) {
2228
2228
Completable .this .subscribe (s );
@@ -2237,10 +2237,10 @@ public void subscribe(Subscriber<? super T> s) {
2237
2237
* @return the new NbpObservable created
2238
2238
*/
2239
2239
@ SchedulerSupport (SchedulerKind .NONE )
2240
- public final <T > NbpObservable <T > toNbpObservable () {
2241
- return NbpObservable .create (new NbpOnSubscribe <T >() {
2240
+ public final <T > Observable <T > toNbpObservable () {
2241
+ return Observable .create (new NbpOnSubscribe <T >() {
2242
2242
@ Override
2243
- public void accept (NbpSubscriber <? super T > s ) {
2243
+ public void accept (Observer <? super T > s ) {
2244
2244
subscribe (s );
2245
2245
}
2246
2246
});
0 commit comments