Skip to content

Non deterministic test: OperatorRetryTest.testRetryWithBackpressureParallel #2863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
davidmoten opened this issue Apr 8, 2015 · 18 comments
Closed
Milestone

Comments

@davidmoten
Copy link
Collaborator

This fails for me 5 out of 10 times when I run all tests in OperatorRetryTest in Eclipse on a fast machine.

Here's the stack trace:

java.lang.AssertionError: Data content mismatch: {349=[beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, beginningEveryTime, onSuccessOnly]}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:752)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:62)
@akarnokd
Copy link
Member

akarnokd commented Apr 9, 2015

This should be fixed now.

@akarnokd akarnokd closed this as completed Apr 9, 2015
@davidmoten
Copy link
Collaborator Author

Not yet:

java.lang.AssertionError: Data content mismatch: 1465={beginningEveryTime}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:763)

@davidmoten
Copy link
Collaborator Author

@akarnokd can you reopen this issue please

@akarnokd akarnokd reopened this Apr 15, 2015
@davidmoten
Copy link
Collaborator Author

Still seeing this failure 50% of the time:

java.lang.AssertionError: Data content mismatch: 2673={beginningEveryTime}, 4222={beginningEveryTime}
    at org.junit.Assert.fail(Assert.java:93)
    at rx.internal.operators.OperatorRetryTest.testRetryWithBackpressureParallel(OperatorRetryTest.java:763)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.FailOnTimeout$StatementThread.run(FailOnTimeout.java:62)

@akarnokd
Copy link
Member

I've tried reproducing this error but it never happened. I've run the test 10k times, played with CPU affinity and other thread disruptions to no avail.

@davidmoten
Copy link
Collaborator Author

Have you got an oldish i5 dual core to try it on? I'll have a look at the
failure myself and see what I can find.

On Mon, 27 Apr 2015 21:53 David Karnok [email protected] wrote:

I've tried reproducing this error but it never happened. I've run the test
10k times, played with CPU affinity and other thread disruptions to no
avail.


Reply to this email directly or view it on GitHub
#2863 (comment).

@akarnokd
Copy link
Member

I've tried on a Core 2 Duo E6600 running Windows 7 32 bit and no failure.

@davidmoten
Copy link
Collaborator Author

I've been poking around in OnSubscribeRedo. More doco would be good though it's not too bad. I'm not sure what isLocked is being used for exactly and curious to not see symmetry in its use inasmuch as it gets set to false once and never set to true through the resubscribe lifecycle. Do you see any problems here @akarnokd?

@akarnokd
Copy link
Member

I'd wish I understood OnSubscribeRedo better. Despite the many fixes I added, I'm not sure what and how it does things. isLocked caught my eye too and I don't get it either.

I wanted to rewrite the entire operator but I don't understand the protocol for the supplied Func.

@abersnaze
Copy link
Contributor

Sorry that part isn't documented as well. There is a race condition when the observable of restarts is subscribed to and the redo is initiating the first subscribe. The isLocked is to ignore the onNexts from output observable until an onNext sent in to the input.

@abersnaze
Copy link
Contributor

The first message was from my phone so it might not make as much sense as I hoped.

@davidmoten The problem is that this code has to protect itself from the arbitrary code that can exist in the function. The implementations fall into two buckets those that use the scan operator and those that don't. The scan operator is different in that always emits an onNext of the seed synchronously when subscribed. Most operators only emit in reaction to the upstream events.

This led us to the question: What should initiate the first subscription to the source Observable? If the first subscription is to be initiated by the Observable returned by the function then user functions that doesn't use scan would never start but if Redo initiates the first subscription then functions that use scan will subscribe twice immediately.

The path we picked was to make the Redo do the initial subscribe. We didn't want operator to easily fail to do what was expected if there was a misunderstanding in how the user was supposed to write their function to have a scan in it. We handled the double initial subscriptions by not letting them re-subscribe until the initial subscription had completed (aka isLocked).

@akarnokd maybe it would make more sense if it was broken down a bit. The onNext from the Observable returned from the function are mapped to a re-subscription of the source observable. The cool thing about using the onNext as the trigger for re-subscription is that there are many temporal operators to facilitate batching or delaying onNexts. onError and onCompleted are sent as is downstream.

The input to the function is the Observable of terminal Notifications (retry filter down to only the onErrors and repeat filters down to only the onCompletes). The terminal notifications can be counted or delayed in arbitrary was to produce the output behaviour that is desired for when the re-subscription should

@davidmoten
Copy link
Collaborator Author

Thanks @abersnaze, that helps my understanding a little bit. Stuff like that would be a great addition to the comments in this class when we get it working.

@davidmoten
Copy link
Collaborator Author

Given that the test fails reliably (for me) and concerns backpressure in an asynchronous situation I'm assuming there is a race condition associated with the request(n) method below perhaps during resubscription. @abersnaze Given that you seem most familiar with the code can you see problems with this section being interleaved with resubscription say? I'm a bit curious about currentProducer being changed just before producer.request(n) for instance. Without great understanding of the class I'm a bit surprised that locks aren't necessary while resubscription is occuring.

        child.setProducer(new Producer() {

            @Override
            public void request(final long n) {
                long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
                Producer producer = currentProducer.get();
                if (producer != null) {
                    producer.request(n);
                } else
                if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
                    worker.schedule(subscribeToSource);
                }
            }
        });

@abersnaze
Copy link
Contributor

The resubscription only happens after the previous subscription to the source has terminated with an onError/onCompleted so isn't much need for locking.

@benjchristensen
Copy link
Member

Is this still an issue? Or did the change in #2930 fix things?

@benjchristensen benjchristensen added this to the 1.0.x milestone Aug 28, 2015
@akarnokd
Copy link
Member

@davidmoten Could you try your machine with 2.x?

@davidmoten
Copy link
Collaborator Author

Will do

On Fri, 11 Sep 2015 22:31 David Karnok [email protected] wrote:

@davidmoten https://github.com/davidmoten Could you try your machine
with 2.x?


Reply to this email directly or view it on GitHub
#2863 (comment).

@akarnokd
Copy link
Member

akarnokd commented Apr 2, 2016

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.

@akarnokd akarnokd closed this as completed Apr 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants