Skip to content

Commit 2572fa7

Browse files
authored
3.x: Fix many operators swallowing undeliverable exceptions (#6612)
* 3.x: Fix many operators swallowing undeliverable exceptions * Add Completable.merge variants too
1 parent 28e24dc commit 2572fa7

File tree

101 files changed

+1853
-776
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+1853
-776
lines changed

src/main/java/io/reactivex/internal/operators/completable/CompletableMerge.java

+7-16
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static final class CompletableMergeSubscriber
7070
public void dispose() {
7171
upstream.cancel();
7272
set.dispose();
73+
error.tryTerminateAndReport();
7374
}
7475

7576
@Override
@@ -106,15 +107,15 @@ public void onError(Throwable t) {
106107

107108
if (error.addThrowable(t)) {
108109
if (getAndSet(0) > 0) {
109-
downstream.onError(error.terminate());
110+
error.tryTerminateConsumer(downstream);
110111
}
111112
} else {
112113
RxJavaPlugins.onError(t);
113114
}
114115
} else {
115116
if (error.addThrowable(t)) {
116117
if (decrementAndGet() == 0) {
117-
downstream.onError(error.terminate());
118+
error.tryTerminateConsumer(downstream);
118119
}
119120
} else {
120121
RxJavaPlugins.onError(t);
@@ -125,12 +126,7 @@ public void onError(Throwable t) {
125126
@Override
126127
public void onComplete() {
127128
if (decrementAndGet() == 0) {
128-
Throwable ex = error.get();
129-
if (ex != null) {
130-
downstream.onError(error.terminate());
131-
} else {
132-
downstream.onComplete();
133-
}
129+
error.tryTerminateConsumer(downstream);
134130
}
135131
}
136132

@@ -142,15 +138,15 @@ void innerError(MergeInnerObserver inner, Throwable t) {
142138

143139
if (error.addThrowable(t)) {
144140
if (getAndSet(0) > 0) {
145-
downstream.onError(error.terminate());
141+
error.tryTerminateConsumer(downstream);
146142
}
147143
} else {
148144
RxJavaPlugins.onError(t);
149145
}
150146
} else {
151147
if (error.addThrowable(t)) {
152148
if (decrementAndGet() == 0) {
153-
downstream.onError(error.terminate());
149+
error.tryTerminateConsumer(downstream);
154150
} else {
155151
if (maxConcurrency != Integer.MAX_VALUE) {
156152
upstream.request(1);
@@ -165,12 +161,7 @@ void innerError(MergeInnerObserver inner, Throwable t) {
165161
void innerComplete(MergeInnerObserver inner) {
166162
set.delete(inner);
167163
if (decrementAndGet() == 0) {
168-
Throwable ex = error.get();
169-
if (ex != null) {
170-
downstream.onError(ex);
171-
} else {
172-
downstream.onComplete();
173-
}
164+
error.tryTerminateConsumer(downstream);
174165
} else {
175166
if (maxConcurrency != Integer.MAX_VALUE) {
176167
upstream.request(1);

src/main/java/io/reactivex/internal/operators/completable/CompletableMergeArray.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void subscribeActual(final CompletableObserver observer) {
3232
final AtomicBoolean once = new AtomicBoolean();
3333

3434
InnerCompletableObserver shared = new InnerCompletableObserver(observer, once, set, sources.length + 1);
35-
observer.onSubscribe(set);
35+
observer.onSubscribe(shared);
3636

3737
for (CompletableSource c : sources) {
3838
if (set.isDisposed()) {
@@ -52,7 +52,7 @@ public void subscribeActual(final CompletableObserver observer) {
5252
shared.onComplete();
5353
}
5454

55-
static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver {
55+
static final class InnerCompletableObserver extends AtomicInteger implements CompletableObserver, Disposable {
5656
private static final long serialVersionUID = -8360547806504310570L;
5757

5858
final CompletableObserver downstream;
@@ -91,5 +91,16 @@ public void onComplete() {
9191
}
9292
}
9393
}
94+
95+
@Override
96+
public void dispose() {
97+
set.dispose();
98+
once.set(true);
99+
}
100+
101+
@Override
102+
public boolean isDisposed() {
103+
return set.isDisposed();
104+
}
94105
}
95106
}

src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorArray.java

+20-12
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public void subscribeActual(final CompletableObserver observer) {
3434
final AtomicInteger wip = new AtomicInteger(sources.length + 1);
3535

3636
final AtomicThrowable error = new AtomicThrowable();
37+
set.add(new TryTerminateAndReportDisposable(error));
3738

3839
observer.onSubscribe(set);
3940

@@ -53,12 +54,24 @@ public void subscribeActual(final CompletableObserver observer) {
5354
}
5455

5556
if (wip.decrementAndGet() == 0) {
56-
Throwable ex = error.terminate();
57-
if (ex == null) {
58-
observer.onComplete();
59-
} else {
60-
observer.onError(ex);
61-
}
57+
error.tryTerminateConsumer(observer);
58+
}
59+
}
60+
61+
static final class TryTerminateAndReportDisposable implements Disposable {
62+
final AtomicThrowable errors;
63+
TryTerminateAndReportDisposable(AtomicThrowable errors) {
64+
this.errors = errors;
65+
}
66+
67+
@Override
68+
public void dispose() {
69+
errors.tryTerminateAndReport();
70+
}
71+
72+
@Override
73+
public boolean isDisposed() {
74+
return errors.isTerminated();
6275
}
6376
}
6477

@@ -98,12 +111,7 @@ public void onComplete() {
98111

99112
void tryTerminate() {
100113
if (wip.decrementAndGet() == 0) {
101-
Throwable ex = error.terminate();
102-
if (ex == null) {
103-
downstream.onComplete();
104-
} else {
105-
downstream.onError(ex);
106-
}
114+
error.tryTerminateConsumer(downstream);
107115
}
108116
}
109117
}

src/main/java/io/reactivex/internal/operators/completable/CompletableMergeDelayErrorIterable.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.disposables.CompositeDisposable;
2121
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.internal.functions.ObjectHelper;
23-
import io.reactivex.internal.operators.completable.CompletableMergeDelayErrorArray.MergeInnerCompletableObserver;
23+
import io.reactivex.internal.operators.completable.CompletableMergeDelayErrorArray.*;
2424
import io.reactivex.internal.util.AtomicThrowable;
2525

2626
public final class CompletableMergeDelayErrorIterable extends Completable {
@@ -50,6 +50,7 @@ public void subscribeActual(final CompletableObserver observer) {
5050
final AtomicInteger wip = new AtomicInteger(1);
5151

5252
final AtomicThrowable error = new AtomicThrowable();
53+
set.add(new TryTerminateAndReportDisposable(error));
5354

5455
for (;;) {
5556
if (set.isDisposed()) {
@@ -93,12 +94,7 @@ public void subscribeActual(final CompletableObserver observer) {
9394
}
9495

9596
if (wip.decrementAndGet() == 0) {
96-
Throwable ex = error.terminate();
97-
if (ex == null) {
98-
observer.onComplete();
99-
} else {
100-
observer.onError(ex);
101-
}
97+
error.tryTerminateConsumer(observer);
10298
}
10399
}
104100
}

src/main/java/io/reactivex/internal/operators/completable/CompletableMergeIterable.java

+16-5
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,11 @@ public CompletableMergeIterable(Iterable<? extends CompletableSource> sources) {
3232
@Override
3333
public void subscribeActual(final CompletableObserver observer) {
3434
final CompositeDisposable set = new CompositeDisposable();
35+
final AtomicInteger wip = new AtomicInteger(1);
36+
37+
MergeCompletableObserver shared = new MergeCompletableObserver(observer, set, wip);
3538

36-
observer.onSubscribe(set);
39+
observer.onSubscribe(shared);
3740

3841
Iterator<? extends CompletableSource> iterator;
3942

@@ -45,9 +48,6 @@ public void subscribeActual(final CompletableObserver observer) {
4548
return;
4649
}
4750

48-
final AtomicInteger wip = new AtomicInteger(1);
49-
50-
MergeCompletableObserver shared = new MergeCompletableObserver(observer, set, wip);
5151
for (;;) {
5252
if (set.isDisposed()) {
5353
return;
@@ -94,7 +94,7 @@ public void subscribeActual(final CompletableObserver observer) {
9494
shared.onComplete();
9595
}
9696

97-
static final class MergeCompletableObserver extends AtomicBoolean implements CompletableObserver {
97+
static final class MergeCompletableObserver extends AtomicBoolean implements CompletableObserver, Disposable {
9898

9999
private static final long serialVersionUID = -7730517613164279224L;
100100

@@ -133,5 +133,16 @@ public void onComplete() {
133133
}
134134
}
135135
}
136+
137+
@Override
138+
public void dispose() {
139+
set.dispose();
140+
set(true);
141+
}
142+
143+
@Override
144+
public boolean isDisposed() {
145+
return set.isDisposed();
146+
}
136147
}
137148
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

+20-18
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void onError(Throwable t) {
200200
inner.cancel();
201201

202202
if (getAndIncrement() == 0) {
203-
downstream.onError(errors.terminate());
203+
errors.tryTerminateConsumer(downstream);
204204
}
205205
} else {
206206
RxJavaPlugins.onError(t);
@@ -214,7 +214,7 @@ public void innerNext(R value) {
214214
if (compareAndSet(1, 0)) {
215215
return;
216216
}
217-
downstream.onError(errors.terminate());
217+
errors.tryTerminateConsumer(downstream);
218218
}
219219
}
220220

@@ -224,7 +224,7 @@ public void innerError(Throwable e) {
224224
upstream.cancel();
225225

226226
if (getAndIncrement() == 0) {
227-
downstream.onError(errors.terminate());
227+
errors.tryTerminateConsumer(downstream);
228228
}
229229
} else {
230230
RxJavaPlugins.onError(e);
@@ -243,6 +243,8 @@ public void cancel() {
243243

244244
inner.cancel();
245245
upstream.cancel();
246+
247+
errors.tryTerminateAndReport();
246248
}
247249
}
248250

@@ -265,7 +267,7 @@ void drain() {
265267
Exceptions.throwIfFatal(e);
266268
upstream.cancel();
267269
errors.addThrowable(e);
268-
downstream.onError(errors.terminate());
270+
errors.tryTerminateConsumer(downstream);
269271
return;
270272
}
271273

@@ -286,7 +288,7 @@ void drain() {
286288

287289
upstream.cancel();
288290
errors.addThrowable(e);
289-
downstream.onError(errors.terminate());
291+
errors.tryTerminateConsumer(downstream);
290292
return;
291293
}
292294

@@ -312,7 +314,7 @@ void drain() {
312314
Exceptions.throwIfFatal(e);
313315
upstream.cancel();
314316
errors.addThrowable(e);
315-
downstream.onError(errors.terminate());
317+
errors.tryTerminateConsumer(downstream);
316318
return;
317319
}
318320

@@ -324,7 +326,7 @@ void drain() {
324326
if (get() == 0 && compareAndSet(0, 1)) {
325327
downstream.onNext(vr);
326328
if (!compareAndSet(1, 0)) {
327-
downstream.onError(errors.terminate());
329+
errors.tryTerminateConsumer(downstream);
328330
return;
329331
}
330332
}
@@ -437,6 +439,8 @@ public void cancel() {
437439

438440
inner.cancel();
439441
upstream.cancel();
442+
443+
errors.tryTerminateAndReport();
440444
}
441445
}
442446

@@ -456,7 +460,7 @@ void drain() {
456460
if (d && !veryEnd) {
457461
Throwable ex = errors.get();
458462
if (ex != null) {
459-
downstream.onError(errors.terminate());
463+
errors.tryTerminateConsumer(downstream);
460464
return;
461465
}
462466
}
@@ -468,20 +472,18 @@ void drain() {
468472
} catch (Throwable e) {
469473
Exceptions.throwIfFatal(e);
470474
upstream.cancel();
471-
errors.addThrowable(e);
472-
downstream.onError(errors.terminate());
475+
if (errors.addThrowable(e)) {
476+
errors.tryTerminateConsumer(downstream);
477+
} else {
478+
RxJavaPlugins.onError(e);
479+
}
473480
return;
474481
}
475482

476483
boolean empty = v == null;
477484

478485
if (d && empty) {
479-
Throwable ex = errors.terminate();
480-
if (ex != null) {
481-
downstream.onError(ex);
482-
} else {
483-
downstream.onComplete();
484-
}
486+
errors.tryTerminateConsumer(downstream);
485487
return;
486488
}
487489

@@ -495,7 +497,7 @@ void drain() {
495497

496498
upstream.cancel();
497499
errors.addThrowable(e);
498-
downstream.onError(errors.terminate());
500+
errors.tryTerminateConsumer(downstream);
499501
return;
500502
}
501503

@@ -522,7 +524,7 @@ void drain() {
522524
errors.addThrowable(e);
523525
if (!veryEnd) {
524526
upstream.cancel();
525-
downstream.onError(errors.terminate());
527+
errors.tryTerminateConsumer(downstream);
526528
return;
527529
}
528530
vr = null;

0 commit comments

Comments
 (0)