Skip to content

Use EmitterProcessor for Channels adaptation #3100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

artembilan
Copy link
Member

Related spring-cloud/spring-cloud-stream#1835

To honor a back-pressure after MessageChannel adaptation it is better
to use an EmitterProcessor.create(1) instead of Flux.create().
This way whenever an emitter buffer is full, we block upstream producer
and don't allow it to produce more messages

Cherry-pick to 5.1.x

Related spring-cloud/spring-cloud-stream#1835

To honor a back-pressure after `MessageChannel` adaptation it is better
to use an `EmitterProcessor.create(1)` instead of `Flux.create()`.
This way whenever an emitter buffer is full, we block upstream producer
and don't allow it to produce more messages

**Cherry-pick to 5.1.x**
@artembilan
Copy link
Member Author

/CC @bsideup, @olegz

Comment on lines 64 to 69
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
@SuppressWarnings("unchecked")
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
return publisher
.doOnSubscribe((sub) -> inputChannel.subscribe(messageHandler))
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using Flux.defer, so that you return a cold Publisher that can be subscribed multiple times

@@ -60,37 +61,18 @@ else if (messageChannel instanceof PollableChannel) {
}

private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
return new SubscribableChannelPublisherAdapter<>(inputChannel);
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hint: for performance reasons, you may want to allocate a bigger buffer (e.g. 32, or 256, or even use the default one from EmitterProcessor.create().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... We had such a discussion with @olegz and our position is to not lose messages in case of crash.
When messages sit upstream blocked, there is more guarantee that they survive restart.
When we buffer them on the emitter level, it looks there is a possibility that we lose them.
No? What am I missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefetch will keep 1 message in memory anyways (mmm... I think)

Let me test it...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so, the thing is:
A processor is okay, but the downstream most probably will prefetch (e.g. flatMap or concatMap), so it will request more than the user is ready to process.

Even if they set prefetch to 1 (the lowest possible value), it still means that there will be 1 element dangling in memory and it will be lost on crash

An acknowledgment mechanism may help here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true and we also discussed it with Artem and we'll need to deal with it at some point, but in the science of sensible defaults it is my strong belief there are only two lowest and highest ;), so we chose lowest

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will take that risk 😄
At least we have only one message dangling around.
We have many ways to ack messages.
Different story though...

Thank so far.

I have failing test in the ReactiveStreamsConsumerTests. It looks like I can't subscribe a second time, even if I use Flux.defer().

Dare to Zoom? 😉

* Fix `ReactiveStreamsConsumerTests` to use a new `Subscription` after
each `stop()/start()` on the `ReactiveStreamsConsumer`
@artembilan
Copy link
Member Author

OK. Looks like Travis is green.
Ready for final review and merge.

Thanks

@garyrussell garyrussell merged commit 36c9f72 into spring-projects:master Nov 1, 2019
@artembilan
Copy link
Member Author

@garyrussell , are you going to back-port this into 5.1.x or should I do that myself?
I believe there are going to be some conflicts... 😢

@garyrussell
Copy link
Contributor

Sorry; I forgot; please do so if you think there will be many.

artembilan added a commit that referenced this pull request Nov 1, 2019
* Use `EmitterProcessor` for Channels adaptation

Related spring-cloud/spring-cloud-stream#1835

To honor a back-pressure after `MessageChannel` adaptation it is better
to use an `EmitterProcessor.create(1)` instead of `Flux.create()`.
This way whenever an emitter buffer is full, we block upstream producer
and don't allow it to produce more messages

**Cherry-pick to 5.1.x**

* * Wrap every new subscription into a `Flux.defer()`
* Fix `ReactiveStreamsConsumerTests` to use a new `Subscription` after
each `stop()/start()` on the `ReactiveStreamsConsumer`

* * Remove unused imports

# Conflicts:
#	spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java
#	spring-integration-core/src/test/java/org/springframework/integration/channel/MessageChannelReactiveUtilsTests.java
#	spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

* Fixing conflicts in tests
@artembilan
Copy link
Member Author

Cherry-picked to 5.1.x after fixing conflicts in tests.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants