Skip to content

Commit a7fc861

Browse files
committed
Fix PublishSubject from skipping newly added items
Ugly hack, this would better be fixed in the Map implementation instead.
1 parent 52640f5 commit a7fc861

File tree

1 file changed

+35
-9
lines changed

1 file changed

+35
-9
lines changed

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import java.util.Set;
24+
import java.util.HashSet;
2325
import java.util.concurrent.ConcurrentHashMap;
2426
import java.util.concurrent.atomic.AtomicBoolean;
2527
import java.util.concurrent.atomic.AtomicReference;
@@ -94,23 +96,47 @@ protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, Concurren
9496

9597
@Override
9698
public void onCompleted() {
97-
for (Observer<T> observer : observers.values()) {
98-
observer.onCompleted();
99-
}
99+
int observersSize;
100+
Set<Observer<T>> notifiedObservers = new HashSet<Observer<T>>();
101+
do {
102+
observersSize = observers.size();
103+
for (Observer<T> observer : observers.values()) {
104+
if (!notifiedObservers.contains(observer)) {
105+
observer.onCompleted();
106+
notifiedObservers.add(observer);
107+
}
108+
}
109+
} while (observers.size() > observersSize);
100110
}
101111

102112
@Override
103113
public void onError(Exception e) {
104-
for (Observer<T> observer : observers.values()) {
105-
observer.onError(e);
106-
}
114+
int observersSize;
115+
Set<Observer<T>> notifiedObservers = new HashSet<Observer<T>>();
116+
do {
117+
observersSize = observers.size();
118+
for (Observer<T> observer : observers.values()) {
119+
if (!notifiedObservers.contains(observer)) {
120+
observer.onError(e);
121+
notifiedObservers.add(observer);
122+
}
123+
}
124+
} while (observers.size() > observersSize);
107125
}
108126

109127
@Override
110128
public void onNext(T args) {
111-
for (Observer<T> observer : observers.values()) {
112-
observer.onNext(args);
113-
}
129+
int observersSize;
130+
Set<Observer<T>> notifiedObservers = new HashSet<Observer<T>>();
131+
do {
132+
observersSize = observers.size();
133+
for (Observer<T> observer : observers.values()) {
134+
if (!notifiedObservers.contains(observer)) {
135+
observer.onNext(args);
136+
notifiedObservers.add(observer);
137+
}
138+
}
139+
} while (observers.size() > observersSize);
114140
}
115141

116142
public static class UnitTest {

0 commit comments

Comments
 (0)