Skip to content

Commit ad96ca4

Browse files
artembilangaryrussell
authored andcommitted
Add Reactive Stream chapter into docs
* Improve `channel.adoc`: polishing, new channels, java config * Make `LoadBalancingStrategy` in the `DirectChannel` ctor as `@Nullable` to reflect the real logic behind * More Dos and polishing according PR comments * Mention WebFlux and RSocket endpoints in the `endpoint-summary.adoc` table * More reactive streams docs * `SourcePollingChannelAdapter` polishing * Wrap multi-value publisher into `Mono.just()` in the `AbstractMessageProducingHandler` instead of emitting just only a first item for the standard reply Doc polishing
1 parent eeb951b commit ad96ca4

File tree

11 files changed

+368
-22
lines changed

11 files changed

+368
-22
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
2121
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2222
import org.springframework.integration.dispatcher.UnicastingDispatcher;
23+
import org.springframework.lang.Nullable;
2324

2425
/**
2526
* A channel that invokes a single subscriber for each sent Message.
@@ -30,6 +31,7 @@
3031
* @author Iwein Fuld
3132
* @author Oleg Zhurakousky
3233
* @author Gary Russell
34+
* @author Artem Bilan
3335
*/
3436
public class DirectChannel extends AbstractSubscribableChannel {
3537

@@ -45,20 +47,20 @@ public DirectChannel() {
4547
}
4648

4749
/**
48-
* Create a DirectChannel with a {@link LoadBalancingStrategy}. The
49-
* strategy <em>must not</em> be null.
50-
*
50+
* Create a DirectChannel with a {@link LoadBalancingStrategy}.
51+
* Can be {@code null} meaning that no balancing is applied;
52+
* every message is always going to be handled by the first subscriber.
5153
* @param loadBalancingStrategy The load balancing strategy implementation.
54+
* @see #setFailover(boolean)
5255
*/
53-
public DirectChannel(LoadBalancingStrategy loadBalancingStrategy) {
56+
public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
5457
this.dispatcher.setLoadBalancingStrategy(loadBalancingStrategy);
5558
}
5659

5760

5861
/**
5962
* Specify whether the channel's dispatcher should have failover enabled.
6063
* By default, it will. Set this value to 'false' to disable it.
61-
*
6264
* @param failover The failover boolean.
6365
*/
6466
public void setFailover(boolean failover) {
@@ -68,7 +70,6 @@ public void setFailover(boolean failover) {
6870
/**
6971
* Specify the maximum number of subscribers supported by the
7072
* channel's dispatcher.
71-
*
7273
* @param maxSubscribers The maximum number of subscribers allowed.
7374
*/
7475
public void setMaxSubscribers(int maxSubscribers) {
@@ -85,9 +86,8 @@ protected UnicastingDispatcher getDispatcher() {
8586
protected void onInit() {
8687
super.onInit();
8788
if (this.maxSubscribers == null) {
88-
Integer max = this.getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS,
89-
Integer.class);
90-
this.setMaxSubscribers(max);
89+
Integer max = getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_UNICAST_SUBSCRIBERS, Integer.class);
90+
setMaxSubscribers(max);
9191
}
9292
}
9393

spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class PollingConsumer extends AbstractPollingEndpoint implements Integrat
6363
public PollingConsumer(PollableChannel inputChannel, MessageHandler handler) {
6464
Assert.notNull(inputChannel, "inputChannel must not be null");
6565
Assert.notNull(handler, "handler must not be null");
66-
if (inputChannel instanceof NullChannel && logger.isWarnEnabled()) {
66+
if (inputChannel instanceof NullChannel) {
6767
logger.warn("The polling from the NullChannel does not have any effects: " +
6868
"it doesn't forward messages sent to it. A NullChannel is the end of the flow.");
6969
}
@@ -134,7 +134,7 @@ protected void handleMessage(Message<?> message) {
134134
try {
135135
if (this.channelInterceptors != null
136136
&& ((ExecutorChannelInterceptorAware) this.inputChannel).hasExecutorInterceptors()) {
137-
interceptorStack = new ArrayDeque<ExecutorChannelInterceptor>();
137+
interceptorStack = new ArrayDeque<>();
138138
theMessage = applyBeforeHandle(theMessage, interceptorStack);
139139
if (theMessage == null) {
140140
return;

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,11 +147,11 @@ protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
147147
if (AopUtils.isAopProxy(this.source)) {
148148
Advised advised = (Advised) this.source;
149149
this.appliedAdvices.forEach(advised::removeAdvice);
150-
chain.stream().forEach(advice -> advised.addAdvisor(adviceToReceiveAdvisor(advice)));
150+
chain.forEach(advice -> advised.addAdvisor(adviceToReceiveAdvisor(advice)));
151151
}
152152
else {
153153
ProxyFactory proxyFactory = new ProxyFactory(this.source);
154-
chain.stream().forEach(advice -> proxyFactory.addAdvisor(adviceToReceiveAdvisor(advice)));
154+
chain.forEach(advice -> proxyFactory.addAdvisor(adviceToReceiveAdvisor(advice)));
155155
this.source = (MessageSource<?>) proxyFactory.getProxy(getBeanClassLoader());
156156
}
157157
this.appliedAdvices.clear();

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import org.reactivestreams.Publisher;
2929

30+
import org.springframework.core.ReactiveAdapter;
31+
import org.springframework.core.ReactiveAdapterRegistry;
3032
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3133
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
3234
import org.springframework.integration.context.IntegrationContextUtils;
@@ -346,8 +348,15 @@ private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, Obje
346348
}
347349
else {
348350
SettableListenableFuture<Object> settableListenableFuture = new SettableListenableFuture<>();
349-
Mono.from((Publisher<?>) reply)
350-
.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
351+
Mono<?> reactiveReply;
352+
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
353+
if (adapter != null && adapter.isMultiValue()) {
354+
reactiveReply = Mono.just(reply);
355+
}
356+
else {
357+
reactiveReply = Mono.from((Publisher<?>) reply);
358+
}
359+
reactiveReply.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
351360
future = settableListenableFuture;
352361
}
353362
future.addCallback(new ReplyFutureCallback(requestMessage, replyChannel));

0 commit comments

Comments
 (0)