diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 17937609a0..246e54f023 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -109,6 +109,23 @@ public interface Operator extends Func1, Subscriber< // cover for generics insanity } + /** + * Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be + * collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to + * return an Observable (enabling chaining). + * + * @param conversion a function that converts from this {@code Observable} to an {@code R} + * @return an instance of R created by the provided Conversion + */ + @Experimental + public R x(Func1, ? extends R> conversion) { + return conversion.call(new OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + subscriber.add(Observable.subscribe(subscriber, Observable.this)); + }}); + } + /** * Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass * the values of the current Observable through the Operator function. @@ -127,17 +144,17 @@ public interface Operator extends Func1, Subscriber< *
{@code lift} does not operate by default on a particular {@link Scheduler}.
* * - * @param lift the Operator that implements the Observable-operating function to be applied to the source + * @param operator the Operator that implements the Observable-operating function to be applied to the source * Observable * @return an Observable that is the result of applying the lifted Operator to the source Observable * @see RxJava wiki: Implementing Your Own Operators */ - public final Observable lift(final Operator lift) { + public final Observable lift(final Operator operator) { return new Observable(new OnSubscribe() { @Override public void call(Subscriber o) { try { - Subscriber st = hook.onLift(lift).call(o); + Subscriber st = hook.onLift(operator).call(o); try { // new Subscriber created and being subscribed with so 'onStart' it st.onStart(); @@ -163,7 +180,6 @@ public void call(Subscriber o) { }); } - /** * Transform an Observable by applying a particular Transformer function to it. *

@@ -7752,11 +7768,15 @@ public final Subscription unsafeSubscribe(Subscriber subscriber) { * @see ReactiveX operators documentation: Subscribe */ public final Subscription subscribe(Subscriber subscriber) { - // validate and proceed + return Observable.subscribe(subscriber, this); + } + + private static Subscription subscribe(Subscriber subscriber, Observable observable) { + // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("observer can not be null"); } - if (onSubscribe == null) { + if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach @@ -7780,7 +7800,7 @@ public final Subscription subscribe(Subscriber subscriber) { // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks. try { // allow the hook to intercept and/or decorate - hook.onSubscribeStart(this, onSubscribe).call(subscriber); + hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types diff --git a/src/test/java/rx/ObservableConversionTest.java b/src/test/java/rx/ObservableConversionTest.java new file mode 100644 index 0000000000..543c44780b --- /dev/null +++ b/src/test/java/rx/ObservableConversionTest.java @@ -0,0 +1,234 @@ +package rx; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static junit.framework.Assert.*; + +import org.junit.Test; + +import rx.Observable.OnSubscribe; +import rx.Observable.Operator; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.internal.operators.OperatorFilter; +import rx.internal.operators.OperatorMap; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class ObservableConversionTest { + + public static class Cylon {} + + public static class Jail { + Object cylon; + + Jail(Object cylon) { + this.cylon = cylon; + } + } + + public static class CylonDetectorObservable { + protected OnSubscribe onSubscribe; + + public static CylonDetectorObservable create(OnSubscribe onSubscribe) { + return new CylonDetectorObservable(onSubscribe); + } + + protected CylonDetectorObservable(OnSubscribe onSubscribe) { + this.onSubscribe = onSubscribe; + } + + public void subscribe(Subscriber subscriber) { + onSubscribe.call(subscriber); + } + + public CylonDetectorObservable lift(Operator operator) { + return x(new RobotConversionFunc(operator)); + } + + public O x(Func1, O> operator) { + return operator.call(onSubscribe); + } + + public CylonDetectorObservable compose(Func1, CylonDetectorObservable> transformer) { + return transformer.call(this); + } + + public final CylonDetectorObservable beep(Func1 predicate) { + return lift(new OperatorFilter(predicate)); + } + + public final CylonDetectorObservable boop(Func1 func) { + return lift(new OperatorMap(func)); + } + + public CylonDetectorObservable DESTROY() { + return boop(new Func1() { + @Override + public String call(T t) { + Object cylon = ((Jail) t).cylon; + throwOutTheAirlock(cylon); + if (t instanceof Jail) { + String name = cylon.toString(); + return "Cylon '" + name + "' has been destroyed"; + } + else { + return "Cylon 'anonymous' has been destroyed"; + } + }}); + } + + private static void throwOutTheAirlock(Object cylon) { + // ... + } + } + + public static class RobotConversionFunc implements Func1, CylonDetectorObservable> { + private Operator operator; + + public RobotConversionFunc(Operator operator) { + this.operator = operator; + } + + @Override + public CylonDetectorObservable call(final OnSubscribe onSubscribe) { + return CylonDetectorObservable.create(new OnSubscribe() { + @Override + public void call(Subscriber o) { + try { + Subscriber st = operator.call(o); + try { + st.onStart(); + onSubscribe.call(st); + } catch (OnErrorNotImplementedException e) { + throw e; + } catch (Throwable e) { + st.onError(e); + } + } catch (OnErrorNotImplementedException e) { + throw e; + } catch (Throwable e) { + o.onError(e); + } + + }}); + } + } + + public static class ConvertToCylonDetector implements Func1, CylonDetectorObservable> { + @Override + public CylonDetectorObservable call(final OnSubscribe onSubscribe) { + return CylonDetectorObservable.create(onSubscribe); + } + } + + public static class ConvertToObservable implements Func1, Observable> { + @Override + public Observable call(final OnSubscribe onSubscribe) { + return Observable.create(onSubscribe); + } + } + + @Test + public void testConversionBetweenObservableClasses() { + final TestSubscriber subscriber = new TestSubscriber(new Subscriber(){ + + @Override + public void onCompleted() { + System.out.println("Complete"); + } + + @Override + public void onError(Throwable e) { + System.out.println("error: " + e.getMessage()); + e.printStackTrace(); + } + + @Override + public void onNext(String t) { + System.out.println(t); + }}); + List crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()}); + Observable.from(crewOfBattlestarGalactica) + .x(new ConvertToCylonDetector()) + .beep(new Func1(){ + @Override + public Boolean call(Object t) { + return t instanceof Cylon; + }}) + .boop(new Func1() { + @Override + public Jail call(Object cylon) { + return new Jail(cylon); + }}) + .DESTROY() + .x(new ConvertToObservable()) + .reduce("Cylon Detector finished. Report:\n", new Func2() { + @Override + public String call(String a, String n) { + return a + n + "\n"; + }}) + .subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + } + + @Test + public void testConvertToConcurrentQueue() { + final AtomicReference thrown = new AtomicReference(null); + final AtomicBoolean isFinished = new AtomicBoolean(false); + ConcurrentLinkedQueue queue = Observable.range(0,5) + .flatMap(new Func1>(){ + @Override + public Observable call(final Integer i) { + return Observable.range(0, 5) + .observeOn(Schedulers.io()) + .map(new Func1(){ + @Override + public Integer call(Integer k) { + try { + Thread.sleep(System.currentTimeMillis() % 100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return i + k; + }}); + }}) + .x(new Func1, ConcurrentLinkedQueue>() { + @Override + public ConcurrentLinkedQueue call(OnSubscribe onSubscribe) { + final ConcurrentLinkedQueue q = new ConcurrentLinkedQueue(); + onSubscribe.call(new Subscriber(){ + @Override + public void onCompleted() { + isFinished.set(true); + } + + @Override + public void onError(Throwable e) { + thrown.set(e); + } + + @Override + public void onNext(Integer t) { + q.add(t); + }}); + return q; + }}); + + int x = 0; + while(!isFinished.get()) { + Integer i = queue.poll(); + if (i != null) { + x++; + System.out.println(x + " item: " + i); + } + } + assertEquals(null, thrown.get()); + } +} diff --git a/src/test/java/rx/ObservableTests.java b/src/test/java/rx/ObservableTests.java index 5f1667deb6..55e43896d3 100644 --- a/src/test/java/rx/ObservableTests.java +++ b/src/test/java/rx/ObservableTests.java @@ -53,7 +53,6 @@ import rx.functions.Func0; import rx.functions.Func1; import rx.functions.Func2; -import rx.functions.Functions; import rx.observables.ConnectableObservable; import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; @@ -1157,4 +1156,19 @@ public void testForEachWithNull() { // .forEach(null); } + + @Test + public void testExtend() { + final TestSubscriber subscriber = new TestSubscriber(); + final Object value = new Object(); + Observable.just(value).x(new Func1,Object>(){ + @Override + public Object call(OnSubscribe onSubscribe) { + onSubscribe.call(subscriber); + subscriber.assertNoErrors(); + subscriber.assertCompleted(); + subscriber.assertValue(value); + return subscriber.getOnNextEvents().get(0); + }}); + } }