Skip to content

Commit ff15d52

Browse files
committed
Align RSocket module with the latest SF
* Upgrade to RSocket API `1.0.0-RC3-SNAPSHOT` * Use the same `rSocketMessageHandler.responder()` callback for client and server rsocket factories configuration * Clean up `ClientRSocketConnector` and `ServerRSocketConnector` JavaDocs not mentioning overriding any more * Use `WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA` constant instead of built-in literal * Fix `RSocketOutboundGatewayIntegrationTests` according the SF changes
1 parent 5073da8 commit ff15d52

File tree

5 files changed

+10
-17
lines changed

5 files changed

+10
-17
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ ext {
8888
reactorVersion = '3.3.0.M3'
8989
resilience4jVersion = '0.16.0'
9090
romeToolsVersion = '1.12.1'
91-
rsocketVersion = '0.12.2-RC4'
91+
rsocketVersion = '1.0.0-RC3-SNAPSHOT'
9292
servletApiVersion = '4.0.1'
9393
smackVersion = '4.3.4'
9494
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.0.BUILD-SNAPSHOT'

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.util.MimeType;
3333
import org.springframework.util.MimeTypeUtils;
3434

35+
import io.rsocket.metadata.WellKnownMimeType;
36+
3537
/**
3638
* A base connector container for common RSocket client and server functionality.
3739
* <p>
@@ -53,7 +55,8 @@ public abstract class AbstractRSocketConnector
5355

5456
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
5557

56-
private MimeType metadataMimeType = new MimeType("message", "x.rsocket.composite-metadata.v0");
58+
private MimeType metadataMimeType =
59+
MimeTypeUtils.parseMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.toString());
5760

5861
private RSocketStrategies rsocketStrategies =
5962
RSocketStrategies.builder()

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,7 @@
3434
import reactor.core.publisher.Mono;
3535

3636
/**
37-
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
38-
* <p>
39-
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(java.util.function.Function)}
40-
* in the provided {@link #factoryConfigurer} is overridden with an internal
41-
* {@link IntegrationRSocketMessageHandler#clientResponder()}
42-
* for the proper Spring Integration channel adapter mappings.
37+
* A client {@link AbstractRSocketConnector} extension to the RSocket connection.
4338
*
4439
* @author Artem Bilan
4540
*
@@ -126,7 +121,7 @@ public void afterPropertiesSet() {
126121
.dataMimeType(getDataMimeType().toString())
127122
.metadataMimeType(getMetadataMimeType().toString());
128123
this.factoryConfigurer.accept(clientFactory);
129-
clientFactory.acceptor(this.rSocketMessageHandler.clientResponder());
124+
clientFactory.acceptor(this.rSocketMessageHandler.responder());
130125
Payload connectPayload = EmptyPayload.INSTANCE;
131126
if (this.connectRoute != null) {
132127
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,6 @@
4949

5050
/**
5151
* A server {@link AbstractRSocketConnector} extension to accept and manage client RSocket connections.
52-
* <p>
53-
* Note: the {@link RSocketFactory.ServerRSocketFactory#acceptor(io.rsocket.SocketAcceptor)}
54-
* in the provided {@link #factoryConfigurer} is overridden with an internal
55-
* {@link ServerRSocketMessageHandler#serverResponder()}
56-
* for the proper Spring Integration channel adapter mappings.
5752
*
5853
* @author Artem Bilan
5954
*
@@ -131,7 +126,7 @@ public void afterPropertiesSet() {
131126

132127
this.serverMono =
133128
serverFactory
134-
.acceptor(serverRSocketMessageHandler().serverResponder())
129+
.acceptor(serverRSocketMessageHandler().responder())
135130
.transport(this.serverTransport)
136131
.start()
137132
.cache();

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ static void setup() {
119119
serverContext = new AnnotationConfigApplicationContext(ServerConfig.class);
120120
server = RSocketFactory.receive()
121121
.frameDecoder(PayloadDecoder.ZERO_COPY)
122-
.acceptor(serverContext.getBean(RSocketMessageHandler.class).serverResponder())
122+
.acceptor(serverContext.getBean(RSocketMessageHandler.class).responder())
123123
.transport(TcpServerTransport.create("localhost", 0))
124124
.start()
125125
.block();
@@ -531,7 +531,7 @@ public RSocket rsocketForServerRequests() {
531531
.dataMimeType("text/plain")
532532
.metadataMimeType("message/x.rsocket.routing.v0")
533533
.frameDecoder(PayloadDecoder.ZERO_COPY)
534-
.acceptor(messageHandler().clientResponder())
534+
.acceptor(messageHandler().responder())
535535
.transport(TcpClientTransport.create("localhost", server.address().getPort()))
536536
.start()
537537
.block();

0 commit comments

Comments
 (0)