Skip to content

Commit ea9b73a

Browse files
Merge pull request #894 from benjchristensen/synchronize-isTerminated
Synchronized Operator Check for isTerminated
2 parents e4bc210 + f74bb41 commit ea9b73a

File tree

1 file changed

+13
-4
lines changed

1 file changed

+13
-4
lines changed

rxjava-core/src/main/java/rx/observers/SynchronizedObserver.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public final class SynchronizedObserver<T> implements Observer<T> {
4141
*/
4242

4343
private final Observer<? super T> observer;
44-
private volatile Object lock;
44+
private final Object lock;
45+
private boolean isTerminated = false;
4546

4647
public SynchronizedObserver(Observer<? super T> subscriber) {
4748
this.observer = subscriber;
@@ -55,19 +56,27 @@ public SynchronizedObserver(Observer<? super T> subscriber, Object lock) {
5556

5657
public void onNext(T arg) {
5758
synchronized (lock) {
58-
observer.onNext(arg);
59+
if (!isTerminated) {
60+
observer.onNext(arg);
61+
}
5962
}
6063
}
6164

6265
public void onError(Throwable e) {
6366
synchronized (lock) {
64-
observer.onError(e);
67+
if (!isTerminated) {
68+
isTerminated = true;
69+
observer.onError(e);
70+
}
6571
}
6672
}
6773

6874
public void onCompleted() {
6975
synchronized (lock) {
70-
observer.onCompleted();
76+
if (!isTerminated) {
77+
isTerminated = true;
78+
observer.onCompleted();
79+
}
7180
}
7281
}
7382
}

0 commit comments

Comments
 (0)