diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index 03dca208c3..a673e3f557 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -73,8 +73,9 @@ public Subscription call(final Observer observer) { // subscribe to the original Observable and remember the subscription subscription.wrap(originalSequence.subscribe(new Observer() { public void onNext(T value) { - // forward the successful calls - observer.onNext(value); + // forward the successful calls unless resumed + if (subscriptionRef.get()==subscription) + observer.onNext(value); } /** @@ -83,8 +84,8 @@ public void onNext(T value) { public void onError(Exception ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ AtomicObservableSubscription currentSubscription = subscriptionRef.get(); - // check that we have not been unsubscribed before we can process the error - if (currentSubscription != null) { + // check that we have not been unsubscribed and not already resumed before we can process the error + if (currentSubscription == subscription) { /* error occurred, so switch subscription to the 'resumeSequence' */ AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer)); /* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */ @@ -97,8 +98,9 @@ public void onError(Exception ex) { } public void onCompleted() { - // forward the successful calls - observer.onCompleted(); + // forward the successful calls unless resumed + if (subscriptionRef.get()==subscription) + observer.onCompleted(); } })); @@ -119,7 +121,8 @@ public static class UnitTest { @Test public void testResumeNext() { Subscription s = mock(Subscription.class); - TestObservable w = new TestObservable(s, "one"); + // Trigger failure on second element + TestObservable w = new TestObservable(s, "one", "fail", "two", "three"); Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); @@ -140,7 +143,46 @@ public void testResumeNext() { verify(aObserver, Mockito.never()).onNext("three"); verify(aObserver, times(1)).onNext("twoResume"); verify(aObserver, times(1)).onNext("threeResume"); + } + + @Test + public void testMapResumeAsyncNext() { + Subscription sr = mock(Subscription.class); + // Trigger multiple failures + Observable w = Observable.from("one", "fail", "two", "three", "fail"); + // Resume Observable is async + TestObservable resume = new TestObservable(sr, "twoResume", "threeResume"); + + // Introduce map function that fails intermittently (Map does not prevent this when the observer is a + // rx.operator incl onErrorResumeNextViaObservable) + w = w.map(new Func1() { + public String call(String s) { + if ("fail".equals(s)) + throw new RuntimeException("Forced Failure"); + System.out.println("BadMapper:" + s); + return s; + } + }); + + Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + + try { + resume.t.join(); + } catch (InterruptedException e) { + fail(e.getMessage()); + } + verify(aObserver, Mockito.never()).onError(any(Exception.class)); + verify(aObserver, times(1)).onCompleted(); + verify(aObserver, times(1)).onNext("one"); + verify(aObserver, Mockito.never()).onNext("two"); + verify(aObserver, Mockito.never()).onNext("three"); + verify(aObserver, times(1)).onNext("twoResume"); + verify(aObserver, times(1)).onNext("threeResume"); } private static class TestObservable extends Observable { @@ -164,11 +206,15 @@ public void run() { try { System.out.println("running TestObservable thread"); for (String s : values) { + if ("fail".equals(s)) + throw new RuntimeException("Forced Failure"); System.out.println("TestObservable onNext: " + s); observer.onNext(s); } - throw new RuntimeException("Forced Failure"); + System.out.println("TestObservable onCompleted"); + observer.onCompleted(); } catch (Exception e) { + System.out.println("TestObservable onError: " + e); observer.onError(e); } }