Skip to content

Commit 30750f5

Browse files
artembilangaryrussell
authored andcommitted
Polishing for RSocket channel adapter (#2928)
* Polishing for RSocket channel adapter * Add JavaDocs * Remove unused code * * Fix Sonar smells
1 parent a56116f commit 30750f5

9 files changed

+95
-24
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ public abstract class AbstractRSocketConnector
6161

6262
private volatile boolean running;
6363

64-
private ApplicationContext applicationContext;
65-
6664
protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
6765
this.rsocketAcceptor = rsocketAcceptor;
6866
}
6967

68+
/**
69+
* Configure a {@link MimeType} for data exchanging.
70+
* @param dataMimeType the {@link MimeType} to use.
71+
*/
7072
public void setDataMimeType(MimeType dataMimeType) {
7173
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
7274
this.dataMimeType = dataMimeType;
@@ -76,6 +78,10 @@ protected MimeType getDataMimeType() {
7678
return this.dataMimeType;
7779
}
7880

81+
/**
82+
* Configure a {@link RSocketStrategies} for data encoding/decoding.
83+
* @param rsocketStrategies the {@link RSocketStrategies} to use.
84+
*/
7985
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
8086
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
8187
this.rsocketStrategies = rsocketStrategies;
@@ -85,27 +91,31 @@ public RSocketStrategies getRSocketStrategies() {
8591
return this.rsocketStrategies;
8692
}
8793

94+
/**
95+
* Configure {@link IntegrationRSocketEndpoint} instances for mapping nad handling requests.
96+
* @param endpoints the {@link IntegrationRSocketEndpoint} instances for handling inbound requests.
97+
* @see #addEndpoint(IntegrationRSocketEndpoint)
98+
*/
8899
public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
89100
Assert.notNull(endpoints, "'endpoints' must not be null");
90101
for (IntegrationRSocketEndpoint endpoint : endpoints) {
91102
addEndpoint(endpoint);
92103
}
93104
}
94105

106+
/**
107+
* Add an {@link IntegrationRSocketEndpoint} for mapping and handling RSocket requests.
108+
* @param endpoint the {@link IntegrationRSocketEndpoint} to map.
109+
*/
95110
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
96111
this.rsocketAcceptor.addEndpoint(endpoint);
97112
}
98113

99114
@Override
100115
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
101-
this.applicationContext = applicationContext;
102116
this.rsocketAcceptor.setApplicationContext(applicationContext);
103117
}
104118

105-
protected ApplicationContext getApplicationContext() {
106-
return this.applicationContext;
107-
}
108-
109119
@Override
110120
public void afterPropertiesSet() {
111121
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import java.net.URI;
2020
import java.util.function.Consumer;
21-
import java.util.function.Function;
2221

2322
import org.springframework.messaging.rsocket.RSocketRequester;
2423
import org.springframework.util.Assert;
@@ -37,7 +36,7 @@
3736
/**
3837
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
3938
* <p>
40-
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(Function)}
39+
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(java.util.function.Function)}
4140
* in the provided {@link #factoryConfigurer} is overridden with an internal {@link IntegrationRSocketAcceptor}
4241
* for the proper Spring Integration channel adapter mappings.
4342
*
@@ -62,29 +61,57 @@ public class ClientRSocketConnector extends AbstractRSocketConnector {
6261

6362
private Mono<RSocket> rsocketMono;
6463

64+
/**
65+
* Instantiate a connector based on the {@link TcpClientTransport}.
66+
* @param host the TCP host to connect.
67+
* @param port the TCP port to connect.
68+
* @see #ClientRSocketConnector(ClientTransport)
69+
*/
6570
public ClientRSocketConnector(String host, int port) {
6671
this(TcpClientTransport.create(host, port));
6772
}
6873

74+
/**
75+
* Instantiate a connector based on the {@link WebsocketClientTransport}.
76+
* @param uri the WebSocket URI to connect.
77+
* @see #ClientRSocketConnector(ClientTransport)
78+
*/
6979
public ClientRSocketConnector(URI uri) {
7080
this(WebsocketClientTransport.create(uri));
7181
}
7282

83+
/**
84+
* Instantiate a connector based on the provided {@link ClientTransport}.
85+
* @param clientTransport the {@link ClientTransport} to use.
86+
*/
7387
public ClientRSocketConnector(ClientTransport clientTransport) {
7488
super(new IntegrationRSocketAcceptor());
7589
Assert.notNull(clientTransport, "'clientTransport' must not be null");
7690
this.clientTransport = clientTransport;
7791
}
7892

93+
/**
94+
* Specify a {@link Consumer} for configuring a {@link RSocketFactory.ClientRSocketFactory}.
95+
* @param factoryConfigurer the {@link Consumer} to configure the {@link RSocketFactory.ClientRSocketFactory}.
96+
*/
7997
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
8098
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
8199
this.factoryConfigurer = factoryConfigurer;
82100
}
83101

102+
/**
103+
* Configure a route for server RSocket endpoint.
104+
* @param connectRoute the route to connect to.
105+
*/
84106
public void setConnectRoute(String connectRoute) {
85107
this.connectRoute = connectRoute;
86108
}
87109

110+
/**
111+
* Configure a data for connect.
112+
* Defaults to empty string.
113+
* @param connectData the data for connect frame.
114+
*/
88115
public void setConnectData(String connectData) {
89116
Assert.notNull(connectData, "'connectData' must not be null");
90117
this.connectData = connectData;

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,12 @@
4242
import io.netty.buffer.ByteBuf;
4343
import io.rsocket.AbstractRSocket;
4444
import io.rsocket.Payload;
45-
import io.rsocket.RSocket;
4645
import reactor.core.publisher.Flux;
4746
import reactor.core.publisher.Mono;
4847
import reactor.core.publisher.MonoProcessor;
4948

5049
/**
51-
* Implementation of {@link RSocket} that wraps incoming requests with a
50+
* Implementation of {@link io.rsocket.RSocket} that wraps incoming requests with a
5251
* {@link Message}, delegates to a {@link Function} for handling, and then
5352
* obtains the response from a "reply" header.
5453
* <p>

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketAcceptor.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.springframework.util.MimeType;
3838
import org.springframework.util.ReflectionUtils;
3939

40-
import io.rsocket.ConnectionSetupPayload;
4140
import io.rsocket.RSocket;
4241

4342
/**
@@ -66,7 +65,7 @@ class IntegrationRSocketAcceptor extends RSocketMessageHandler implements Functi
6665
/**
6766
* Configure the default content type to use for data payloads.
6867
* <p>By default this is not set. However a server acceptor will use the
69-
* content type from the {@link ConnectionSetupPayload}, so this is typically
68+
* content type from the {@link io.rsocket.ConnectionSetupPayload}, so this is typically
7069
* required for clients but can also be used on servers as a fallback.
7170
* @param defaultDataMimeType the MimeType to use
7271
*/

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@
3333
*/
3434
public interface IntegrationRSocketEndpoint extends ReactiveMessageHandler {
3535

36+
/**
37+
* Obtain path patterns this {@link ReactiveMessageHandler} is going to be mapped onto.
38+
* @return the path patterns for mapping.
39+
*/
3640
String[] getPath();
3741

3842
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

+24
Original file line numberDiff line numberDiff line change
@@ -69,25 +69,49 @@ public class ServerRSocketConnector extends AbstractRSocketConnector
6969

7070
private Mono<? extends Closeable> serverMono;
7171

72+
/**
73+
* Instantiate a server connector based on the {@link TcpServerTransport}.
74+
* @param bindAddress the local address to bind TCP server onto.
75+
* @param port the local TCP port to bind.
76+
* @see #ServerRSocketConnector(ServerTransport)
77+
*/
7278
public ServerRSocketConnector(String bindAddress, int port) {
7379
this(TcpServerTransport.create(bindAddress, port));
7480
}
7581

82+
/**
83+
* Instantiate a server connector based on the {@link WebsocketServerTransport}.
84+
* @param server the {@link HttpServer} to use.
85+
* @see #ServerRSocketConnector(ServerTransport)
86+
*/
7687
public ServerRSocketConnector(HttpServer server) {
7788
this(WebsocketServerTransport.create(server));
7889
}
7990

91+
/**
92+
* Instantiate a server connector based on the provided {@link ServerTransport}.
93+
* @param serverTransport the {@link ServerTransport} to make server based on.
94+
*/
8095
public ServerRSocketConnector(ServerTransport<? extends Closeable> serverTransport) {
8196
super(new ServerRSocketAcceptor());
8297
Assert.notNull(serverTransport, "'serverTransport' must not be null");
8398
this.serverTransport = serverTransport;
8499
}
85100

101+
/**
102+
* Provide a {@link Consumer} to configure the {@link RSocketFactory.ServerRSocketFactory}.
103+
* @param factoryConfigurer the {@link Consumer} to configure the {@link RSocketFactory.ServerRSocketFactory}.
104+
*/
86105
public void setFactoryConfigurer(Consumer<RSocketFactory.ServerRSocketFactory> factoryConfigurer) {
87106
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
88107
this.factoryConfigurer = factoryConfigurer;
89108
}
90109

110+
/**
111+
* Configure a strategy to determine a key for the client {@link RSocketRequester} connected.
112+
* Defaults to the {@code destination} the client is connected.
113+
* @param clientRSocketKeyStrategy the {@link BiFunction} to determine a key for client {@link RSocketRequester}s.
114+
*/
91115
public void setClientRSocketKeyStrategy(BiFunction<String, DataBuffer, Object> clientRSocketKeyStrategy) {
92116
Assert.notNull(clientRSocketKeyStrategy, "'clientRSocketKeyStrategy' must not be null");
93117
serverRSocketAcceptor().clientRSocketKeyStrategy = clientRSocketKeyStrategy;

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketNamespaceHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler;
2020

2121
/**
22-
* Namespace handler for Spring Integration's <em>RSocket</em> namespace.
22+
* Namespace handler for Spring Integration XML configuration for <em>RSocket</em> support.
2323
*
2424
* @author Artem Bilan
25+
*
26+
* @since 5.2
2527
*/
2628
public class RSocketNamespaceHandler extends AbstractIntegrationNamespaceHandler {
2729

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
/**
5757
* The {@link MessagingGatewaySupport} implementation for the {@link IntegrationRSocketEndpoint}.
58+
* Represents an inbound endpoint for RSocket requests.
5859
* <p>
5960
* May be configured with the {@link AbstractRSocketConnector} for mapping registration.
6061
* Or existing {@link AbstractRSocketConnector} bean(s) will perform detection automatically.
@@ -92,6 +93,10 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
9293
@Nullable
9394
private ResolvableType requestElementType;
9495

96+
/**
97+
* Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests.
98+
* @param path the mapping patterns to use.
99+
*/
95100
public RSocketInboundGateway(String... path) {
96101
Assert.notNull(path, "'path' must not be null");
97102
this.path = path;
@@ -123,7 +128,7 @@ public void setRSocketConnector(AbstractRSocketConnector rsocketConnector) {
123128
* @return the mapping path
124129
*/
125130
public String[] getPath() {
126-
return this.path;
131+
return Arrays.copyOf(this.path, this.path.length);
127132
}
128133

129134
/**
@@ -151,9 +156,10 @@ public void setRequestElementType(ResolvableType requestElementType) {
151156
@Override
152157
protected void onInit() {
153158
super.onInit();
154-
if (this.rsocketConnector != null) {
155-
this.rsocketConnector.addEndpoint(this);
156-
this.rsocketStrategies = this.rsocketConnector.getRSocketStrategies();
159+
AbstractRSocketConnector rsocketConnectorToUse = this.rsocketConnector;
160+
if (rsocketConnectorToUse != null) {
161+
rsocketConnectorToUse.addEndpoint(this);
162+
this.rsocketStrategies = rsocketConnectorToUse.getRSocketStrategies();
157163
}
158164
}
159165

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
197197
requesterMono = this.rsocketRequesterMono;
198198
}
199199

200-
Assert.notNull(requesterMono, () ->
201-
"The 'RSocketRequester' must be configured via 'ClientRSocketConnector' or provided in the '" +
200+
Assert.notNull(requesterMono,
201+
() -> "The 'RSocketRequester' must be configured via 'ClientRSocketConnector' or provided in the '" +
202202
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER + "' request message headers.");
203203

204204
return requesterMono
@@ -207,13 +207,13 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
207207
.flatMap((responseSpec) -> performRequest(responseSpec, requestMessage));
208208
}
209209

210-
private RSocketRequester.RequestSpec createRequestSpec(RSocketRequester rSocketRequester,
210+
private RSocketRequester.RequestSpec createRequestSpec(RSocketRequester rsocketRequester,
211211
Message<?> requestMessage) {
212212

213213
String route = this.routeExpression.getValue(this.evaluationContext, requestMessage, String.class);
214214
Assert.notNull(route, () -> "The 'routeExpression' [" + this.routeExpression + "] must not evaluate to null");
215215

216-
return rSocketRequester.route(route);
216+
return rsocketRequester.route(route);
217217
}
218218

219219
private RSocketRequester.ResponseSpec createResponseSpec(RSocketRequester.RequestSpec requestSpec,
@@ -244,8 +244,8 @@ private RSocketRequester.ResponseSpec responseSpecForPublisher(RSocketRequester.
244244

245245
private Mono<?> performRequest(RSocketRequester.ResponseSpec responseSpec, Message<?> requestMessage) {
246246
Command command = this.commandExpression.getValue(this.evaluationContext, requestMessage, Command.class);
247-
Assert.notNull(command, () -> "The 'commandExpression' [" + this.commandExpression +
248-
"] must not evaluate to null");
247+
Assert.notNull(command,
248+
() -> "The 'commandExpression' [" + this.commandExpression + "] must not evaluate to null");
249249

250250
Object expectedResponseType = null;
251251
if (!Command.fireAndForget.equals(command)) {

0 commit comments

Comments
 (0)