Skip to content

1.x: Add Single.onErrorResumeNext(Func) #3766

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 38 additions & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1430,9 +1430,46 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
* @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingleInCaseOfError) {
return new Single<T>(new SingleOperatorOnErrorResumeNextViaSingle<T>(this, resumeSingleInCaseOfError));
return new Single<T>(SingleOperatorOnErrorResumeNext.withOther(this, resumeSingleInCaseOfError));
}

/**
* Instructs a Single to pass control to another Single rather than invoking
* {@link Observer#onError(Throwable)} if it encounters an error.
* <p/>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
* <p/>
* By default, when a Single encounters an error that prevents it from emitting the expected item to
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
* behavior. If you pass a function that will return another Single ({@code resumeFunctionInCaseOfError}) to an Single's
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
* happened.
* <p/>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param resumeFunctionInCaseOfError a function that returns a Single that will take control if source Single encounters an error.
* @return the original Single, with appropriately modified behavior.
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
* @Experimental The behavior of this can change at any time.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Single<T> onErrorResumeNext(final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
return new Single<T>(SingleOperatorOnErrorResumeNext.withFunction(this, resumeFunctionInCaseOfError));
}

/**
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.*;

import rx.Observer;
import rx.SingleSubscriber;
import rx.annotations.Experimental;

/**
Expand Down Expand Up @@ -188,6 +189,7 @@ public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
Exceptions.throwIfFatal(t);
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
}

/**
* Forwards a fatal exception or reports it to the given Observer.
* @param t the exception
Expand All @@ -199,4 +201,17 @@ public static void throwOrReport(Throwable t, Observer<?> o) {
Exceptions.throwIfFatal(t);
o.onError(t);
}

/**
* Forwards a fatal exception or reports it to the given Observer.
*
* @param throwable the exception.
* @param subscriber the subscriber to report to.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number).
*/
@Experimental
public static void throwOrReport(Throwable throwable, SingleSubscriber<?> subscriber) {
Exceptions.throwIfFatal(throwable);
subscriber.onError(throwable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rx.internal.operators;

import rx.Single;
import rx.SingleSubscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.plugins.RxJavaPlugins;

public class SingleOperatorOnErrorResumeNext<T> implements Single.OnSubscribe<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renamed SingleOperatorOnErrorResumeNextViaSingle to something more general and it's now works with function instead of Single directly.


private final Single<? extends T> originalSingle;
private final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError;

private SingleOperatorOnErrorResumeNext(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
if (originalSingle == null) {
throw new NullPointerException("originalSingle must not be null");
}

if (resumeFunctionInCaseOfError == null) {
throw new NullPointerException("resumeFunctionInCaseOfError must not be null");
}

this.originalSingle = originalSingle;
this.resumeFunctionInCaseOfError = resumeFunctionInCaseOfError;
}

public static <T> SingleOperatorOnErrorResumeNext<T> withFunction(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
return new SingleOperatorOnErrorResumeNext<T>(originalSingle, resumeFunctionInCaseOfError);
}

public static <T> SingleOperatorOnErrorResumeNext<T> withOther(Single<? extends T> originalSingle, final Single<? extends T> resumeSingleInCaseOfError) {
if (resumeSingleInCaseOfError == null) {
throw new NullPointerException("resumeSingleInCaseOfError must not be null");
}

return new SingleOperatorOnErrorResumeNext<T>(originalSingle, new Func1<Throwable, Single<? extends T>>() {
@Override
public Single<? extends T> call(Throwable throwable) {
return resumeSingleInCaseOfError;
}
});
}

@Override
public void call(final SingleSubscriber<? super T> child) {
final SingleSubscriber<? super T> parent = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
child.onSuccess(value);
}

@Override
public void onError(Throwable error) {
try {
resumeFunctionInCaseOfError.call(error).subscribe(child);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd separate catching the error and subscribing to the Single outside the try-catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove unsubscribe()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's terminal state, we don't do unsubscribe() in onSuccess() too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If resumeFunctionInCaseOfError returns something like Observable.never.toSingle, we should still unsubscribe the original one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I just found SafeSubscriber will do it.

} catch (Throwable innerError) {
Exceptions.throwOrReport(innerError, child);
}
}
};

child.add(parent);
originalSingle.subscribe(parent);
}
}

This file was deleted.

67 changes: 66 additions & 1 deletion src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1220,13 +1220,78 @@ public void onErrorResumeNextViaSingleShouldPreventNullSingle() {
try {
Single
.just("value")
.onErrorResumeNext(null);
.onErrorResumeNext((Single<String>) null);
fail();
} catch (NullPointerException expected) {
assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage());
}
}

@Test
public void onErrorResumeNextViaFunctionShouldNotInterruptSuccesfulSingle() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.just("success")
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
@Override
public Single<? extends String> call(Throwable throwable) {
return Single.just("fail");
}
})
.subscribe(testSubscriber);

testSubscriber.assertValue("success");
}

@Test
public void onErrorResumeNextViaFunctionShouldResumeWithPassedSingleInCaseOfError() {
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();

Single
.<String> error(new RuntimeException("test exception"))
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
@Override
public Single<? extends String> call(Throwable throwable) {
return Single.just("fallback");
}
})
.subscribe(testSubscriber);

testSubscriber.assertValue("fallback");
}

@Test
public void onErrorResumeNextViaFunctionShouldPreventNullFunction() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also check/handle if the function returns a null Single.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test, but didn't handle this specifically in the operator, test will make sure that it won't be swallowed

try {
Single
.just("value")
.onErrorResumeNext((Func1<Throwable, ? extends Single<? extends String>>) null);
fail();
} catch (NullPointerException expected) {
assertEquals("resumeFunctionInCaseOfError must not be null", expected.getMessage());
}
}

@Test
public void onErrorResumeNextViaFunctionShouldFailIfFunctionReturnsNull() {
try {
Single
.error(new TestException())
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
@Override
public Single<? extends String> call(Throwable throwable) {
return null;
}
})
.subscribe();

fail();
} catch (OnErrorNotImplementedException expected) {
assertTrue(expected.getCause() instanceof NullPointerException);
}
}

@Test(expected = NullPointerException.class)
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
Single.iterableToArray(null);
Expand Down