diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 62828d4788..aac823a5cb 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -39,8 +39,8 @@ import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; -import rx.operators.AtomicObservableSubscription; -import rx.operators.AtomicObserver; +import rx.operators.SafeObservableSubscription; +import rx.operators.SafeObserver; import rx.operators.OperationAll; import rx.operators.OperationBuffer; import rx.operators.OperationCache; @@ -198,8 +198,8 @@ public Subscription subscribe(Observer observer) { return hook.onSubscribeReturn(this, s); } } else { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - subscription.wrap(onSubscribeFunction.call(new AtomicObserver(subscription, observer))); + SafeObservableSubscription subscription = new SafeObservableSubscription(); + subscription.wrap(onSubscribeFunction.call(new SafeObserver(subscription, observer))); return hook.onSubscribeReturn(this, subscription); } } catch (OnErrorNotImplementedException e) { @@ -263,8 +263,8 @@ public Subscription subscribe(Observer observer, Scheduler scheduler) { * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - return subscription.wrap(subscribe(new AtomicObserver(subscription, o))); + SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(subscribe(new SafeObserver(subscription, o))); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -4337,7 +4337,7 @@ private boolean isInternalImplementation(Object o) { return true; } // prevent double-wrapping (yeah it happens) - if (o instanceof AtomicObserver) + if (o instanceof SafeObserver) return true; // we treat the following package as "internal" and don't wrap it Package p = o.getClass().getPackage(); // it can be null diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index ba3dd0473d..c0c7402455 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -19,7 +19,7 @@ import rx.Scheduler; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; +import rx.operators.SafeObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -30,7 +30,7 @@ private final Func2 underlying; private final T state; - private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); + private final SafeObservableSubscription wrapper = new SafeObservableSubscription(); private final AtomicBoolean ready = new AtomicBoolean(true); public DiscardableAction(T state, Func2 underlying) { diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index 5f0880e855..caf4e5a51d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -20,7 +20,7 @@ import rx.Scheduler; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; +import rx.operators.SafeObservableSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; @@ -37,7 +37,7 @@ public static NewThreadScheduler getInstance() { @Override public Subscription schedule(final T state, final Func2 action) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); final Scheduler _scheduler = this; Thread t = new Thread(new Runnable() { diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 3ef4dbcebc..47d0583cfe 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -15,8 +15,8 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; -import rx.operators.AtomicObserver; +import rx.operators.SafeObservableSubscription; +import rx.operators.SafeObserver; import rx.operators.OperationMostRecent; import rx.operators.OperationNext; import rx.operators.OperationToFuture; @@ -404,8 +404,8 @@ public static Iterable toIterable(final Observable source) { * calls to user code from within an operator" */ private Subscription protectivelyWrapAndSubscribe(Observer o) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - return subscription.wrap(subscribe(new AtomicObserver(subscription, o))); + SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(subscribe(new SafeObserver(subscription, o))); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java index a010937e37..8084b7835e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAll.java @@ -28,7 +28,7 @@ private static class AllObservable implements Func1, Subscr private final Observable sequence; private final Func1 predicate; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); private AllObservable(Observable sequence, Func1 predicate) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java index 87c7c93023..f4fa22ab18 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationBuffer.java +++ b/rxjava-core/src/main/java/rx/operators/OperationBuffer.java @@ -443,7 +443,7 @@ public void stop() { */ private static class ObservableBasedSingleBufferCreator implements BufferCreator { - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); private final Func0> bufferClosingSelector; private final NonOverlappingBuffers buffers; @@ -487,7 +487,7 @@ public void stop() { */ private static class ObservableBasedMultiBufferCreator implements BufferCreator { - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); public ObservableBasedMultiBufferCreator(final OverlappingBuffers buffers, Observable bufferOpenings, final Func1> bufferClosingSelector) { subscription.wrap(bufferOpenings.subscribe(new Action1() { @@ -525,7 +525,7 @@ public void stop() { */ private static class TimeBasedBufferCreator implements BufferCreator { - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); public TimeBasedBufferCreator(final NonOverlappingBuffers buffers, long time, TimeUnit unit, Scheduler scheduler) { this.subscription.wrap(scheduler.schedulePeriodically(new Action0() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 77996bab67..dfcf8bcaf0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -237,7 +237,7 @@ public Subscription call(Observer observer) { throw new IllegalStateException("Only one Observer can subscribe to this Observable."); } - AtomicObservableSubscription subscription = new AtomicObservableSubscription(new Subscription() { + SafeObservableSubscription subscription = new SafeObservableSubscription(new Subscription() { @Override public void unsubscribe() { stop(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 577731bee2..0a2706cca9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -71,7 +71,7 @@ public static Func1, Subscription> concat(final Observable implements Func1, Subscription> { private Observable> sequences; - private AtomicObservableSubscription innerSubscription = null; + private SafeObservableSubscription innerSubscription = null; public Concat(Observable> sequences) { this.sequences = sequences; @@ -81,7 +81,7 @@ public Subscription call(final Observer observer) { final AtomicBoolean completedOrErred = new AtomicBoolean(false); final AtomicBoolean allSequencesReceived = new AtomicBoolean(false); final Queue> nextSequences = new ConcurrentLinkedQueue>(); - final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription(); + final SafeObservableSubscription outerSubscription = new SafeObservableSubscription(); final Observer reusableObserver = new Observer() { @Override @@ -109,7 +109,7 @@ public void onCompleted() { } } else { // Continue on to the next sequence - innerSubscription = new AtomicObservableSubscription(); + innerSubscription = new SafeObservableSubscription(); innerSubscription.wrap(nextSequences.poll().subscribe(this)); } } @@ -122,7 +122,7 @@ public void onNext(Observable nextSequence) { synchronized (nextSequences) { if (innerSubscription == null) { // We are currently not subscribed to any sequence - innerSubscription = new AtomicObservableSubscription(); + innerSubscription = new SafeObservableSubscription(); innerSubscription.wrap(nextSequence.subscribe(reusableObserver)); } else { // Put this sequence at the end of the queue @@ -545,7 +545,7 @@ public void testConcatUnsubscribe() { final Observer aObserver = mock(Observer.class); @SuppressWarnings("unchecked") final Observable concat = Observable.create(concat(w1, w2)); - final AtomicObservableSubscription s1 = new AtomicObservableSubscription(); + final SafeObservableSubscription s1 = new SafeObservableSubscription(); try { // Subscribe diff --git a/rxjava-core/src/main/java/rx/operators/OperationFilter.java b/rxjava-core/src/main/java/rx/operators/OperationFilter.java index 82d692618a..21dddafed7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFilter.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFilter.java @@ -48,7 +48,7 @@ public Filter(Observable that, Func1 predicate) { } public Subscription call(final Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); return subscription.wrap(that.subscribe(new Observer() { public void onNext(T value) { try { diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 210fc74fb3..fdfbb1c733 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -71,7 +71,7 @@ private static class GroupBy implements Func1> source; private final ConcurrentHashMap> groupedObservables = new ConcurrentHashMap>(); - private final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription actualParentSubscription = new SafeObservableSubscription(); private final AtomicInteger numGroupSubscriptions = new AtomicInteger(); private final AtomicBoolean unsubscribeRequested = new AtomicBoolean(false); @@ -178,7 +178,7 @@ static GroupedSubject create(final K key, final GroupBy paren return new GroupedSubject(key, new Func1, Subscription>() { - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); @Override public Subscription call(Observer observer) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index c5a2854c50..c6435ae081 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -15,11 +15,13 @@ */ package rx.operators; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Before; import org.junit.Test; @@ -119,11 +121,8 @@ public MapObserver(Observer observer, Func1 func) { Func1 func; public void onNext(T value) { - try { - observer.onNext(func.call(value)); - } catch (Exception ex) { - observer.onError(ex); - } + // let the exception be thrown if func fails as a SafeObserver wrapping this will handle it + observer.onNext(func.call(value)); } public void onError(Exception ex) { @@ -251,6 +250,40 @@ public String call(Map map) { } + @Test + public void testMapWithSynchronousObservableContainingError() { + Observable w = Observable.from("one", "fail", "two", "three", "fail"); + final AtomicInteger c1 = new AtomicInteger(); + final AtomicInteger c2 = new AtomicInteger(); + Observable m = Observable.create(map(w, new Func1() { + public String call(String s) { + if ("fail".equals(s)) + throw new RuntimeException("Forced Failure"); + System.out.println("BadMapper:" + s); + c1.incrementAndGet(); + return s; + } + })).map(new Func1() { + public String call(String s) { + System.out.println("SecondMapper:" + s); + c2.incrementAndGet(); + return s; + } + }); + + m.subscribe(stringObserver); + + verify(stringObserver, times(1)).onNext("one"); + verify(stringObserver, never()).onNext("two"); + verify(stringObserver, never()).onNext("three"); + verify(stringObserver, never()).onCompleted(); + verify(stringObserver, times(1)).onError(any(Exception.class)); + + // we should have only returned 1 value: "one" + assertEquals(1, c1.get()); + assertEquals(1, c2.get()); + } + private Map getMap(String prefix) { Map m = new HashMap(); m.put("firstName", prefix + "First"); diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index 227767930d..2710141603 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -133,7 +133,7 @@ public Subscription call(Observer actualObserver) { *

* Bug report: https://github.com/Netflix/RxJava/issues/200 */ - AtomicObservableSubscription subscription = new AtomicObservableSubscription(ourSubscription); + SafeObservableSubscription subscription = new SafeObservableSubscription(ourSubscription); SynchronizedObserver synchronizedObserver = new SynchronizedObserver(actualObserver, subscription); /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index 900715c805..c7ae0c3a3e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -69,10 +69,10 @@ public OnErrorResumeNextViaFunction(Observable originalSequence, Func1 observer) { // AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed - final AtomicReference subscriptionRef = new AtomicReference(new AtomicObservableSubscription()); + final AtomicReference subscriptionRef = new AtomicReference(new SafeObservableSubscription()); // subscribe to the original Observable and remember the subscription - subscriptionRef.get().wrap(new AtomicObservableSubscription(originalSequence.subscribe(new Observer() { + subscriptionRef.get().wrap(new SafeObservableSubscription(originalSequence.subscribe(new Observer() { public void onNext(T value) { // forward the successful calls observer.onNext(value); @@ -83,13 +83,13 @@ public void onNext(T value) { */ public void onError(Exception ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ - AtomicObservableSubscription currentSubscription = subscriptionRef.get(); + SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed before we can process the error if (currentSubscription != null) { try { Observable resumeSequence = resumeFunction.call(ex); /* error occurred, so switch subscription to the 'resumeSequence' */ - AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer)); + SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer)); /* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */ if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) { // we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index a673e3f557..e9e460e8f0 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -65,10 +65,10 @@ public OnErrorResumeNextViaObservable(Observable originalSequence, Observable } public Subscription call(final Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); // AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed - final AtomicReference subscriptionRef = new AtomicReference(subscription); + final AtomicReference subscriptionRef = new AtomicReference(subscription); // subscribe to the original Observable and remember the subscription subscription.wrap(originalSequence.subscribe(new Observer() { @@ -83,11 +83,11 @@ public void onNext(T value) { */ public void onError(Exception ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ - AtomicObservableSubscription currentSubscription = subscriptionRef.get(); + SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed and not already resumed before we can process the error if (currentSubscription == subscription) { /* error occurred, so switch subscription to the 'resumeSequence' */ - AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer)); + SafeObservableSubscription innerSubscription = new SafeObservableSubscription(resumeSequence.subscribe(observer)); /* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */ if (!subscriptionRef.compareAndSet(currentSubscription, innerSubscription)) { // we failed to set which means 'subscriptionRef' was set to NULL via the unsubscribe below diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java index cca2602db4..ea1c9601bb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java @@ -64,10 +64,10 @@ public OnErrorReturn(Observable originalSequence, Func1 resumeF } public Subscription call(final Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); // AtomicReference since we'll be accessing/modifying this across threads so we can switch it if needed - final AtomicReference subscriptionRef = new AtomicReference(subscription); + final AtomicReference subscriptionRef = new AtomicReference(subscription); // subscribe to the original Observable and remember the subscription subscription.wrap(originalSequence.subscribe(new Observer() { @@ -81,7 +81,7 @@ public void onNext(T value) { */ public void onError(Exception ex) { /* remember what the current subscription is so we can determine if someone unsubscribes concurrently */ - AtomicObservableSubscription currentSubscription = subscriptionRef.get(); + SafeObservableSubscription currentSubscription = subscriptionRef.get(); // check that we have not been unsubscribed before we can process the error if (currentSubscription != null) { try { diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index c6ba3d621d..3e2e3fdfac 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -69,7 +69,7 @@ public Switch(Observable> sequences) { @Override public Subscription call(Observer observer) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + SafeObservableSubscription subscription = new SafeObservableSubscription(); subscription.wrap(sequences.subscribe(new SwitchObserver(observer, subscription))); return subscription; } @@ -78,10 +78,10 @@ public Subscription call(Observer observer) { private static class SwitchObserver implements Observer> { private final Observer observer; - private final AtomicObservableSubscription parent; + private final SafeObservableSubscription parent; private final AtomicReference subsequence = new AtomicReference(); - public SwitchObserver(Observer observer, AtomicObservableSubscription parent) { + public SwitchObserver(Observer observer, SafeObservableSubscription parent) { this.observer = observer; this.parent = parent; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index cbd547d105..b6998eaa1d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -71,7 +71,7 @@ public Synchronize(Observable innerObservable) { private SynchronizedObserver atomicObserver; public Subscription call(Observer observer) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + SafeObservableSubscription subscription = new SafeObservableSubscription(); atomicObserver = new SynchronizedObserver(observer, subscription); return subscription.wrap(innerObservable.subscribe(atomicObserver)); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index e5e8dd5c65..43f51239bc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -77,7 +77,7 @@ public Subscription call(Observer observer) { private static class Take implements Func1, Subscription> { private final Observable items; private final int num; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); private Take(Observable items, int num) { this.items = items; diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index 75c322b1f0..569d4cc603 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -51,7 +51,7 @@ public Subscription call(Observer observer) { private static class TakeLast implements Func1, Subscription> { private final int count; private final Observable items; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); TakeLast(final Observable items, final int count) { this.count = count; diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 00d505dae5..273e6cac67 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -95,7 +95,7 @@ public Boolean call(T input, Integer index) { private static class TakeWhile implements Func1, Subscription> { private final Observable items; private final Func2 predicate; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); private TakeWhile(Observable items, Func2 predicate) { this.items = items; @@ -116,7 +116,7 @@ public ItemObserver(Observer observer) { // Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior // needs "isFinished" logic to not send duplicated events // The 'testTakeWhile1' and 'testTakeWhile2' tests fail without this. - this.observer = new AtomicObserver(subscription, observer); + this.observer = new SafeObserver(subscription, observer); } @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index b36c59253f..a8989d4d90 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -94,7 +94,7 @@ public static Func1, Subscription> zip(Collection> private static class ZipObserver implements Observer { final Observable w; final Aggregator a; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + private final SafeObservableSubscription subscription = new SafeObservableSubscription(); private final AtomicBoolean subscribed = new AtomicBoolean(false); public ZipObserver(Aggregator a, Observable w) { @@ -246,7 +246,7 @@ void next(ZipObserver w, Object arg) { @Override public Subscription call(Observer observer) { if (started.compareAndSet(false, true)) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + SafeObservableSubscription subscription = new SafeObservableSubscription(); this.observer = new SynchronizedObserver(observer, subscription); /* start the Observers */ for (ZipObserver rw : observers) { diff --git a/rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java similarity index 89% rename from rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java rename to rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java index 11bcc5fabc..b2419b27f1 100644 --- a/rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObservableSubscription.java @@ -34,7 +34,7 @@ *

  • handle both synchronous and asynchronous subscribe() execution flows
  • * */ -public final class AtomicObservableSubscription implements Subscription { +public final class SafeObservableSubscription implements Subscription { private static final Subscription UNSUBSCRIBED = new Subscription() { @@ -45,10 +45,10 @@ public void unsubscribe() }; private final AtomicReference actualSubscription = new AtomicReference(); - public AtomicObservableSubscription() { + public SafeObservableSubscription() { } - public AtomicObservableSubscription(Subscription actualSubscription) { + public SafeObservableSubscription(Subscription actualSubscription) { this.actualSubscription.set(actualSubscription); } @@ -59,7 +59,7 @@ public AtomicObservableSubscription(Subscription actualSubscription) { * @throws IllegalStateException * if trying to set more than once (or use this method after setting via constructor) */ - public AtomicObservableSubscription wrap(Subscription actualSubscription) { + public SafeObservableSubscription wrap(Subscription actualSubscription) { if (!this.actualSubscription.compareAndSet(null, actualSubscription)) { if (this.actualSubscription.get() == UNSUBSCRIBED) { actualSubscription.unsubscribe(); @@ -87,7 +87,7 @@ public boolean isUnsubscribed() { public static class UnitTest { @Test public void testWrapAfterUnsubscribe() { - AtomicObservableSubscription atomicObservableSubscription = new AtomicObservableSubscription(); + SafeObservableSubscription atomicObservableSubscription = new SafeObservableSubscription(); atomicObservableSubscription.unsubscribe(); Subscription innerSubscription = mock(Subscription.class); atomicObservableSubscription.wrap(innerSubscription); diff --git a/rxjava-core/src/main/java/rx/operators/AtomicObserver.java b/rxjava-core/src/main/java/rx/operators/SafeObserver.java similarity index 95% rename from rxjava-core/src/main/java/rx/operators/AtomicObserver.java rename to rxjava-core/src/main/java/rx/operators/SafeObserver.java index 673ddc4c62..2d6055a213 100644 --- a/rxjava-core/src/main/java/rx/operators/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SafeObserver.java @@ -40,13 +40,13 @@ * * @param */ -public class AtomicObserver implements Observer { +public class SafeObserver implements Observer { private final Observer actual; private final AtomicBoolean isFinished = new AtomicBoolean(false); - private final AtomicObservableSubscription subscription; + private final SafeObservableSubscription subscription; - public AtomicObserver(AtomicObservableSubscription subscription, Observer actual) { + public SafeObserver(SafeObservableSubscription subscription, Observer actual) { this.subscription = subscription; this.actual = actual; } diff --git a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index ea647da807..b03073acb9 100644 --- a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -65,11 +65,11 @@ public final class SynchronizedObserver implements Observer { */ private final Observer observer; - private final AtomicObservableSubscription subscription; + private final SafeObservableSubscription subscription; private volatile boolean finishRequested = false; private volatile boolean finished = false; - public SynchronizedObserver(Observer Observer, AtomicObservableSubscription subscription) { + public SynchronizedObserver(Observer Observer, SafeObservableSubscription subscription) { this.observer = Observer; this.subscription = subscription; } @@ -136,7 +136,7 @@ public void testSingleThreadedBasic() { TestSingleThreadedObservable onSubscribe = new TestSingleThreadedObservable(s, "one", "two", "three"); Observable w = Observable.create(onSubscribe); - AtomicObservableSubscription as = new AtomicObservableSubscription(s); + SafeObservableSubscription as = new SafeObservableSubscription(s); SynchronizedObserver aw = new SynchronizedObserver(aObserver, as); w.subscribe(aw); @@ -158,7 +158,7 @@ public void testMultiThreadedBasic() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three"); Observable w = Observable.create(onSubscribe); - AtomicObservableSubscription as = new AtomicObservableSubscription(s); + SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); @@ -184,7 +184,7 @@ public void testMultiThreadedWithNPE() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null); Observable w = Observable.create(onSubscribe); - AtomicObservableSubscription as = new AtomicObservableSubscription(s); + SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); @@ -216,7 +216,7 @@ public void testMultiThreadedWithNPEinMiddle() { TestMultiThreadedObservable onSubscribe = new TestMultiThreadedObservable(s, "one", "two", "three", null, "four", "five", "six", "seven", "eight", "nine"); Observable w = Observable.create(onSubscribe); - AtomicObservableSubscription as = new AtomicObservableSubscription(s); + SafeObservableSubscription as = new SafeObservableSubscription(s); BusyObserver busyObserver = new BusyObserver(); SynchronizedObserver aw = new SynchronizedObserver(busyObserver, as); @@ -252,7 +252,7 @@ public void runConcurrencyTest() { ExecutorService tp = Executors.newFixedThreadPool(20); try { TestConcurrencyObserver tw = new TestConcurrencyObserver(); - AtomicObservableSubscription s = new AtomicObservableSubscription(); + SafeObservableSubscription s = new SafeObservableSubscription(); SynchronizedObserver w = new SynchronizedObserver(tw, s); Future f1 = tp.submit(new OnNextThread(w, 12000)); diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 858241e919..f4978831c2 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -26,7 +26,7 @@ import rx.Observer; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; +import rx.operators.SafeObservableSubscription; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -73,7 +73,7 @@ public static AsyncSubject create() { Func1, Subscription> onSubscribe = new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); subscription.wrap(new Subscription() { @Override diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 1dcaa0e6c5..a7c369e557 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -26,7 +26,7 @@ import rx.Observer; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; +import rx.operators.SafeObservableSubscription; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -77,7 +77,7 @@ public static BehaviorSubject createWithDefaultValue(T defaultValue) { Func1, Subscription> onSubscribe = new Func1, Subscription>() { @Override public Subscription call(Observer observer) { - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); subscription.wrap(new Subscription() { @Override diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 56fd2e75bf..003d839f50 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -37,7 +37,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.operators.AtomicObservableSubscription; +import rx.operators.SafeObservableSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func0; @@ -77,7 +77,7 @@ public Subscription call(Observer observer) { Subscription s = checkTerminalState(observer); if(s != null) return s; - final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + final SafeObservableSubscription subscription = new SafeObservableSubscription(); subscription.wrap(new Subscription() { @Override