Skip to content

Commit 7db0752

Browse files
committed
fix race conditions in OnSubscribeRedo, particularly related to failure of OperatorRetry.testRetryWithBackpressureParallel
1 parent 2532484 commit 7db0752

File tree

3 files changed

+350
-169
lines changed

3 files changed

+350
-169
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 224 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import java.util.concurrent.atomic.AtomicBoolean;
3737
import java.util.concurrent.atomic.AtomicLong;
38-
import java.util.concurrent.atomic.AtomicReference;
3938

4039
import rx.Notification;
4140
import rx.Observable;
@@ -47,13 +46,14 @@
4746
import rx.functions.Action0;
4847
import rx.functions.Func1;
4948
import rx.functions.Func2;
49+
import rx.observers.Subscribers;
5050
import rx.schedulers.Schedulers;
51-
import rx.subjects.PublishSubject;
51+
import rx.subjects.BehaviorSubject;
5252
import rx.subscriptions.SerialSubscription;
5353

5454
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
5555

56-
static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INIFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
56+
static final Func1<Observable<? extends Notification<?>>, Observable<?>> REDO_INFINITE = new Func1<Observable<? extends Notification<?>>, Observable<?>>() {
5757
@Override
5858
public Observable<?> call(Observable<? extends Notification<?>> ts) {
5959
return ts.map(new Func1<Notification<?>, Notification<?>>() {
@@ -120,7 +120,7 @@ public Notification<Integer> call(Notification<Integer> n, Notification<?> term)
120120
}
121121

122122
public static <T> Observable<T> retry(Observable<T> source) {
123-
return retry(source, REDO_INIFINITE);
123+
return retry(source, REDO_INFINITE);
124124
}
125125

126126
public static <T> Observable<T> retry(Observable<T> source, final long count) {
@@ -144,7 +144,7 @@ public static <T> Observable<T> repeat(Observable<T> source) {
144144
}
145145

146146
public static <T> Observable<T> repeat(Observable<T> source, Scheduler scheduler) {
147-
return repeat(source, REDO_INIFINITE, scheduler);
147+
return repeat(source, REDO_INFINITE, scheduler);
148148
}
149149

150150
public static <T> Observable<T> repeat(Observable<T> source, final long count) {
@@ -172,10 +172,10 @@ public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observa
172172
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
173173
}
174174

175-
private Observable<T> source;
175+
private final Observable<T> source;
176176
private final Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> controlHandlerFunction;
177-
private boolean stopOnComplete;
178-
private boolean stopOnError;
177+
private final boolean stopOnComplete;
178+
private final boolean stopOnError;
179179
private final Scheduler scheduler;
180180

181181
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,
@@ -189,20 +189,31 @@ private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends
189189

190190
@Override
191191
public void call(final Subscriber<? super T> child) {
192-
final AtomicBoolean isLocked = new AtomicBoolean(true);
192+
193+
// when true is a marker to say we are ready to resubscribe to source
193194
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
195+
194196
// incremented when requests are made, decremented when requests are fulfilled
195197
final AtomicLong consumerCapacity = new AtomicLong(0l);
196-
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
197198

198199
final Scheduler.Worker worker = scheduler.createWorker();
199200
child.add(worker);
200201

201202
final SerialSubscription sourceSubscriptions = new SerialSubscription();
202203
child.add(sourceSubscriptions);
203204

204-
final PublishSubject<Notification<?>> terminals = PublishSubject.create();
205-
205+
// use a subject to receive terminals (onCompleted and onError signals) from
206+
// the source observable. We use a BehaviorSubject because subscribeToSource
207+
// may emit a terminal before the restarts observable (transformed terminals)
208+
// is subscribed
209+
final BehaviorSubject<Notification<?>> terminals = BehaviorSubject.create();
210+
final Subscriber<Notification<?>> dummySubscriber = Subscribers.empty();
211+
// subscribe immediately so the last emission will be replayed to the next
212+
// subscriber (which is the one we care about)
213+
terminals.subscribe(dummySubscriber);
214+
215+
final ProducerArbiter arbiter = new ProducerArbiter();
216+
206217
final Action0 subscribeToSource = new Action0() {
207218
@Override
208219
public void call() {
@@ -212,11 +223,11 @@ public void call() {
212223

213224
Subscriber<T> terminalDelegatingSubscriber = new Subscriber<T>() {
214225
boolean done;
226+
215227
@Override
216228
public void onCompleted() {
217229
if (!done) {
218230
done = true;
219-
currentProducer.set(null);
220231
unsubscribe();
221232
terminals.onNext(Notification.createOnCompleted());
222233
}
@@ -226,7 +237,6 @@ public void onCompleted() {
226237
public void onError(Throwable e) {
227238
if (!done) {
228239
done = true;
229-
currentProducer.set(null);
230240
unsubscribe();
231241
terminals.onNext(Notification.createOnError(e));
232242
}
@@ -235,20 +245,30 @@ public void onError(Throwable e) {
235245
@Override
236246
public void onNext(T v) {
237247
if (!done) {
238-
if (consumerCapacity.get() != Long.MAX_VALUE) {
239-
consumerCapacity.decrementAndGet();
240-
}
241248
child.onNext(v);
249+
decrementConsumerCapacity();
250+
arbiter.produced(1);
251+
}
252+
}
253+
254+
private void decrementConsumerCapacity() {
255+
// use a CAS loop because we don't want to decrement the
256+
// value if it is Long.MAX_VALUE
257+
while (true) {
258+
long cc = consumerCapacity.get();
259+
if (cc != Long.MAX_VALUE) {
260+
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
261+
break;
262+
}
263+
} else {
264+
break;
265+
}
242266
}
243267
}
244268

245269
@Override
246270
public void setProducer(Producer producer) {
247-
currentProducer.set(producer);
248-
long c = consumerCapacity.get();
249-
if (c > 0) {
250-
producer.request(c);
251-
}
271+
arbiter.setProducer(producer);
252272
}
253273
};
254274
// new subscription each time so if it unsubscribes itself it does not prevent retries
@@ -278,12 +298,11 @@ public void onError(Throwable e) {
278298

279299
@Override
280300
public void onNext(Notification<?> t) {
281-
if (t.isOnCompleted() && stopOnComplete)
282-
child.onCompleted();
283-
else if (t.isOnError() && stopOnError)
284-
child.onError(t.getThrowable());
285-
else {
286-
isLocked.set(false);
301+
if (t.isOnCompleted() && stopOnComplete) {
302+
filteredTerminals.onCompleted();
303+
} else if (t.isOnError() && stopOnError) {
304+
filteredTerminals.onError(t.getThrowable());
305+
} else {
287306
filteredTerminals.onNext(t);
288307
}
289308
}
@@ -313,10 +332,15 @@ public void onError(Throwable e) {
313332

314333
@Override
315334
public void onNext(Object t) {
316-
if (!isLocked.get() && !child.isUnsubscribed()) {
335+
if (!child.isUnsubscribed()) {
336+
// perform a best endeavours check on consumerCapacity
337+
// with the intent of only resubscribing immediately
338+
// if there is outstanding capacity
317339
if (consumerCapacity.get() > 0) {
318340
worker.schedule(subscribeToSource);
319341
} else {
342+
// set this to true so that on next request
343+
// subscribeToSource will be scheduled
320344
resumeBoundary.compareAndSet(false, true);
321345
}
322346
}
@@ -334,16 +358,180 @@ public void setProducer(Producer producer) {
334358

335359
@Override
336360
public void request(final long n) {
337-
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
338-
Producer producer = currentProducer.get();
339-
if (producer != null) {
340-
producer.request(n);
341-
} else
342-
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
343-
worker.schedule(subscribeToSource);
361+
if (n > 0) {
362+
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
363+
arbiter.request(n);
364+
if (resumeBoundary.compareAndSet(true, false))
365+
worker.schedule(subscribeToSource);
344366
}
345367
}
346368
});
347369

348370
}
371+
372+
/**
373+
* Between when the source subscription finishes and the next subscription starts requests may arrive.
374+
* ProducerArbiter keeps track of all requests made and all arriving emissions so that when setProducer
375+
* is called for a new subscription the appropriate number of requests are made to the new producer.
376+
*/
377+
static final class ProducerArbiter implements Producer {
378+
/** Guarded by this. */
379+
boolean emitting;
380+
/** The current producer. Accessed while emitting. */
381+
Producer currentProducer;
382+
/** The current requested count. */
383+
long requested;
384+
385+
long missedRequested;
386+
Producer missedProducer;
387+
long missedProd;
388+
389+
@Override
390+
public void request(long n) {
391+
if (n <= 0) {
392+
return;
393+
}
394+
Producer mp;
395+
long mprod;
396+
synchronized (this) {
397+
if (emitting) {
398+
missedRequested += n;
399+
return;
400+
}
401+
emitting = true;
402+
mp = missedProducer;
403+
mprod = missedProd;
404+
405+
missedProducer = null;
406+
missedProd = 0L;
407+
}
408+
409+
boolean skipFinal = false;
410+
try {
411+
emit(n, mp, mprod);
412+
drainLoop();
413+
skipFinal = true;
414+
} finally {
415+
if (!skipFinal) {
416+
synchronized (this) {
417+
emitting = false;
418+
}
419+
}
420+
}
421+
}
422+
void setProducer(Producer p) {
423+
if (p == null) {
424+
throw new NullPointerException();
425+
}
426+
427+
long mreq;
428+
long mprod;
429+
synchronized (this) {
430+
if (emitting) {
431+
missedProducer = p;
432+
return;
433+
}
434+
emitting = true;
435+
mreq = missedRequested;
436+
mprod = missedProd;
437+
438+
missedRequested = 0L;
439+
missedProd = 0L;
440+
}
441+
442+
boolean skipFinal = false;
443+
try {
444+
emit(mreq, p, mprod);
445+
drainLoop();
446+
skipFinal = true;
447+
} finally {
448+
if (!skipFinal) {
449+
synchronized (this) {
450+
emitting = false;
451+
}
452+
}
453+
}
454+
}
455+
void produced(long n) {
456+
if (n <= 0) {
457+
throw new IllegalArgumentException(n + " produced?!");
458+
}
459+
460+
long mreq;
461+
Producer mp;
462+
synchronized (this) {
463+
if (emitting) {
464+
missedProd += n;
465+
return;
466+
}
467+
emitting = true;
468+
mreq = missedRequested;
469+
mp = missedProducer;
470+
471+
missedRequested = 0L;
472+
missedProducer = null;
473+
}
474+
475+
boolean skipFinal = false;
476+
try {
477+
emit(mreq, mp, n);
478+
drainLoop();
479+
skipFinal = true;
480+
} finally {
481+
if (!skipFinal) {
482+
synchronized (this) {
483+
emitting = false;
484+
}
485+
}
486+
}
487+
}
488+
void drainLoop() {
489+
for (;;) {
490+
long mreq;
491+
long mprod;
492+
Producer mp;
493+
synchronized (this) {
494+
mreq = missedRequested;
495+
mprod = missedProd;
496+
mp = missedProducer;
497+
if (mreq == 0L && mp == null && mprod == 0L) {
498+
emitting = false;
499+
return;
500+
}
501+
missedRequested = 0L;
502+
missedProd = 0L;
503+
missedProducer = null;
504+
}
505+
emit(mreq, mp, mprod);
506+
}
507+
}
508+
void emit(long mreq, Producer mp, long mprod) {
509+
boolean newMp = false;
510+
if (mp != null) {
511+
newMp = true;
512+
currentProducer = mp;
513+
} else {
514+
mp = currentProducer;
515+
}
516+
517+
long u = requested + mreq;
518+
if (u < 0) {
519+
u = Long.MAX_VALUE;
520+
} else
521+
if (u != Long.MAX_VALUE) {
522+
u -= mprod;
523+
if (u < 0) {
524+
throw new IllegalStateException("More produced than requested");
525+
}
526+
}
527+
requested = u;
528+
529+
if (mreq > 0 && mp != null) {
530+
mp.request(mreq);
531+
} else
532+
if (newMp && u > 0) {
533+
mp.request(u);
534+
}
535+
}
536+
}
349537
}

0 commit comments

Comments
 (0)