Skip to content

Commit 4190d8e

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Hardened both cache() and replay() against child-thrown exceptions.
1 parent 96786bb commit 4190d8e

File tree

5 files changed

+384
-45
lines changed

5 files changed

+384
-45
lines changed

src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.*;
2222
import rx.functions.Action1;
2323
import rx.observables.ConnectableObservable;
24+
import rx.observers.Subscribers;
2425

2526
/**
2627
* Wraps a ConnectableObservable and calls its connect() method once
@@ -47,7 +48,7 @@ public OnSubscribeAutoConnect(ConnectableObservable<? extends T> source,
4748
}
4849
@Override
4950
public void call(Subscriber<? super T> child) {
50-
source.unsafeSubscribe(child);
51+
source.unsafeSubscribe(Subscribers.wrap(child));
5152
if (clients.incrementAndGet() == numberOfSubscribers) {
5253
source.connect(connection);
5354
}

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.*;
2323
import rx.Observable;
2424
import rx.exceptions.Exceptions;
25+
import rx.exceptions.OnErrorThrowable;
2526
import rx.functions.*;
2627
import rx.observables.ConnectableObservable;
2728
import rx.schedulers.Timestamped;
@@ -813,7 +814,16 @@ public void replay(InnerProducer<T> output) {
813814

814815
while (r != 0L && destIndex < sourceIndex) {
815816
Object o = get(destIndex);
816-
if (nl.accept(output.child, o)) {
817+
try {
818+
if (nl.accept(output.child, o)) {
819+
return;
820+
}
821+
} catch (Throwable err) {
822+
Exceptions.throwIfFatal(err);
823+
output.unsubscribe();
824+
if (!nl.isError(o) && !nl.isCompleted(o)) {
825+
output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
826+
}
817827
return;
818828
}
819829
if (output.isUnsubscribed()) {
@@ -969,8 +979,18 @@ public final void replay(InnerProducer<T> output) {
969979
Node v = node.get();
970980
if (v != null) {
971981
Object o = leaveTransform(v.value);
972-
if (nl.accept(output.child, o)) {
982+
try {
983+
if (nl.accept(output.child, o)) {
984+
output.index = null;
985+
return;
986+
}
987+
} catch (Throwable err) {
973988
output.index = null;
989+
Exceptions.throwIfFatal(err);
990+
output.unsubscribe();
991+
if (!nl.isError(o) && !nl.isCompleted(o)) {
992+
output.child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
993+
}
974994
return;
975995
}
976996
e++;

src/main/java/rx/internal/util/CachedObservable.java

Lines changed: 62 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import java.util.concurrent.atomic.*;
1818

1919
import rx.*;
20+
import rx.exceptions.Exceptions;
21+
import rx.exceptions.OnErrorThrowable;
2022
import rx.internal.operators.NotificationLite;
2123
import rx.subscriptions.SerialSubscription;
2224

@@ -170,48 +172,51 @@ public void removeProducer(ReplayProducer<T> p) {
170172
* Make sure this is called only once.
171173
*/
172174
public void connect() {
173-
connection.set(source.subscribe(this));
175+
Subscriber<T> subscriber = new Subscriber<T>() {
176+
@Override
177+
public void onNext(T t) {
178+
CacheState.this.onNext(t);
179+
}
180+
@Override
181+
public void onError(Throwable e) {
182+
CacheState.this.onError(e);
183+
}
184+
@Override
185+
public void onCompleted() {
186+
CacheState.this.onCompleted();
187+
}
188+
};
189+
connection.set(subscriber);
190+
source.unsafeSubscribe(subscriber);
174191
isConnected = true;
175192
}
176193
@Override
177194
public void onNext(T t) {
178-
Object o = nl.next(t);
179-
synchronized (this) {
180-
if (!sourceDone) {
181-
add(o);
182-
} else {
183-
return;
184-
}
195+
if (!sourceDone) {
196+
Object o = nl.next(t);
197+
add(o);
198+
dispatch();
185199
}
186-
dispatch();
187200
}
188201
@Override
189202
public void onError(Throwable e) {
190-
Object o = nl.error(e);
191-
synchronized (this) {
192-
if (!sourceDone) {
193-
sourceDone = true;
194-
add(o);
195-
} else {
196-
return;
197-
}
203+
if (!sourceDone) {
204+
sourceDone = true;
205+
Object o = nl.error(e);
206+
add(o);
207+
connection.unsubscribe();
208+
dispatch();
198209
}
199-
connection.unsubscribe();
200-
dispatch();
201210
}
202211
@Override
203212
public void onCompleted() {
204-
Object o = nl.completed();
205-
synchronized (this) {
206-
if (!sourceDone) {
207-
sourceDone = true;
208-
add(o);
209-
} else {
210-
return;
211-
}
213+
if (!sourceDone) {
214+
sourceDone = true;
215+
Object o = nl.completed();
216+
add(o);
217+
connection.unsubscribe();
218+
dispatch();
212219
}
213-
connection.unsubscribe();
214-
dispatch();
215220
}
216221
/**
217222
* Signals all known children there is work to do.
@@ -352,6 +357,12 @@ public void replay() {
352357
for (;;) {
353358

354359
long r = get();
360+
361+
if (r < 0L) {
362+
skipFinal = true;
363+
return;
364+
}
365+
355366
// read the size, if it is non-zero, we can safely read the head and
356367
// read values up to the given absolute index
357368
int s = state.size();
@@ -385,17 +396,31 @@ public void replay() {
385396
if (r > 0) {
386397
int valuesProduced = 0;
387398

388-
while (j < s && r > 0 && !child.isUnsubscribed()) {
399+
while (j < s && r > 0) {
400+
if (child.isUnsubscribed()) {
401+
skipFinal = true;
402+
return;
403+
}
389404
if (k == n) {
390405
b = (Object[])b[n];
391406
k = 0;
392407
}
393408
Object o = b[k];
394409

395-
if (nl.accept(child, o)) {
410+
try {
411+
if (nl.accept(child, o)) {
412+
skipFinal = true;
413+
unsubscribe();
414+
return;
415+
}
416+
} catch (Throwable err) {
417+
Exceptions.throwIfFatal(err);
396418
skipFinal = true;
397419
unsubscribe();
398-
return;
420+
if (!nl.isError(o) && !nl.isCompleted(o)) {
421+
child.onError(OnErrorThrowable.addValueAsLastCause(err, nl.getValue(o)));
422+
return;
423+
}
399424
}
400425

401426
k++;
@@ -404,6 +429,11 @@ public void replay() {
404429
valuesProduced++;
405430
}
406431

432+
if (child.isUnsubscribed()) {
433+
skipFinal = true;
434+
return;
435+
}
436+
407437
index = j;
408438
currentIndexInBuffer = k;
409439
currentBuffer = b;

0 commit comments

Comments
 (0)