Skip to content

Add Reactive mode for AbstractPollingEndpoint #2429

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 14, 2018

Conversation

artembilan
Copy link
Member

  • When SourcePollingChannelAdapter.outputChannel is a
    ReactiveStreamsSubscribableChannel, use Flux.generate() for polling
  • Refactor AbstractPollingEndpoint to remove redundant Poller class
    in favor of lambda
  • Extract pollForMessage() method to handle TX states instead of
    Poller class previously

@artembilan
Copy link
Member Author

There is still need to implement such a mode for the PollingConsumer, but that can be addressed in the separate PR. As well as Docs.

@artembilan
Copy link
Member Author

Would be great to see your opinion @smaldini ! 😄

@artembilan
Copy link
Member Author

@garyrussell ,

any chances that this can make it into our current 5.1?
Then I will shade your one on the matter: #2433 . With slightly different approach since our FluxMessageChannel have some attraction already, especially after adoption into the Spring Cloud Stream.

I mean that I had recently a question about Reactive JMS on Gitter: https://gitter.im/spring-projects/spring-integration?at=5b9644fef3c26b08f65608d4

This Reactive polling adaption will do for us a trick when we won't ask broker for more messages until downstream demand.

I believe we can do something similar with the event-driven channel adapters when we will block consuming thread until demand from the downstream. Different story though...

Thanks

@garyrussell
Copy link
Contributor

Sounds like a plan if it can make the RC1 on 9/20.

* When `SourcePollingChannelAdapter.outputChannel` is a
`ReactiveStreamsSubscribableChannel`, use `Flux.generate()` for polling
* Refactor `AbstractPollingEndpoint` to remove redundant `Poller` class
in favor of lambda
* Extract `pollForMessage()` method to handle TX states instead of
`Poller` class previously
@artembilan
Copy link
Member Author

So, Gary, can we merge it now or you would like to see anything else here?

I would address Docs in the separate PR somewhere in between RC and GA.

Thanks

Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one issue.

@@ -66,23 +74,27 @@

private boolean syncExecutor = true;

private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();

private Trigger trigger = new PeriodicTrigger(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default 10ms? Really?

If we must have a default, it should be much larger than this.

Why wouldn't we just say it's invalid to configure a reactive poller without a trigger?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? This is an existing code. I just reformatted it a bit to remove some volatile.
We can reconsider the default in other issue, although I think Mark did this way to make as close to the event-driven behavior as possible

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doh; merging.

@garyrussell garyrussell merged commit 27353ed into spring-projects:master Sep 14, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants