Skip to content

Commit 62345af

Browse files
author
Aaron Tull
committed
Implemented Observable.x(ConversionFunc) to allow external extensions to Observables.
1 parent 96786bb commit 62345af

File tree

4 files changed

+134
-7
lines changed

4 files changed

+134
-7
lines changed

src/main/java/rx/ConversionFunc.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package rx;
2+
3+
import rx.Observable.OnSubscribe;
4+
5+
/**
6+
* Converts the values emitted by an Observable's OnSubscribe function to a value.
7+
*
8+
* @param <T> the type of values to be consumed
9+
* @param <R> the return type
10+
*/
11+
public interface ConversionFunc<T, R> {
12+
/**
13+
* Converts the data produced by the provided {@code OnSubscribe function} to a value.
14+
*
15+
* @param onSubscribe a function that produces data to a Subscriber, usually wrapped by an Observable.
16+
* @return an instance of {@code R}
17+
*/
18+
public R convert(OnSubscribe<T> onSubscribe);
19+
}

src/main/java/rx/Observable.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
109109
// cover for generics insanity
110110
}
111111

112+
/**
113+
* Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be
114+
* collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to
115+
* return an Observable (enabling chaining).
116+
*
117+
* @param conversion a function that converts from this {@code Observable<T>} to an {@code R}
118+
* @return an instance of R created by the provided Conversion
119+
*/
120+
public <R> R x(ConversionFunc<T, R> conversion) {
121+
final Observable<T> self = this;
122+
return conversion.convert(new OnSubscribe<T>() {
123+
@Override
124+
public void call(Subscriber<? super T> subscriber) {
125+
subscriber.add(Observable.subscribe(subscriber, self));
126+
}});
127+
}
128+
112129
/**
113130
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
114131
* the values of the current Observable through the Operator function.
@@ -133,11 +150,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
133150
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
134151
*/
135152
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
136-
return new Observable<R>(new OnSubscribe<R>() {
153+
return new Observable<R>(lift(lift, onSubscribe));
154+
}
155+
156+
/**
157+
* Wraps the given OnSubscribe in an OnSubscribe that applies the provided operator implementation. The subscription
158+
* of the provided {@code onSubscribe} will be deferred until the returned OnSubscribe is subscribed.
159+
*
160+
* @param operator
161+
* @param onSubscribe the source OnSubscribe function
162+
* @return an OnSubscribe that delegates the emitted values from {@code onSubscribe} to the {@code operator}.
163+
*/
164+
/*package*/ static <R, T> OnSubscribe<R> lift(final Operator<? extends R, ? super T> operator, final OnSubscribe<T> onSubscribe) {
165+
return new OnSubscribe<R>() {
137166
@Override
138167
public void call(Subscriber<? super R> o) {
139168
try {
140-
Subscriber<? super T> st = hook.onLift(lift).call(o);
169+
Subscriber<? super T> st = hook.onLift(operator).call(o);
141170
try {
142171
// new Subscriber created and being subscribed with so 'onStart' it
143172
st.onStart();
@@ -160,10 +189,9 @@ public void call(Subscriber<? super R> o) {
160189
o.onError(e);
161190
}
162191
}
163-
});
192+
};
164193
}
165194

166-
167195
/**
168196
* Transform an Observable by applying a particular Transformer function to it.
169197
* <p>
@@ -7737,11 +7765,15 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
77377765
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
77387766
*/
77397767
public final Subscription subscribe(Subscriber<? super T> subscriber) {
7740-
// validate and proceed
7768+
return Observable.subscribe(subscriber, this);
7769+
}
7770+
7771+
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
7772+
// validate and proceed
77417773
if (subscriber == null) {
77427774
throw new IllegalArgumentException("observer can not be null");
77437775
}
7744-
if (onSubscribe == null) {
7776+
if (observable.onSubscribe == null) {
77457777
throw new IllegalStateException("onSubscribe function can not be null.");
77467778
/*
77477779
* the subscribe function can also be overridden but generally that's not the appropriate approach
@@ -7765,7 +7797,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
77657797
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
77667798
try {
77677799
// allow the hook to intercept and/or decorate
7768-
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
7800+
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
77697801
return hook.onSubscribeReturn(subscriber);
77707802
} catch (Throwable e) {
77717803
// special handling for certain Throwable/Error/Exception types
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package rx;
2+
3+
import rx.Observable.OnSubscribe;
4+
import rx.Observable.Operator;
5+
6+
public class OperatorConversion<T, R> implements ConversionFunc<T, Observable<R>> {
7+
8+
private Operator<? extends R, ? super T> operator;
9+
10+
public OperatorConversion(Operator<? extends R, ? super T> operator) {
11+
this.operator = operator;
12+
}
13+
14+
@Override
15+
public Observable<R> convert(OnSubscribe<T> onSubscribe) {
16+
return Observable.create(wrapSubscriber(onSubscribe));
17+
}
18+
19+
protected OnSubscribe<R> wrapSubscriber(final OnSubscribe<T> onSubscribe) {
20+
return Observable.lift(operator, onSubscribe);
21+
}
22+
23+
}

src/test/java/rx/ObservableTests.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.CountDownLatch;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.atomic.AtomicInteger;
40+
import java.util.concurrent.atomic.AtomicLong;
4041
import java.util.concurrent.atomic.AtomicReference;
4142

4243
import org.junit.Before;
@@ -46,13 +47,16 @@
4647
import org.mockito.MockitoAnnotations;
4748

4849
import rx.Observable.OnSubscribe;
50+
import rx.Observable.Operator;
4951
import rx.Observable.Transformer;
5052
import rx.exceptions.OnErrorNotImplementedException;
5153
import rx.functions.Action1;
5254
import rx.functions.Action2;
5355
import rx.functions.Func0;
5456
import rx.functions.Func1;
5557
import rx.functions.Func2;
58+
import rx.internal.operators.OperatorAll;
59+
import rx.internal.operators.OperatorMap;
5660
import rx.observables.ConnectableObservable;
5761
import rx.observers.TestSubscriber;
5862
import rx.schedulers.TestScheduler;
@@ -1138,4 +1142,53 @@ public void testSubscribingSubscriberAsObserverMaintainsSubscriptionChain() {
11381142

11391143
subscriber.assertUnsubscribed();
11401144
}
1145+
1146+
@Test
1147+
public void testExtend() {
1148+
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1149+
final Object value = new Object();
1150+
Observable.just(value).x(new ConversionFunc<Object,Object>(){
1151+
@Override
1152+
public Object convert(OnSubscribe<Object> onSubscribe) {
1153+
onSubscribe.call(subscriber);
1154+
subscriber.assertNoErrors();
1155+
subscriber.assertCompleted();
1156+
subscriber.assertValue(value);
1157+
return subscriber.getOnNextEvents().get(0);
1158+
}});
1159+
}
1160+
1161+
@Test
1162+
public void testOperatorConversion() {
1163+
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1164+
final Object value = new Object();
1165+
Observable.just(value).x(new OperatorConversion<Object,Object>(new Operator<Object, Object>(){
1166+
@Override
1167+
public Subscriber<? super Object> call(Subscriber<? super Object> t) {
1168+
return t;
1169+
}})).subscribe(subscriber);
1170+
subscriber.assertNoErrors();
1171+
subscriber.assertCompleted();
1172+
subscriber.assertValue(value);
1173+
}
1174+
1175+
@Test
1176+
public void testOperatorConversionWithMap() {
1177+
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1178+
final AtomicLong value = new AtomicLong();
1179+
OperatorConversion<Object, Long> toTimeReceived = new OperatorConversion<Object,Long>(
1180+
new OperatorMap<Object, Long>(new Func1<Object, Long>(){
1181+
@Override
1182+
public Long call(Object t) {
1183+
long time = System.currentTimeMillis();
1184+
value.set(time);
1185+
return time;
1186+
}}));
1187+
Observable.just(new Object())
1188+
.x(toTimeReceived)
1189+
.subscribe(subscriber);
1190+
subscriber.assertNoErrors();
1191+
subscriber.assertCompleted();
1192+
subscriber.assertValue(value.get());
1193+
}
11411194
}

0 commit comments

Comments
 (0)