-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Reactive adapter adapter [PoC Only Do not merge] #2433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reactive adapter adapter [PoC Only Do not merge] #2433
Conversation
PoC - adapt SI message-driven adapters to send one initial message with a flux payload during startup and then send messages via the flux as they arrive. The test case shows 2 services, one that updates the flux and a second one that subscribes to the modified flux.
@@ -182,6 +200,13 @@ protected void onInit() { | |||
*/ | |||
@Override | |||
protected void doStart() { | |||
if (this.reactive && this.flux == null) { | |||
this.flux = | |||
Flux.<Message<?>>create(emitter -> this.sink = emitter, FluxSink.OverflowStrategy.IGNORE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not recommended to leak emitter
. Use one of the Processors:
https://projectreactor.io/docs/core/release/reference/#processors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually we've been advised exactly opposite: the processors are going to go away from Reactor Core, so would be better to rely on the similar structure provided by the Flux.create()
.
Anyway this PR is too old and really deserves revisiting one day.
I hope you'll come up soon with the Flux.fromQueue()
as we discussed recently in call, so we would have a natural back-pressure for the constantly emitting source.
Thank you for feedback, @bsideup , anyway!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not going away, this is a misunderstanding. Flux.fromQueue
won't give you backpressure I think, the processes is the way to go :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah... Let's reconsider that in the next milestone!
We have a lot of similar Flux.create()
here in Spring Integration.
I'll ping you later if needed.
Thank you, Sergey, again!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
np! Feel free to assign me as a reviewer, or ask for more info directly 👍
With the We definitely may think about a Thoughts? Thanks |
Another argument against this PR is with Java DSL's:
|
Superseded with #3503 |
No description provided.