Skip to content

Commit 89e11f2

Browse files
artembilangaryrussell
authored andcommitted
Add initial support for RSockets (#2902)
* Add initial support for RSockets * Add `spring-integration-rsocket` module and respective dependencies * Implement `RSocketOutboundGateway` based on the Spring Messaging `RSocketRequester`. This component supports dynamic RSocket properties via expressions against request message. to handle `Publisher` for requests, it must be present in the request message `payload` instead of `FluxMessageChannel` upstream, since the last one just flattens events to be handled in the `MessageHandler` one by one. The result `Mono` is subscribed downstream in the `FluxMessageChannel` or directly by the `AbstractReplyProducingMessageHandler`. If result is a `Flux` it is just wrapped into the `Mono` to be processed downstream by end-user code. The point is that these request/replies are volatile and live in the particular context meanwhile a `FluxMessageChannel` is long living publisher in the application context boundaries. * The `RSocketOutboundGatewayIntegrationTests` is an adapted copy of `RSocketClientToServerIntegrationTests` from Spring Messaging * Add `doOnError()` into the `Flux` created in the `AbstractMessageProducingHandler` for `Publisher` replies * * Use singular for the `RSocket` term * Use no-op `Consumer` for the `strategiesConfigurer` and `factoryConfigurer` in the `RSocketOutboundGateway` and also `Assert.notNull()` in the appropriate setters to avoid null check during `RSocketRequester.builder()` initialization * Use `TcpServer.create().port(0)` in the `RSocketOutboundGatewayIntegrationTests` to allow to select free OS port and bind into it. The selected port is used later for client configuration in the `RSocketOutboundGateway` bean definition * * Change `RSocketOutboundGatewayIntegrationTests.PORT` to lower case
1 parent f8f69c9 commit 89e11f2

File tree

14 files changed

+846
-12
lines changed

14 files changed

+846
-12
lines changed

build.gradle

+17-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ allprojects {
3939
if (version.endsWith('BUILD-SNAPSHOT')) {
4040
maven { url 'https://repo.spring.io/libs-snapshot' }
4141
}
42+
maven { url "https://oss.jfrog.org/artifactory/libs-snapshot" } // RSocket
4243
// maven { url 'https://repo.spring.io/libs-staging-local' }
4344
}
4445

@@ -128,10 +129,11 @@ subprojects { subproject ->
128129
mysqlVersion = '8.0.15'
129130
pahoMqttClientVersion = '1.2.0'
130131
postgresVersion = '42.2.5'
131-
reactorNettyVersion = '0.8.6.RELEASE'
132-
reactorVersion = '3.2.8.RELEASE'
132+
reactorNettyVersion = '0.9.0.BUILD-SNAPSHOT'
133+
reactorVersion = '3.3.0.BUILD-SNAPSHOT'
133134
resilience4jVersion = '0.14.1'
134135
romeToolsVersion = '1.12.0'
136+
rsocketVersion = '0.12.2-RC3-SNAPSHOT'
135137
servletApiVersion = '4.0.1'
136138
smackVersion = '4.3.3'
137139
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.0.M1'
@@ -141,7 +143,7 @@ subprojects { subproject ->
141143
springGemfireVersion = '2.2.0.M3'
142144
springSecurityVersion = '5.2.0.M2'
143145
springRetryVersion = '1.2.4.RELEASE'
144-
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.2.0.M1'
146+
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.2.0.BUILD-SNAPSHOT'
145147
springWsVersion = '3.0.7.RELEASE'
146148
tomcatVersion = "9.0.17"
147149
xstreamVersion = '1.4.11.1'
@@ -596,6 +598,18 @@ project('spring-integration-rmi') {
596598
}
597599
}
598600

601+
project('spring-integration-rsocket') {
602+
description = 'Spring Integration RSocket Support'
603+
dependencies {
604+
compile project(":spring-integration-core")
605+
compile("io.projectreactor.netty:reactor-netty:$reactorNettyVersion")
606+
compile("io.rsocket:rsocket-core:$rsocketVersion")
607+
compile("io.rsocket:rsocket-transport-netty:$rsocketVersion")
608+
609+
testCompile "io.projectreactor:reactor-test:$reactorVersion"
610+
}
611+
}
612+
599613
project('spring-integration-scripting') {
600614
description = 'Spring Integration Scripting Support'
601615
dependencies {

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,8 @@ public void subscribeTo(Publisher<Message<?>> publisher) {
8282
ConnectableFlux<?> connectableFlux =
8383
Flux.from(publisher)
8484
.handle((message, sink) -> sink.next(send(message)))
85-
.onErrorContinue((throwable, o) -> logger.warn("Error during processing event: " + o, throwable)
86-
)
85+
.onErrorContinue((throwable, event) ->
86+
logger.warn("Error during processing event: " + event, throwable))
8787
.doOnComplete(() -> this.publishers.remove(publisher))
8888
.publish();
8989

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3131
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
32+
import org.springframework.integration.context.IntegrationContextUtils;
3233
import org.springframework.integration.core.MessageProducer;
3334
import org.springframework.integration.core.MessagingTemplate;
3435
import org.springframework.integration.routingslip.RoutingSlipRouteStrategy;
@@ -283,6 +284,7 @@ private void doProduceOutput(final Message<?> requestMessage, final MessageHeade
283284
((ReactiveStreamsSubscribableChannel) messageChannel)
284285
.subscribeTo(
285286
Flux.from((Publisher<?>) reply)
287+
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
286288
.map(result -> createOutputMessage(result, requestHeaders)));
287289
}
288290
}
@@ -311,25 +313,22 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
311313
}
312314

313315
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, Object replyChannel) {
314-
315316
ListenableFuture<?> future;
316317
if (reply instanceof ListenableFuture<?>) {
317318
future = (ListenableFuture<?>) reply;
318319
}
319320
else {
320321
SettableListenableFuture<Object> settableListenableFuture = new SettableListenableFuture<>();
321-
322322
Mono.from((Publisher<?>) reply)
323323
.subscribe(settableListenableFuture::set, settableListenableFuture::setException);
324-
325324
future = settableListenableFuture;
326325
}
327-
328326
future.addCallback(new ReplyFutureCallback(requestMessage, replyChannel));
329327
}
330328

331329
private Object getOutputChannelFromRoutingSlip(Object reply, Message<?> requestMessage, List<?> routingSlip,
332330
AtomicInteger routingSlipIndex) {
331+
333332
if (routingSlipIndex.get() >= routingSlip.size()) {
334333
return null;
335334
}
@@ -365,7 +364,7 @@ else if (path instanceof RoutingSlipRouteStrategy) {
365364
}
366365

367366
protected Message<?> createOutputMessage(Object output, MessageHeaders requestHeaders) {
368-
AbstractIntegrationMessageBuilder<?> builder = null;
367+
AbstractIntegrationMessageBuilder<?> builder;
369368
if (output instanceof Message<?>) {
370369
if (this.noHeadersPropagation || !shouldCopyRequestHeaders()) {
371370
return (Message<?>) output;
@@ -449,7 +448,7 @@ protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
449448
}
450449
catch (Exception e) {
451450
Exception exceptionToLog =
452-
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage, () -> null, e);
451+
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage, () -> null, e);
453452
logger.error("Failed to send async reply", exceptionToLog);
454453
}
455454
}
@@ -459,7 +458,7 @@ protected Object resolveErrorChannel(final MessageHeaders requestHeaders) {
459458
Object errorChannel = requestHeaders.getErrorChannel();
460459
if (errorChannel == null) {
461460
try {
462-
errorChannel = getChannelResolver().resolveDestination("errorChannel");
461+
errorChannel = getChannelResolver().resolveDestination(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
463462
}
464463
catch (DestinationResolutionException e) {
465464
// ignore

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ public void setBeanClassLoader(ClassLoader beanClassLoader) {
8787
this.beanClassLoader = beanClassLoader;
8888
}
8989

90+
protected ClassLoader getBeanClassLoader() {
91+
return this.beanClassLoader;
92+
}
9093

9194
@Override
9295
protected final void onInit() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket.config;
18+
19+
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
20+
21+
/**
22+
* Namespace handler for Spring Integration's <em>RSocket</em> namespace.
23+
*
24+
* @author Artem Bilan
25+
*/
26+
public class RSocketNamespaceHandler extends AbstractIntegrationNamespaceHandler {
27+
28+
public void init() {
29+
30+
}
31+
32+
}

0 commit comments

Comments
 (0)