diff --git a/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java new file mode 100644 index 0000000000..62c015c027 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/AbstractSubject.java @@ -0,0 +1,152 @@ +package rx.subjects; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +import rx.Notification; +import rx.Observer; +import rx.Subscription; +import rx.operators.SafeObservableSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action2; + +public abstract class AbstractSubject extends Subject { + + protected AbstractSubject(rx.Observable.OnSubscribeFunc onSubscribe) { + super(onSubscribe); + } + + protected static class SubjectState { + protected final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + protected final AtomicReference> currentValue = new AtomicReference>(); + protected final AtomicBoolean completed = new AtomicBoolean(); + protected final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock(); + } + + protected static OnSubscribeFunc getOnSubscribeFunc(final SubjectState state, final Action2, Observer> onEach) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + /* + * Subscription needs to be synchronized with terminal states to ensure + * race conditions are handled. When subscribing we must make sure + * onComplete/onError is correctly emitted to all observers, even if it + * comes in while the onComplete/onError is being propagated. + */ + state.SUBSCRIPTION_LOCK.lock(); + try { + if (state.completed.get()) { + emitNotification(state.currentValue.get(), observer); + if (onEach != null) { + onEach.call(state, observer); + } + return Subscriptions.empty(); + } else { + // the subject is not completed so we subscribe + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + + subscription.wrap(new Subscription() { + @Override + public void unsubscribe() { + // on unsubscribe remove it from the map of outbound observers to notify + state.observers.remove(subscription); + } + }); + + // on subscribe add it to the map of outbound observers to notify + state.observers.put(subscription, observer); + + // invoke onSubscribe logic + if (onEach != null) { + onEach.call(state, observer); + } + + return subscription; + } + } finally { + state.SUBSCRIPTION_LOCK.unlock(); + } + + } + + }; + } + + protected static void emitNotification(Notification value, Observer observer) { + // if null that means onNext was never invoked (no Notification set) + if (value != null) { + if (value.isOnNext()) { + observer.onNext(value.getValue()); + } else if (value.isOnError()) { + observer.onError(value.getThrowable()); + } else if (value.isOnCompleted()) { + observer.onCompleted(); + } + } + } + + /** + * Emit the current value. + * + * @param state + */ + protected static void emitNotification(final SubjectState state, final Action2, Observer> onEach) { + for (Subscription s : snapshotOfObservers(state)) { + Observer o = state.observers.get(s); + // emit notifications to this observer + emitNotification(state.currentValue.get(), o); + // onEach action if applicable + if (onEach != null) { + onEach.call(state, o); + } + } + } + + /** + * Emit the current value to all observers and remove their subscription. + * + * @param state + */ + protected void emitNotificationAndTerminate(final SubjectState state, final Action2, Observer> onEach) { + /* + * We can not allow new subscribers to be added while we execute the terminal state. + */ + state.SUBSCRIPTION_LOCK.lock(); + try { + if (state.completed.compareAndSet(false, true)) { + for (Subscription s : snapshotOfObservers(state)) { + Observer o = state.observers.get(s); + // emit notifications to this observer + emitNotification(state.currentValue.get(), o); + // onEach action if applicable + if (onEach != null) { + onEach.call(state, o); + } + + // remove the subscription as it is completed + state.observers.remove(s); + } + } + } finally { + state.SUBSCRIPTION_LOCK.unlock(); + } + } + + /** + * Current snapshot of 'state.observers.keySet()' so that concurrent modifications aren't included. + * + * This makes it behave deterministically in a single-threaded execution when nesting subscribes. + * + * In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead + * of possibly being included in the current onNext iteration. + * + * @return List> + */ + private static Collection snapshotOfObservers(final SubjectState state) { + return new ArrayList(state.observers.keySet()); + } +} diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 3f7f5d0a0f..a9c18be805 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -15,16 +15,9 @@ */ package rx.subjects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - import rx.Notification; import rx.Observer; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; -import rx.subscriptions.Subscriptions; +import rx.util.functions.Action2; /** * Subject that publishes only the last event to each {@link Observer} that has subscribed when the @@ -55,7 +48,7 @@ * * @param */ -public class AsyncSubject extends Subject { +public class AsyncSubject extends AbstractSubject { /** * Create a new AsyncSubject @@ -63,115 +56,60 @@ public class AsyncSubject extends Subject { * @return a new AsyncSubject */ public static AsyncSubject create() { - final AsyncSubjectState state = new AsyncSubjectState(); + final SubjectState state = new SubjectState(); + OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { - OnSubscribeFunc onSubscribe = new OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - /* - * Subscription needs to be synchronized with terminal states to ensure - * race conditions are handled. When subscribing we must make sure - * onComplete/onError is correctly emitted to all observers, even if it - * comes in while the onComplete/onError is being propagated. - */ - state.SUBSCRIPTION_LOCK.lock(); - try { - if (state.completed.get()) { - emitNotificationToObserver(state, observer); - return Subscriptions.empty(); - } else { - // the subject is not completed so we subscribe - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - subscription.wrap(new Subscription() { - @Override - public void unsubscribe() { - // on unsubscribe remove it from the map of outbound observers to notify - state.observers.remove(subscription); - } - }); - - // on subscribe add it to the map of outbound observers to notify - state.observers.put(subscription, observer); - - return subscription; + public void call(SubjectState state, Observer o) { + // we want the last value + completed so add this extra logic + // to send onCompleted if the last value is an onNext + if (state.completed.get()) { + Notification value = state.currentValue.get(); + if (value != null && value.isOnNext()) { + o.onCompleted(); } - } finally { - state.SUBSCRIPTION_LOCK.unlock(); } - } - - }; - + }); return new AsyncSubject(onSubscribe, state); } - private static void emitNotificationToObserver(final AsyncSubjectState state, Observer observer) { - Notification finalValue = state.currentValue.get(); - - // if null that means onNext was never invoked (no Notification set) - if (finalValue != null) { - if (finalValue.isOnNext()) { - observer.onNext(finalValue.getValue()); - } else if (finalValue.isOnError()) { - observer.onError(finalValue.getThrowable()); - } - } - observer.onCompleted(); - } - - /** - * State externally constructed and passed in so the onSubscribe function has access to it. - * - * @param - */ - private static class AsyncSubjectState { - private final ConcurrentHashMap> observers = new ConcurrentHashMap>(); - private final AtomicReference> currentValue = new AtomicReference>(); - private final AtomicBoolean completed = new AtomicBoolean(); - private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock(); - } - - private final AsyncSubjectState state; + private final SubjectState state; - protected AsyncSubject(OnSubscribeFunc onSubscribe, AsyncSubjectState state) { + protected AsyncSubject(OnSubscribeFunc onSubscribe, SubjectState state) { super(onSubscribe); this.state = state; } @Override public void onCompleted() { - terminalState(); + /** + * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers + */ + emitNotificationAndTerminate(state, new Action2, Observer>() { + + @Override + public void call(SubjectState state, Observer o) { + o.onCompleted(); + } + }); } @Override public void onError(Throwable e) { + /** + * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers + */ state.currentValue.set(new Notification(e)); - terminalState(); + emitNotificationAndTerminate(state, null); } @Override public void onNext(T v) { + /** + * Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs. + */ state.currentValue.set(new Notification(v)); } - private void terminalState() { - /* - * We can not allow new subscribers to be added while we execute the terminal state. - */ - state.SUBSCRIPTION_LOCK.lock(); - try { - if (state.completed.compareAndSet(false, true)) { - for (Subscription s : state.observers.keySet()) { - // emit notifications to this observer - emitNotificationToObserver(state, state.observers.get(s)); - // remove the subscription as it is completed - state.observers.remove(s); - } - } - } finally { - state.SUBSCRIPTION_LOCK.unlock(); - } - } } diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 55c91ed122..053a51f0cf 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -15,12 +15,9 @@ */ package rx.subjects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; - +import rx.Notification; import rx.Observer; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; +import rx.util.functions.Action2; /** * Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}. @@ -50,7 +47,7 @@ * * @param */ -public class BehaviorSubject extends Subject { +public class BehaviorSubject extends AbstractSubject { /** * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. @@ -58,64 +55,68 @@ public class BehaviorSubject extends Subject { * @param defaultValue * The value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events. * @return the constructed {@link BehaviorSubject}. + * @deprecated Use {@link create()} instead. */ public static BehaviorSubject createWithDefaultValue(T defaultValue) { - final ConcurrentHashMap> observers = new ConcurrentHashMap>(); + return create(defaultValue); + } - final AtomicReference currentValue = new AtomicReference(defaultValue); + /** + * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it. + * + * @param defaultValue + * The value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events. + * @return the constructed {@link BehaviorSubject}. + */ + public static BehaviorSubject create(T defaultValue) { + final SubjectState state = new SubjectState(); + // set a default value so subscriptions will immediately receive this until a new notification is received + state.currentValue.set(new Notification(defaultValue)); + OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, new Action2, Observer>() { - OnSubscribeFunc onSubscribe = new OnSubscribeFunc() { @Override - public Subscription onSubscribe(Observer observer) { - final SafeObservableSubscription subscription = new SafeObservableSubscription(); - - subscription.wrap(new Subscription() { - @Override - public void unsubscribe() { - // on unsubscribe remove it from the map of outbound observers to notify - observers.remove(subscription); - } - }); - - observer.onNext(currentValue.get()); - - // on subscribe add it to the map of outbound observers to notify - observers.put(subscription, observer); - return subscription; + public void call(SubjectState state, Observer o) { + /** + * When we subscribe we always emit the latest value to the observer, including + * terminal states which are recorded as the last value. + */ + emitNotification(state.currentValue.get(), o); } - }; - - return new BehaviorSubject(currentValue, onSubscribe, observers); + }); + return new BehaviorSubject(onSubscribe, state); } - private final ConcurrentHashMap> observers; - private final AtomicReference currentValue; + private final SubjectState state; - protected BehaviorSubject(AtomicReference currentValue, OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers) { + protected BehaviorSubject(OnSubscribeFunc onSubscribe, SubjectState state) { super(onSubscribe); - this.currentValue = currentValue; - this.observers = observers; + this.state = state; } @Override public void onCompleted() { - for (Observer observer : observers.values()) { - observer.onCompleted(); - } + /** + * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers + */ + state.currentValue.set(new Notification()); + emitNotificationAndTerminate(state, null); } @Override public void onError(Throwable e) { - for (Observer observer : observers.values()) { - observer.onError(e); - } + /** + * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers + */ + state.currentValue.set(new Notification(e)); + emitNotificationAndTerminate(state, null); } @Override - public void onNext(T args) { - currentValue.set(args); - for (Observer observer : observers.values()) { - observer.onNext(args); - } + public void onNext(T v) { + /** + * Store the latest value and send it to all observers; + */ + state.currentValue.set(new Notification(v)); + emitNotification(state, null); } } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index fd32bd6e42..8cf2d75747 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -15,13 +15,8 @@ */ package rx.subjects; -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.ConcurrentHashMap; - +import rx.Notification; import rx.Observer; -import rx.Subscription; -import rx.operators.SafeObservableSubscription; /** * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber. @@ -32,7 +27,7 @@ *

*

 {@code
 
- * ublishSubject subject = PublishSubject.create();
+ * PublishSubject subject = PublishSubject.create();
   // observer1 will receive all onNext and onCompleted events
   subject.subscribe(observer1);
   subject.onNext("one");
@@ -46,75 +41,44 @@
  * 
  * @param 
  */
-public class PublishSubject extends Subject {
+public class PublishSubject extends AbstractSubject {
     public static  PublishSubject create() {
-        final ConcurrentHashMap> observers = new ConcurrentHashMap>();
-
-        OnSubscribeFunc onSubscribe = new OnSubscribeFunc() {
-            @Override
-            public Subscription onSubscribe(Observer observer) {
-                final SafeObservableSubscription subscription = new SafeObservableSubscription();
-
-                subscription.wrap(new Subscription() {
-                    @Override
-                    public void unsubscribe() {
-                        // on unsubscribe remove it from the map of outbound observers to notify
-                        observers.remove(subscription);
-                    }
-                });
-
-                // on subscribe add it to the map of outbound observers to notify
-                observers.put(subscription, observer);
-
-                return subscription;
-            }
-
-        };
-
-        return new PublishSubject(onSubscribe, observers);
+        final SubjectState state = new SubjectState();
+        OnSubscribeFunc onSubscribe = getOnSubscribeFunc(state, null);
+        return new PublishSubject(onSubscribe, state);
     }
 
-    private final ConcurrentHashMap> observers;
+    private final SubjectState state;
 
-    protected PublishSubject(OnSubscribeFunc onSubscribe, ConcurrentHashMap> observers) {
+    protected PublishSubject(OnSubscribeFunc onSubscribe, SubjectState state) {
         super(onSubscribe);
-        this.observers = observers;
+        this.state = state;
     }
 
     @Override
     public void onCompleted() {
-        for (Observer observer : snapshotOfValues()) {
-            observer.onCompleted();
-        }
-        observers.clear();
+        /**
+         * Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
+         */
+        state.currentValue.set(new Notification());
+        emitNotificationAndTerminate(state, null);
     }
 
     @Override
     public void onError(Throwable e) {
-        for (Observer observer : snapshotOfValues()) {
-            observer.onError(e);
-        }
-        observers.clear();
+        /**
+         * Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
+         */
+        state.currentValue.set(new Notification(e));
+        emitNotificationAndTerminate(state, null);
     }
 
     @Override
-    public void onNext(T args) {
-        for (Observer observer : snapshotOfValues()) {
-            observer.onNext(args);
-        }
-    }
-
-    /**
-     * Current snapshot of 'values()' so that concurrent modifications aren't included.
-     * 
-     * This makes it behave deterministically in a single-threaded execution when nesting subscribes.
-     * 
-     * In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
-     * of possibly being included in the current onNext iteration.
-     * 
-     * @return List>
-     */
-    private Collection> snapshotOfValues() {
-        return new ArrayList>(observers.values());
+    public void onNext(T v) {
+        /**
+         * Store the latest value and send it to all observers;
+         */
+        state.currentValue.set(new Notification(v));
+        emitNotification(state, null);
     }
 }
diff --git a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
index ac543b7a18..57958c108f 100644
--- a/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
@@ -1,12 +1,12 @@
 /**
  * Copyright 2013 Netflix, Inc.
- *
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,9 +19,11 @@
 import static org.mockito.Mockito.*;
 
 import org.junit.Test;
+import org.mockito.InOrder;
 import org.mockito.Mockito;
 
 import rx.Observer;
+import rx.Subscription;
 import rx.util.functions.Action1;
 import rx.util.functions.Func0;
 
@@ -41,10 +43,6 @@ public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
         subject.onNext("two");
         subject.onNext("three");
 
-        assertReceivedAllEvents(aObserver);
-    }
-
-    private void assertReceivedAllEvents(Observer aObserver) {
         verify(aObserver, times(1)).onNext("default");
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, times(1)).onNext("two");
@@ -55,7 +53,7 @@ private void assertReceivedAllEvents(Observer aObserver) {
 
     @Test
     public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
-        BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+        BehaviorSubject subject = BehaviorSubject.create("default");
 
         subject.onNext("one");
 
@@ -66,10 +64,6 @@ public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished()
         subject.onNext("two");
         subject.onNext("three");
 
-        assertDidNotReceiveTheDefaultValue(aObserver);
-    }
-
-    private void assertDidNotReceiveTheDefaultValue(Observer aObserver) {
         verify(aObserver, Mockito.never()).onNext("default");
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, times(1)).onNext("two");
@@ -80,7 +74,7 @@ private void assertDidNotReceiveTheDefaultValue(Observer aObserver) {
 
     @Test
     public void testCompleted() {
-        BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+        BehaviorSubject subject = BehaviorSubject.create("default");
 
         @SuppressWarnings("unchecked")
         Observer aObserver = mock(Observer.class);
@@ -89,19 +83,61 @@ public void testCompleted() {
         subject.onNext("one");
         subject.onCompleted();
 
-        assertCompletedObserver(aObserver);
-    }
-
-    private void assertCompletedObserver(Observer aObserver) {
         verify(aObserver, times(1)).onNext("default");
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, Mockito.never()).onError(any(Throwable.class));
         verify(aObserver, times(1)).onCompleted();
     }
 
+    @Test
+    public void testCompletedStopsEmittingData() {
+        BehaviorSubject channel = BehaviorSubject.create(2013);
+        @SuppressWarnings("unchecked")
+        Observer observerA = mock(Observer.class);
+        @SuppressWarnings("unchecked")
+        Observer observerB = mock(Observer.class);
+        @SuppressWarnings("unchecked")
+        Observer observerC = mock(Observer.class);
+
+        Subscription a = channel.subscribe(observerA);
+        Subscription b = channel.subscribe(observerB);
+
+        InOrder inOrderA = inOrder(observerA);
+        InOrder inOrderB = inOrder(observerB);
+        InOrder inOrderC = inOrder(observerC);
+
+        inOrderA.verify(observerA).onNext(2013);
+        inOrderB.verify(observerB).onNext(2013);
+
+        channel.onNext(42);
+
+        inOrderA.verify(observerA).onNext(42);
+        inOrderB.verify(observerB).onNext(42);
+
+        a.unsubscribe();
+        inOrderA.verifyNoMoreInteractions();
+
+        channel.onNext(4711);
+
+        inOrderB.verify(observerB).onNext(4711);
+
+        channel.onCompleted();
+
+        inOrderB.verify(observerB).onCompleted();
+
+        Subscription c = channel.subscribe(observerC);
+
+        inOrderC.verify(observerC).onCompleted();
+
+        channel.onNext(13);
+
+        inOrderB.verifyNoMoreInteractions();
+        inOrderC.verifyNoMoreInteractions();
+    }
+
     @Test
     public void testCompletedAfterError() {
-        BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+        BehaviorSubject subject = BehaviorSubject.create("default");
 
         @SuppressWarnings("unchecked")
         Observer aObserver = mock(Observer.class);
@@ -112,10 +148,6 @@ public void testCompletedAfterError() {
         subject.onNext("two");
         subject.onCompleted();
 
-        assertErrorObserver(aObserver);
-    }
-
-    private void assertErrorObserver(Observer aObserver) {
         verify(aObserver, times(1)).onNext("default");
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, times(1)).onError(testException);
@@ -127,7 +159,7 @@ public void testUnsubscribe() {
                 new Func0>() {
                     @Override
                     public BehaviorSubject call() {
-                        return BehaviorSubject.createWithDefaultValue("default");
+                        return BehaviorSubject.create("default");
                     }
                 }, new Action1>() {
                     @Override
diff --git a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
index 9e93c89e01..5e9cc2790f 100644
--- a/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
+++ b/rxjava-core/src/test/java/rx/subjects/PublishSubjectTest.java
@@ -1,12 +1,12 @@
 /**
  * Copyright 2013 Netflix, Inc.
- *
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,18 +20,12 @@
 import static org.mockito.Mockito.*;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import junit.framework.Assert;
 
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
 
-import rx.Notification;
 import rx.Observable;
 import rx.Observer;
 import rx.Subscription;
@@ -41,57 +35,6 @@
 
 public class PublishSubjectTest {
 
-    @Test
-    public void test() {
-        PublishSubject subject = PublishSubject.create();
-        final AtomicReference>> actualRef = new AtomicReference>>();
-
-        Observable>> wNotificationsList = subject.materialize().toList();
-        wNotificationsList.subscribe(new Action1>>() {
-            @Override
-            public void call(List> actual) {
-                actualRef.set(actual);
-            }
-        });
-
-        Subscription sub = Observable.create(new Observable.OnSubscribeFunc() {
-            @Override
-            public Subscription onSubscribe(final Observer observer) {
-                final AtomicBoolean stop = new AtomicBoolean(false);
-                new Thread() {
-                    @Override
-                    public void run() {
-                        int i = 1;
-                        while (!stop.get()) {
-                            observer.onNext(i++);
-                        }
-                        observer.onCompleted();
-                    }
-                }.start();
-                return new Subscription() {
-                    @Override
-                    public void unsubscribe() {
-                        stop.set(true);
-                    }
-                };
-            }
-        }).subscribe(subject);
-        // the subject has received an onComplete from the first subscribe because
-        // it is synchronous and the next subscribe won't do anything.
-        Observable.from(-1, -2, -3).subscribe(subject);
-
-        List> expected = new ArrayList>();
-        expected.add(new Notification(-1));
-        expected.add(new Notification(-2));
-        expected.add(new Notification(-3));
-        expected.add(new Notification());
-        Assert.assertTrue(actualRef.get().containsAll(expected));
-
-        sub.unsubscribe();
-    }
-
-    private final Throwable testException = new Throwable();
-
     @Test
     public void testCompleted() {
         PublishSubject subject = PublishSubject.create();
@@ -117,6 +60,49 @@ public void testCompleted() {
         // todo bug?            assertNeverObserver(anotherObserver);
     }
 
+    @Test
+    public void testCompletedStopsEmittingData() {
+        PublishSubject channel = PublishSubject.create();
+        @SuppressWarnings("unchecked")
+        Observer observerA = mock(Observer.class);
+        @SuppressWarnings("unchecked")
+        Observer observerB = mock(Observer.class);
+        @SuppressWarnings("unchecked")
+        Observer observerC = mock(Observer.class);
+
+        Subscription a = channel.subscribe(observerA);
+        Subscription b = channel.subscribe(observerB);
+
+        InOrder inOrderA = inOrder(observerA);
+        InOrder inOrderB = inOrder(observerB);
+        InOrder inOrderC = inOrder(observerC);
+
+        channel.onNext(42);
+
+        inOrderA.verify(observerA).onNext(42);
+        inOrderB.verify(observerB).onNext(42);
+
+        a.unsubscribe();
+        inOrderA.verifyNoMoreInteractions();
+
+        channel.onNext(4711);
+
+        inOrderB.verify(observerB).onNext(4711);
+
+        channel.onCompleted();
+
+        inOrderB.verify(observerB).onCompleted();
+
+        Subscription c = channel.subscribe(observerC);
+
+        inOrderC.verify(observerC).onCompleted();
+
+        channel.onNext(13);
+
+        inOrderB.verifyNoMoreInteractions();
+        inOrderC.verifyNoMoreInteractions();
+    }
+
     private void assertCompletedObserver(Observer aObserver) {
         verify(aObserver, times(1)).onNext("one");
         verify(aObserver, times(1)).onNext("two");
@@ -339,43 +325,7 @@ public void testReSubscribe() {
         s2.unsubscribe();
     }
 
-    /**
-     * Even if subject received an onError/onCompleted, new subscriptions should be able to restart it.
-     */
-    @Test
-    public void testReSubscribeAfterTerminalState() {
-        final PublishSubject ps = PublishSubject.create();
-
-        Observer o1 = mock(Observer.class);
-        Subscription s1 = ps.subscribe(o1);
-
-        // emit
-        ps.onNext(1);
-
-        // validate we got it
-        InOrder inOrder1 = inOrder(o1);
-        inOrder1.verify(o1, times(1)).onNext(1);
-        inOrder1.verifyNoMoreInteractions();
-
-        // unsubscribe
-        s1.unsubscribe();
 
-        ps.onCompleted();
-
-        // emit again but nothing will be there to receive it
-        ps.onNext(2);
-
-        Observer o2 = mock(Observer.class);
-        Subscription s2 = ps.subscribe(o2);
-
-        // emit
-        ps.onNext(3);
-
-        // validate we got it
-        InOrder inOrder2 = inOrder(o2);
-        inOrder2.verify(o2, times(1)).onNext(3);
-        inOrder2.verifyNoMoreInteractions();
+    private final Throwable testException = new Throwable();
 
-        s2.unsubscribe();
-    }
 }