Skip to content

Commit b7ee269

Browse files
committed
Use EmitterProcessor for Channels adaptation (#3100)
* Use `EmitterProcessor` for Channels adaptation Related spring-cloud/spring-cloud-stream#1835 To honor a back-pressure after `MessageChannel` adaptation it is better to use an `EmitterProcessor.create(1)` instead of `Flux.create()`. This way whenever an emitter buffer is full, we block upstream producer and don't allow it to produce more messages **Cherry-pick to 5.1.x** * * Wrap every new subscription into a `Flux.defer()` * Fix `ReactiveStreamsConsumerTests` to use a new `Subscription` after each `stop()/start()` on the `ReactiveStreamsConsumer` * * Remove unused imports # Conflicts: # spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java # spring-integration-core/src/test/java/org/springframework/integration/channel/MessageChannelReactiveUtilsTests.java # spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java * Fixing conflicts in tests
1 parent 54de7a2 commit b7ee269

File tree

3 files changed

+36
-46
lines changed

3 files changed

+36
-46
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/MessageChannelReactiveUtils.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.messaging.PollableChannel;
2626
import org.springframework.messaging.SubscribableChannel;
2727

28+
import reactor.core.publisher.EmitterProcessor;
2829
import reactor.core.publisher.Flux;
2930
import reactor.core.publisher.FluxSink;
3031
import reactor.core.scheduler.Schedulers;
@@ -60,37 +61,20 @@ else if (messageChannel instanceof PollableChannel) {
6061
}
6162

6263
private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
63-
return new SubscribableChannelPublisherAdapter<>(inputChannel);
64+
return Flux.defer(() -> {
65+
EmitterProcessor<Message<T>> publisher = EmitterProcessor.create(1);
66+
@SuppressWarnings("unchecked")
67+
MessageHandler messageHandler = (message) -> publisher.onNext((Message<T>) message);
68+
inputChannel.subscribe(messageHandler);
69+
return publisher
70+
.doOnCancel(() -> inputChannel.unsubscribe(messageHandler));
71+
});
6472
}
6573

6674
private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
6775
return new PollableChannelPublisherAdapter<>(inputChannel);
6876
}
6977

70-
71-
private static final class SubscribableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
72-
73-
private final SubscribableChannel channel;
74-
75-
SubscribableChannelPublisherAdapter(SubscribableChannel channel) {
76-
this.channel = channel;
77-
}
78-
79-
@Override
80-
@SuppressWarnings("unchecked")
81-
public void subscribe(Subscriber<? super Message<T>> subscriber) {
82-
Flux.
83-
<Message<?>>create(emitter -> {
84-
MessageHandler messageHandler = emitter::next;
85-
this.channel.subscribe(messageHandler);
86-
emitter.onCancel(() -> this.channel.unsubscribe(messageHandler));
87-
},
88-
FluxSink.OverflowStrategy.IGNORE)
89-
.subscribe((Subscriber<? super Message<?>>) subscriber);
90-
}
91-
92-
}
93-
9478
private static final class PollableChannelPublisherAdapter<T> implements Publisher<Message<T>> {
9579

9680
private final PollableChannel channel;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ protected void doStart() {
140140

141141
@Override
142142
public void hookOnSubscribe(Subscription s) {
143-
this.delegate.onSubscribe(s);
144143
ReactiveStreamsConsumer.this.subscription = s;
144+
this.delegate.onSubscribe(s);
145145
}
146146

147147
@Override

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/ReactiveStreamsConsumerTests.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -108,35 +108,43 @@ public void testReactiveStreamsConsumerFluxMessageChannel() throws InterruptedEx
108108

109109

110110
@Test
111-
@SuppressWarnings("unchecked")
112111
public void testReactiveStreamsConsumerDirectChannel() throws InterruptedException {
113112
DirectChannel testChannel = new DirectChannel();
114113

115-
Subscriber<Message<?>> testSubscriber = (Subscriber<Message<?>>) Mockito.mock(Subscriber.class);
116-
117114
BlockingQueue<Message<?>> messages = new LinkedBlockingQueue<>();
118115

119-
willAnswer(i -> {
120-
messages.put(i.getArgument(0));
121-
return null;
122-
})
123-
.given(testSubscriber)
124-
.onNext(any(Message.class));
116+
Subscriber<Message<?>> testSubscriber = Mockito.spy(new Subscriber<Message<?>>() {
117+
118+
@Override
119+
public void onSubscribe(Subscription subscription) {
120+
subscription.request(1);
121+
}
122+
123+
@Override
124+
public void onNext(Message<?> message) {
125+
messages.offer(message);
126+
}
127+
128+
@Override
129+
public void onError(Throwable t) {
130+
131+
}
132+
133+
@Override
134+
public void onComplete() {
135+
136+
}
137+
138+
});
125139

126140
ReactiveStreamsConsumer reactiveConsumer = new ReactiveStreamsConsumer(testChannel, testSubscriber);
127141
reactiveConsumer.setBeanFactory(mock(BeanFactory.class));
128142
reactiveConsumer.afterPropertiesSet();
129143
reactiveConsumer.start();
130144

131-
Message<?> testMessage = new GenericMessage<>("test");
145+
final Message<?> testMessage = new GenericMessage<>("test");
132146
testChannel.send(testMessage);
133147

134-
ArgumentCaptor<Subscription> subscriptionArgumentCaptor = ArgumentCaptor.forClass(Subscription.class);
135-
verify(testSubscriber).onSubscribe(subscriptionArgumentCaptor.capture());
136-
Subscription subscription = subscriptionArgumentCaptor.getValue();
137-
138-
subscription.request(1);
139-
140148
Message<?> message = messages.poll(10, TimeUnit.SECONDS);
141149
assertSame(testMessage, message);
142150

@@ -152,10 +160,6 @@ public void testReactiveStreamsConsumerDirectChannel() throws InterruptedExcepti
152160

153161
reactiveConsumer.start();
154162

155-
subscription.request(1);
156-
157-
testMessage = new GenericMessage<>("test2");
158-
159163
testChannel.send(testMessage);
160164

161165
message = messages.poll(10, TimeUnit.SECONDS);
@@ -165,6 +169,8 @@ public void testReactiveStreamsConsumerDirectChannel() throws InterruptedExcepti
165169
verify(testSubscriber, never()).onComplete();
166170

167171
assertTrue(messages.isEmpty());
172+
173+
reactiveConsumer.stop();
168174
}
169175

170176
@Test

0 commit comments

Comments
 (0)