Skip to content

Commit 444c1f9

Browse files
committed
RSocket: Add support for RoutingMetadata
Related to spring-projects/spring-framework#23137 The metadata in Spring Messaging for RSockets now supports any arbitrary objects for setup payload, including composition. * Switch the `ClientRSocketConnector` to fully delegate to the `RSocketRequester.Builder` inheriting possible metadata encoding/decoding in the target `RSocketRequester` implementation * Turn off a default `dataMimeType` from the `MimeTypeUtils.TEXT_PLAIN` to the `null` by default relying on the encoder/decoder logic in the target RSocket wrappers * Expose more delegating options in the `ClientRSocketConnector`, like `setupRouteVars`, `setupMetadata`
1 parent fd7e7db commit 444c1f9

File tree

5 files changed

+92
-69
lines changed

5 files changed

+92
-69
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,11 @@ ext {
8484
mysqlVersion = '8.0.16'
8585
pahoMqttClientVersion = '1.2.0'
8686
postgresVersion = '42.2.6'
87-
reactorNettyVersion = '0.9.0.M3'
88-
reactorVersion = '3.3.0.M3'
87+
reactorNettyVersion = '0.9.0.BUILD-SNAPSHOT'
88+
reactorVersion = '3.3.0.BUILD-SNAPSHOT'
8989
resilience4jVersion = '0.16.0'
9090
romeToolsVersion = '1.12.1'
91-
rsocketVersion = '1.0.0-RC3-SNAPSHOT'
91+
rsocketVersion = '1.0.0-RC3'
9292
servletApiVersion = '4.0.1'
9393
smackVersion = '4.3.4'
9494
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.0.BUILD-SNAPSHOT'

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public abstract class AbstractRSocketConnector
5353

5454
protected final IntegrationRSocketMessageHandler rSocketMessageHandler; // NOSONAR - final
5555

56-
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
56+
private MimeType dataMimeType;
5757

5858
private MimeType metadataMimeType =
5959
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString());

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

Lines changed: 80 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,18 @@
1717
package org.springframework.integration.rsocket;
1818

1919
import java.net.URI;
20-
import java.util.function.Consumer;
20+
import java.util.Arrays;
21+
import java.util.LinkedHashMap;
22+
import java.util.Map;
2123

24+
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer;
2225
import org.springframework.messaging.rsocket.RSocketRequester;
2326
import org.springframework.util.Assert;
27+
import org.springframework.util.MimeType;
2428

25-
import io.rsocket.Payload;
26-
import io.rsocket.RSocket;
27-
import io.rsocket.RSocketFactory;
2829
import io.rsocket.transport.ClientTransport;
2930
import io.rsocket.transport.netty.client.TcpClientTransport;
3031
import io.rsocket.transport.netty.client.WebsocketClientTransport;
31-
import io.rsocket.util.DefaultPayload;
32-
import io.rsocket.util.EmptyPayload;
3332
import reactor.core.Disposable;
3433
import reactor.core.publisher.Mono;
3534

@@ -40,22 +39,26 @@
4039
*
4140
* @since 5.2
4241
*
43-
* @see RSocketFactory.ClientRSocketFactory
42+
* @see io.rsocket.RSocketFactory.ClientRSocketFactory
4443
* @see RSocketRequester
4544
*/
4645
public class ClientRSocketConnector extends AbstractRSocketConnector {
4746

4847
private final ClientTransport clientTransport;
4948

50-
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
49+
private final Map<Object, MimeType> setupMetadata = new LinkedHashMap<>(4);
5150

52-
private String connectRoute;
51+
private ClientRSocketFactoryConfigurer factoryConfigurer = (clientRSocketFactory) -> { };
5352

54-
private String connectData = "";
53+
private Object setupData;
54+
55+
private String setupRoute;
56+
57+
private Object[] setupRouteVars = new Object[0];
5558

5659
private boolean autoConnect;
5760

58-
private Mono<RSocket> rsocketMono;
61+
private Mono<RSocketRequester> rsocketRequesterMono;
5962

6063
/**
6164
* Instantiate a connector based on the {@link TcpClientTransport}.
@@ -79,6 +82,7 @@ public ClientRSocketConnector(URI uri) {
7982
/**
8083
* Instantiate a connector based on the provided {@link ClientTransport}.
8184
* @param clientTransport the {@link ClientTransport} to use.
85+
* @see RSocketRequester.Builder#connect(ClientTransport)
8286
*/
8387
public ClientRSocketConnector(ClientTransport clientTransport) {
8488
super(new IntegrationRSocketMessageHandler());
@@ -87,47 +91,83 @@ public ClientRSocketConnector(ClientTransport clientTransport) {
8791
}
8892

8993
/**
90-
* Specify a {@link Consumer} for configuring a {@link RSocketFactory.ClientRSocketFactory}.
91-
* @param factoryConfigurer the {@link Consumer} to configure the {@link RSocketFactory.ClientRSocketFactory}.
94+
* Callback to configure the {@code ClientRSocketFactory} directly.
95+
* Note: this class adds extra {@link ClientRSocketFactoryConfigurer} to the
96+
* target {@link RSocketRequester} to populate a reference to an internal
97+
* {@link IntegrationRSocketMessageHandler#responder()}.
98+
* This overrides possible external
99+
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(io.rsocket.SocketAcceptor)}
100+
* @param factoryConfigurer the {@link ClientRSocketFactoryConfigurer} to
101+
* configure the {@link io.rsocket.RSocketFactory.ClientRSocketFactory}.
102+
* @see RSocketRequester.Builder#rsocketFactory(ClientRSocketFactoryConfigurer)
92103
*/
93-
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
104+
public void setFactoryConfigurer(ClientRSocketFactoryConfigurer factoryConfigurer) {
94105
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
95106
this.factoryConfigurer = factoryConfigurer;
96107
}
97108

98109
/**
99-
* Configure a route for server RSocket endpoint.
100-
* @param connectRoute the route to connect to.
110+
* Set the route for the setup payload.
111+
* @param setupRoute the route to connect to.
112+
* @see RSocketRequester.Builder#setupRoute(String, Object...)
113+
*/
114+
public void setSetupRoute(String setupRoute) {
115+
Assert.notNull(setupRoute, "'setupRoute' must not be null");
116+
this.setupRoute = setupRoute;
117+
}
118+
119+
/**
120+
* Set the variables for route template to expand with.
121+
* @param setupRouteVars the route to connect to.
122+
* @see RSocketRequester.Builder#setupRoute(String, Object...)
101123
*/
102-
public void setConnectRoute(String connectRoute) {
103-
this.connectRoute = connectRoute;
124+
public void setSetupRouteVariables(Object... setupRouteVars) {
125+
Assert.notNull(setupRouteVars, "'setupRouteVars' must not be null");
126+
this.setupRouteVars = Arrays.copyOf(setupRouteVars, setupRouteVars.length);
104127
}
105128

106129
/**
107-
* Configure a data for connect.
108-
* Defaults to empty string.
109-
* @param connectData the data for connect frame.
130+
* Add metadata to the setup payload. Composite metadata must be
131+
* in use if this is called more than once or in addition to
132+
* {@link #setSetupRoute(String)}.
133+
* @param setupMetadata the map of metadata to use.
134+
* @see RSocketRequester.Builder#setupMetadata(Object, MimeType)
110135
*/
111-
public void setConnectData(String connectData) {
112-
Assert.notNull(connectData, "'connectData' must not be null");
113-
this.connectData = connectData;
136+
public void setSetupMetadata(Map<Object, MimeType> setupMetadata) {
137+
Assert.notNull(setupMetadata, "'setupMetadata' must not be null");
138+
this.setupMetadata.clear();
139+
this.setupMetadata.putAll(setupMetadata);
140+
}
141+
142+
/**
143+
* Set the data for the setup payload.
144+
* @param setupData the data for connect frame.
145+
* @see RSocketRequester.Builder#setupData(Object)
146+
*/
147+
public void setSetupData(Object setupData) {
148+
Assert.notNull(setupData, "'setupData' must not be null");
149+
this.setupData = setupData;
114150
}
115151

116152
@Override
117153
public void afterPropertiesSet() {
118154
super.afterPropertiesSet();
119-
RSocketFactory.ClientRSocketFactory clientFactory =
120-
RSocketFactory.connect()
121-
.dataMimeType(getDataMimeType().toString())
122-
.metadataMimeType(getMetadataMimeType().toString());
123-
this.factoryConfigurer.accept(clientFactory);
124-
clientFactory.acceptor(this.rSocketMessageHandler.responder());
125-
Payload connectPayload = EmptyPayload.INSTANCE;
126-
if (this.connectRoute != null) {
127-
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);
128-
}
129-
clientFactory.setupPayload(connectPayload);
130-
this.rsocketMono = clientFactory.transport(this.clientTransport).start().cache();
155+
156+
RSocketRequester.Builder rsocketRequesterBuilder =
157+
RSocketRequester.builder()
158+
.dataMimeType(getDataMimeType())
159+
.metadataMimeType(getMetadataMimeType())
160+
.rsocketStrategies(getRSocketStrategies())
161+
.setupData(this.setupData)
162+
.setupRoute(this.setupRoute, this.setupRouteVars)
163+
.rsocketFactory(this.factoryConfigurer)
164+
.rsocketFactory((rsocketFactory) ->
165+
rsocketFactory.acceptor(this.rSocketMessageHandler.responder()));
166+
this.setupMetadata.forEach(rsocketRequesterBuilder::setupMetadata);
167+
this.rsocketRequesterMono =
168+
rsocketRequesterBuilder
169+
.connect(this.clientTransport)
170+
.cache();
131171
}
132172

133173
@Override
@@ -144,7 +184,8 @@ protected void doStart() {
144184

145185
@Override
146186
public void destroy() {
147-
this.rsocketMono
187+
this.rsocketRequesterMono
188+
.map(RSocketRequester::rsocket)
148189
.doOnNext(Disposable::dispose)
149190
.subscribe();
150191
}
@@ -153,15 +194,11 @@ public void destroy() {
153194
* Perform subscription into the RSocket server for incoming requests.
154195
*/
155196
public void connect() {
156-
this.rsocketMono.subscribe();
197+
this.rsocketRequesterMono.subscribe();
157198
}
158199

159200
public Mono<RSocketRequester> getRSocketRequester() {
160-
return this.rsocketMono
161-
.map((rsocket) ->
162-
RSocketRequester
163-
.wrap(rsocket, getDataMimeType(), getMetadataMimeType(), getRSocketStrategies()))
164-
.cache();
201+
return this.rsocketRequesterMono;
165202
}
166203

167204
}

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ public ClientRSocketConnector clientRSocketConnector() {
230230
clientRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
231231
clientRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
232232
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
233-
clientRSocketConnector.setConnectRoute("clientConnect");
233+
clientRSocketConnector.setSetupRoute("clientConnect");
234234
return clientRSocketConnector;
235235
}
236236

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

2121
import java.time.Duration;
22-
import java.util.Collections;
2322

2423
import org.junit.jupiter.api.AfterAll;
2524
import org.junit.jupiter.api.BeforeAll;
@@ -64,10 +63,8 @@
6463
import io.rsocket.RSocket;
6564
import io.rsocket.RSocketFactory;
6665
import io.rsocket.frame.decoder.PayloadDecoder;
67-
import io.rsocket.transport.netty.client.TcpClientTransport;
6866
import io.rsocket.transport.netty.server.CloseableChannel;
6967
import io.rsocket.transport.netty.server.TcpServerTransport;
70-
import io.rsocket.util.DefaultPayload;
7168
import reactor.core.Disposable;
7269
import reactor.core.publisher.Flux;
7370
import reactor.core.publisher.Mono;
@@ -515,33 +512,22 @@ public PollableChannel errorChannel() {
515512
@EnableIntegration
516513
public static class ClientConfig extends CommonConfig {
517514

518-
@Bean
519-
public RSocketMessageHandler messageHandler() {
520-
RSocketMessageHandler handler = new RSocketMessageHandler();
521-
handler.setRSocketStrategies(rsocketStrategies());
522-
handler.setHandlers(Collections.singletonList(controller()));
523-
return handler;
524-
}
525-
526515
@Bean(destroyMethod = "dispose")
527516
@Nullable
528517
public RSocket rsocketForServerRequests() {
529-
return RSocketFactory.connect()
530-
.setupPayload(DefaultPayload.create("", "clientConnect"))
531-
.dataMimeType("text/plain")
532-
.metadataMimeType("message/x.rsocket.routing.v0")
533-
.frameDecoder(PayloadDecoder.ZERO_COPY)
534-
.acceptor(messageHandler().responder())
535-
.transport(TcpClientTransport.create("localhost", server.address().getPort()))
536-
.start()
537-
.block();
518+
519+
return RSocketRequester.builder()
520+
.setupRoute("clientConnect")
521+
.rsocketFactory(RSocketMessageHandler.clientResponder(rsocketStrategies(), controller()))
522+
.connectTcp("localhost", server.address().getPort())
523+
.block()
524+
.rsocket();
538525
}
539526

540527
@Bean
541528
public ClientRSocketConnector clientRSocketConnector() {
542529
ClientRSocketConnector clientRSocketConnector =
543530
new ClientRSocketConnector("localhost", server.address().getPort());
544-
clientRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
545531
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
546532
return clientRSocketConnector;
547533
}

0 commit comments

Comments
 (0)