Skip to content

Commit 7c04f6b

Browse files
Merge pull request #314 from benjchristensen/map-error-handling
Map Error Handling
2 parents 69b46ce + 863a064 commit 7c04f6b

27 files changed

+109
-76
lines changed

rxjava-core/src/main/java/rx/Observable.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@
3939
import rx.observables.BlockingObservable;
4040
import rx.observables.ConnectableObservable;
4141
import rx.observables.GroupedObservable;
42-
import rx.operators.AtomicObservableSubscription;
43-
import rx.operators.AtomicObserver;
42+
import rx.operators.SafeObservableSubscription;
43+
import rx.operators.SafeObserver;
4444
import rx.operators.OperationAll;
4545
import rx.operators.OperationBuffer;
4646
import rx.operators.OperationCache;
@@ -198,8 +198,8 @@ public Subscription subscribe(Observer<T> observer) {
198198
return hook.onSubscribeReturn(this, s);
199199
}
200200
} else {
201-
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
202-
subscription.wrap(onSubscribeFunction.call(new AtomicObserver<T>(subscription, observer)));
201+
SafeObservableSubscription subscription = new SafeObservableSubscription();
202+
subscription.wrap(onSubscribeFunction.call(new SafeObserver<T>(subscription, observer)));
203203
return hook.onSubscribeReturn(this, subscription);
204204
}
205205
} catch (OnErrorNotImplementedException e) {
@@ -263,8 +263,8 @@ public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
263263
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
264264
*/
265265
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
266-
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
267-
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
266+
SafeObservableSubscription subscription = new SafeObservableSubscription();
267+
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
268268
}
269269

270270
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -4337,7 +4337,7 @@ private boolean isInternalImplementation(Object o) {
43374337
return true;
43384338
}
43394339
// prevent double-wrapping (yeah it happens)
4340-
if (o instanceof AtomicObserver)
4340+
if (o instanceof SafeObserver)
43414341
return true;
43424342
// we treat the following package as "internal" and don't wrap it
43434343
Package p = o.getClass().getPackage(); // it can be null

rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import rx.Scheduler;
2121
import rx.Subscription;
22-
import rx.operators.AtomicObservableSubscription;
22+
import rx.operators.SafeObservableSubscription;
2323
import rx.util.functions.Func1;
2424
import rx.util.functions.Func2;
2525

@@ -30,7 +30,7 @@
3030
private final Func2<Scheduler, T, Subscription> underlying;
3131
private final T state;
3232

33-
private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
33+
private final SafeObservableSubscription wrapper = new SafeObservableSubscription();
3434
private final AtomicBoolean ready = new AtomicBoolean(true);
3535

3636
public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import rx.Scheduler;
2222
import rx.Subscription;
23-
import rx.operators.AtomicObservableSubscription;
23+
import rx.operators.SafeObservableSubscription;
2424
import rx.subscriptions.CompositeSubscription;
2525
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
@@ -37,7 +37,7 @@ public static NewThreadScheduler getInstance() {
3737

3838
@Override
3939
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
40-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
40+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
4141
final Scheduler _scheduler = this;
4242

4343
Thread t = new Thread(new Runnable() {

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
import rx.Observable;
1616
import rx.Observer;
1717
import rx.Subscription;
18-
import rx.operators.AtomicObservableSubscription;
19-
import rx.operators.AtomicObserver;
18+
import rx.operators.SafeObservableSubscription;
19+
import rx.operators.SafeObserver;
2020
import rx.operators.OperationMostRecent;
2121
import rx.operators.OperationNext;
2222
import rx.operators.OperationToFuture;
@@ -404,8 +404,8 @@ public static <T> Iterable<T> toIterable(final Observable<T> source) {
404404
* calls to user code from within an operator"
405405
*/
406406
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
407-
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
408-
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
407+
SafeObservableSubscription subscription = new SafeObservableSubscription();
408+
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
409409
}
410410

411411
/**

rxjava-core/src/main/java/rx/operators/OperationAll.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscr
2828
private final Observable<T> sequence;
2929
private final Func1<T, Boolean> predicate;
3030

31-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
31+
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
3232

3333

3434
private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ public void stop() {
443443
*/
444444
private static class ObservableBasedSingleBufferCreator<T> implements BufferCreator<T> {
445445

446-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
446+
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
447447
private final Func0<Observable<BufferClosing>> bufferClosingSelector;
448448
private final NonOverlappingBuffers<T> buffers;
449449

@@ -487,7 +487,7 @@ public void stop() {
487487
*/
488488
private static class ObservableBasedMultiBufferCreator<T> implements BufferCreator<T> {
489489

490-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
490+
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
491491

492492
public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
493493
subscription.wrap(bufferOpenings.subscribe(new Action1<BufferOpening>() {
@@ -525,7 +525,7 @@ public void stop() {
525525
*/
526526
private static class TimeBasedBufferCreator<T> implements BufferCreator<T> {
527527

528-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
528+
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
529529

530530
public TimeBasedBufferCreator(final NonOverlappingBuffers<T> buffers, long time, TimeUnit unit, Scheduler scheduler) {
531531
this.subscription.wrap(scheduler.schedulePeriodically(new Action0() {

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public Subscription call(Observer<? super R> observer) {
237237
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
238238
}
239239

240-
AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
240+
SafeObservableSubscription subscription = new SafeObservableSubscription(new Subscription() {
241241
@Override
242242
public void unsubscribe() {
243243
stop();

rxjava-core/src/main/java/rx/operators/OperationConcat.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Obser
7171

7272
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
7373
private Observable<Observable<T>> sequences;
74-
private AtomicObservableSubscription innerSubscription = null;
74+
private SafeObservableSubscription innerSubscription = null;
7575

7676
public Concat(Observable<Observable<T>> sequences) {
7777
this.sequences = sequences;
@@ -81,7 +81,7 @@ public Subscription call(final Observer<T> observer) {
8181
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
8282
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
8383
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
84-
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
84+
final SafeObservableSubscription outerSubscription = new SafeObservableSubscription();
8585

8686
final Observer<T> reusableObserver = new Observer<T>() {
8787
@Override
@@ -109,7 +109,7 @@ public void onCompleted() {
109109
}
110110
} else {
111111
// Continue on to the next sequence
112-
innerSubscription = new AtomicObservableSubscription();
112+
innerSubscription = new SafeObservableSubscription();
113113
innerSubscription.wrap(nextSequences.poll().subscribe(this));
114114
}
115115
}
@@ -122,7 +122,7 @@ public void onNext(Observable<T> nextSequence) {
122122
synchronized (nextSequences) {
123123
if (innerSubscription == null) {
124124
// We are currently not subscribed to any sequence
125-
innerSubscription = new AtomicObservableSubscription();
125+
innerSubscription = new SafeObservableSubscription();
126126
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
127127
} else {
128128
// Put this sequence at the end of the queue
@@ -545,7 +545,7 @@ public void testConcatUnsubscribe() {
545545
final Observer<String> aObserver = mock(Observer.class);
546546
@SuppressWarnings("unchecked")
547547
final Observable<String> concat = Observable.create(concat(w1, w2));
548-
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
548+
final SafeObservableSubscription s1 = new SafeObservableSubscription();
549549

550550
try {
551551
// Subscribe

rxjava-core/src/main/java/rx/operators/OperationFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
4848
}
4949

5050
public Subscription call(final Observer<T> observer) {
51-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
51+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
5252
return subscription.wrap(that.subscribe(new Observer<T>() {
5353
public void onNext(T value) {
5454
try {

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K
7171

7272
private final Observable<KeyValue<K, V>> source;
7373
private final ConcurrentHashMap<K, GroupedSubject<K, V>> groupedObservables = new ConcurrentHashMap<K, GroupedSubject<K, V>>();
74-
private final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription();
74+
private final SafeObservableSubscription actualParentSubscription = new SafeObservableSubscription();
7575
private final AtomicInteger numGroupSubscriptions = new AtomicInteger();
7676
private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false);
7777

@@ -178,7 +178,7 @@ static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> paren
178178

179179
return new GroupedSubject<K, T>(key, new Func1<Observer<T>, Subscription>() {
180180

181-
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
181+
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
182182

183183
@Override
184184
public Subscription call(Observer<T> observer) {

rxjava-core/src/main/java/rx/operators/OperationMap.java

+38-5
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
2021

2122
import java.util.HashMap;
2223
import java.util.Map;
24+
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import org.junit.Before;
2527
import org.junit.Test;
@@ -119,11 +121,8 @@ public MapObserver(Observer<R> observer, Func1<T, R> func) {
119121
Func1<T, R> func;
120122

121123
public void onNext(T value) {
122-
try {
123-
observer.onNext(func.call(value));
124-
} catch (Exception ex) {
125-
observer.onError(ex);
126-
}
124+
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
125+
observer.onNext(func.call(value));
127126
}
128127

129128
public void onError(Exception ex) {
@@ -251,6 +250,40 @@ public String call(Map<String, String> map) {
251250

252251
}
253252

253+
@Test
254+
public void testMapWithSynchronousObservableContainingError() {
255+
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
256+
final AtomicInteger c1 = new AtomicInteger();
257+
final AtomicInteger c2 = new AtomicInteger();
258+
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
259+
public String call(String s) {
260+
if ("fail".equals(s))
261+
throw new RuntimeException("Forced Failure");
262+
System.out.println("BadMapper:" + s);
263+
c1.incrementAndGet();
264+
return s;
265+
}
266+
})).map(new Func1<String, String>() {
267+
public String call(String s) {
268+
System.out.println("SecondMapper:" + s);
269+
c2.incrementAndGet();
270+
return s;
271+
}
272+
});
273+
274+
m.subscribe(stringObserver);
275+
276+
verify(stringObserver, times(1)).onNext("one");
277+
verify(stringObserver, never()).onNext("two");
278+
verify(stringObserver, never()).onNext("three");
279+
verify(stringObserver, never()).onCompleted();
280+
verify(stringObserver, times(1)).onError(any(Exception.class));
281+
282+
// we should have only returned 1 value: "one"
283+
assertEquals(1, c1.get());
284+
assertEquals(1, c2.get());
285+
}
286+
254287
private Map<String, String> getMap(String prefix) {
255288
Map<String, String> m = new HashMap<String, String>();
256289
m.put("firstName", prefix + "First");

rxjava-core/src/main/java/rx/operators/OperationMerge.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public Subscription call(Observer<T> actualObserver) {
133133
* <p>
134134
* Bug report: https://github.com/Netflix/RxJava/issues/200
135135
*/
136-
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
136+
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
137137
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);
138138

139139
/**

rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public OnErrorResumeNextViaFunction(Observable<T> originalSequence, Func1<Except
6969

7070
public Subscription call(final Observer<T> observer) {
7171
// AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed
72-
final AtomicReference<AtomicObservableSubscription> subscriptionRef = new AtomicReference<AtomicObservableSubscription>(new AtomicObservableSubscription());
72+
final AtomicReference<SafeObservableSubscription> subscriptionRef = new AtomicReference<SafeObservableSubscription>(new SafeObservableSubscription());
7373

7474
// subscribe to the original Observable and remember the subscription
75-
subscriptionRef.get().wrap(new AtomicObservableSubscription(originalSequence.subscribe(new Observer<T>() {
75+
subscriptionRef.get().wrap(new SafeObservableSubscription(originalSequence.subscribe(new Observer<T>() {
7676
public void onNext(T value) {
7777
// forward the successful calls
7878
observer.onNext(value);
@@ -83,13 +83,13 @@ public void onNext(T value) {
8383
*/
8484
public void onError(Exception ex) {
8585
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
86-
AtomicObservableSubscription currentSubscription = subscriptionRef.get();
86+
SafeObservableSubscription currentSubscription = subscriptionRef.get();
8787
// check that we have not been unsubscribed before we can process the error
8888
if (currentSubscription != null) {
8989
try {
9090
Observable<T> resumeSequence = resumeFunction.call(ex);
9191
/* error occurred, so switch subscription to the 'resumeSequence' */
92-
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer));
92+
SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer));
9393
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
9494
if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) {
9595
// we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below

rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ public OnErrorResumeNextViaObservable(Observable<T> originalSequence, Observable
6565
}
6666

6767
public Subscription call(final Observer<T> observer) {
68-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
68+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
6969

7070
// AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed
71-
final AtomicReference<AtomicObservableSubscription> subscriptionRef = new AtomicReference<AtomicObservableSubscription>(subscription);
71+
final AtomicReference<SafeObservableSubscription> subscriptionRef = new AtomicReference<SafeObservableSubscription>(subscription);
7272

7373
// subscribe to the original Observable and remember the subscription
7474
subscription.wrap(originalSequence.subscribe(new Observer<T>() {
@@ -83,11 +83,11 @@ public void onNext(T value) {
8383
*/
8484
public void onError(Exception ex) {
8585
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
86-
AtomicObservableSubscription currentSubscription = subscriptionRef.get();
86+
SafeObservableSubscription currentSubscription = subscriptionRef.get();
8787
// check that we have not been unsubscribed and not already resumed before we can process the error
8888
if (currentSubscription == subscription) {
8989
/* error occurred, so switch subscription to the 'resumeSequence' */
90-
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer));
90+
SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer));
9191
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
9292
if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) {
9393
// we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below

0 commit comments

Comments
 (0)