From 61a5acd9efcb25b15f0b9343f6534eea4df8b296 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 15 Jul 2019 15:47:39 +0200 Subject: [PATCH] 3.x: fix switchMaps inconsistency swallowing errors when cancelled --- .../operators/flowable/FlowableFlatMap.java | 5 +- .../operators/flowable/FlowableSwitchMap.java | 2 + .../mixed/FlowableSwitchMapCompletable.java | 4 +- .../mixed/FlowableSwitchMapMaybe.java | 1 + .../mixed/FlowableSwitchMapSingle.java | 1 + .../mixed/ObservableSwitchMapCompletable.java | 4 +- .../mixed/ObservableSwitchMapMaybe.java | 1 + .../mixed/ObservableSwitchMapSingle.java | 1 + .../observable/ObservableFlatMap.java | 5 +- .../observable/ObservableSwitchMap.java | 2 + .../internal/util/AtomicThrowable.java | 15 ++++++ .../flowable/FlowableSwitchTest.java | 31 +++++++++++ .../FlowableSwitchMapCompletableTest.java | 32 ++++++++++- .../mixed/FlowableSwitchMapMaybeTest.java | 30 +++++++++++ .../mixed/FlowableSwitchMapSingleTest.java | 30 +++++++++++ .../ObservableSwitchMapCompletableTest.java | 32 ++++++++++- .../mixed/ObservableSwitchMapMaybeTest.java | 30 +++++++++++ .../mixed/ObservableSwitchMapSingleTest.java | 30 +++++++++++ .../observable/ObservableSwitchTest.java | 31 +++++++++++ .../internal/util/AtomicThrowableTest.java | 54 +++++++++++++++++++ 20 files changed, 329 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java index 099b3f6d52..52dd347b4b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java @@ -585,10 +585,7 @@ void disposeAll() { for (InnerSubscriber inner : a) { inner.dispose(); } - Throwable ex = errs.terminate(); - if (ex != null && ex != ExceptionHelper.TERMINATED) { - RxJavaPlugins.onError(ex); - } + errs.tryTerminateAndReport(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java index 9730bd38b2..146ee3f413 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java @@ -173,6 +173,8 @@ public void cancel() { upstream.cancel(); disposeInner(); + + error.tryTerminateAndReport(); } } diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java index 70294ff3d3..1fae1bab6f 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java @@ -162,6 +162,7 @@ void disposeInner() { public void dispose() { upstream.cancel(); disposeInner(); + errors.tryTerminateAndReport(); } @Override @@ -178,7 +179,8 @@ void innerError(SwitchMapInnerObserver sender, Throwable error) { downstream.onError(ex); } } else { - dispose(); + upstream.cancel(); + disposeInner(); Throwable ex = errors.terminate(); if (ex != ExceptionHelper.TERMINATED) { downstream.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java index 7bb3941d93..9ea99b8d7e 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java @@ -177,6 +177,7 @@ public void cancel() { cancelled = true; upstream.cancel(); disposeInner(); + errors.tryTerminateAndReport(); } void innerError(SwitchMapMaybeObserver sender, Throwable ex) { diff --git a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java index 752ee852b9..f176322fde 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java @@ -177,6 +177,7 @@ public void cancel() { cancelled = true; upstream.cancel(); disposeInner(); + errors.tryTerminateAndReport(); } void innerError(SwitchMapSingleObserver sender, Throwable ex) { diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java index 1d4e8d247d..84b3a82950 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java @@ -160,6 +160,7 @@ void disposeInner() { public void dispose() { upstream.dispose(); disposeInner(); + errors.tryTerminateAndReport(); } @Override @@ -176,7 +177,8 @@ void innerError(SwitchMapInnerObserver sender, Throwable error) { downstream.onError(ex); } } else { - dispose(); + upstream.dispose(); + disposeInner(); Throwable ex = errors.terminate(); if (ex != ExceptionHelper.TERMINATED) { downstream.onError(ex); diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java index 89086255e6..4bc76c15dc 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java @@ -164,6 +164,7 @@ public void dispose() { cancelled = true; upstream.dispose(); disposeInner(); + errors.tryTerminateAndReport(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java index f9871aa6f9..8371d7773f 100644 --- a/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java @@ -164,6 +164,7 @@ public void dispose() { cancelled = true; upstream.dispose(); disposeInner(); + errors.tryTerminateAndReport(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index 48cdb8f55e..b920fa57a7 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -307,10 +307,7 @@ public void dispose() { if (!cancelled) { cancelled = true; if (disposeAll()) { - Throwable ex = errors.terminate(); - if (ex != null && ex != ExceptionHelper.TERMINATED) { - RxJavaPlugins.onError(ex); - } + errors.tryTerminateAndReport(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java index 8c5aa371dc..795e7cae50 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java @@ -157,6 +157,8 @@ public void dispose() { cancelled = true; upstream.dispose(); disposeInner(); + + errors.tryTerminateAndReport(); } } diff --git a/src/main/java/io/reactivex/internal/util/AtomicThrowable.java b/src/main/java/io/reactivex/internal/util/AtomicThrowable.java index 60c19155c5..6c7c06b54c 100644 --- a/src/main/java/io/reactivex/internal/util/AtomicThrowable.java +++ b/src/main/java/io/reactivex/internal/util/AtomicThrowable.java @@ -15,6 +15,8 @@ import java.util.concurrent.atomic.AtomicReference; +import io.reactivex.plugins.RxJavaPlugins; + /** * Atomic container for Throwables including combining and having a * terminal state via ExceptionHelper. @@ -46,4 +48,17 @@ public Throwable terminate() { public boolean isTerminated() { return get() == ExceptionHelper.TERMINATED; } + + /** + * Tries to terminate this atomic throwable (by swapping in the TERMINATED indicator) + * and calls {@link RxJavaPlugins#onError(Throwable)} if there was a non-null, non-indicator + * exception contained within before. + * @since 3.0.0 + */ + public void tryTerminateAndReport() { + Throwable ex = terminate(); + if (ex != null && ex != ExceptionHelper.TERMINATED) { + RxJavaPlugins.onError(ex); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java index 401e341656..5af6277459 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -1202,4 +1203,34 @@ public Object apply(Integer w) throws Exception { .assertNoErrors() .assertComplete(); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestSubscriberEx ts = new TestSubscriberEx(); + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + ts.cancel(); + throw new TestException(); + } + }) + .switchMap(new Function>() { + @Override + public Publisher apply(Integer v) throws Throwable { + return Flowable.just(v).hide(); + } + }) + .subscribe(ts); + + ts.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java index c309197fc0..d6f8248f3e 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java @@ -29,7 +29,7 @@ import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.PublishProcessor; import io.reactivex.subjects.CompletableSubject; -import io.reactivex.testsupport.TestHelper; +import io.reactivex.testsupport.*; public class FlowableSwitchMapCompletableTest { @@ -387,4 +387,34 @@ public void mainErrorDelayed() { to.assertFailure(TestException.class); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserverEx to = new TestObserverEx(); + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + to.dispose(); + throw new TestException(); + } + }) + .switchMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Throwable { + return Completable.complete().hide(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java index 72aa7c6016..8e5a460626 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java @@ -646,4 +646,34 @@ public void onNext(Integer t) { ts.assertResult(1, 1, 1, 1, 1); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestSubscriberEx ts = new TestSubscriberEx(); + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + ts.cancel(); + throw new TestException(); + } + }) + .switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) throws Throwable { + return Maybe.just(v).hide(); + } + }) + .subscribe(ts); + + ts.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java index 506deb8517..80be5e1079 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java @@ -603,4 +603,34 @@ public void backpressured() { .requestMore(1) .assertResult(1); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestSubscriberEx ts = new TestSubscriberEx(); + + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + ts.cancel(); + throw new TestException(); + } + }) + .switchMapSingle(new Function>() { + @Override + public Single apply(Integer v) throws Throwable { + return Single.just(v).hide(); + } + }) + .subscribe(ts); + + ts.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java index e791c024ad..6e183cc5b3 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java @@ -27,7 +27,7 @@ import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.*; -import io.reactivex.testsupport.TestHelper; +import io.reactivex.testsupport.*; public class ObservableSwitchMapCompletableTest { @@ -429,4 +429,34 @@ public void scalarSource() { to.assertResult(); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserverEx to = new TestObserverEx(); + + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + to.dispose(); + throw new TestException(); + } + }) + .switchMapCompletable(new Function() { + @Override + public Completable apply(Integer v) throws Throwable { + return Completable.complete().hide(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java index 82e891ab5b..4264af01f4 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java @@ -686,4 +686,34 @@ public void scalarSource() { to.assertResult(2); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserverEx to = new TestObserverEx(); + + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + to.dispose(); + throw new TestException(); + } + }) + .switchMapMaybe(new Function>() { + @Override + public Maybe apply(Integer v) throws Throwable { + return Maybe.just(v).hide(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java index 99c533c826..bd50bcffc0 100644 --- a/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingleTest.java @@ -654,4 +654,34 @@ public void scalarSource() { to.assertResult(2); } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserverEx to = new TestObserverEx(); + + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + to.dispose(); + throw new TestException(); + } + }) + .switchMapSingle(new Function>() { + @Override + public Single apply(Integer v) throws Throwable { + return Single.just(v).hide(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java index 15d031c120..84b7d28702 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.List; @@ -1195,4 +1196,34 @@ public Object apply(Integer w) throws Exception { assertNotEquals(thread, o); } } + + @Test + public void undeliverableUponCancel() { + List errors = TestHelper.trackPluginErrors(); + try { + final TestObserverEx to = new TestObserverEx(); + + Observable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Throwable { + to.dispose(); + throw new TestException(); + } + }) + .switchMap(new Function>() { + @Override + public Observable apply(Integer v) throws Throwable { + return Observable.just(v).hide(); + } + }) + .subscribe(to); + + to.assertEmpty(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } } diff --git a/src/test/java/io/reactivex/internal/util/AtomicThrowableTest.java b/src/test/java/io/reactivex/internal/util/AtomicThrowableTest.java index 55575228b8..696d183e4f 100644 --- a/src/test/java/io/reactivex/internal/util/AtomicThrowableTest.java +++ b/src/test/java/io/reactivex/internal/util/AtomicThrowableTest.java @@ -15,8 +15,14 @@ import static org.junit.Assert.*; +import java.util.List; + import org.junit.Test; +import io.reactivex.exceptions.TestException; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.testsupport.TestHelper; + public class AtomicThrowableTest { @Test @@ -29,4 +35,52 @@ public void isTerminated() { assertTrue(ex.isTerminated()); } + + @Test + public void tryTerminateAndReportNull() { + List errors = TestHelper.trackPluginErrors(); + try { + + AtomicThrowable ex = new AtomicThrowable(); + ex.tryTerminateAndReport(); + + assertTrue("" + errors, errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void tryTerminateAndReportAlreadyTerminated() { + List errors = TestHelper.trackPluginErrors(); + try { + + AtomicThrowable ex = new AtomicThrowable(); + ex.terminate(); + + ex.tryTerminateAndReport(); + + assertTrue("" + errors, errors.isEmpty()); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void tryTerminateAndReportHasError() { + List errors = TestHelper.trackPluginErrors(); + try { + + AtomicThrowable ex = new AtomicThrowable(); + ex.set(new TestException()); + + ex.tryTerminateAndReport(); + + TestHelper.assertUndeliverable(errors, 0, TestException.class); + + assertEquals(1, errors.size()); + } finally { + RxJavaPlugins.reset(); + } + } }