Skip to content

OperatorObserveOn onComplete can be emitted despite onError being called #2929

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 13, 2015

Conversation

davidmoten
Copy link
Collaborator

This is a fix for a race condition in OperatorObserveOn where if thread A gets to L164 and thread B starts the pollQueue loop then it will act as if the stream had completed normally instead of with an error.

The effect is that a stream could appear to complete normally when in fact an error had occurred.

Using two boolean volatiles completed and failed that as a pair were not atomically updated/read exposed us to this race condition.

The fix is to use a single volatile integer status to represent the states ACTIVE, COMPLETED, ERRORED to replace completed and failed.

@davidmoten
Copy link
Collaborator Author

seeing as I don't use AtomicInteger methods on status I may as well just use a byte to store these. I'll update the PR

@davidmoten
Copy link
Collaborator Author

I expect also that the lines below in onCompleted and onError methods are not required because those methods will not be run at the same time and the methods are not reentrant (I hope that's the right term):

 if (error != null) {
    return;
 }

I'll remove them in an update to the PR now

@davidmoten davidmoten force-pushed the observe-on-race branch 2 times, most recently from e44d6c4 to 9b40386 Compare May 1, 2015 01:27
@akarnokd
Copy link
Member

akarnokd commented May 1, 2015

Nice catch, but you don't need to introduce that state variable. Remove failure and make error volatile. In the pollQueue, if completed is true, read error and if it is nonnull, report it and return. Otherwise, check if queue is empty and if so, send onComplete(). I'm not sure about why there is the requested == 0 because, I think, it will just make the loop spin until the request value reaches zero by the getAndDecrement below it.

@davidmoten
Copy link
Collaborator Author

Ok I'll have a look at that. I agree that the pollQueue logic is weird and needs review. I wanted to leave it basically untouched to just review the race condition fix.

@davidmoten
Copy link
Collaborator Author

Righto, I've run with your suggestion. I also removed the requested == 0 check because it seems pointless. Apart from that the logic is equivalent. I'm not very keen on the second if (finished) block, I'd prefer it to loop around again but might be slightly more performant.

By the way I renamed completed to finished so it could carry the sense of either onComplete or onError. If that is inconsistent with naming elsewhere I can change it back.

@akarnokd
Copy link
Member

akarnokd commented May 1, 2015

Looks good. Could you do a perf comparison?

@davidmoten
Copy link
Collaborator Author

Sure

@davidmoten
Copy link
Collaborator Author

Benchmarks improved a few percent in general:

1.x branch:

Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    70535.751    15423.954    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5    10852.111     3038.402    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       29.313        3.500    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5  8477545.925   290748.417    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   130930.008     8774.301    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      119.666        2.875    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     9045.354     1739.331    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     5090.723      359.544    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       27.364        1.785    ops/s

observe-on-race branch:


Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    73963.037    20044.651    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5    11155.981     7426.208    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       29.820        1.742    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5  9136674.653   428898.037    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   136164.981     3340.247    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      119.388        2.231    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     8476.745     1586.126    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     5129.316      586.046    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       31.284        2.868    ops/s

Ran using

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorObserveOn.*'

@davidmoten
Copy link
Collaborator Author

I'll just fix a comment and I don't think onNext needs to check finished. Might rerun perfs after that change.

@davidmoten
Copy link
Collaborator Author

The variability in perfs is pretty large. I'll run the full benchmarks and report back on those.

@akarnokd
Copy link
Member

akarnokd commented May 1, 2015

i usually run observeOn perf with -r 5 .

@davidmoten
Copy link
Collaborator Author

Thanks, I used

./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 5 .*OperatorObserveOn.*'

I'd call it a draw:

1.x branch:

Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    79456.426    36108.380    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5    11094.927     2290.069    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       28.735        3.461    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5  8387544.528   589105.152    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   134932.398     3153.000    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      118.703       13.585    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     8418.924      423.795    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4879.297      213.291    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       29.267        0.362    ops/s

observe-on-race branch:

Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    71650.362     7807.887    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5    10936.904     1868.829    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5       29.325        2.208    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5  8297987.180   437741.592    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5   135453.628     2471.897    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5      121.683       12.504    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     9304.443      381.064    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     4957.975     1897.498    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5       27.492        0.897    ops/s

@akarnokd
Copy link
Member

akarnokd commented May 1, 2015

I think the main cause of the fluctuation is the thread hopping of the emission of the source. If it hops to the observation thread, that is less traffic I guess. If you have time, you could modify the perf by adding subscribeOn which guarantees there is alway a thread boundary crossed.

@davidmoten
Copy link
Collaborator Author

I added subscribeOn(computation()) (did you want modified perfs or extra perfs?) to the existing ObserveOn perfs on 1.x and observe-on-race branches and these are the results:

1.x branch:

Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    39023.468    10714.207    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     5815.699      607.484    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        8.863        0.779    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5    73664.638    15363.977    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5    28392.278     2229.074    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5       75.900        9.994    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     9634.822     5069.338    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     5222.449     2559.225    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        8.931        1.078    ops/s

observe-on-race branch:

Benchmark                                         (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation         1  thrpt         5    35758.208     2014.403    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation      1000  thrpt         5     5797.158      455.773    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation   1000000  thrpt         5        9.390        0.509    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate           1  thrpt         5    73051.368     5076.367    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate        1000  thrpt         5    20619.434     3587.966    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate     1000000  thrpt         5       77.932        6.377    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread           1  thrpt         5     9311.025      542.003    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread        1000  thrpt         5     3754.341      271.638    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread     1000000  thrpt         5        9.473        0.219    ops/s

@akarnokd
Copy link
Member

akarnokd commented May 3, 2015

Extra perfs so they can be compared with the old anytime.

@davidmoten
Copy link
Collaborator Author

Righto, here are the perfs including new onSubscribe ones:

1.x branch:

Benchmark                                                                (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation                                1  thrpt         5    70251.688     8961.723    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                             1000  thrpt         5     9280.779     8214.742    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                          1000000  thrpt         5       28.990        3.447    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation         1  thrpt         5    36056.035     2511.357    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation      1000  thrpt         5     5677.334      735.949    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation   1000000  thrpt         5        9.674        0.382    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                                  1  thrpt         5  8480014.275   172973.770    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                               1000  thrpt         5   135104.525     1579.495    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                            1000000  thrpt         5      122.858       13.194    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation           1  thrpt         5    72785.494     2701.185    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation        1000  thrpt         5    19612.208     4703.085    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation     1000000  thrpt         5       73.333       13.677    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                                  1  thrpt         5     9007.205      571.124    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                               1000  thrpt         5     5971.174      192.781    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                            1000000  thrpt         5       31.180        1.258    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation           1  thrpt         5     9540.089      363.964    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation        1000  thrpt         5     3790.601      188.191    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation     1000000  thrpt         5       10.008        0.328    ops/s

observe-on-race branch:

Benchmark                                                                (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation                                1  thrpt         5    39214.570   158345.016    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                             1000  thrpt         5    10669.899     1337.330    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                          1000000  thrpt         5       29.230        1.805    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation         1  thrpt         5    36179.648     6840.388    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation      1000  thrpt         5     5844.341      185.774    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation   1000000  thrpt         5        9.566        0.294    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                                  1  thrpt         5  8283063.707   317079.693    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                               1000  thrpt         5   134060.544     5573.792    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                            1000000  thrpt         5      120.713       15.779    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation           1  thrpt         5   106156.191    43942.564    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation        1000  thrpt         5    27984.472     3232.294    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation     1000000  thrpt         5       76.865        8.196    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                                  1  thrpt         5     8890.780     1062.994    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                               1000  thrpt         5     5024.608      180.168    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                            1000000  thrpt         5       32.990        2.758    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation           1  thrpt         5     9114.271      314.387    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation        1000  thrpt         5     4271.852      977.703    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation     1000000  thrpt         5        9.647        0.831    ops/s

@davidmoten
Copy link
Collaborator Author

@akarnokd I can maybe get a few percent improvement in most of the benchmarks by touching the volatile requested less in the loop but I'm thinking I should make a new PR out of that if we are happy with the correctness and performance of the current PR.

observe-on-race branch:

Benchmark                                                                (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation                                1  thrpt         5    39214.570   158345.016    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                             1000  thrpt         5    10669.899     1337.330    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                          1000000  thrpt         5       29.230        1.805    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation         1  thrpt         5    36179.648     6840.388    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation      1000  thrpt         5     5844.341      185.774    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation   1000000  thrpt         5        9.566        0.294    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                                  1  thrpt         5  8283063.707   317079.693    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                               1000  thrpt         5   134060.544     5573.792    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                            1000000  thrpt         5      120.713       15.779    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation           1  thrpt         5   106156.191    43942.564    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation        1000  thrpt         5    27984.472     3232.294    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation     1000000  thrpt         5       76.865        8.196    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                                  1  thrpt         5     8890.780     1062.994    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                               1000  thrpt         5     5024.608      180.168    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                            1000000  thrpt         5       32.990        2.758    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation           1  thrpt         5     9114.271      314.387    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation        1000  thrpt         5     4271.852      977.703    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation     1000000  thrpt         5        9.647        0.831    ops/s

observe-on-race branch, touch requested less:

Benchmark                                                                (size)   Mode   Samples        Score  Score error    Units
r.o.OperatorObserveOnPerf.observeOnComputation                                1  thrpt         5    72839.461    11872.361    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                             1000  thrpt         5    10953.971     3684.071    ops/s
r.o.OperatorObserveOnPerf.observeOnComputation                          1000000  thrpt         5       30.553        1.772    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation         1  thrpt         5    34296.824     2049.160    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation      1000  thrpt         5     6084.331      577.669    ops/s
r.o.OperatorObserveOnPerf.observeOnComputationSubscribedOnComputation   1000000  thrpt         5       10.296        0.272    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                                  1  thrpt         5  8622340.209    78646.722    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                               1000  thrpt         5   136452.092     3432.061    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediate                            1000000  thrpt         5      122.923       15.274    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation           1  thrpt         5    72631.043     3254.035    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation        1000  thrpt         5    27904.731     5882.644    ops/s
r.o.OperatorObserveOnPerf.observeOnImmediateSubscribedOnComputation     1000000  thrpt         5       79.129        7.015    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                                  1  thrpt         5     8623.913      898.953    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                               1000  thrpt         5     5256.389      197.299    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThread                            1000000  thrpt         5       33.302        1.333    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation           1  thrpt         5     9287.645      468.941    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation        1000  thrpt         5     3916.492      176.669    ops/s
r.o.OperatorObserveOnPerf.observeOnNewThreadSubscribedOnComputation     1000000  thrpt         5       10.401        0.743    ops/s

} else {
if (requested == 0 && completed && queue.isEmpty()) {
if (finished) {
if (error != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid reading volatile twice here.

@akarnokd
Copy link
Member

akarnokd commented May 4, 2015

Almost good and the performance is mostly okay:

image

if (_finished) {
if (error != null) {
//even if there are onNext in the queue we eagerly notify of error
child.onError(error);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant error.

@davidmoten
Copy link
Collaborator Author

@akarnokd Seeing as error bypasses the queue do you think I should add queue.clear() just before calling child.onError() so that the entries in the queue can be garbage collected?

@davidmoten
Copy link
Collaborator Author

In fact could clear the queue in onError method so that gc can happen before next scheduled pollQueue.

@akarnokd
Copy link
Member

akarnokd commented May 5, 2015

That would break the queue. Instead, clear the queue on the pollQueue side before emitting onError.

@davidmoten
Copy link
Collaborator Author

PR updated with clearing the queue on the pollQueue side before emitting onError.

@benjchristensen
Copy link
Member

I've got this on my list to review.

davidmoten added a commit to davidmoten/RxJava that referenced this pull request May 8, 2015
@davidmoten
Copy link
Collaborator Author

One more race condition. We need to increment requested with care because if is Long.MAX_VALUE and gets decremented then a request for one comes in (which uses BackpressureUtils to take us back up to Long.MAX_VALUE) then the increment on request happens and overflows. Fix is to use BackpresureUtils to increment.

davidmoten added a commit to davidmoten/RxJava that referenced this pull request May 9, 2015
@akarnokd
Copy link
Member

akarnokd commented May 9, 2015

I never understood why it does that many request updates. Here is a simpler version:

int emitted = 0;
do {
    counter = 1;
    long produced = 0;
    long r = requested;
    while (!child.isUnsubscribed()) {
        Throwable error;
        if (completed) {
            if ((error = this.error) != null) {
                child.onError(error);
                return;
            } else
            if (queue.isEmpty()) {
                child.onCompleted();
                return;
            }
        }
        if (r > 0) {
            Object o = queue.poll();
            if (o != null) {
                child.onNext(on.getValue(o));
                r--;
                emitted++;
                produced++;
            } else {
                break;
            }
        } else {
            break;
        }
    }
    if (produced > 0) {
        REQUESTED.addAndGet(this, -produced);
    }
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
if (emitted > 0) {
    request(emitted);
}

@davidmoten
Copy link
Collaborator Author

Nice David, I was going to do that in a later PR but now seems good!

On Sat, 9 May 2015 18:24 eliyana281180 [email protected] wrote:

Pada 01/05/2015 8:53 AM, "Dave Moten" [email protected] menulis:

seeing as I don't use AtomicInteger methods on status I may as well just
use a byte to store these. I'll update the PR

Reply to this email directly or view it on GitHub.


Reply to this email directly or view it on GitHub
#2929 (comment).

@davidmoten
Copy link
Collaborator Author

I've updated the PR with the new pollQueue implementation from @akarnokd and added the queue.clear() before calling child.onError().

@davidmoten
Copy link
Collaborator Author

Rebased into 2 commits

davidmoten added a commit to davidmoten/RxJava that referenced this pull request May 10, 2015
@davidmoten
Copy link
Collaborator Author

@akarnokd I was thinking of changing the last lines in the pollQueue() method from

if (emitted > 0)
    request(emitted);

to

if (emitted > 0) {
    request(Math.min(RxRingBuffer.SIZE - queue.size(), emitted));
}

This was just to reduce the probability of an overflow of the RingBuffer. I haven't seen it happen but it looks to me that it is possible for emitted to reach any value you like and we may as well be defensive about it.

@davidmoten
Copy link
Collaborator Author

In pollQueue we should only reduce requested by produced if it is not already at Long.MAX_VALUE

@akarnokd
Copy link
Member

There is no need for that because the queue is bounded and you'd produce up to the capacity and request only replacements after. Even though the downstream requests more, it isn't directly translated to requests for upstream.

For example, imagine the queue is full and there are plenty requests available. Now the inner loop can drain the queue completely and exit when it becomes empty. Since the upstream honors backpressure, there won't be any new enqueueing of values during this time and you get emitting to equal to the queue capacity.

davidmoten added a commit to davidmoten/RxJava that referenced this pull request May 13, 2015
@davidmoten
Copy link
Collaborator Author

Thanks. I forgot about the counter = 1 line!

On Wed, 13 May 2015 16:31 David Karnok [email protected] wrote:

There is no need for that because the queue is bounded and you'd produce
up to the capacity and request only replacements after. Even though the
downstream requests more, it isn't directly translated to requests for
upstream.

For example, imagine the queue is full and there are plenty requests
available. Now the inner loop can drain the queue completely and exit when
it becomes empty. Since the upstream honors backpressure, there won't be
any new enqueueing of values during this time and you get emitting to
equal to the queue capacity.


Reply to this email directly or view it on GitHub
#2929 (comment).

@benjchristensen
Copy link
Member

I never understood why it does that many request updates

I think that's my fault. I had a version that did batching at one point but it was buggy so I got it "working" and left that for later optimization. It is far better if we can request in batches.

@benjchristensen
Copy link
Member

Ah, wrong request piece. I thought it was about batching of requests, which this still doesn't fully do.

Changes look good.

benjchristensen added a commit that referenced this pull request May 13, 2015
OperatorObserveOn onComplete can be emitted despite onError being called
@benjchristensen benjchristensen merged commit 47b098c into ReactiveX:1.x May 13, 2015
@benjchristensen benjchristensen mentioned this pull request May 19, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants