Skip to content

Commit cc87c1b

Browse files
committed
Optimize ServerRSocketConnector connection
When RSocket client connects to the server there is no reason to wrap a `ConnectionSetupPayload` into a `Message` since we are not going to send it downstream * Refactor `IntegrationRSocket.handleConnectionSetupPayload()` just return a `Mono<DataBuffer>` for converted `ConnectionSetupPayload` * Ask for a `destination` and `RSocketRequester` from the `IntegrationRSocket` instead of message headers
1 parent 0675fdf commit cc87c1b

File tree

2 files changed

+9
-18
lines changed

2 files changed

+9
-18
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,20 @@ class IntegrationRSocket extends AbstractRSocket {
110110
this.bufferFactory = bufferFactory;
111111
}
112112

113+
RSocketRequester getRequester() {
114+
return this.requester;
115+
}
116+
113117
/**
114118
* Wrap the {@link ConnectionSetupPayload} with a {@link Message} and
115119
* delegate to {@link #handle(Payload)} for handling.
116120
* @param payload the connection payload
117121
* @return completion handle for success or error
118122
*/
119-
Mono<Message<DataBuffer>> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
120-
String destination = getDestination(payload);
121-
MessageHeaders headers = createHeaders(destination, null);
123+
Mono<DataBuffer> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
122124
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
123125
int refCount = refCount(dataBuffer);
124-
return Mono.just(MessageBuilder.createMessage(dataBuffer, headers))
126+
return Mono.just(dataBuffer)
125127
.doFinally(s -> {
126128
if (refCount(dataBuffer) == refCount) {
127129
DataBufferUtils.release(dataBuffer);

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ServerRSocketConnector.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,8 @@
2929
import org.springframework.context.ApplicationEventPublisherAware;
3030
import org.springframework.core.io.buffer.DataBuffer;
3131
import org.springframework.lang.Nullable;
32-
import org.springframework.messaging.MessageHeaders;
33-
import org.springframework.messaging.handler.DestinationPatternsMessageCondition;
3432
import org.springframework.messaging.rsocket.RSocketRequester;
35-
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
3633
import org.springframework.util.Assert;
37-
import org.springframework.util.RouteMatcher;
3834

3935
import io.rsocket.RSocketFactory;
4036
import io.rsocket.SocketAcceptor;
@@ -181,17 +177,10 @@ public SocketAcceptor serverAcceptor() {
181177
return (setupPayload, sendingRSocket) -> {
182178
IntegrationRSocket rsocket = createRSocket(setupPayload, sendingRSocket);
183179
return rsocket.handleConnectionSetupPayload(setupPayload)
184-
.doOnNext((message) -> {
185-
MessageHeaders messageHeaders = message.getHeaders();
186-
DataBuffer dataBuffer = message.getPayload();
187-
String destination =
188-
messageHeaders.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER,
189-
RouteMatcher.Route.class)
190-
.value();
180+
.doOnNext((dataBuffer) -> {
181+
String destination = rsocket.getDestination(setupPayload);
191182
Object rsocketRequesterKey = this.clientRSocketKeyStrategy.apply(destination, dataBuffer);
192-
RSocketRequester rsocketRequester =
193-
messageHeaders.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER,
194-
RSocketRequester.class);
183+
RSocketRequester rsocketRequester = rsocket.getRequester();
195184
this.clientRSocketRequesters.put(rsocketRequesterKey, rsocketRequester);
196185
RSocketConnectedEvent rSocketConnectedEvent =
197186
new RSocketConnectedEvent(rsocket, destination, dataBuffer, rsocketRequester);

0 commit comments

Comments
 (0)