Skip to content

Commit 4199ff5

Browse files
artembilangaryrussell
authored andcommitted
Make RSocketOutboundGateway for server side
* Introduce a `ClientRSocketConnector` to represent a common logic for client side connection and management an obtained `RSocket` * Rework the `RSocketOutboundGateway` to perform similar RSocket request logic on the server side as well. * Refactor `RSocketOutboundGatewayIntegrationTests` to demonstrate that RSocket requests work the same way from the server side as well. For this reason a `CommonConfig` has been extracted with the same `RSocketOutboundGateway` configuration reuse on both server and client sides.
1 parent b069780 commit 4199ff5

File tree

6 files changed

+470
-108
lines changed

6 files changed

+470
-108
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket;
18+
19+
import java.net.URI;
20+
import java.util.function.Consumer;
21+
22+
import org.springframework.beans.factory.DisposableBean;
23+
import org.springframework.beans.factory.InitializingBean;
24+
import org.springframework.messaging.rsocket.RSocketRequester;
25+
import org.springframework.messaging.rsocket.RSocketStrategies;
26+
import org.springframework.util.Assert;
27+
import org.springframework.util.MimeType;
28+
import org.springframework.util.MimeTypeUtils;
29+
30+
import io.rsocket.Payload;
31+
import io.rsocket.RSocket;
32+
import io.rsocket.RSocketFactory;
33+
import io.rsocket.transport.ClientTransport;
34+
import io.rsocket.transport.netty.client.TcpClientTransport;
35+
import io.rsocket.transport.netty.client.WebsocketClientTransport;
36+
import io.rsocket.util.DefaultPayload;
37+
import io.rsocket.util.EmptyPayload;
38+
import reactor.core.Disposable;
39+
import reactor.core.publisher.Mono;
40+
41+
/**
42+
* A client connector to the RSocket server.
43+
*
44+
* @author Artem Bilan
45+
*
46+
* @since 5.2
47+
*
48+
* @see RSocketFactory.ClientRSocketFactory
49+
* @see RSocketRequester
50+
*/
51+
public class ClientRSocketConnector implements InitializingBean, DisposableBean {
52+
53+
private final ClientTransport clientTransport;
54+
55+
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
56+
57+
private Payload connectPayload = EmptyPayload.INSTANCE;
58+
59+
private RSocketStrategies rsocketStrategies = RSocketStrategies.builder().build();
60+
61+
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
62+
63+
private Mono<RSocket> rsocketMono;
64+
65+
public ClientRSocketConnector(String host, int port) {
66+
this(TcpClientTransport.create(host, port));
67+
}
68+
69+
public ClientRSocketConnector(URI uri) {
70+
this(WebsocketClientTransport.create(uri));
71+
}
72+
73+
public ClientRSocketConnector(ClientTransport clientTransport) {
74+
Assert.notNull(clientTransport, "'clientTransport' must not be null");
75+
this.clientTransport = clientTransport;
76+
}
77+
78+
public void setDataMimeType(MimeType dataMimeType) {
79+
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
80+
this.dataMimeType = dataMimeType;
81+
}
82+
83+
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
84+
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
85+
this.factoryConfigurer = factoryConfigurer;
86+
}
87+
88+
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
89+
Assert.notNull(rsocketStrategies, "'rsocketStrategies' must not be null");
90+
this.rsocketStrategies = rsocketStrategies;
91+
}
92+
93+
public void setConnectRoute(String connectRoute) {
94+
this.connectPayload = DefaultPayload.create("", connectRoute);
95+
}
96+
97+
@Override
98+
public void afterPropertiesSet() {
99+
RSocketFactory.ClientRSocketFactory clientFactory =
100+
RSocketFactory.connect()
101+
.dataMimeType(this.dataMimeType.toString());
102+
this.factoryConfigurer.accept(clientFactory);
103+
clientFactory.setupPayload(this.connectPayload);
104+
this.rsocketMono = clientFactory.transport(this.clientTransport).start();
105+
}
106+
107+
public void connect() {
108+
this.rsocketMono.cache().subscribe();
109+
}
110+
111+
public Mono<RSocketRequester> getRSocketRequester() {
112+
return this.rsocketMono
113+
.map(rsocket -> RSocketRequester.wrap(rsocket, this.dataMimeType, this.rsocketStrategies))
114+
.cache();
115+
}
116+
117+
@Override
118+
public void destroy() {
119+
this.rsocketMono
120+
.doOnNext(Disposable::dispose)
121+
.subscribe();
122+
}
123+
124+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes for RSocket XML namespace parsing and configuration support.
3+
*/
4+
package org.springframework.integration.rsocket.config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes representing inbound RSocket components.
3+
*/
4+
package org.springframework.integration.rsocket.inbound;

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

+79-46
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.integration.rsocket.outbound;
1818

19-
import java.util.function.Consumer;
20-
2119
import org.reactivestreams.Publisher;
2220

2321
import org.springframework.core.ParameterizedTypeReference;
@@ -26,82 +24,108 @@
2624
import org.springframework.integration.expression.ExpressionUtils;
2725
import org.springframework.integration.expression.ValueExpression;
2826
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
27+
import org.springframework.integration.rsocket.ClientRSocketConnector;
28+
import org.springframework.lang.Nullable;
2929
import org.springframework.messaging.Message;
3030
import org.springframework.messaging.rsocket.RSocketRequester;
31-
import org.springframework.messaging.rsocket.RSocketStrategies;
31+
import org.springframework.messaging.rsocket.RSocketRequesterMethodArgumentResolver;
3232
import org.springframework.util.Assert;
3333
import org.springframework.util.ClassUtils;
34-
import org.springframework.util.MimeType;
35-
import org.springframework.util.MimeTypeUtils;
3634

37-
import io.rsocket.RSocketFactory;
38-
import io.rsocket.transport.ClientTransport;
39-
import reactor.core.Disposable;
4035
import reactor.core.publisher.Mono;
4136

4237
/**
43-
* An Outbound Messaging Gateway for RSocket client requests.
38+
* An Outbound Messaging Gateway for RSocket requests.
39+
* The request logic is fully based on the {@link RSocketRequester}, which can be obtained from the
40+
* {@link ClientRSocketConnector} on the client side or from the
41+
* {@link RSocketRequesterMethodArgumentResolver#RSOCKET_REQUESTER_HEADER} request message header
42+
* on the server side.
43+
* <p>
44+
* An RSocket operation is determined by the configured {@link Command} or respective SpEL
45+
* expression to be evaluated at runtime against the request message.
46+
* By default the {@link Command#requestResponse} operation is used.
47+
* <p>
48+
* For a {@link Publisher}-based requests, it must be present in the request message {@code payload}.
49+
* The flattening via upstream {@link org.springframework.integration.channel.FluxMessageChannel} will work, too,
50+
* but this way we will lose a scope of particular request and every {@link Publisher} event
51+
* will be send in its own plain request.
52+
* <p>
53+
* If reply is a {@link reactor.core.publisher.Flux}, it is wrapped to the {@link Mono} to retain a request scope.
54+
* The downstream flow is responsible to obtain this {@link reactor.core.publisher.Flux} from a message payload
55+
* and subscribe to it by itself. The {@link Mono} reply from this component is subscribed from the downstream
56+
* {@link org.springframework.integration.channel.FluxMessageChannel} or it is adapted to the
57+
* {@link org.springframework.util.concurrent.ListenableFuture} otherwise.
4458
*
4559
* @author Artem Bilan
4660
*
4761
* @since 5.2
4862
*
63+
* @see Command
4964
* @see RSocketRequester
5065
*/
5166
public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler {
5267

53-
private final ClientTransport clientTransport;
54-
5568
private final Expression routeExpression;
5669

57-
private MimeType dataMimeType = MimeTypeUtils.TEXT_PLAIN;
58-
59-
private Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer = (clientRSocketFactory) -> { };
60-
61-
private Consumer<RSocketStrategies.Builder> strategiesConfigurer = (builder) -> { };
70+
@Nullable
71+
private ClientRSocketConnector clientRSocketConnector;
6272

6373
private Expression commandExpression = new ValueExpression<>(Command.requestResponse);
6474

6575
private Expression publisherElementTypeExpression = new ValueExpression<>(String.class);
6676

6777
private Expression expectedResponseTypeExpression = new ValueExpression<>(String.class);
6878

69-
private Mono<RSocketRequester> rSocketRequesterMono;
70-
7179
private EvaluationContext evaluationContext;
7280

73-
public RSocketOutboundGateway(ClientTransport clientTransport, String route) {
74-
this(clientTransport, new ValueExpression<>(route));
81+
@Nullable
82+
private Mono<RSocketRequester> rsocketRequesterMono;
83+
84+
/**
85+
* Instantiate based on the provided RSocket endpoint {@code route}.
86+
* @param route the RSocket endpoint route to use.
87+
*/
88+
public RSocketOutboundGateway(String route) {
89+
this(new ValueExpression<>(route));
7590
}
7691

77-
public RSocketOutboundGateway(ClientTransport clientTransport, Expression routeExpression) {
78-
Assert.notNull(clientTransport, "'clientTransport' must not be null");
92+
/**
93+
* Instantiate based on the provided SpEL expression to evaluate an RSocket endpoint {@code route}
94+
* at runtime against a request message.
95+
* @param routeExpression the SpEL expression to use.
96+
*/
97+
public RSocketOutboundGateway(Expression routeExpression) {
7998
Assert.notNull(routeExpression, "'routeExpression' must not be null");
80-
this.clientTransport = clientTransport;
8199
this.routeExpression = routeExpression;
82100
setAsync(true);
83101
setPrimaryExpression(this.routeExpression);
84102
}
85103

86-
public void setDataMimeType(MimeType dataMimeType) {
87-
Assert.notNull(dataMimeType, "'dataMimeType' must not be null");
88-
this.dataMimeType = dataMimeType;
89-
}
90-
91-
public void setFactoryConfigurer(Consumer<RSocketFactory.ClientRSocketFactory> factoryConfigurer) {
92-
Assert.notNull(factoryConfigurer, "'factoryConfigurer' must not be null");
93-
this.factoryConfigurer = factoryConfigurer;
94-
}
95-
96-
public void setStrategiesConfigurer(Consumer<RSocketStrategies.Builder> strategiesConfigurer) {
97-
Assert.notNull(strategiesConfigurer, "'strategiesConfigurer' must not be null");
98-
this.strategiesConfigurer = strategiesConfigurer;
104+
/**
105+
* Configure a {@link ClientRSocketConnector} for client side requests based on the connection
106+
* provided by the {@link ClientRSocketConnector#getRSocketRequester()}.
107+
* In case of server side, an {@link RSocketRequester} must be provided in the
108+
* {@link RSocketRequesterMethodArgumentResolver#RSOCKET_REQUESTER_HEADER} header of request message.
109+
* @param clientRSocketConnector the {@link ClientRSocketConnector} to use.
110+
*/
111+
public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnector) {
112+
Assert.notNull(clientRSocketConnector, "'clientRSocketConnector' must not be null");
113+
this.clientRSocketConnector = clientRSocketConnector;
99114
}
100115

116+
/**
117+
* Configure a {@link Command} for RSocket request type.
118+
* @param command the {@link Command} to use.
119+
*/
101120
public void setCommand(Command command) {
102121
setCommandExpression(new ValueExpression<>(command));
103122
}
104123

124+
/**
125+
* Configure a SpEL expression to evaluate a {@link Command} for RSocket request type at runtime
126+
* against a request message.
127+
* @param commandExpression the SpEL expression to use.
128+
*/
105129
public void setCommandExpression(Expression commandExpression) {
106130
Assert.notNull(commandExpression, "'commandExpression' must not be null");
107131
this.commandExpression = commandExpression;
@@ -155,26 +179,35 @@ public void setExpectedResponseTypeExpression(Expression expectedResponseTypeExp
155179
@Override
156180
protected void doInit() {
157181
super.doInit();
158-
this.rSocketRequesterMono =
159-
RSocketRequester.builder()
160-
.rsocketFactory(this.factoryConfigurer)
161-
.rsocketStrategies(this.strategiesConfigurer)
162-
.connect(this.clientTransport, this.dataMimeType);
163-
164182
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
183+
if (this.clientRSocketConnector != null) {
184+
this.rsocketRequesterMono = this.clientRSocketConnector.getRSocketRequester().cache();
185+
}
165186
}
166187

167188
@Override
168189
public void destroy() {
169190
super.destroy();
170-
this.rSocketRequesterMono.map(RSocketRequester::rsocket)
171-
.doOnNext(Disposable::dispose)
172-
.subscribe();
191+
173192
}
174193

175194
@Override
176195
protected Object handleRequestMessage(Message<?> requestMessage) {
177-
return this.rSocketRequesterMono.cache()
196+
RSocketRequester rsocketRequester = requestMessage.getHeaders()
197+
.get(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, RSocketRequester.class);
198+
Mono<RSocketRequester> requesterMono;
199+
if (rsocketRequester != null) {
200+
requesterMono = Mono.just(rsocketRequester);
201+
}
202+
else {
203+
requesterMono = this.rsocketRequesterMono;
204+
}
205+
206+
Assert.notNull(requesterMono, () ->
207+
"The 'RSocketRequester' must be configured via 'ClientRSocketConnector' or provided in the '" +
208+
RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER + "' request message headers.");
209+
210+
return requesterMono
178211
.map((rSocketRequester) -> createRequestSpec(rSocketRequester, requestMessage))
179212
.map((requestSpec) -> createResponseSpec(requestSpec, requestMessage))
180213
.flatMap((responseSpec) -> performRequest(responseSpec, requestMessage));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides common classes for RSocket components.
3+
*/
4+
package org.springframework.integration.rsocket;

0 commit comments

Comments
 (0)