Skip to content

Commit dd336fd

Browse files
authored
3.x: fix switchMaps inconsistency swallowing errors when cancelled (#6572)
1 parent aa8e03f commit dd336fd

20 files changed

+329
-12
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMap.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -585,10 +585,7 @@ void disposeAll() {
585585
for (InnerSubscriber<?, ?> inner : a) {
586586
inner.dispose();
587587
}
588-
Throwable ex = errs.terminate();
589-
if (ex != null && ex != ExceptionHelper.TERMINATED) {
590-
RxJavaPlugins.onError(ex);
591-
}
588+
errs.tryTerminateAndReport();
592589
}
593590
}
594591
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

+2
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ public void cancel() {
173173
upstream.cancel();
174174

175175
disposeInner();
176+
177+
error.tryTerminateAndReport();
176178
}
177179
}
178180

src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletable.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ void disposeInner() {
162162
public void dispose() {
163163
upstream.cancel();
164164
disposeInner();
165+
errors.tryTerminateAndReport();
165166
}
166167

167168
@Override
@@ -178,7 +179,8 @@ void innerError(SwitchMapInnerObserver sender, Throwable error) {
178179
downstream.onError(ex);
179180
}
180181
} else {
181-
dispose();
182+
upstream.cancel();
183+
disposeInner();
182184
Throwable ex = errors.terminate();
183185
if (ex != ExceptionHelper.TERMINATED) {
184186
downstream.onError(ex);

src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybe.java

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public void cancel() {
177177
cancelled = true;
178178
upstream.cancel();
179179
disposeInner();
180+
errors.tryTerminateAndReport();
180181
}
181182

182183
void innerError(SwitchMapMaybeObserver<R> sender, Throwable ex) {

src/main/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingle.java

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ public void cancel() {
177177
cancelled = true;
178178
upstream.cancel();
179179
disposeInner();
180+
errors.tryTerminateAndReport();
180181
}
181182

182183
void innerError(SwitchMapSingleObserver<R> sender, Throwable ex) {

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletable.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ void disposeInner() {
160160
public void dispose() {
161161
upstream.dispose();
162162
disposeInner();
163+
errors.tryTerminateAndReport();
163164
}
164165

165166
@Override
@@ -176,7 +177,8 @@ void innerError(SwitchMapInnerObserver sender, Throwable error) {
176177
downstream.onError(ex);
177178
}
178179
} else {
179-
dispose();
180+
upstream.dispose();
181+
disposeInner();
180182
Throwable ex = errors.terminate();
181183
if (ex != ExceptionHelper.TERMINATED) {
182184
downstream.onError(ex);

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybe.java

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void dispose() {
164164
cancelled = true;
165165
upstream.dispose();
166166
disposeInner();
167+
errors.tryTerminateAndReport();
167168
}
168169

169170
@Override

src/main/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapSingle.java

+1
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void dispose() {
164164
cancelled = true;
165165
upstream.dispose();
166166
disposeInner();
167+
errors.tryTerminateAndReport();
167168
}
168169

169170
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -307,10 +307,7 @@ public void dispose() {
307307
if (!cancelled) {
308308
cancelled = true;
309309
if (disposeAll()) {
310-
Throwable ex = errors.terminate();
311-
if (ex != null && ex != ExceptionHelper.TERMINATED) {
312-
RxJavaPlugins.onError(ex);
313-
}
310+
errors.tryTerminateAndReport();
314311
}
315312
}
316313
}

src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+2
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ public void dispose() {
157157
cancelled = true;
158158
upstream.dispose();
159159
disposeInner();
160+
161+
errors.tryTerminateAndReport();
160162
}
161163
}
162164

src/main/java/io/reactivex/internal/util/AtomicThrowable.java

+15
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

18+
import io.reactivex.plugins.RxJavaPlugins;
19+
1820
/**
1921
* Atomic container for Throwables including combining and having a
2022
* terminal state via ExceptionHelper.
@@ -46,4 +48,17 @@ public Throwable terminate() {
4648
public boolean isTerminated() {
4749
return get() == ExceptionHelper.TERMINATED;
4850
}
51+
52+
/**
53+
* Tries to terminate this atomic throwable (by swapping in the TERMINATED indicator)
54+
* and calls {@link RxJavaPlugins#onError(Throwable)} if there was a non-null, non-indicator
55+
* exception contained within before.
56+
* @since 3.0.0
57+
*/
58+
public void tryTerminateAndReport() {
59+
Throwable ex = terminate();
60+
if (ex != null && ex != ExceptionHelper.TERMINATED) {
61+
RxJavaPlugins.onError(ex);
62+
}
63+
}
4964
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.flowable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.util.*;
@@ -1202,4 +1203,34 @@ public Object apply(Integer w) throws Exception {
12021203
.assertNoErrors()
12031204
.assertComplete();
12041205
}
1206+
1207+
@Test
1208+
public void undeliverableUponCancel() {
1209+
List<Throwable> errors = TestHelper.trackPluginErrors();
1210+
try {
1211+
final TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>();
1212+
1213+
Flowable.just(1)
1214+
.map(new Function<Integer, Integer>() {
1215+
@Override
1216+
public Integer apply(Integer v) throws Throwable {
1217+
ts.cancel();
1218+
throw new TestException();
1219+
}
1220+
})
1221+
.switchMap(new Function<Integer, Publisher<Integer>>() {
1222+
@Override
1223+
public Publisher<Integer> apply(Integer v) throws Throwable {
1224+
return Flowable.just(v).hide();
1225+
}
1226+
})
1227+
.subscribe(ts);
1228+
1229+
ts.assertEmpty();
1230+
1231+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
1232+
} finally {
1233+
RxJavaPlugins.reset();
1234+
}
1235+
}
12051236
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapCompletableTest.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import io.reactivex.plugins.RxJavaPlugins;
3030
import io.reactivex.processors.PublishProcessor;
3131
import io.reactivex.subjects.CompletableSubject;
32-
import io.reactivex.testsupport.TestHelper;
32+
import io.reactivex.testsupport.*;
3333

3434
public class FlowableSwitchMapCompletableTest {
3535

@@ -387,4 +387,34 @@ public void mainErrorDelayed() {
387387

388388
to.assertFailure(TestException.class);
389389
}
390+
391+
@Test
392+
public void undeliverableUponCancel() {
393+
List<Throwable> errors = TestHelper.trackPluginErrors();
394+
try {
395+
final TestObserverEx<Integer> to = new TestObserverEx<Integer>();
396+
397+
Flowable.just(1)
398+
.map(new Function<Integer, Integer>() {
399+
@Override
400+
public Integer apply(Integer v) throws Throwable {
401+
to.dispose();
402+
throw new TestException();
403+
}
404+
})
405+
.switchMapCompletable(new Function<Integer, Completable>() {
406+
@Override
407+
public Completable apply(Integer v) throws Throwable {
408+
return Completable.complete().hide();
409+
}
410+
})
411+
.subscribe(to);
412+
413+
to.assertEmpty();
414+
415+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
416+
} finally {
417+
RxJavaPlugins.reset();
418+
}
419+
}
390420
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapMaybeTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -646,4 +646,34 @@ public void onNext(Integer t) {
646646

647647
ts.assertResult(1, 1, 1, 1, 1);
648648
}
649+
650+
@Test
651+
public void undeliverableUponCancel() {
652+
List<Throwable> errors = TestHelper.trackPluginErrors();
653+
try {
654+
final TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>();
655+
656+
Flowable.just(1)
657+
.map(new Function<Integer, Integer>() {
658+
@Override
659+
public Integer apply(Integer v) throws Throwable {
660+
ts.cancel();
661+
throw new TestException();
662+
}
663+
})
664+
.switchMapMaybe(new Function<Integer, Maybe<Integer>>() {
665+
@Override
666+
public Maybe<Integer> apply(Integer v) throws Throwable {
667+
return Maybe.just(v).hide();
668+
}
669+
})
670+
.subscribe(ts);
671+
672+
ts.assertEmpty();
673+
674+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
675+
} finally {
676+
RxJavaPlugins.reset();
677+
}
678+
}
649679
}

src/test/java/io/reactivex/internal/operators/mixed/FlowableSwitchMapSingleTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -603,4 +603,34 @@ public void backpressured() {
603603
.requestMore(1)
604604
.assertResult(1);
605605
}
606+
607+
@Test
608+
public void undeliverableUponCancel() {
609+
List<Throwable> errors = TestHelper.trackPluginErrors();
610+
try {
611+
final TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>();
612+
613+
Flowable.just(1)
614+
.map(new Function<Integer, Integer>() {
615+
@Override
616+
public Integer apply(Integer v) throws Throwable {
617+
ts.cancel();
618+
throw new TestException();
619+
}
620+
})
621+
.switchMapSingle(new Function<Integer, Single<Integer>>() {
622+
@Override
623+
public Single<Integer> apply(Integer v) throws Throwable {
624+
return Single.just(v).hide();
625+
}
626+
})
627+
.subscribe(ts);
628+
629+
ts.assertEmpty();
630+
631+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
632+
} finally {
633+
RxJavaPlugins.reset();
634+
}
635+
}
606636
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapCompletableTest.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import io.reactivex.observers.TestObserver;
2828
import io.reactivex.plugins.RxJavaPlugins;
2929
import io.reactivex.subjects.*;
30-
import io.reactivex.testsupport.TestHelper;
30+
import io.reactivex.testsupport.*;
3131

3232
public class ObservableSwitchMapCompletableTest {
3333

@@ -429,4 +429,34 @@ public void scalarSource() {
429429

430430
to.assertResult();
431431
}
432+
433+
@Test
434+
public void undeliverableUponCancel() {
435+
List<Throwable> errors = TestHelper.trackPluginErrors();
436+
try {
437+
final TestObserverEx<Integer> to = new TestObserverEx<Integer>();
438+
439+
Observable.just(1)
440+
.map(new Function<Integer, Integer>() {
441+
@Override
442+
public Integer apply(Integer v) throws Throwable {
443+
to.dispose();
444+
throw new TestException();
445+
}
446+
})
447+
.switchMapCompletable(new Function<Integer, Completable>() {
448+
@Override
449+
public Completable apply(Integer v) throws Throwable {
450+
return Completable.complete().hide();
451+
}
452+
})
453+
.subscribe(to);
454+
455+
to.assertEmpty();
456+
457+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
458+
} finally {
459+
RxJavaPlugins.reset();
460+
}
461+
}
432462
}

src/test/java/io/reactivex/internal/operators/mixed/ObservableSwitchMapMaybeTest.java

+30
Original file line numberDiff line numberDiff line change
@@ -686,4 +686,34 @@ public void scalarSource() {
686686

687687
to.assertResult(2);
688688
}
689+
690+
@Test
691+
public void undeliverableUponCancel() {
692+
List<Throwable> errors = TestHelper.trackPluginErrors();
693+
try {
694+
final TestObserverEx<Integer> to = new TestObserverEx<Integer>();
695+
696+
Observable.just(1)
697+
.map(new Function<Integer, Integer>() {
698+
@Override
699+
public Integer apply(Integer v) throws Throwable {
700+
to.dispose();
701+
throw new TestException();
702+
}
703+
})
704+
.switchMapMaybe(new Function<Integer, Maybe<Integer>>() {
705+
@Override
706+
public Maybe<Integer> apply(Integer v) throws Throwable {
707+
return Maybe.just(v).hide();
708+
}
709+
})
710+
.subscribe(to);
711+
712+
to.assertEmpty();
713+
714+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
715+
} finally {
716+
RxJavaPlugins.reset();
717+
}
718+
}
689719
}

0 commit comments

Comments
 (0)