Skip to content

Commit b12657c

Browse files
artembilangaryrussell
authored andcommitted
Document RSocket support
* Fix some typos in `webflux.adoc` and `dsl.adoc` * Make `FunctionsTests.kt` more Kotlin friendly * Improve RSocket components and tests for them, especially rely on the default `RSocketStrategies` in the target SF RSocket components * Rework `ServerRSocketConnector.setClientRSocketKeyStrategy` to use the whole `MessageHeaders` for consultation. It turns out that just destination is not enough since it can be used from different clients * Make the key based on the provided `setupData` by default * Include all the `MessageHeaders` into the `RSocketConnectedEvent` * Improve Docs according review feedback and Docs in SF Doc polishing
1 parent 2133581 commit b12657c

File tree

12 files changed

+380
-154
lines changed

12 files changed

+380
-154
lines changed

spring-integration-core/src/test/kotlin/org/springframework/integration/function/FunctionsTests.kt

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,14 +152,7 @@ class FunctionsTests {
152152

153153
@Bean
154154
fun flowFromSupplier() =
155-
IntegrationFlows.from<String>({ "bar" },
156-
{ e ->
157-
e.poller { p ->
158-
p.fixedDelay(10)
159-
.maxMessagesPerPoll(1)
160-
}
161-
162-
})
155+
IntegrationFlows.from<String>({ "bar" }) { e -> e.poller { p -> p.fixedDelay(10).maxMessagesPerPoll(1) } }
163156
.channel { c -> c.queue("fromSupplierQueue") }
164157
.get()
165158
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@
2323
import org.springframework.context.ApplicationContext;
2424
import org.springframework.context.ApplicationContextAware;
2525
import org.springframework.context.SmartLifecycle;
26-
import org.springframework.core.codec.CharSequenceEncoder;
27-
import org.springframework.core.codec.StringDecoder;
28-
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
29-
import org.springframework.messaging.rsocket.MetadataExtractor;
3026
import org.springframework.messaging.rsocket.RSocketStrategies;
3127
import org.springframework.util.Assert;
3228
import org.springframework.util.MimeType;
@@ -38,7 +34,7 @@
3834
* A base connector container for common RSocket client and server functionality.
3935
* <p>
4036
* It accepts {@link IntegrationRSocketEndpoint} instances for mapping registration via an internal
41-
* {@link IntegrationRSocketMessageHandler} or performs an auto-detection otherwise, when all bean are ready
37+
* {@link IntegrationRSocketMessageHandler} or performs an auto-detection otherwise, when all beans are ready
4238
* in the application context.
4339
*
4440
* @author Artem Bilan
@@ -58,12 +54,7 @@ public abstract class AbstractRSocketConnector
5854
private MimeType metadataMimeType =
5955
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString());
6056

61-
private RSocketStrategies rsocketStrategies =
62-
RSocketStrategies.builder()
63-
.decoder(StringDecoder.allMimeTypes())
64-
.encoder(CharSequenceEncoder.allMimeTypes())
65-
.dataBufferFactory(new DefaultDataBufferFactory())
66-
.build();
57+
private RSocketStrategies rsocketStrategies = RSocketStrategies.create();
6758

6859
private boolean autoStartup = true;
6960

@@ -114,7 +105,7 @@ public RSocketStrategies getRSocketStrategies() {
114105
}
115106

116107
/**
117-
* Configure {@link IntegrationRSocketEndpoint} instances for mapping nad handling requests.
108+
* Configure {@link IntegrationRSocketEndpoint} instances for mapping and handling requests.
118109
* @param endpoints the {@link IntegrationRSocketEndpoint} instances for handling inbound requests.
119110
* @see #addEndpoint(IntegrationRSocketEndpoint)
120111
*/
@@ -125,20 +116,6 @@ public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
125116
}
126117
}
127118

128-
/**
129-
* Configure a {@link MetadataExtractor} to extract the route and possibly
130-
* other metadata from the first payload of incoming requests.
131-
* <p>By default this is a
132-
* {@link org.springframework.messaging.rsocket.DefaultMetadataExtractor}
133-
* with the configured {@link RSocketStrategies} (and decoders), extracting a route
134-
* from {@code "message/x.rsocket.routing.v0"} or {@code "text/plain"}
135-
* metadata entries.
136-
* @param extractor the extractor to use
137-
*/
138-
public void setMetadataExtractor(MetadataExtractor extractor) {
139-
this.rSocketMessageHandler.setMetadataExtractor(extractor);
140-
}
141-
142119
/**
143120
* Add an {@link IntegrationRSocketEndpoint} for mapping and handling RSocket requests.
144121
* @param endpoint the {@link IntegrationRSocketEndpoint} to map.

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketConnectedEvent.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.rsocket;
1818

19+
import java.util.Map;
20+
1921
import org.springframework.core.io.buffer.DataBuffer;
2022
import org.springframework.integration.events.IntegrationEvent;
2123
import org.springframework.messaging.rsocket.RSocketRequester;
@@ -25,7 +27,7 @@
2527
* to the server.
2628
* <p>
2729
* This event can be used for mapping {@link RSocketRequester} to the client by the
28-
* {@code destination} meta-data or connect payload {@code data}.
30+
* {@code headers} meta-data or connect payload {@code data}.
2931
*
3032
* @author Artem Bilan
3133
*
@@ -36,21 +38,23 @@
3638
@SuppressWarnings("serial")
3739
public class RSocketConnectedEvent extends IntegrationEvent {
3840

39-
private final String destination;
41+
private final Map<String, Object> headers;
4042

4143
private final DataBuffer data;
4244

4345
private final RSocketRequester requester;
4446

45-
public RSocketConnectedEvent(Object source, String destination, DataBuffer data, RSocketRequester requester) {
47+
public RSocketConnectedEvent(Object source, Map<String, Object> headers, DataBuffer data,
48+
RSocketRequester requester) {
49+
4650
super(source);
47-
this.destination = destination;
51+
this.headers = headers;
4852
this.data = data;
4953
this.requester = requester;
5054
}
5155

52-
public String getDestination() {
53-
return this.destination;
56+
public Map<String, Object> getHeaders() {
57+
return this.headers;
5458
}
5559

5660
public DataBuffer getData() {
@@ -64,7 +68,8 @@ public RSocketRequester getRequester() {
6468
@Override
6569
public String toString() {
6670
return "RSocketConnectedEvent{" +
67-
"destination='" + this.destination + '\'' +
71+
"headers=" + this.headers +
72+
", data=" + this.data +
6873
", requester=" + this.requester +
6974
'}';
7075
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.rsocket;
1818

1919
import java.lang.reflect.Method;
20+
import java.nio.charset.StandardCharsets;
2021
import java.util.Collections;
2122
import java.util.HashMap;
2223
import java.util.Map;
@@ -36,7 +37,6 @@
3637
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
3738
import org.springframework.util.Assert;
3839
import org.springframework.util.ReflectionUtils;
39-
import org.springframework.util.RouteMatcher;
4040

4141
import io.rsocket.RSocketFactory;
4242
import io.rsocket.transport.ServerTransport;
@@ -108,7 +108,9 @@ public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> f
108108
* Defaults to the {@code destination} the client is connected.
109109
* @param clientRSocketKeyStrategy the {@link BiFunction} to determine a key for client {@link RSocketRequester}s.
110110
*/
111-
public void setClientRSocketKeyStrategy(BiFunction<String, DataBuffer, Object> clientRSocketKeyStrategy) {
111+
public void setClientRSocketKeyStrategy(BiFunction<Map<String, Object>,
112+
DataBuffer, Object> clientRSocketKeyStrategy) {
113+
112114
Assert.notNull(clientRSocketKeyStrategy, "'clientRSocketKeyStrategy' must not be null");
113115
serverRSocketMessageHandler().clientRSocketKeyStrategy = clientRSocketKeyStrategy;
114116
}
@@ -176,36 +178,29 @@ private static class ServerRSocketMessageHandler extends IntegrationRSocketMessa
176178

177179
private final Map<Object, RSocketRequester> clientRSocketRequesters = new HashMap<>();
178180

179-
private BiFunction<String, DataBuffer, Object> clientRSocketKeyStrategy = (destination, data) -> destination;
181+
private BiFunction<Map<String, Object>, DataBuffer, Object> clientRSocketKeyStrategy =
182+
(headers, data) -> data.toString(StandardCharsets.UTF_8);
180183

181184
private ApplicationEventPublisher applicationEventPublisher;
182185

183186
private void registerHandleConnectionSetupMethod() {
184187
registerHandlerMethod(this, HANDLE_CONNECTION_SETUP_METHOD,
185188
new CompositeMessageCondition(
186189
RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
187-
new DestinationPatternsMessageCondition(new String[] { "*" }, getRouteMatcher()))); // NOSONAR
190+
new DestinationPatternsMessageCondition(new String[] { "*" }, obtainRouteMatcher())));
188191
}
189192

190193
@SuppressWarnings("unused")
191194
private void handleConnectionSetup(Message<DataBuffer> connectMessage) {
192195
DataBuffer dataBuffer = connectMessage.getPayload();
193196
MessageHeaders messageHeaders = connectMessage.getHeaders();
194-
String destination = "";
195-
RouteMatcher.Route route =
196-
messageHeaders.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER,
197-
RouteMatcher.Route.class);
198-
if (route != null) {
199-
destination = route.value();
200-
}
201-
202-
Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(destination, dataBuffer);
197+
Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(messageHeaders, dataBuffer);
203198
RSocketRequester rsocketRequester =
204199
messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER,
205200
RSocketRequester.class);
206201
this.clientRSocketRequesters.put(rsocketRequesterKey, rsocketRequester);
207202
RSocketConnectedEvent rSocketConnectedEvent =
208-
new RSocketConnectedEvent(this, destination, dataBuffer, rsocketRequester); // NOSONAR
203+
new RSocketConnectedEvent(this, messageHeaders, dataBuffer, rsocketRequester); // NOSONAR
209204
if (this.applicationEventPublisher != null) {
210205
this.applicationEventPublisher.publishEvent(rSocketConnectedEvent);
211206
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@
2222

2323
import org.springframework.core.ReactiveAdapter;
2424
import org.springframework.core.ResolvableType;
25-
import org.springframework.core.codec.CharSequenceEncoder;
2625
import org.springframework.core.codec.Decoder;
2726
import org.springframework.core.codec.Encoder;
28-
import org.springframework.core.codec.StringDecoder;
2927
import org.springframework.core.io.buffer.DataBuffer;
3028
import org.springframework.core.io.buffer.DataBufferFactory;
31-
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
3229
import org.springframework.integration.gateway.MessagingGatewaySupport;
3330
import org.springframework.integration.rsocket.AbstractRSocketConnector;
3431
import org.springframework.integration.rsocket.ClientRSocketConnector;
@@ -77,12 +74,7 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
7774

7875
private final String[] path;
7976

80-
private RSocketStrategies rsocketStrategies =
81-
RSocketStrategies.builder()
82-
.decoder(StringDecoder.allMimeTypes())
83-
.encoder(CharSequenceEncoder.allMimeTypes())
84-
.dataBufferFactory(new DefaultDataBufferFactory())
85-
.build();
77+
private RSocketStrategies rsocketStrategies = RSocketStrategies.create();
8678

8779
@Nullable
8880
private AbstractRSocketConnector rsocketConnector;
@@ -91,7 +83,7 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
9183
private ResolvableType requestElementType;
9284

9385
/**
94-
* Instantiate based on the provided Ant-style path patterns to map this endpoint for incoming RSocket requests.
86+
* Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests.
9587
* @param pathArg the mapping patterns to use.
9688
*/
9789
public RSocketInboundGateway(String... pathArg) {

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,15 @@
2626
import org.springframework.beans.factory.annotation.Qualifier;
2727
import org.springframework.context.annotation.Bean;
2828
import org.springframework.context.annotation.Configuration;
29-
import org.springframework.core.codec.CharSequenceEncoder;
30-
import org.springframework.core.codec.StringDecoder;
31-
import org.springframework.core.io.buffer.NettyDataBufferFactory;
3229
import org.springframework.integration.config.EnableIntegration;
3330
import org.springframework.integration.dsl.IntegrationFlow;
3431
import org.springframework.integration.dsl.IntegrationFlows;
3532
import org.springframework.integration.rsocket.ClientRSocketConnector;
3633
import org.springframework.integration.rsocket.ServerRSocketConnector;
3734
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
38-
import org.springframework.messaging.rsocket.RSocketStrategies;
3935
import org.springframework.test.annotation.DirtiesContext;
4036
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4137

42-
import io.netty.buffer.PooledByteBufAllocator;
43-
import io.rsocket.frame.decoder.PayloadDecoder;
4438
import reactor.core.publisher.Flux;
4539
import reactor.core.publisher.Mono;
4640

@@ -66,29 +60,15 @@ void testRsocketUpperCaseFlows() {
6660
@EnableIntegration
6761
public static class TestConfiguration {
6862

69-
@Bean
70-
public RSocketStrategies rsocketStrategies() {
71-
return RSocketStrategies.builder()
72-
.decoder(StringDecoder.allMimeTypes())
73-
.encoder(CharSequenceEncoder.allMimeTypes())
74-
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
75-
.build();
76-
}
77-
7863
@Bean
7964
public ServerRSocketConnector serverRSocketConnector() {
80-
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
81-
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
82-
serverRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
83-
return serverRSocketConnector;
65+
return new ServerRSocketConnector("localhost", 0);
8466
}
8567

8668
@Bean
8769
public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serverRSocketConnector) {
8870
int port = serverRSocketConnector.getBoundPort().block();
8971
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", port);
90-
clientRSocketConnector.setFactoryConfigurer((factory) -> factory.frameDecoder(PayloadDecoder.ZERO_COPY));
91-
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
9272
clientRSocketConnector.setAutoStartup(false);
9373
return clientRSocketConnector;
9474
}
@@ -107,8 +87,7 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client
10787
@Bean
10888
public IntegrationFlow rsocketUpperCaseFlow() {
10989
return IntegrationFlows
110-
.from(RSockets.inboundGateway("/uppercase")
111-
.rsocketStrategies(rsocketStrategies()))
90+
.from(RSockets.inboundGateway("/uppercase"))
11291
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
11392
.get();
11493
}

0 commit comments

Comments
 (0)