Skip to content

Commit 67f7038

Browse files
committed
Add RSocketInboundGateway; refactoring
* Extract an `AbstractRSocketConnector` for common client and server connectors logic * Introduce an `IntegrationRSocketAcceptor` and `IntegrationRSocket` for the mapping and handling logic of RSockets and messages in between * Introduce an `IntegrationRSocketEndpoint` marker interface for Inbound Gateway mappings * Add `RSocketInboundGateway` implementation, which is called from the `IntegrationRSocketAcceptor` by the `IntegrationRSocketEndpoint` mapping * Add `RSocketConnectedEvent` to emit when the client is connected to the server. It does not make sense in Spring Integration to delegate such a logic into the `RSocketInboundGateway`
1 parent bcff9d4 commit 67f7038

File tree

8 files changed

+929
-38
lines changed

8 files changed

+929
-38
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket;
18+
19+
import org.springframework.beans.BeansException;
20+
import org.springframework.beans.factory.DisposableBean;
21+
import org.springframework.beans.factory.InitializingBean;
22+
import org.springframework.beans.factory.SmartInitializingSingleton;
23+
import org.springframework.context.ApplicationContext;
24+
import org.springframework.context.ApplicationContextAware;
25+
import org.springframework.messaging.rsocket.RSocketStrategies;
26+
import org.springframework.util.Assert;
27+
import org.springframework.util.MimeType;
28+
import org.springframework.util.MimeTypeUtils;
29+
30+
/**
31+
* A base connector container for common RSocket client and server functionality.
32+
* <p>
33+
* It accepts {@link IntegrationRSocketEndpoint} instances for mapping registration via an internal
34+
* {@link IntegrationRSocketAcceptor} or performs an auto-detection otherwise, when all bean are ready
35+
* in the application context.
36+
*
37+
* @author Artem Bilan
38+
*
39+
* @since 5.2
40+
*
41+
* @see IntegrationRSocketAcceptor
42+
*/
43+
public class AbstractRSocketConnector
44+
implements ApplicationContextAware, InitializingBean, DisposableBean, SmartInitializingSingleton {
45+
46+
protected final IntegrationRSocketAcceptor rsocketAcceptor = new IntegrationRSocketAcceptor(); // NOSONAR - final
47+
48+
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
49+
50+
private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
51+
52+
public void setDataMimeType(MimeType dataMimeType) {
53+
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
54+
this.dataMimeType = dataMimeType;
55+
}
56+
57+
protected MimeType getDataMimeType() {
58+
return this.dataMimeType;
59+
}
60+
61+
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
62+
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
63+
this.rsocketStrategies = rsocketStrategies;
64+
}
65+
66+
public RSocketStrategies getRSocketStrategies() {
67+
return this.rsocketStrategies;
68+
}
69+
70+
public void setEndpoints(IntegrationRSocketEndpoint... endpoints) {
71+
Assert.notNull(endpoints, "'endpoints' must not be null");
72+
for (IntegrationRSocketEndpoint endpoint : endpoints) {
73+
addEndpoint(endpoint);
74+
}
75+
}
76+
77+
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
78+
this.rsocketAcceptor.addEndpoint(endpoint);
79+
}
80+
81+
@Override
82+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
83+
this.rsocketAcceptor.setApplicationContext(applicationContext);
84+
}
85+
86+
@Override
87+
public void afterPropertiesSet() {
88+
this.rsocketAcceptor.setDefaultDataMimeType(this.dataMimeType);
89+
this.rsocketAcceptor.setRSocketStrategies(this.rsocketStrategies);
90+
}
91+
92+
@Override
93+
public void afterSingletonsInstantiated() {
94+
this.rsocketAcceptor.detectEndpoints();
95+
}
96+
97+
@Override
98+
public void destroy() {
99+
100+
}
101+
102+
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+65-32
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,11 @@
1818

1919
import java.net.URI;
2020
import java.util.function.Consumer;
21+
import java.util.function.Function;
2122

22-
import org.springframework.beans.factory.DisposableBean;
23-
import org.springframework.beans.factory.InitializingBean;
23+
import org.springframework.context.SmartLifecycle;
2424
import org.springframework.messaging.rsocket.RSocketRequester;
25-
import org.springframework.messaging.rsocket.RSocketStrategies;
2625
import org.springframework.util.Assert;
27-
import org.springframework.util.MimeType;
28-
import org.springframework.util.MimeTypeUtils;
2926

3027
import io.rsocket.Payload;
3128
import io.rsocket.RSocket;
@@ -39,7 +36,11 @@
3936
import reactor.core.publisher.Mono;
4037

4138
/**
42-
* A client connector to the RSocket server.
39+
* A client {@link AbstractRSocketConnector} extension to the RSocket server.
40+
* <p>
41+
* Note: the {@link RSocketFactory.ClientRSocketFactory#acceptor(Function)}
42+
* in the provided {@link #factoryConfigurer} is overridden with an internal {@link IntegrationRSocketAcceptor}
43+
* for the proper Spring Integration channel adapter mappings.
4344
*
4445
* @author Artem Bilan
4546
*
@@ -48,20 +49,22 @@
4849
* @see RSocketFactory.ClientRSocketFactory
4950
* @see RSocketRequester
5051
*/
51-
public class ClientRSocketConnector implements InitializingBean, DisposableBean {
52+
public class ClientRSocketConnector extends AbstractRSocketConnector implements SmartLifecycle {
5253

5354
private final ClientTransport clientTransport;
5455

55-
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
56-
57-
private Payload connectPayload = EmptyPayload.INSTANCE;
56+
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
5857

59-
private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
58+
private String connectRoute;
6059

61-
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
60+
private String connectData = "";
6261

6362
private Mono<RSocket> rsocketMono;
6463

64+
private volatile boolean running;
65+
66+
private boolean autoConnect;
67+
6568
public ClientRSocketConnector(String host, int port) {
6669
this(TcpClientTransport.create(host, port));
6770
}
@@ -75,50 +78,80 @@ public ClientRSocketConnector(ClientTransport clientTransport) {
7578
this.clientTransport = clientTransport;
7679
}
7780

78-
public void setDataMimeType(MimeType dataMimeType) {
79-
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
80-
this.dataMimeType = dataMimeType;
81-
}
82-
8381
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
8482
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
8583
this.factoryConfigurer = factoryConfigurer;
8684
}
8785

88-
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
89-
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
90-
this.rsocketStrategies = rsocketStrategies;
86+
public void setConnectRoute(String connectRoute) {
87+
this.connectRoute = connectRoute;
9188
}
9289

93-
public void setConnectRoute(String connectRoute) {
94-
this.connectPayload = DefaultPayload.create("", connectRoute);
90+
public void setConnectData(String connectData) {
91+
Assert.notNull(connectData, "'connectData' must not be null");
92+
this.connectData = connectData;
9593
}
9694

9795
@Override
9896
public void afterPropertiesSet() {
97+
super.afterPropertiesSet();
9998
RSocketFactory.ClientRSocketFactory clientFactory =
10099
RSocketFactory.connect()
101-
.dataMimeType(this.dataMimeType.toString());
100+
.dataMimeType(getDataMimeType().toString());
102101
this.factoryConfigurer.accept(clientFactory);
103-
clientFactory.setupPayload(this.connectPayload);
102+
clientFactory.acceptor(this.rsocketAcceptor);
103+
Payload connectPayload = EmptyPayload.INSTANCE;
104+
if (this.connectRoute != null) {
105+
connectPayload = DefaultPayload.create(this.connectData, this.connectRoute);
106+
}
107+
clientFactory.setupPayload(connectPayload);
104108
this.rsocketMono = clientFactory.transport(this.clientTransport).start().cache();
105109
}
106110

107-
public void connect() {
108-
this.rsocketMono.subscribe();
109-
}
110-
111-
public Mono<RSocketRequester> getRSocketRequester() {
112-
return this.rsocketMono
113-
.map(rsocket -> RSocketRequester.wrap(rsocket, this.dataMimeType, this.rsocketStrategies))
114-
.cache();
111+
@Override
112+
public void afterSingletonsInstantiated() {
113+
this.autoConnect = this.rsocketAcceptor.detectEndpoints();
115114
}
116115

117116
@Override
118117
public void destroy() {
118+
super.destroy();
119119
this.rsocketMono
120120
.doOnNext(Disposable::dispose)
121121
.subscribe();
122122
}
123123

124+
@Override
125+
public void start() {
126+
if (!this.running) {
127+
this.running = true;
128+
if (this.autoConnect) {
129+
connect();
130+
}
131+
}
132+
}
133+
134+
@Override
135+
public void stop() {
136+
this.running = false;
137+
}
138+
139+
@Override
140+
public boolean isRunning() {
141+
return this.running;
142+
}
143+
144+
/**
145+
* Perform subscription into the RSocket server for incoming requests.
146+
*/
147+
public void connect() {
148+
this.rsocketMono.subscribe();
149+
}
150+
151+
public Mono<RSocketRequester> getRSocketRequester() {
152+
return this.rsocketMono
153+
.map(rsocket -> RSocketRequester.wrap(rsocket, getDataMimeType(), getRSocketStrategies()))
154+
.cache();
155+
}
156+
124157
}

0 commit comments

Comments
 (0)