Skip to content

Commit f049623

Browse files
committed
Fix Rsocket module according latest SF changes
1 parent 3ec6b56 commit f049623

File tree

5 files changed

+17
-14
lines changed

5 files changed

+17
-14
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

+9-8
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,12 @@
3232
import org.springframework.lang.Nullable;
3333
import org.springframework.messaging.Message;
3434
import org.springframework.messaging.MessageHeaders;
35+
import org.springframework.messaging.ReactiveMessageHandler;
3536
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3637
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
37-
import org.springframework.messaging.rsocket.RSocketPayloadReturnValueHandler;
3838
import org.springframework.messaging.rsocket.RSocketRequester;
39-
import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver;
39+
import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler;
40+
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
4041
import org.springframework.messaging.support.MessageBuilder;
4142
import org.springframework.messaging.support.MessageHeaderAccessor;
4243
import org.springframework.util.Assert;
@@ -57,14 +58,14 @@
5758
* obtains the response from a "reply" header.
5859
* <p>
5960
* Essentially, this is an adapted for Spring Integration copy
60-
* of the {@link org.springframework.messaging.rsocket.MessagingRSocket} because
61+
* of the {@link org.springframework.messaging.rsocket.annotation.support.MessagingRSocket} because
6162
* that one is not public.
6263
*
6364
* @author Artem Bilan
6465
*
6566
* @since 5.2
6667
*
67-
* @see org.springframework.messaging.rsocket.MessagingRSocket
68+
* @see org.springframework.messaging.rsocket.annotation.support.MessagingRSocket
6869
*/
6970
class IntegrationRSocket extends AbstractRSocket {
7071

@@ -75,7 +76,7 @@ class IntegrationRSocket extends AbstractRSocket {
7576
static final List<MimeType> METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING);
7677

7778

78-
private final Function<Message<?>, Mono<Void>> handler;
79+
private final ReactiveMessageHandler handler;
7980

8081
private final RouteMatcher routeMatcher;
8182

@@ -87,7 +88,7 @@ class IntegrationRSocket extends AbstractRSocket {
8788

8889
private final MimeType metadataMimeType;
8990

90-
IntegrationRSocket(Function<Message<?>, Mono<Void>> handler, RouteMatcher routeMatcher,
91+
IntegrationRSocket(ReactiveMessageHandler handler, RouteMatcher routeMatcher,
9192
RSocketRequester requester, MimeType dataMimeType, MimeType metadataMimeType,
9293
DataBufferFactory bufferFactory) {
9394

@@ -149,7 +150,7 @@ private Mono<Void> handle(Payload payload) {
149150
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
150151
int refCount = refCount(dataBuffer);
151152
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers);
152-
return Mono.defer(() -> this.handler.apply(message))
153+
return Mono.defer(() -> this.handler.handleMessage(message))
153154
.doFinally((signal) -> {
154155
if (refCount(dataBuffer) == refCount) {
155156
DataBufferUtils.release(dataBuffer);
@@ -173,7 +174,7 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
173174
.doOnSubscribe((subscription) -> read.set(true));
174175
Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers);
175176

176-
return Mono.defer(() -> this.handler.apply(message))
177+
return Mono.defer(() -> this.handler.handleMessage(message))
177178
.doFinally((signal) -> {
178179
// Subscription should have happened by now due to ChannelSendOperator
179180
if (!read.get()) {

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketAcceptor.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3131
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
3232
import org.springframework.messaging.handler.invocation.reactive.SyncHandlerMethodArgumentResolver;
33-
import org.springframework.messaging.rsocket.RSocketMessageHandler;
3433
import org.springframework.messaging.rsocket.RSocketRequester;
3534
import org.springframework.messaging.rsocket.RSocketStrategies;
35+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
3636
import org.springframework.util.Assert;
3737
import org.springframework.util.MimeType;
3838
import org.springframework.util.MimeTypeUtils;
@@ -79,6 +79,7 @@ class IntegrationRSocketAcceptor extends RSocketMessageHandler
7979
* required for clients but can also be used on servers as a fallback.
8080
* @param defaultDataMimeType the MimeType to use
8181
*/
82+
@Override
8283
public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
8384
this.defaultDataMimeType = defaultDataMimeType;
8485
}
@@ -89,6 +90,7 @@ public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) {
8990
* <p>By default this is set to {@code "message/x.rsocket.composite-metadata.v0"}
9091
* @param mimeType the MimeType to use
9192
*/
93+
@Override
9294
public void setDefaultMetadataMimeType(MimeType mimeType) {
9395
Assert.notNull(mimeType, "'metadataMimeType' is required");
9496
this.defaultMetadataMimeType = mimeType;
@@ -138,7 +140,7 @@ protected IntegrationRSocket createRSocket(ConnectionSetupPayload setupPayload,
138140
? MimeTypeUtils.parseMimeType(setupPayload.metadataMimeType())
139141
: this.defaultMetadataMimeType;
140142
Assert.notNull(dataMimeType, "No `metadataMimeType` in the ConnectionSetupPayload and no default value");
141-
return new IntegrationRSocket(this::handleMessage, getRouteMatcher(),
143+
return new IntegrationRSocket(this, getRouteMatcher(),
142144
RSocketRequester.wrap(rsocket, dataMimeType, metadataMimeType, rsocketStrategies),
143145
dataMimeType, metadataMimeType, rsocketStrategies.dataBufferFactory());
144146
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
import org.springframework.messaging.MessageDeliveryException;
4242
import org.springframework.messaging.MessageHeaders;
4343
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler;
44-
import org.springframework.messaging.rsocket.RSocketPayloadReturnValueHandler;
4544
import org.springframework.messaging.rsocket.RSocketStrategies;
45+
import org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler;
4646
import org.springframework.util.Assert;
4747
import org.springframework.util.MimeType;
4848

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.springframework.lang.Nullable;
2929
import org.springframework.messaging.Message;
3030
import org.springframework.messaging.rsocket.RSocketRequester;
31-
import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver;
31+
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
3232
import org.springframework.util.Assert;
3333
import org.springframework.util.ClassUtils;
3434

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@
5050
import org.springframework.messaging.PollableChannel;
5151
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
5252
import org.springframework.messaging.handler.annotation.MessageMapping;
53-
import org.springframework.messaging.rsocket.RSocketMessageHandler;
5453
import org.springframework.messaging.rsocket.RSocketRequester;
55-
import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver;
5654
import org.springframework.messaging.rsocket.RSocketStrategies;
55+
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
56+
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
5757
import org.springframework.messaging.support.ErrorMessage;
5858
import org.springframework.stereotype.Controller;
5959
import org.springframework.test.annotation.DirtiesContext;

0 commit comments

Comments
 (0)