Skip to content

Commit a644739

Browse files
authored
3.x: Fix switchMap incorrect sync-fusion & error management (#6616)
1 parent 2572fa7 commit a644739

File tree

4 files changed

+68
-2
lines changed

4 files changed

+68
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableSwitchMap.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ void drain() {
311311
if (r != Long.MAX_VALUE) {
312312
requested.addAndGet(-e);
313313
}
314-
inner.get().request(e);
314+
inner.request(e);
315315
}
316316
}
317317

@@ -395,6 +395,7 @@ public void onError(Throwable t) {
395395
if (index == p.unique && p.errors.addThrowable(t)) {
396396
if (!p.delayErrors) {
397397
p.upstream.cancel();
398+
p.done = true;
398399
}
399400
done = true;
400401
p.drain();
@@ -415,5 +416,11 @@ public void onComplete() {
415416
public void cancel() {
416417
SubscriptionHelper.cancel(this);
417418
}
419+
420+
public void request(long n) {
421+
if (fusionMode != QueueSubscription.SYNC) {
422+
get().request(n);
423+
}
424+
}
418425
}
419426
}

src/main/java/io/reactivex/internal/operators/observable/ObservableSwitchMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ void innerError(SwitchMapInnerObserver<T, R> inner, Throwable ex) {
316316
if (inner.index == unique && errors.addThrowable(ex)) {
317317
if (!delayErrors) {
318318
upstream.dispose();
319+
done = true;
319320
}
320321
inner.done = true;
321322
drain();

src/test/java/io/reactivex/internal/operators/flowable/FlowableSwitchTest.java

+28
Original file line numberDiff line numberDiff line change
@@ -1201,4 +1201,32 @@ public Publisher<Integer> apply(Integer v) throws Throwable {
12011201
RxJavaPlugins.reset();
12021202
}
12031203
}
1204+
1205+
@Test
1206+
public void switchMapFusedIterable() {
1207+
Flowable.range(1, 2)
1208+
.switchMap(new Function<Integer, Publisher<Integer>>() {
1209+
@Override
1210+
public Publisher<Integer> apply(Integer v)
1211+
throws Throwable {
1212+
return Flowable.fromIterable(Arrays.asList(v * 10));
1213+
}
1214+
})
1215+
.test()
1216+
.assertResult(10, 20);
1217+
}
1218+
1219+
@Test
1220+
public void switchMapHiddenIterable() {
1221+
Flowable.range(1, 2)
1222+
.switchMap(new Function<Integer, Publisher<Integer>>() {
1223+
@Override
1224+
public Publisher<Integer> apply(Integer v)
1225+
throws Throwable {
1226+
return Flowable.fromIterable(Arrays.asList(v * 10)).hide();
1227+
}
1228+
})
1229+
.test()
1230+
.assertResult(10, 20);
1231+
}
12041232
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717
import static org.mockito.ArgumentMatchers.*;
1818
import static org.mockito.Mockito.*;
1919

20-
import java.util.List;
20+
import java.util.*;
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.*;
2323

2424
import org.junit.*;
2525
import org.mockito.InOrder;
2626

2727
import io.reactivex.*;
28+
import io.reactivex.Observable;
29+
import io.reactivex.Observer;
2830
import io.reactivex.disposables.*;
2931
import io.reactivex.exceptions.*;
3032
import io.reactivex.functions.*;
@@ -1226,4 +1228,32 @@ public Observable<Integer> apply(Integer v) throws Throwable {
12261228
RxJavaPlugins.reset();
12271229
}
12281230
}
1231+
1232+
@Test
1233+
public void switchMapFusedIterable() {
1234+
Observable.range(1, 2)
1235+
.switchMap(new Function<Integer, Observable<Integer>>() {
1236+
@Override
1237+
public Observable<Integer> apply(Integer v)
1238+
throws Throwable {
1239+
return Observable.fromIterable(Arrays.asList(v * 10));
1240+
}
1241+
})
1242+
.test()
1243+
.assertResult(10, 20);
1244+
}
1245+
1246+
@Test
1247+
public void switchMapHiddenIterable() {
1248+
Observable.range(1, 2)
1249+
.switchMap(new Function<Integer, Observable<Integer>>() {
1250+
@Override
1251+
public Observable<Integer> apply(Integer v)
1252+
throws Throwable {
1253+
return Observable.fromIterable(Arrays.asList(v * 10)).hide();
1254+
}
1255+
})
1256+
.test()
1257+
.assertResult(10, 20);
1258+
}
12291259
}

0 commit comments

Comments
 (0)