Skip to content

Commit ecee757

Browse files
committed
* Add ServerRSocketConnector to represent an RSocket server and
container for connected `RSocketRequester`s from clients * Extract `accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket)` server logic into an internal `ServerRSocketAcceptor` extension for the `IntegrationRSocketAcceptor` * Address PR comments: - `RSocketConnectedEvent.toString()` - `ApplicationEventPublisherAware` into the `ServerRSocketConnector` - Log RSocket connection if no `this.applicationEventPublisher` * Remove `handleConnectionSetupPayload()` from the `IntegrationRSocket` since it is not delegated to the target handler * Provide reasonable default `RSocketStrategies` for the `AbstractRSocketConnector` and `RSocketInboundGateway` * Add initial `RSocketInboundGatewayIntegrationTests`
1 parent 67f7038 commit ecee757

File tree

9 files changed

+514
-123
lines changed

9 files changed

+514
-123
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/AbstractRSocketConnector.java

+44-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
import org.springframework.beans.factory.SmartInitializingSingleton;
2323
import org.springframework.context.ApplicationContext;
2424
import org.springframework.context.ApplicationContextAware;
25+
import org.springframework.context.SmartLifecycle;
26+
import org.springframework.core.codec.CharSequenceEncoder;
27+
import org.springframework.core.codec.StringDecoder;
28+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
2529
import org.springframework.messaging.rsocket.RSocketStrategies;
2630
import org.springframework.util.Assert;
2731
import org.springframework.util.MimeType;
@@ -40,14 +44,28 @@
4044
*
4145
* @see IntegrationRSocketAcceptor
4246
*/
43-
public class AbstractRSocketConnector
44-
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton {
47+
public abstract class AbstractRSocketConnector
48+
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton,
49+
SmartLifecycle {
4550

46-
protected final IntegrationRSocketAcceptor rsocketAcceptor = new IntegrationRSocketAcceptor(); // NOSONAR - final
51+
protected final IntegrationRSocketAcceptor rsocketAcceptor; // NOSONAR - final
4752

4853
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
4954

50-
private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
55+
private RSocketStrategies rsocketStrategies =
56+
RSocketStrategies.builder()
57+
.decoder(StringDecoder.allMimeTypes())
58+
.encoder(CharSequenceEncoder.allMimeTypes())
59+
.dataBufferFactory(new DefaultDataBufferFactory())
60+
.build();
61+
62+
private volatile boolean running;
63+
64+
private ApplicationContext applicationContext;
65+
66+
protected AbstractRSocketConnector(IntegrationRSocketAcceptor rsocketAcceptor) {
67+
this.rsocketAcceptor = rsocketAcceptor;
68+
}
5169

5270
public void setDataMimeType(MimeType dataMimeType) {
5371
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
@@ -80,13 +98,19 @@ public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
8098

8199
@Override
82100
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
101+
this.applicationContext = applicationContext;
83102
this.rsocketAcceptor.setApplicationContext(applicationContext);
84103
}
85104

105+
protected ApplicationContext getApplicationContext() {
106+
return this.applicationContext;
107+
}
108+
86109
@Override
87110
public void afterPropertiesSet() {
88111
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
89112
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
113+
this.rsocketAcceptor.afterPropertiesSet();
90114
}
91115

92116
@Override
@@ -95,8 +119,23 @@ public void afterSingletonsInstantiated() {
95119
}
96120

97121
@Override
98-
public void destroy() {
122+
public void start() {
123+
if (!this.running) {
124+
this.running = true;
125+
doStart();
126+
}
127+
}
128+
129+
protected abstract void doStart();
130+
131+
@Override
132+
public void stop() {
133+
this.running = false;
134+
}
99135

136+
@Override
137+
public boolean isRunning() {
138+
return this.running;
100139
}
101140

102141
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+11-27
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.function.Consumer;
2121
import java.util.function.Function;
2222

23-
import org.springframework.context.SmartLifecycle;
2423
import org.springframework.messaging.rsocket.RSocketRequester;
2524
import org.springframework.util.Assert;
2625

@@ -49,7 +48,7 @@
4948
* @see RSocketFactory.ClientRSocketFactory
5049
* @see RSocketRequester
5150
*/
52-
public class ClientRSocketConnector extends AbstractRSocketConnector implements SmartLifecycle {
51+
public class ClientRSocketConnector extends AbstractRSocketConnector {
5352

5453
private final ClientTransport clientTransport;
5554

@@ -59,12 +58,10 @@ public class ClientRSocketConnector extends AbstractRSocketConnector implements
5958

6059
private String connectData = "";
6160

62-
private Mono<RSocket> rsocketMono;
63-
64-
private volatile boolean running;
65-
6661
private boolean autoConnect;
6762

63+
private Mono<RSocket> rsocketMono;
64+
6865
public ClientRSocketConnector(String host, int port) {
6966
this(TcpClientTransport.create(host, port));
7067
}
@@ -74,6 +71,7 @@ public ClientRSocketConnector(URI uri) {
7471
}
7572

7673
public ClientRSocketConnector(ClientTransport clientTransport) {
74+
super(new IntegrationRSocketAcceptor());
7775
Assert.notNull(clientTransport, "'clientTransport' must not be null");
7876
this.clientTransport = clientTransport;
7977
}
@@ -114,31 +112,17 @@ public void afterSingletonsInstantiated() {
114112
}
115113

116114
@Override
117-
public void destroy() {
118-
super.destroy();
119-
this.rsocketMono
120-
.doOnNext(Disposable::dispose)
121-
.subscribe();
122-
}
123-
124-
@Override
125-
public void start() {
126-
if (!this.running) {
127-
this.running = true;
128-
if (this.autoConnect) {
129-
connect();
130-
}
115+
protected void doStart() {
116+
if (this.autoConnect) {
117+
connect();
131118
}
132119
}
133120

134121
@Override
135-
public void stop() {
136-
this.running = false;
137-
}
138-
139-
@Override
140-
public boolean isRunning() {
141-
return this.running;
122+
public void destroy() {
123+
this.rsocketMono
124+
.doOnNext(Disposable::dispose)
125+
.subscribe();
142126
}
143127

144128
/**

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java

+27-59
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.reactivestreams.Publisher;
2323

24-
import org.springframework.context.ApplicationEventPublisher;
2524
import org.springframework.core.io.buffer.DataBuffer;
2625
import org.springframework.core.io.buffer.DataBufferFactory;
2726
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -39,12 +38,9 @@
3938
import org.springframework.messaging.support.MessageHeaderAccessor;
4039
import org.springframework.util.Assert;
4140
import org.springframework.util.MimeType;
42-
import org.springframework.util.MimeTypeUtils;
43-
import org.springframework.util.StringUtils;
4441

4542
import io.netty.buffer.ByteBuf;
4643
import io.rsocket.AbstractRSocket;
47-
import io.rsocket.ConnectionSetupPayload;
4844
import io.rsocket.Payload;
4945
import io.rsocket.RSocket;
5046
import reactor.core.publisher.Flux;
@@ -59,9 +55,6 @@
5955
* Essentially, this is an adapted for Spring Integration copy
6056
* of the {@link org.springframework.messaging.rsocket.MessagingRSocket} because
6157
* that one is not public.
62-
* <p>
63-
* Also this class doesn't delegate a {@link #handleConnectionSetupPayload}
64-
* into the target handler {@link Function} and emits a {@link RSocketConnectedEvent} instead.
6558
*
6659
* @author Artem Bilan
6760
*
@@ -80,9 +73,6 @@ class IntegrationRSocket extends AbstractRSocket {
8073
@Nullable
8174
private MimeType dataMimeType;
8275

83-
private ApplicationEventPublisher applicationEventPublisher;
84-
85-
8676
IntegrationRSocket(Function<Message<?>, Mono<Void>> handler, RSocketRequester requester,
8777
@Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) {
8878

@@ -94,38 +84,12 @@ class IntegrationRSocket extends AbstractRSocket {
9484
this.bufferFactory = bufferFactory;
9585
}
9686

97-
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
98-
this.applicationEventPublisher = applicationEventPublisher;
87+
public void setDataMimeType(MimeType dataMimeType) {
88+
this.dataMimeType = dataMimeType;
9989
}
10090

101-
/**
102-
* Wrap the {@link ConnectionSetupPayload} with a {@link Message} and
103-
* delegate to {@link #handle(Payload)} for handling.
104-
* @param payload the connection payload
105-
* @return completion handle for success or error
106-
*/
107-
public Mono<Void> handleConnectionSetupPayload(ConnectionSetupPayload payload) {
108-
if (StringUtils.hasText(payload.dataMimeType())) {
109-
this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType());
110-
}
111-
// frameDecoder does not apply to connectionSetupPayload
112-
// so retain here since handle expects it..
113-
payload.retain();
114-
String destination = getDestination(payload);
115-
DataBuffer dataBuffer = retainDataAndReleasePayload(payload);
116-
int refCount = refCount(dataBuffer);
117-
return Mono
118-
.defer(() -> {
119-
this.applicationEventPublisher.publishEvent(
120-
new RSocketConnectedEvent(IntegrationRSocket.this, destination, dataBuffer,
121-
this.requester));
122-
return Mono.<Void>empty();
123-
})
124-
.doFinally((signal) -> {
125-
if (refCount(dataBuffer) == refCount) {
126-
DataBufferUtils.release(dataBuffer);
127-
}
128-
});
91+
public RSocketRequester getRequester() {
92+
return this.requester;
12993
}
13094

13195
@Override
@@ -173,7 +137,7 @@ private Mono<Void> handle(Payload payload) {
173137
});
174138
}
175139

176-
private int refCount(DataBuffer dataBuffer) {
140+
static int refCount(DataBuffer dataBuffer) {
177141
return dataBuffer instanceof NettyDataBuffer ?
178142
((NettyDataBuffer) dataBuffer).getNativeBuffer().refCnt() : 1;
179143
}
@@ -199,25 +163,8 @@ private Flux<Payload> handleAndReply(Payload firstPayload, Flux<Payload> payload
199163
Mono.error(new IllegalStateException("Something went wrong: reply Mono not set"))));
200164
}
201165

202-
private String getDestination(Payload payload) {
203-
return payload.getMetadataUtf8();
204-
}
205-
206166
private DataBuffer retainDataAndReleasePayload(Payload payload) {
207-
try {
208-
if (this.bufferFactory instanceof NettyDataBufferFactory) {
209-
ByteBuf byteBuf = payload.sliceData().retain();
210-
return ((NettyDataBufferFactory) this.bufferFactory).wrap(byteBuf);
211-
}
212-
else {
213-
return this.bufferFactory.wrap(payload.getData());
214-
}
215-
}
216-
finally {
217-
if (payload.refCnt() > 0) {
218-
payload.release();
219-
}
220-
}
167+
return payloadToDataBuffer(payload, this.bufferFactory);
221168
}
222169

223170
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) {
@@ -235,4 +182,25 @@ private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor
235182
return headers.getMessageHeaders();
236183
}
237184

185+
static String getDestination(Payload payload) {
186+
return payload.getMetadataUtf8();
187+
}
188+
189+
static DataBuffer payloadToDataBuffer(Payload payload, DataBufferFactory bufferFactory) {
190+
try {
191+
if (bufferFactory instanceof NettyDataBufferFactory) {
192+
ByteBuf byteBuf = payload.sliceData().retain();
193+
return ((NettyDataBufferFactory) bufferFactory).wrap(byteBuf);
194+
}
195+
else {
196+
return bufferFactory.wrap(payload.getData());
197+
}
198+
}
199+
finally {
200+
if (payload.refCnt() > 0) {
201+
payload.release();
202+
}
203+
}
204+
}
205+
238206
}

0 commit comments

Comments
 (0)