Skip to content

Observable.x(ConversionFunc) to allow extensions to Observables #3082

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 7, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
// cover for generics insanity
}

/**
* Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be
* collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to
* return an Observable (enabling chaining).
*
* @param conversion a function that converts from this {@code Observable<T>} to an {@code R}
* @return an instance of R created by the provided Conversion
*/
@Experimental
public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion) {
return conversion.call(new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.add(Observable.subscribe(subscriber, Observable.this));
}});
}

/**
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
* the values of the current Observable through the Operator function.
Expand All @@ -127,17 +144,17 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param lift the Operator that implements the Observable-operating function to be applied to the source
* @param operator the Operator that implements the Observable-operating function to be applied to the source
* Observable
* @return an Observable that is the result of applying the lifted Operator to the source Observable
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = hook.onLift(lift).call(o);
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
Expand All @@ -163,7 +180,6 @@ public void call(Subscriber<? super R> o) {
});
}


/**
* Transform an Observable by applying a particular Transformer function to it.
* <p>
Expand Down Expand Up @@ -7752,11 +7768,15 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
*/
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// validate and proceed
return Observable.subscribe(subscriber, this);
}

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribe == null) {
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
Expand All @@ -7780,7 +7800,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
try {
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Expand Down
234 changes: 234 additions & 0 deletions src/test/java/rx/ObservableConversionTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package rx;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static junit.framework.Assert.*;

import org.junit.Test;

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.operators.OperatorFilter;
import rx.internal.operators.OperatorMap;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class ObservableConversionTest {

public static class Cylon {}

public static class Jail {
Object cylon;

Jail(Object cylon) {
this.cylon = cylon;
}
}

public static class CylonDetectorObservable<T> {
protected OnSubscribe<T> onSubscribe;

public static <T> CylonDetectorObservable<T> create(OnSubscribe<T> onSubscribe) {
return new CylonDetectorObservable<T>(onSubscribe);
}

protected CylonDetectorObservable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

public void subscribe(Subscriber<T> subscriber) {
onSubscribe.call(subscriber);
}

public <R> CylonDetectorObservable<R> lift(Operator<? extends R, ? super T> operator) {
return x(new RobotConversionFunc<T, R>(operator));
}

public <R, O> O x(Func1<OnSubscribe<T>, O> operator) {
return operator.call(onSubscribe);
}

public <R> CylonDetectorObservable<? extends R> compose(Func1<CylonDetectorObservable<? super T>, CylonDetectorObservable<? extends R>> transformer) {
return transformer.call(this);
}

public final CylonDetectorObservable<T> beep(Func1<? super T, Boolean> predicate) {
return lift(new OperatorFilter<T>(predicate));
}

public final <R> CylonDetectorObservable<R> boop(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}

public CylonDetectorObservable<String> DESTROY() {
return boop(new Func1<T, String>() {
@Override
public String call(T t) {
Object cylon = ((Jail) t).cylon;
throwOutTheAirlock(cylon);
if (t instanceof Jail) {
String name = cylon.toString();
return "Cylon '" + name + "' has been destroyed";
}
else {
return "Cylon 'anonymous' has been destroyed";
}
}});
}

private static void throwOutTheAirlock(Object cylon) {
// ...
}
}

public static class RobotConversionFunc<T, R> implements Func1<OnSubscribe<T>, CylonDetectorObservable<R>> {
private Operator<? extends R, ? super T> operator;

public RobotConversionFunc(Operator<? extends R, ? super T> operator) {
this.operator = operator;
}

@Override
public CylonDetectorObservable<R> call(final OnSubscribe<T> onSubscribe) {
return CylonDetectorObservable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = operator.call(o);
try {
st.onStart();
onSubscribe.call(st);
} catch (OnErrorNotImplementedException e) {
throw e;
} catch (Throwable e) {
st.onError(e);
}
} catch (OnErrorNotImplementedException e) {
throw e;
} catch (Throwable e) {
o.onError(e);
}

}});
}
}

public static class ConvertToCylonDetector<T> implements Func1<OnSubscribe<T>, CylonDetectorObservable<T>> {
@Override
public CylonDetectorObservable<T> call(final OnSubscribe<T> onSubscribe) {
return CylonDetectorObservable.create(onSubscribe);
}
}

public static class ConvertToObservable<T> implements Func1<OnSubscribe<T>, Observable<T>> {
@Override
public Observable<T> call(final OnSubscribe<T> onSubscribe) {
return Observable.create(onSubscribe);
}
}

@Test
public void testConversionBetweenObservableClasses() {
final TestSubscriber<String> subscriber = new TestSubscriber<String>(new Subscriber<String>(){

@Override
public void onCompleted() {
System.out.println("Complete");
}

@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
e.printStackTrace();
}

@Override
public void onNext(String t) {
System.out.println(t);
}});
List<Object> crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()});
Observable.from(crewOfBattlestarGalactica)
.x(new ConvertToCylonDetector<Object>())
.beep(new Func1<Object, Boolean>(){
@Override
public Boolean call(Object t) {
return t instanceof Cylon;
}})
.boop(new Func1<Object, Object>() {
@Override
public Jail call(Object cylon) {
return new Jail(cylon);
}})
.DESTROY()
.x(new ConvertToObservable<String>())
.reduce("Cylon Detector finished. Report:\n", new Func2<String, String, String>() {
@Override
public String call(String a, String n) {
return a + n + "\n";
}})
.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
}

@Test
public void testConvertToConcurrentQueue() {
final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(null);
final AtomicBoolean isFinished = new AtomicBoolean(false);
ConcurrentLinkedQueue<? extends Integer> queue = Observable.range(0,5)
.flatMap(new Func1<Integer, Observable<Integer>>(){
@Override
public Observable<Integer> call(final Integer i) {
return Observable.range(0, 5)
.observeOn(Schedulers.io())
.map(new Func1<Integer, Integer>(){
@Override
public Integer call(Integer k) {
try {
Thread.sleep(System.currentTimeMillis() % 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i + k;
}});
}})
.x(new Func1<OnSubscribe<Integer>, ConcurrentLinkedQueue<Integer>>() {
@Override
public ConcurrentLinkedQueue<Integer> call(OnSubscribe<Integer> onSubscribe) {
final ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<Integer>();
onSubscribe.call(new Subscriber<Integer>(){
@Override
public void onCompleted() {
isFinished.set(true);
}

@Override
public void onError(Throwable e) {
thrown.set(e);
}

@Override
public void onNext(Integer t) {
q.add(t);
}});
return q;
}});

int x = 0;
while(!isFinished.get()) {
Integer i = queue.poll();
if (i != null) {
x++;
System.out.println(x + " item: " + i);
}
}
assertEquals(null, thrown.get());
}
}
16 changes: 15 additions & 1 deletion src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.Functions;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.TestScheduler;
Expand Down Expand Up @@ -1157,4 +1156,19 @@ public void testForEachWithNull() {
//
.forEach(null);
}

@Test
public void testExtend() {
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
final Object value = new Object();
Observable.just(value).x(new Func1<OnSubscribe<Object>,Object>(){
@Override
public Object call(OnSubscribe<Object> onSubscribe) {
onSubscribe.call(subscriber);
subscriber.assertNoErrors();
subscriber.assertCompleted();
subscriber.assertValue(value);
return subscriber.getOnNextEvents().get(0);
}});
}
}