Skip to content

Map Error Handling #314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 31, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import rx.observables.BlockingObservable;
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.operators.AtomicObservableSubscription;
import rx.operators.AtomicObserver;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.operators.OperationAll;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
Expand Down Expand Up @@ -198,8 +198,8 @@ public Subscription subscribe(Observer<T> observer) {
return hook.onSubscribeReturn(this, s);
}
} else {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new AtomicObserver<T>(subscription, observer)));
SafeObservableSubscription subscription = new SafeObservableSubscription();
subscription.wrap(onSubscribeFunction.call(new SafeObserver<T>(subscription, observer)));
return hook.onSubscribeReturn(this, subscription);
}
} catch (OnErrorNotImplementedException e) {
Expand Down Expand Up @@ -263,8 +263,8 @@ public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down Expand Up @@ -4337,7 +4337,7 @@ private boolean isInternalImplementation(Object o) {
return true;
}
// prevent double-wrapping (yeah it happens)
if (o instanceof AtomicObserver)
if (o instanceof SafeObserver)
return true;
// we treat the following package as "internal" and don't wrap it
Package p = o.getClass().getPackage(); // it can be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import rx.Scheduler;
import rx.Subscription;
import rx.operators.AtomicObservableSubscription;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand All @@ -30,7 +30,7 @@
private final Func2<Scheduler, T, Subscription> underlying;
private final T state;

private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
private final SafeObservableSubscription wrapper = new SafeObservableSubscription();
private final AtomicBoolean ready = new AtomicBoolean(true);

public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import rx.Scheduler;
import rx.Subscription;
import rx.operators.AtomicObservableSubscription;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;
Expand All @@ -37,7 +37,7 @@ public static NewThreadScheduler getInstance() {

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;

Thread t = new Thread(new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.operators.AtomicObservableSubscription;
import rx.operators.AtomicObserver;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
Expand Down Expand Up @@ -404,8 +404,8 @@ public static <T> Iterable<T> toIterable(final Observable<T> source) {
* calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
return subscription.wrap(subscribe(new AtomicObserver<T>(subscription, o)));
SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(subscribe(new SafeObserver<T>(subscription, o)));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscr
private final Observable<T> sequence;
private final Func1<T, Boolean> predicate;

private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
private final SafeObservableSubscription subscription = new SafeObservableSubscription();


private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ public void stop() {
*/
private static class ObservableBasedSingleBufferCreator<T> implements BufferCreator<T> {

private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
private final Func0<Observable<BufferClosing>> bufferClosingSelector;
private final NonOverlappingBuffers<T> buffers;

Expand Down Expand Up @@ -487,7 +487,7 @@ public void stop() {
*/
private static class ObservableBasedMultiBufferCreator<T> implements BufferCreator<T> {

private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
private final SafeObservableSubscription subscription = new SafeObservableSubscription();

public ObservableBasedMultiBufferCreator(final OverlappingBuffers<T> buffers, Observable<BufferOpening> bufferOpenings, final Func1<BufferOpening, Observable<BufferClosing>> bufferClosingSelector) {
subscription.wrap(bufferOpenings.subscribe(new Action1<BufferOpening>() {
Expand Down Expand Up @@ -525,7 +525,7 @@ public void stop() {
*/
private static class TimeBasedBufferCreator<T> implements BufferCreator<T> {

private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
private final SafeObservableSubscription subscription = new SafeObservableSubscription();

public TimeBasedBufferCreator(final NonOverlappingBuffers<T> buffers, long time, TimeUnit unit, Scheduler scheduler) {
this.subscription.wrap(scheduler.schedulePeriodically(new Action0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public Subscription call(Observer<? super R> observer) {
throw new IllegalStateException("Only one Observer can subscribe to this Observable.");
}

AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() {
SafeObservableSubscription subscription = new SafeObservableSubscription(new Subscription() {
@Override
public void unsubscribe() {
stop();
Expand Down
10 changes: 5 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Obser

private static class Concat<T> implements Func1<Observer<T>, Subscription> {
private Observable<Observable<T>> sequences;
private AtomicObservableSubscription innerSubscription = null;
private SafeObservableSubscription innerSubscription = null;

public Concat(Observable<Observable<T>> sequences) {
this.sequences = sequences;
Expand All @@ -81,7 +81,7 @@ public Subscription call(final Observer<T> observer) {
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
final SafeObservableSubscription outerSubscription = new SafeObservableSubscription();

final Observer<T> reusableObserver = new Observer<T>() {
@Override
Expand Down Expand Up @@ -109,7 +109,7 @@ public void onCompleted() {
}
} else {
// Continue on to the next sequence
innerSubscription = new AtomicObservableSubscription();
innerSubscription = new SafeObservableSubscription();
innerSubscription.wrap(nextSequences.poll().subscribe(this));
}
}
Expand All @@ -122,7 +122,7 @@ public void onNext(Observable<T> nextSequence) {
synchronized (nextSequences) {
if (innerSubscription == null) {
// We are currently not subscribed to any sequence
innerSubscription = new AtomicObservableSubscription();
innerSubscription = new SafeObservableSubscription();
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
} else {
// Put this sequence at the end of the queue
Expand Down Expand Up @@ -545,7 +545,7 @@ public void testConcatUnsubscribe() {
final Observer<String> aObserver = mock(Observer.class);
@SuppressWarnings("unchecked")
final Observable<String> concat = Observable.create(concat(w1, w2));
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
final SafeObservableSubscription s1 = new SafeObservableSubscription();

try {
// Subscribe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Filter(Observable<T> that, Func1<T, Boolean> predicate) {
}

public Subscription call(final Observer<T> observer) {
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
final SafeObservableSubscription subscription = new SafeObservableSubscription();
return subscription.wrap(that.subscribe(new Observer<T>() {
public void onNext(T value) {
try {
Expand Down
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K

private final Observable<KeyValue<K, V>> source;
private final ConcurrentHashMap<K, GroupedSubject<K, V>> groupedObservables = new ConcurrentHashMap<K, GroupedSubject<K, V>>();
private final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription();
private final SafeObservableSubscription actualParentSubscription = new SafeObservableSubscription();
private final AtomicInteger numGroupSubscriptions = new AtomicInteger();
private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false);

Expand Down Expand Up @@ -178,7 +178,7 @@ static <K, T> GroupedSubject<K, T> create(final K key, final GroupBy<K, T> paren

return new GroupedSubject<K, T>(key, new Func1<Observer<T>, Subscription>() {

private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
private final SafeObservableSubscription subscription = new SafeObservableSubscription();

@Override
public Subscription call(Observer<T> observer) {
Expand Down
43 changes: 38 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
*/
package rx.operators;

import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -119,11 +121,8 @@ public MapObserver(Observer<R> observer, Func1<T, R> func) {
Func1<T, R> func;

public void onNext(T value) {
try {
observer.onNext(func.call(value));
} catch (Exception ex) {
observer.onError(ex);
}
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
observer.onNext(func.call(value));
}

public void onError(Exception ex) {
Expand Down Expand Up @@ -251,6 +250,40 @@ public String call(Map<String, String> map) {

}

@Test
public void testMapWithSynchronousObservableContainingError() {
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
final AtomicInteger c1 = new AtomicInteger();
final AtomicInteger c2 = new AtomicInteger();
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
public String call(String s) {
if ("fail".equals(s))
throw new RuntimeException("Forced Failure");
System.out.println("BadMapper:" + s);
c1.incrementAndGet();
return s;
}
})).map(new Func1<String, String>() {
public String call(String s) {
System.out.println("SecondMapper:" + s);
c2.incrementAndGet();
return s;
}
});

m.subscribe(stringObserver);

verify(stringObserver, times(1)).onNext("one");
verify(stringObserver, never()).onNext("two");
verify(stringObserver, never()).onNext("three");
verify(stringObserver, never()).onCompleted();
verify(stringObserver, times(1)).onError(any(Exception.class));

// we should have only returned 1 value: "one"
assertEquals(1, c1.get());
assertEquals(1, c2.get());
}

private Map<String, String> getMap(String prefix) {
Map<String, String> m = new HashMap<String, String>();
m.put("firstName", prefix + "First");
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public Subscription call(Observer<T> actualObserver) {
* <p>
* Bug report: https://github.com/Netflix/RxJava/issues/200
*/
AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription);
SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription);
SynchronizedObserver<T> synchronizedObserver = new SynchronizedObserver<T>(actualObserver, subscription);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public OnErrorResumeNextViaFunction(Observable<T> originalSequence, Func1<Except

public Subscription call(final Observer<T> observer) {
// AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed
final AtomicReference<AtomicObservableSubscription> subscriptionRef = new AtomicReference<AtomicObservableSubscription>(new AtomicObservableSubscription());
final AtomicReference<SafeObservableSubscription> subscriptionRef = new AtomicReference<SafeObservableSubscription>(new SafeObservableSubscription());

// subscribe to the original Observable and remember the subscription
subscriptionRef.get().wrap(new AtomicObservableSubscription(originalSequence.subscribe(new Observer<T>() {
subscriptionRef.get().wrap(new SafeObservableSubscription(originalSequence.subscribe(new Observer<T>() {
public void onNext(T value) {
// forward the successful calls
observer.onNext(value);
Expand All @@ -83,13 +83,13 @@ public void onNext(T value) {
*/
public void onError(Exception ex) {
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
AtomicObservableSubscription currentSubscription = subscriptionRef.get();
SafeObservableSubscription currentSubscription = subscriptionRef.get();
// check that we have not been unsubscribed before we can process the error
if (currentSubscription != null) {
try {
Observable<T> resumeSequence = resumeFunction.call(ex);
/* error occurred, so switch subscription to the 'resumeSequence' */
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer));
SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer));
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) {
// we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public OnErrorResumeNextViaObservable(Observable<T> originalSequence, Observable
}

public Subscription call(final Observer<T> observer) {
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
final SafeObservableSubscription subscription = new SafeObservableSubscription();

// AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed
final AtomicReference<AtomicObservableSubscription> subscriptionRef = new AtomicReference<AtomicObservableSubscription>(subscription);
final AtomicReference<SafeObservableSubscription> subscriptionRef = new AtomicReference<SafeObservableSubscription>(subscription);

// subscribe to the original Observable and remember the subscription
subscription.wrap(originalSequence.subscribe(new Observer<T>() {
Expand All @@ -83,11 +83,11 @@ public void onNext(T value) {
*/
public void onError(Exception ex) {
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
AtomicObservableSubscription currentSubscription = subscriptionRef.get();
SafeObservableSubscription currentSubscription = subscriptionRef.get();
// check that we have not been unsubscribed and not already resumed before we can process the error
if (currentSubscription == subscription) {
/* error occurred, so switch subscription to the 'resumeSequence' */
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer));
SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer));
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) {
// we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below
Expand Down
Loading