Skip to content

Commit c68077b

Browse files
dalewkingakarnokd
authored andcommitted
2.x Remove Function from transformer interfaces to allow a single obj… (#4672)
* 2.x Remove Function from transformer interfaces to allow a single object to implement multiple transformers * Fix missing Exception declarations from transformers
1 parent 39a4e42 commit c68077b

10 files changed

+40
-25
lines changed

src/main/java/io/reactivex/Completable.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,12 @@ public final Throwable blockingGet(long timeout, TimeUnit unit) {
942942
*/
943943
@SchedulerSupport(SchedulerSupport.NONE)
944944
public final Completable compose(CompletableTransformer transformer) {
945-
return wrap(to(transformer));
945+
try {
946+
return wrap(transformer.apply(this));
947+
} catch (Throwable ex) {
948+
Exceptions.throwIfFatal(ex);
949+
throw ExceptionHelper.wrapOrThrow(ex);
950+
}
946951
}
947952

948953
/**

src/main/java/io/reactivex/CompletableTransformer.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,10 @@
1313

1414
package io.reactivex;
1515

16-
import io.reactivex.functions.Function;
17-
1816
/**
1917
* Convenience interface and callback used by the compose operator to turn a Completable into another
2018
* Completable fluently.
2119
*/
22-
public interface CompletableTransformer extends Function<Completable, CompletableSource> {
23-
20+
public interface CompletableTransformer {
21+
CompletableSource apply(Completable completable) throws Exception;
2422
}

src/main/java/io/reactivex/Flowable.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -6278,7 +6278,12 @@ public final <U> Single<U> collectInto(final U initialItem, BiConsumer<? super U
62786278
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
62796279
@SchedulerSupport(SchedulerSupport.NONE)
62806280
public final <R> Flowable<R> compose(FlowableTransformer<T, R> composer) {
6281-
return fromPublisher(to(composer));
6281+
try {
6282+
return fromPublisher(composer.apply(this));
6283+
} catch (Throwable ex) {
6284+
Exceptions.throwIfFatal(ex);
6285+
throw ExceptionHelper.wrapOrThrow(ex);
6286+
}
62826287
}
62836288

62846289
/**

src/main/java/io/reactivex/FlowableTransformer.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@
1515

1616
import org.reactivestreams.Publisher;
1717

18-
import io.reactivex.functions.Function;
19-
2018
/**
2119
* Interface to compose Flowables.
2220
*
2321
* @param <Upstream> the upstream value type
2422
* @param <Downstream> the downstream value type
2523
*/
26-
public interface FlowableTransformer<Upstream, Downstream> extends Function<Flowable<Upstream>, Publisher<? extends Downstream>> {
27-
24+
public interface FlowableTransformer<Upstream, Downstream> {
25+
Publisher<? extends Downstream> apply(Flowable<Upstream> flowable) throws Exception;
2826
}

src/main/java/io/reactivex/Maybe.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1974,7 +1974,12 @@ public final <U> Maybe<U> cast(final Class<? extends U> clazz) {
19741974
*/
19751975
@SchedulerSupport(SchedulerSupport.NONE)
19761976
public final <R> Maybe<R> compose(MaybeTransformer<T, R> transformer) {
1977-
return wrap(to(transformer));
1977+
try {
1978+
return wrap(transformer.apply(this));
1979+
} catch (Throwable ex) {
1980+
Exceptions.throwIfFatal(ex);
1981+
throw ExceptionHelper.wrapOrThrow(ex);
1982+
}
19781983
}
19791984

19801985
/**

src/main/java/io/reactivex/MaybeTransformer.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313

1414
package io.reactivex;
1515

16-
import io.reactivex.functions.Function;
17-
1816
/**
1917
* Interface to compose Maybes.
2018
*
2119
* @param <Upstream> the upstream value type
2220
* @param <Downstream> the downstream value type
2321
*/
24-
public interface MaybeTransformer<Upstream, Downstream> extends Function<Maybe<Upstream>, MaybeSource<Downstream>> {
25-
22+
public interface MaybeTransformer<Upstream, Downstream> {
23+
MaybeSource<Downstream> apply(Maybe<Upstream> maybe) throws Exception;
2624
}

src/main/java/io/reactivex/Observable.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -5489,7 +5489,12 @@ public final <U> Single<U> collectInto(final U initialValue, BiConsumer<? super
54895489
*/
54905490
@SchedulerSupport(SchedulerSupport.NONE)
54915491
public final <R> Observable<R> compose(ObservableTransformer<T, R> composer) {
5492-
return wrap(to(composer));
5492+
try {
5493+
return wrap(composer.apply(this));
5494+
} catch (Throwable ex) {
5495+
Exceptions.throwIfFatal(ex);
5496+
throw ExceptionHelper.wrapOrThrow(ex);
5497+
}
54935498
}
54945499

54955500

src/main/java/io/reactivex/ObservableTransformer.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313

1414
package io.reactivex;
1515

16-
import io.reactivex.functions.Function;
17-
1816
/**
1917
* Interface to compose Observables.
2018
*
2119
* @param <Upstream> the upstream value type
2220
* @param <Downstream> the downstream value type
2321
*/
24-
public interface ObservableTransformer<Upstream, Downstream> extends Function<Observable<Upstream>, ObservableSource<Downstream>> {
25-
22+
public interface ObservableTransformer<Upstream, Downstream> {
23+
ObservableSource<Downstream> apply(Observable<Upstream> upstream) throws Exception;
2624
}

src/main/java/io/reactivex/Single.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1473,7 +1473,12 @@ public final Single<T> hide() {
14731473
*/
14741474
@SchedulerSupport(SchedulerSupport.NONE)
14751475
public final <R> Single<R> compose(SingleTransformer<T, R> transformer) {
1476-
return wrap(to(transformer));
1476+
try {
1477+
return wrap(transformer.apply(this));
1478+
} catch (Throwable ex) {
1479+
Exceptions.throwIfFatal(ex);
1480+
throw ExceptionHelper.wrapOrThrow(ex);
1481+
}
14771482
}
14781483

14791484
/**

src/main/java/io/reactivex/SingleTransformer.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,12 @@
1313

1414
package io.reactivex;
1515

16-
import io.reactivex.functions.Function;
17-
1816
/**
1917
* Interface to compose Singles.
2018
*
2119
* @param <Upstream> the upstream value type
2220
* @param <Downstream> the downstream value type
2321
*/
24-
public interface SingleTransformer<Upstream, Downstream> extends Function<Single<Upstream>, SingleSource<Downstream>> {
25-
22+
public interface SingleTransformer<Upstream, Downstream> {
23+
SingleSource<Downstream> apply(Single<Upstream> upstream) throws Exception;
2624
}

0 commit comments

Comments
 (0)