Skip to content

Commit 863a064

Browse files
Better handling of map function errors
- Stop catching the error and passing to onError and instead let the SafeObserver handle it which will then prevent subsequent onNext calls and/or unsubscribe when a failure occurs. - This also solves the OnErrorResumeNext issue fixed in ReactiveX#312 but those changes still seem valid so I'll leave them. Related to ReactiveX#216 and ReactiveX#312
1 parent 6886142 commit 863a064

File tree

1 file changed

+38
-5
lines changed

1 file changed

+38
-5
lines changed

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
2021

2122
import java.util.HashMap;
2223
import java.util.Map;
24+
import java.util.concurrent.atomic.AtomicInteger;
2325

2426
import org.junit.Before;
2527
import org.junit.Test;
@@ -119,11 +121,8 @@ public MapObserver(Observer<R> observer, Func1<T, R> func) {
119121
Func1<T, R> func;
120122

121123
public void onNext(T value) {
122-
try {
123-
observer.onNext(func.call(value));
124-
} catch (Exception ex) {
125-
observer.onError(ex);
126-
}
124+
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
125+
observer.onNext(func.call(value));
127126
}
128127

129128
public void onError(Exception ex) {
@@ -251,6 +250,40 @@ public String call(Map<String, String> map) {
251250

252251
}
253252

253+
@Test
254+
public void testMapWithSynchronousObservableContainingError() {
255+
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
256+
final AtomicInteger c1 = new AtomicInteger();
257+
final AtomicInteger c2 = new AtomicInteger();
258+
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
259+
public String call(String s) {
260+
if ("fail".equals(s))
261+
throw new RuntimeException("Forced Failure");
262+
System.out.println("BadMapper:" + s);
263+
c1.incrementAndGet();
264+
return s;
265+
}
266+
})).map(new Func1<String, String>() {
267+
public String call(String s) {
268+
System.out.println("SecondMapper:" + s);
269+
c2.incrementAndGet();
270+
return s;
271+
}
272+
});
273+
274+
m.subscribe(stringObserver);
275+
276+
verify(stringObserver, times(1)).onNext("one");
277+
verify(stringObserver, never()).onNext("two");
278+
verify(stringObserver, never()).onNext("three");
279+
verify(stringObserver, never()).onCompleted();
280+
verify(stringObserver, times(1)).onError(any(Exception.class));
281+
282+
// we should have only returned 1 value: "one"
283+
assertEquals(1, c1.get());
284+
assertEquals(1, c2.get());
285+
}
286+
254287
private Map<String, String> getMap(String prefix) {
255288
Map<String, String> m = new HashMap<String, String>();
256289
m.put("firstName", prefix + "First");

0 commit comments

Comments
 (0)