Skip to content

Commit 04ff879

Browse files
artembilangaryrussell
authored andcommitted
Improve RSocket support
Related to: spring-projects/spring-boot#18812 * Extract `ServerRSocketMessageHandler` into a `public` class to allow to configure it as top-level bean and bind it into an existing server * Change `ServerRSocketConnector` to accept the mentioned external bean and don't create an internal RSocket server with an assumption that it is create externally * Add `IntegrationRSocketMessageHandler.requestMappingCompatible` option to allow to configure `ServerRSocketMessageHandler` for both Spring Integration RSocket channel adapters and regular `@MessageMapping`. This is useful for Spring Boot auto-configuration These changes give a hook to auto-configure Spring Integration RSocket channel adapters in Spring Boot
1 parent 0ceea84 commit 04ff879

File tree

6 files changed

+249
-109
lines changed

6 files changed

+249
-109
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

+10-23
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
import org.springframework.context.ApplicationContext;
2424
import org.springframework.context.ApplicationContextAware;
2525
import org.springframework.context.SmartLifecycle;
26+
import org.springframework.lang.Nullable;
2627
import org.springframework.messaging.rsocket.RSocketStrategies;
2728
import org.springframework.util.Assert;
2829
import org.springframework.util.MimeType;
29-
import org.springframework.util.MimeTypeUtils;
30-
31-
import io.rsocket.metadata.WellKnownMimeType;
3230

3331
/**
3432
* A base connector container for common RSocket client and server functionality.
@@ -49,32 +47,26 @@ public abstract class AbstractRSocketConnector
4947

5048
protected final IntegrationRSocketMessageHandler rSocketMessageHandler; // NOSONAR - final
5149

52-
private MimeType dataMimeType;
53-
54-
private MimeType metadataMimeType =
55-
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString());
56-
57-
private RSocketStrategies rsocketStrategies = RSocketStrategies.create();
58-
5950
private boolean autoStartup = true;
6051

6152
private volatile boolean running;
6253

6354
protected AbstractRSocketConnector(IntegrationRSocketMessageHandler rSocketMessageHandler) {
55+
Assert.notNull(rSocketMessageHandler, "'rSocketMessageHandler' must not be null");
6456
this.rSocketMessageHandler = rSocketMessageHandler;
6557
}
6658

6759
/**
6860
* Configure a {@link MimeType} for data exchanging.
6961
* @param dataMimeType the {@link MimeType} to use.
7062
*/
71-
public void setDataMimeType(MimeType dataMimeType) {
72-
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
73-
this.dataMimeType = dataMimeType;
63+
public void setDataMimeType(@Nullable MimeType dataMimeType) {
64+
this.rSocketMessageHandler.setDefaultDataMimeType(dataMimeType);
7465
}
7566

67+
@Nullable
7668
protected MimeType getDataMimeType() {
77-
return this.dataMimeType;
69+
return this.rSocketMessageHandler.getDefaultDataMimeType();
7870
}
7971

8072
/**
@@ -83,25 +75,23 @@ protected MimeType getDataMimeType() {
8375
* @param metadataMimeType the {@link MimeType} to use.
8476
*/
8577
public void setMetadataMimeType(MimeType metadataMimeType) {
86-
Assert.notNull(metadataMimeType, "'metadataMimeType' must not be null");
87-
this.metadataMimeType = metadataMimeType;
78+
this.rSocketMessageHandler.setDefaultMetadataMimeType(metadataMimeType);
8879
}
8980

9081
protected MimeType getMetadataMimeType() {
91-
return this.metadataMimeType;
82+
return this.rSocketMessageHandler.getDefaultMetadataMimeType();
9283
}
9384

9485
/**
9586
* Configure a {@link RSocketStrategies} for data encoding/decoding.
9687
* @param rsocketStrategies the {@link RSocketStrategies} to use.
9788
*/
9889
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
99-
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
100-
this.rsocketStrategies = rsocketStrategies;
90+
this.rSocketMessageHandler.setRSocketStrategies(rsocketStrategies);
10191
}
10292

10393
public RSocketStrategies getRSocketStrategies() {
104-
return this.rsocketStrategies;
94+
return this.rSocketMessageHandler.getRSocketStrategies();
10595
}
10696

10797
/**
@@ -131,9 +121,6 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
131121

132122
@Override
133123
public void afterPropertiesSet() {
134-
this.rSocketMessageHandler.setDefaultDataMimeType(this.dataMimeType);
135-
this.rSocketMessageHandler.setDefaultMetadataMimeType(this.metadataMimeType);
136-
this.rSocketMessageHandler.setRSocketStrategies(this.rsocketStrategies);
137124
this.rSocketMessageHandler.afterPropertiesSet();
138125
}
139126

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,17 @@ class IntegrationRSocketMessageHandler extends RSocketMessageHandler {
4848
private static final Method HANDLE_MESSAGE_METHOD =
4949
ReflectionUtils.findMethod(ReactiveMessageHandler.class, "handleMessage", Message.class);
5050

51+
protected final boolean messageMappingCompatible; // NOSONAR final
52+
5153
IntegrationRSocketMessageHandler() {
52-
setHandlerPredicate((clazz) -> false);
54+
this(false);
55+
}
56+
57+
IntegrationRSocketMessageHandler(boolean requestMappingCompatible) {
58+
this.messageMappingCompatible = requestMappingCompatible;
59+
if (!this.messageMappingCompatible) {
60+
setHandlerPredicate((clazz) -> false);
61+
}
5362
}
5463

5564
public boolean detectEndpoints() {
@@ -76,14 +85,21 @@ public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
7685

7786
@Override
7887
protected List<? extends HandlerMethodArgumentResolver> initArgumentResolvers() {
79-
return Collections.singletonList(new MessageHandlerMethodArgumentResolver());
88+
if (this.messageMappingCompatible) {
89+
// Add argument resolver before parent initializes argument resolution
90+
getArgumentResolverConfigurer().addCustomResolver(new MessageHandlerMethodArgumentResolver());
91+
return super.initArgumentResolvers();
92+
}
93+
else {
94+
return Collections.singletonList(new MessageHandlerMethodArgumentResolver());
95+
}
8096
}
8197

82-
private static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {
98+
protected static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {
8399

84100
@Override
85101
public boolean supportsParameter(MethodParameter parameter) {
86-
return true;
102+
return Message.class.equals(parameter.getParameterType());
87103
}
88104

89105
@Override

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

+81-77
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,20 @@
1616

1717
package org.springframework.integration.rsocket;
1818

19-
import java.lang.reflect.Method;
20-
import java.nio.charset.StandardCharsets;
21-
import java.util.Collections;
22-
import java.util.HashMap;
2319
import java.util.Map;
2420
import java.util.function.BiFunction;
2521
import java.util.function.Consumer;
2622

23+
import org.springframework.beans.BeansException;
24+
import org.springframework.context.ApplicationContext;
2725
import org.springframework.context.ApplicationEventPublisher;
2826
import org.springframework.context.ApplicationEventPublisherAware;
2927
import org.springframework.core.io.buffer.DataBuffer;
3028
import org.springframework.lang.Nullable;
31-
import org.springframework.messaging.Message;
32-
import org.springframework.messaging.MessageHeaders;
33-
import org.springframework.messaging.handler.CompositeMessageCondition;
34-
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3529
import org.springframework.messaging.rsocket.RSocketRequester;
36-
import org.springframework.messaging.rsocket.annotation.support.RSocketFrameTypeMessageCondition;
37-
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
30+
import org.springframework.messaging.rsocket.RSocketStrategies;
3831
import org.springframework.util.Assert;
39-
import org.springframework.util.ReflectionUtils;
32+
import org.springframework.util.MimeType;
4033

4134
import io.rsocket.RSocketFactory;
4235
import io.rsocket.transport.ServerTransport;
@@ -56,15 +49,27 @@
5649
*
5750
* @see RSocketFactory.ServerRSocketFactory
5851
*/
59-
public class ServerRSocketConnector extends AbstractRSocketConnector
60-
implements ApplicationEventPublisherAware {
52+
public class ServerRSocketConnector extends AbstractRSocketConnector implements ApplicationEventPublisherAware {
6153

6254
private final ServerTransport<CloseableChannel> serverTransport;
6355

6456
private Consumer<RSocketFactory.ServerRSocketFactory> factoryConfigurer = (serverRSocketFactory) -> { };
6557

6658
private Mono<CloseableChannel> serverMono;
6759

60+
/**
61+
* Instantiate a server connector based on a provided {@link ServerRSocketMessageHandler}
62+
* with an assumption that RSocket server is created externally as well.
63+
* All other options are ignored in favor of provided {@link ServerRSocketMessageHandler}
64+
* and its external RSocket server configuration.
65+
* @param serverRSocketMessageHandler the {@link ServerRSocketMessageHandler} to rely on.
66+
* @since 5.2.1
67+
*/
68+
public ServerRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
69+
super(serverRSocketMessageHandler);
70+
this.serverTransport = null;
71+
}
72+
6873
/**
6974
* Instantiate a server connector based on the {@link TcpServerTransport}.
7075
* @param bindAddress the local address to bind TCP server onto.
@@ -111,41 +116,79 @@ public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> f
111116
public void setClientRSocketKeyStrategy(BiFunction<Map<String, Object>,
112117
DataBuffer, Object> clientRSocketKeyStrategy) {
113118

114-
Assert.notNull(clientRSocketKeyStrategy, "'clientRSocketKeyStrategy' must not be null");
115-
serverRSocketMessageHandler().clientRSocketKeyStrategy = clientRSocketKeyStrategy;
119+
if (this.serverTransport != null) {
120+
serverRSocketMessageHandler().setClientRSocketKeyStrategy(clientRSocketKeyStrategy);
121+
}
122+
}
123+
124+
@Override
125+
public void setDataMimeType(@Nullable MimeType dataMimeType) {
126+
if (this.serverTransport != null) {
127+
super.setDataMimeType(dataMimeType);
128+
}
129+
}
130+
131+
@Override
132+
public void setMetadataMimeType(MimeType metadataMimeType) {
133+
if (this.serverTransport != null) {
134+
super.setMetadataMimeType(metadataMimeType);
135+
}
136+
}
137+
138+
@Override
139+
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
140+
if (this.serverTransport != null) {
141+
super.setRSocketStrategies(rsocketStrategies);
142+
}
143+
}
144+
145+
@Override
146+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
147+
if (this.serverTransport != null) {
148+
super.setApplicationContext(applicationContext);
149+
}
116150
}
117151

118152
@Override
119153
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
120-
serverRSocketMessageHandler().applicationEventPublisher = applicationEventPublisher;
154+
if (this.serverTransport != null) {
155+
serverRSocketMessageHandler().setApplicationEventPublisher(applicationEventPublisher);
156+
}
121157
}
122158

123159
@Override
124160
public void afterPropertiesSet() {
125-
super.afterPropertiesSet();
126-
RSocketFactory.ServerRSocketFactory serverFactory = RSocketFactory.receive();
127-
this.factoryConfigurer.accept(serverFactory);
128-
129-
this.serverMono =
130-
serverFactory
131-
.acceptor(serverRSocketMessageHandler().responder())
132-
.transport(this.serverTransport)
133-
.start()
134-
.cache();
161+
if (this.serverTransport != null) {
162+
super.afterPropertiesSet();
163+
RSocketFactory.ServerRSocketFactory serverFactory = RSocketFactory.receive();
164+
this.factoryConfigurer.accept(serverFactory);
165+
166+
this.serverMono =
167+
serverFactory
168+
.acceptor(serverRSocketMessageHandler().responder())
169+
.transport(this.serverTransport)
170+
.start()
171+
.cache();
172+
}
135173
}
136174

137175
public Map<Object, RSocketRequester> getClientRSocketRequesters() {
138-
return Collections.unmodifiableMap(serverRSocketMessageHandler().clientRSocketRequesters);
176+
return serverRSocketMessageHandler().getClientRSocketRequesters();
139177
}
140178

141179
@Nullable
142180
public RSocketRequester getClientRSocketRequester(Object key) {
143-
return serverRSocketMessageHandler().clientRSocketRequesters.get(key);
181+
return getClientRSocketRequesters().get(key);
144182
}
145183

146184
public Mono<Integer> getBoundPort() {
147-
return this.serverMono
148-
.map((server) -> server.address().getPort());
185+
if (this.serverTransport != null) {
186+
return this.serverMono
187+
.map((server) -> server.address().getPort());
188+
}
189+
else {
190+
return Mono.empty();
191+
}
149192
}
150193

151194
private ServerRSocketMessageHandler serverRSocketMessageHandler() {
@@ -154,14 +197,18 @@ private ServerRSocketMessageHandler serverRSocketMessageHandler() {
154197

155198
@Override
156199
protected void doStart() {
157-
this.serverMono.subscribe();
200+
if (this.serverTransport != null) {
201+
this.serverMono.subscribe();
202+
}
158203
}
159204

160205
@Override
161206
public void destroy() {
162-
this.serverMono
163-
.doOnNext(Disposable::dispose)
164-
.subscribe();
207+
if (this.serverTransport != null) {
208+
this.serverMono
209+
.doOnNext(Disposable::dispose)
210+
.subscribe();
211+
}
165212
}
166213

167214
@Override
@@ -170,47 +217,4 @@ public void afterSingletonsInstantiated() {
170217
serverRSocketMessageHandler().registerHandleConnectionSetupMethod();
171218
}
172219

173-
private static class ServerRSocketMessageHandler extends IntegrationRSocketMessageHandler {
174-
175-
private static final Method HANDLE_CONNECTION_SETUP_METHOD =
176-
ReflectionUtils.findMethod(ServerRSocketMessageHandler.class, "handleConnectionSetup", Message.class);
177-
178-
179-
private final Map<Object, RSocketRequester> clientRSocketRequesters = new HashMap<>();
180-
181-
private BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy =
182-
(headers, data) -> data.toString(StandardCharsets.UTF_8);
183-
184-
private ApplicationEventPublisher applicationEventPublisher;
185-
186-
private void registerHandleConnectionSetupMethod() {
187-
registerHandlerMethod(this, HANDLE_CONNECTION_SETUP_METHOD,
188-
new CompositeMessageCondition(
189-
RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
190-
new DestinationPatternsMessageCondition(new String[] { "*" }, obtainRouteMatcher())));
191-
}
192-
193-
@SuppressWarnings("unused")
194-
private void handleConnectionSetup(Message<DataBuffer> connectMessage) {
195-
DataBuffer dataBuffer = connectMessage.getPayload();
196-
MessageHeaders messageHeaders = connectMessage.getHeaders();
197-
Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(messageHeaders, dataBuffer);
198-
RSocketRequester rsocketRequester =
199-
messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER,
200-
RSocketRequester.class);
201-
this.clientRSocketRequesters.put(rsocketRequesterKey, rsocketRequester);
202-
RSocketConnectedEvent rSocketConnectedEvent =
203-
new RSocketConnectedEvent(this, messageHeaders, dataBuffer, rsocketRequester); // NOSONAR
204-
if (this.applicationEventPublisher != null) {
205-
this.applicationEventPublisher.publishEvent(rSocketConnectedEvent);
206-
}
207-
else {
208-
if (logger.isInfoEnabled()) {
209-
logger.info("The RSocket has been connected: " + rSocketConnectedEvent);
210-
}
211-
}
212-
}
213-
214-
}
215-
216220
}

0 commit comments

Comments
 (0)