Skip to content

Commit 3ddc004

Browse files
committed
Fix RSocket adapters for routing metadata support
1 parent 24b8e58 commit 3ddc004

File tree

7 files changed

+118
-38
lines changed

7 files changed

+118
-38
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

+17
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public abstract class AbstractRSocketConnector
5252

5353
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
5454

55+
private MimeType metadataMimeType = IntegrationRSocket.COMPOSITE_METADATA;
56+
5557
private RSocketStrategies rsocketStrategies =
5658
RSocketStrategies.builder()
5759
.decoder(StringDecoder.allMimeTypes())
@@ -80,6 +82,20 @@ protected MimeType getDataMimeType() {
8082
return this.dataMimeType;
8183
}
8284

85+
/**
86+
* Configure a {@link MimeType} for metadata exchanging.
87+
* Default to {@code "message/x.rsocket.composite-metadata.v0"}.
88+
* @param metadataMimeType the {@link MimeType} to use.
89+
*/
90+
public void setMetadataMimeType(MimeType metadataMimeType) {
91+
Assert.notNull(metadataMimeType, "'metadataMimeType' must not be null");
92+
this.metadataMimeType = metadataMimeType;
93+
}
94+
95+
protected MimeType getMetadataMimeType() {
96+
return this.metadataMimeType;
97+
}
98+
8399
/**
84100
* Configure a {@link RSocketStrategies} for data encoding/decoding.
85101
* @param rsocketStrategies the {@link RSocketStrategies} to use.
@@ -121,6 +137,7 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
121137
@Override
122138
public void afterPropertiesSet() {
123139
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
140+
this.rsocketAcceptor.setDefaultMetadataMimeType(this.metadataMimeType);
124141
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
125142
this.rsocketAcceptor.afterPropertiesSet();
126143
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public void afterPropertiesSet() {
122122
super.afterPropertiesSet();
123123
RSocketFactory.ClientRSocketFactory clientFactory =
124124
RSocketFactory.connect()
125-
.dataMimeType(getDataMimeType().toString());
125+
.dataMimeType(getDataMimeType().toString())
126+
.metadataMimeType(getMetadataMimeType().toString());
126127
this.factoryConfigurer.accept(clientFactory);
127128
clientFactory.acceptor(this.rsocketAcceptor);
128129
Payload connectPayload = EmptyPayload.INSTANCE;
@@ -161,7 +162,9 @@ public void connect() {
161162

162163
public Mono<RSocketRequester> getRSocketRequester() {
163164
return this.rsocketMono
164-
.map((rsocket) -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
165+
.map((rsocket) ->
166+
RSocketRequester
167+
.wrap(rsocket, getDataMimeType(), getMetadataMimeType(), getRSocketStrategies()))
165168
.cache();
166169
}
167170

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

+51-17
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package org.springframework.integration.rsocket;
1818

19+
import java.nio.charset.StandardCharsets;
20+
import java.util.Arrays;
21+
import java.util.List;
1922
import java.util.concurrent.atomic.AtomicBoolean;
2023
import java.util.function.Function;
2124

@@ -38,10 +41,12 @@
3841
import org.springframework.messaging.support.MessageHeaderAccessor;
3942
import org.springframework.util.Assert;
4043
import org.springframework.util.MimeType;
44+
import org.springframework.util.RouteMatcher;
4145

4246
import io.netty.buffer.ByteBuf;
4347
import io.rsocket.AbstractRSocket;
4448
import io.rsocket.Payload;
49+
import io.rsocket.metadata.CompositeMetadata;
4550
import reactor.core.publisher.Flux;
4651
import reactor.core.publisher.Mono;
4752
import reactor.core.publisher.MonoProcessor;
@@ -63,28 +68,44 @@
6368
*/
6469
class IntegrationRSocket extends AbstractRSocket {
6570

71+
static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0");
72+
73+
static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
74+
75+
static final List<MimeType> METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING);
76+
77+
6678
private final Function<Message<?>, Mono<Void>> handler;
6779

80+
private final RouteMatcher routeMatcher;
81+
6882
private final RSocketRequester requester;
6983

7084
private final DataBufferFactory bufferFactory;
7185

72-
@Nullable
73-
private MimeType dataMimeType;
86+
private final MimeType dataMimeType;
7487

75-
IntegrationRSocket(Function<Message<?>, Mono<Void>> handler, RSocketRequester requester,
76-
@Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) {
88+
private final MimeType metadataMimeType;
89+
90+
IntegrationRSocket(Function<Message<?>, Mono<Void>> handler, RouteMatcher routeMatcher,
91+
RSocketRequester requester, MimeType dataMimeType, MimeType metadataMimeType,
92+
DataBufferFactory bufferFactory) {
7793

7894
Assert.notNull(handler, "'handler' is required");
95+
Assert.notNull(routeMatcher, "'routeMatcher' is required");
7996
Assert.notNull(requester, "'requester' is required");
97+
Assert.notNull(dataMimeType, "'dataMimeType' is required");
98+
Assert.notNull(metadataMimeType, "'metadataMimeType' is required");
99+
100+
Assert.isTrue(METADATA_MIME_TYPES.contains(metadataMimeType),
101+
() -> "Unexpected metadatata mime type: '" + metadataMimeType + "'");
102+
80103
this.handler = handler;
104+
this.routeMatcher = routeMatcher;
81105
this.requester = requester;
82-
this.dataMimeType = defaultDataMimeType;
83-
this.bufferFactory = bufferFactory;
84-
}
85-
86-
public void setDataMimeType(MimeType dataMimeType) {
87106
this.dataMimeType = dataMimeType;
107+
this.metadataMimeType = metadataMimeType;
108+
this.bufferFactory = bufferFactory;
88109
}
89110

90111
public RSocketRequester getRequester() {
@@ -165,17 +186,34 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
165186
: Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
166187
}
167188

189+
String getDestination(Payload payload) {
190+
if (this.metadataMimeType.equals(COMPOSITE_METADATA)) {
191+
CompositeMetadata metadata = new CompositeMetadata(payload.metadata(), false);
192+
for (CompositeMetadata.Entry entry : metadata) {
193+
String mimeType = entry.getMimeType();
194+
if (ROUTING.toString().equals(mimeType)) {
195+
return entry.getContent().toString(StandardCharsets.UTF_8);
196+
}
197+
}
198+
return "";
199+
}
200+
else if (this.metadataMimeType.equals(ROUTING)) {
201+
return payload.getMetadataUtf8();
202+
}
203+
// Should not happen (given constructor assertions)
204+
throw new IllegalArgumentException("Unexpected metadata MimeType");
205+
}
206+
168207
private DataBuffer retainDataAndReleasePayload(Payload payload) {
169208
return payloadToDataBuffer(payload, this.bufferFactory);
170209
}
171210

172211
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) {
173212
MessageHeaderAccessor headers = new MessageHeaderAccessor();
174213
headers.setLeaveMutable(true);
175-
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, destination);
176-
if (this.dataMimeType != null) {
177-
headers.setContentType(this.dataMimeType);
178-
}
214+
RouteMatcher.Route route = this.routeMatcher.parseRoute(destination);
215+
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route);
216+
headers.setContentType(this.dataMimeType);
179217
headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester);
180218
if (replyMono != null) {
181219
headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono);
@@ -184,10 +222,6 @@ private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor
184222
return headers.getMessageHeaders();
185223
}
186224

187-
static String getDestination(Payload payload) {
188-
return payload.getMetadataUtf8();
189-
}
190-
191225
static DataBuffer payloadToDataBuffer(Payload payload, DataBufferFactory bufferFactory) {
192226
payload.retain();
193227
try {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketAcceptor.java

+38-10
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Collections;
2121
import java.util.List;
22-
import java.util.function.Function;
22+
import java.util.function.BiFunction;
2323
import java.util.function.Predicate;
2424

2525
import org.springframework.context.ApplicationContext;
@@ -34,9 +34,13 @@
3434
import org.springframework.messaging.rsocket.RSocketMessageHandler;
3535
import org.springframework.messaging.rsocket.RSocketRequester;
3636
import org.springframework.messaging.rsocket.RSocketStrategies;
37+
import org.springframework.util.Assert;
3738
import org.springframework.util.MimeType;
39+
import org.springframework.util.MimeTypeUtils;
3840
import org.springframework.util.ReflectionUtils;
41+
import org.springframework.util.StringUtils;
3942

43+
import io.rsocket.ConnectionSetupPayload;
4044
import io.rsocket.RSocket;
4145

4246
/**
@@ -54,14 +58,17 @@
5458
*
5559
* @see org.springframework.messaging.rsocket.MessageHandlerAcceptor
5660
*/
57-
class IntegrationRSocketAcceptor extends RSocketMessageHandler implements Function<RSocket, RSocket> {
61+
class IntegrationRSocketAcceptor extends RSocketMessageHandler
62+
implements BiFunction<ConnectionSetupPayload, RSocket, RSocket> {
5863

5964
private static final Method HANDLE_MESSAGE_METHOD =
6065
ReflectionUtils.findMethod(ReactiveMessageHandler.class, "handleMessage", Message.class);
6166

6267
@Nullable
6368
private MimeType defaultDataMimeType;
6469

70+
private MimeType defaultMetadataMimeType = IntegrationRSocket.COMPOSITE_METADATA;
71+
6572
/**
6673
* Configure the default content type to use for data payloads.
6774
* <p>By default this is not set. However a server acceptor will use the
@@ -73,6 +80,17 @@ public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
7380
this.defaultDataMimeType = defaultDataMimeType;
7481
}
7582

83+
/**
84+
* Configure the default {@code MimeType} for payload data if the
85+
* {@code SETUP} frame did not specify one.
86+
* <p>By default this is set to {@code "message/x.rsocket.composite-metadata.v0"}
87+
* @param mimeType the MimeType to use
88+
*/
89+
public void setDefaultMetadataMimeType(MimeType mimeType) {
90+
Assert.notNull(mimeType, "'metadataMimeType' is required");
91+
this.defaultMetadataMimeType = mimeType;
92+
}
93+
7694
public boolean detectEndpoints() {
7795
ApplicationContext applicationContext = getApplicationContext();
7896
if (applicationContext != null && getHandlerMethods().isEmpty()) {
@@ -91,7 +109,7 @@ public boolean detectEndpoints() {
91109
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
92110
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
93111
new CompositeMessageCondition(
94-
new DestinationPatternsMessageCondition(endpoint.getPath(), getPathMatcher())));
112+
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher())));
95113
}
96114

97115
@Override
@@ -105,16 +123,26 @@ protected Predicate<Class<?>> initHandlerPredicate() {
105123
}
106124

107125
@Override
108-
public RSocket apply(RSocket sendingRSocket) {
109-
return createRSocket(sendingRSocket);
126+
public RSocket apply(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
127+
return createRSocket(setupPayload, sendingRSocket);
110128
}
111129

112-
protected IntegrationRSocket createRSocket(RSocket rsocket) {
130+
protected IntegrationRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) {
113131
RSocketStrategies rsocketStrategies = getRSocketStrategies();
114-
return new IntegrationRSocket(this::handleMessage,
115-
RSocketRequester.wrap(rsocket, this.defaultDataMimeType, rsocketStrategies),
116-
this.defaultDataMimeType,
117-
rsocketStrategies.dataBufferFactory());
132+
MimeType dataMimeType =
133+
StringUtils.hasText(setupPayload.dataMimeType())
134+
? MimeTypeUtils.parseMimeType(setupPayload.dataMimeType())
135+
: this.defaultDataMimeType;
136+
Assert.notNull(dataMimeType, "No `dataMimeType` in the ConnectionSetupPayload and no default value");
137+
138+
MimeType metadataMimeType =
139+
StringUtils.hasText(setupPayload.dataMimeType())
140+
? MimeTypeUtils.parseMimeType(setupPayload.metadataMimeType())
141+
: this.defaultMetadataMimeType;
142+
Assert.notNull(dataMimeType, "No `metadataMimeType` in the ConnectionSetupPayload and no default value");
143+
return new IntegrationRSocket(this::handleMessage, getRouteMatcher(),
144+
RSocketRequester.wrap(rsocket, dataMimeType, metadataMimeType, rsocketStrategies),
145+
dataMimeType, metadataMimeType, rsocketStrategies.dataBufferFactory());
118146
}
119147

120148
private static final class MessageHandlerMethodArgumentResolver implements SyncHandlerMethodArgumentResolver {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
import org.springframework.lang.Nullable;
3333
import org.springframework.messaging.rsocket.RSocketRequester;
3434
import org.springframework.util.Assert;
35-
import org.springframework.util.MimeTypeUtils;
36-
import org.springframework.util.StringUtils;
3735

3836
import io.rsocket.Closeable;
3937
import io.rsocket.ConnectionSetupPayload;
@@ -162,7 +160,7 @@ public void destroy() {
162160

163161
private static class ServerRSocketAcceptor extends IntegrationRSocketAcceptor implements SocketAcceptor {
164162

165-
private static final Log LOGGER = LogFactory.getLog(IntegrationRSocket.class);
163+
private static final Log LOGGER = LogFactory.getLog(ServerRSocketAcceptor.class);
166164

167165
private final Map<Object, RSocketRequester> clientRSocketRequesters = new HashMap<>();
168166

@@ -172,16 +170,12 @@ private static class ServerRSocketAcceptor extends IntegrationRSocketAcceptor im
172170

173171
@Override
174172
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) {
175-
String destination = IntegrationRSocket.getDestination(setupPayload);
176173
DataBuffer dataBuffer =
177174
IntegrationRSocket.payloadToDataBuffer(setupPayload, getRSocketStrategies().dataBufferFactory());
178175
int refCount = IntegrationRSocket.refCount(dataBuffer);
179-
return Mono.just(sendingRSocket)
180-
.map(this::createRSocket)
176+
return Mono.just(createRSocket(setupPayload, sendingRSocket))
181177
.doOnNext((rsocket) -> {
182-
if (StringUtils.hasText(setupPayload.dataMimeType())) {
183-
rsocket.setDataMimeType(MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()));
184-
}
178+
String destination = rsocket.getDestination(setupPayload);
185179
Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(destination, dataBuffer);
186180
this.clientRSocketRequesters.put(rsocketRequesterKey, rsocket.getRequester());
187181
RSocketConnectedEvent rSocketConnectedEvent =

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/inbound/RSocketInboundGatewayIntegrationTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.springframework.messaging.rsocket.RSocketStrategies;
4848
import org.springframework.test.annotation.DirtiesContext;
4949
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
50+
import org.springframework.util.MimeType;
5051

5152
import io.netty.buffer.PooledByteBufAllocator;
5253
import io.rsocket.frame.decoder.PayloadDecoder;
@@ -218,6 +219,7 @@ public ServerRSocketConnector serverRSocketConnector() {
218219
ServerRSocketConnector serverRSocketConnector =
219220
new ServerRSocketConnector(TcpServerTransport.create(tcpServer));
220221
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
222+
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
221223
serverRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
222224
return serverRSocketConnector;
223225
}
@@ -231,6 +233,7 @@ public static class ClientConfig extends CommonConfig {
231233
@Bean
232234
public ClientRSocketConnector clientRSocketConnector() {
233235
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
236+
clientRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
234237
clientRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
235238
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
236239
clientRSocketConnector.setConnectRoute("clientConnect");

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ public RSocket rsocketForServerRequests() {
532532
return RSocketFactory.connect()
533533
.setupPayload(DefaultPayload.create("", "clientConnect"))
534534
.dataMimeType("text/plain")
535+
.metadataMimeType("message/x.rsocket.routing.v0")
535536
.frameDecoder(PayloadDecoder.ZERO_COPY)
536537
.acceptor(clientAcceptor())
537538
.transport(TcpClientTransport.create("localhost", port))

0 commit comments

Comments
 (0)