Skip to content

Commit dd15ff7

Browse files
committed
Register annotated handlers in RSocketRequester
Prior to this commit, there would be no easy way to register client RSocket handlers against an `RSocketRequester` instance. The only solution was to gather all handlers and wrap them in a `RSocketMessageHandler` and configure it as an acceptor on the client RSocket. This commit adds a convenience method on the `RSocketRequester` builder to tkae care of this part of the infrastructure. Closes gh-23170
1 parent d5a2bde commit dd15ff7

File tree

3 files changed

+45
-33
lines changed

3 files changed

+45
-33
lines changed

spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,20 @@
1818

1919
import java.net.URI;
2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.List;
2223
import java.util.function.Consumer;
2324
import java.util.stream.Stream;
2425

2526
import io.rsocket.RSocketFactory;
27+
import io.rsocket.frame.decoder.PayloadDecoder;
2628
import io.rsocket.transport.ClientTransport;
2729
import io.rsocket.transport.netty.client.TcpClientTransport;
2830
import io.rsocket.transport.netty.client.WebsocketClientTransport;
2931
import reactor.core.publisher.Mono;
3032

3133
import org.springframework.lang.Nullable;
34+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
3235
import org.springframework.util.Assert;
3336
import org.springframework.util.MimeType;
3437

@@ -53,6 +56,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
5356

5457
private List<Consumer<RSocketStrategies.Builder>> strategiesConfigurers = new ArrayList<>();
5558

59+
private List<Object> handlers = new ArrayList<>();
5660

5761
@Override
5862
public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) {
@@ -79,6 +83,12 @@ public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies st
7983
return this;
8084
}
8185

86+
@Override
87+
public RSocketRequester.Builder annotatedHandlers(Object... handlers) {
88+
this.handlers.addAll(Arrays.asList(handlers));
89+
return this;
90+
}
91+
8292
@Override
8393
public RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer) {
8494
this.strategiesConfigurers.add(configurer);
@@ -101,7 +111,6 @@ public Mono<RSocketRequester> connect(ClientTransport transport) {
101111
}
102112

103113
private Mono<RSocketRequester> doConnect(ClientTransport transport) {
104-
105114
RSocketStrategies rsocketStrategies = getRSocketStrategies();
106115
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
107116
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
@@ -110,6 +119,15 @@ private Mono<RSocketRequester> doConnect(ClientTransport transport) {
110119
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
111120
rsocketFactory.dataMimeType(dataMimeType.toString());
112121
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
122+
123+
if (!this.handlers.isEmpty()) {
124+
RSocketMessageHandler messageHandler = new RSocketMessageHandler();
125+
messageHandler.setHandlers(this.handlers);
126+
messageHandler.setRSocketStrategies(rsocketStrategies);
127+
messageHandler.afterPropertiesSet();
128+
rsocketFactory.acceptor(messageHandler.clientAcceptor());
129+
}
130+
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
113131
this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory));
114132

115133
return rsocketFactory.transport(transport)

spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

+12
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.core.ParameterizedTypeReference;
3131
import org.springframework.core.ReactiveAdapterRegistry;
3232
import org.springframework.lang.Nullable;
33+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
3334
import org.springframework.util.MimeType;
3435

3536
/**
@@ -158,6 +159,17 @@ interface Builder {
158159
*/
159160
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
160161

162+
/**
163+
* Add handlers for processing requests sent by the server.
164+
* <p>This is a shortcut for registering client handlers (i.e. annotated controllers)
165+
* to a {@link RSocketMessageHandler} and configuring it as an acceptor.
166+
* You can take full control by manually registering an acceptor on the
167+
* {@link RSocketFactory.ClientRSocketFactory} using {@link #rsocketFactory(Consumer)}
168+
* instead.
169+
* @param handlers the client handlers to configure on the requester
170+
*/
171+
RSocketRequester.Builder annotatedHandlers(Object... handlers);
172+
161173
/**
162174
* Connect to the RSocket server over TCP.
163175
* @param host the server host

spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

+14-32
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717
package org.springframework.messaging.rsocket;
1818

1919
import java.time.Duration;
20-
import java.util.Collections;
2120

2221
import io.netty.buffer.PooledByteBufAllocator;
23-
import io.rsocket.Closeable;
24-
import io.rsocket.RSocket;
2522
import io.rsocket.RSocketFactory;
2623
import io.rsocket.frame.decoder.PayloadDecoder;
27-
import io.rsocket.transport.netty.client.TcpClientTransport;
24+
import io.rsocket.transport.netty.server.CloseableChannel;
2825
import io.rsocket.transport.netty.server.TcpServerTransport;
29-
import io.rsocket.util.DefaultPayload;
3026
import org.junit.AfterClass;
3127
import org.junit.BeforeClass;
3228
import org.junit.Test;
@@ -51,12 +47,13 @@
5147
* Client-side handling of requests initiated from the server side.
5248
*
5349
* @author Rossen Stoyanchev
50+
* @author Brian Clozel
5451
*/
5552
public class RSocketServerToClientIntegrationTests {
5653

5754
private static AnnotationConfigApplicationContext context;
5855

59-
private static Closeable server;
56+
private static CloseableChannel server;
6057

6158

6259
@BeforeClass
@@ -66,8 +63,8 @@ public static void setupOnce() {
6663

6764
server = RSocketFactory.receive()
6865
.frameDecoder(PayloadDecoder.ZERO_COPY)
69-
.acceptor(context.getBean("serverMessageHandler", RSocketMessageHandler.class).serverAcceptor())
70-
.transport(TcpServerTransport.create("localhost", 7000))
66+
.acceptor(context.getBean(RSocketMessageHandler.class).serverAcceptor())
67+
.transport(TcpServerTransport.create("localhost", 0))
7168
.start()
7269
.block();
7370
}
@@ -104,23 +101,21 @@ private static void connectAndVerify(String destination) {
104101
ServerController serverController = context.getBean(ServerController.class);
105102
serverController.reset();
106103

107-
RSocket rsocket = null;
104+
RSocketRequester requester = null;
108105
try {
109-
rsocket = RSocketFactory.connect()
110-
.metadataMimeType("message/x.rsocket.routing.v0")
111-
.dataMimeType("text/plain")
112-
.setupPayload(DefaultPayload.create("", destination))
113-
.frameDecoder(PayloadDecoder.ZERO_COPY)
114-
.acceptor(context.getBean("clientMessageHandler", RSocketMessageHandler.class).clientAcceptor())
115-
.transport(TcpClientTransport.create("localhost", 7000))
116-
.start()
106+
requester = RSocketRequester.builder()
107+
.annotatedHandlers(new ClientHandler())
108+
.rsocketStrategies(context.getBean(RSocketStrategies.class))
109+
.connectTcp("localhost", server.address().getPort())
117110
.block();
118111

112+
requester.route(destination).data("").send().block();
113+
119114
serverController.await(Duration.ofSeconds(5));
120115
}
121116
finally {
122-
if (rsocket != null) {
123-
rsocket.dispose();
117+
if (requester != null) {
118+
requester.rsocket().dispose();
124119
}
125120
}
126121
}
@@ -250,24 +245,11 @@ Flux<String> echoChannel(Flux<String> payloads) {
250245
@Configuration
251246
static class RSocketConfig {
252247

253-
@Bean
254-
public ClientHandler clientHandler() {
255-
return new ClientHandler();
256-
}
257-
258248
@Bean
259249
public ServerController serverController() {
260250
return new ServerController();
261251
}
262252

263-
@Bean
264-
public RSocketMessageHandler clientMessageHandler() {
265-
RSocketMessageHandler handler = new RSocketMessageHandler();
266-
handler.setHandlers(Collections.singletonList(clientHandler()));
267-
handler.setRSocketStrategies(rsocketStrategies());
268-
return handler;
269-
}
270-
271253
@Bean
272254
public RSocketMessageHandler serverMessageHandler() {
273255
RSocketMessageHandler handler = new RSocketMessageHandler();

0 commit comments

Comments
 (0)