Skip to content

Commit a95c76e

Browse files
committed
RSocket requests: Add per message metadata support
* Use convenient `Consumer` API to avoid external iteration for `setupMetadata` in the `ClientRSocketConnector` * Add `routeVars` and `metadata` support into the `RSocketOutboundGateway` * Cover new options in the XML and Java DSL configurations
1 parent 444c1f9 commit a95c76e

File tree

8 files changed

+106
-18
lines changed

8 files changed

+106
-18
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/ClientRSocketConnector.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void setSetupData(Object setupData) {
153153
public void afterPropertiesSet() {
154154
super.afterPropertiesSet();
155155

156-
RSocketRequester.Builder rsocketRequesterBuilder =
156+
this.rsocketRequesterMono =
157157
RSocketRequester.builder()
158158
.dataMimeType(getDataMimeType())
159159
.metadataMimeType(getMetadataMimeType())
@@ -162,10 +162,8 @@ public void afterPropertiesSet() {
162162
.setupRoute(this.setupRoute, this.setupRouteVars)
163163
.rsocketFactory(this.factoryConfigurer)
164164
.rsocketFactory((rsocketFactory) ->
165-
rsocketFactory.acceptor(this.rSocketMessageHandler.responder()));
166-
this.setupMetadata.forEach(rsocketRequesterBuilder::setupMetadata);
167-
this.rsocketRequesterMono =
168-
rsocketRequesterBuilder
165+
rsocketFactory.acceptor(this.rSocketMessageHandler.responder()))
166+
.apply((builder) -> this.setupMetadata.forEach(builder::setupMetadata))
169167
.connect(this.clientTransport)
170168
.cache();
171169
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
5252
populateValueOrExpressionIfAny(builder, element, parserContext, "command");
5353
populateValueOrExpressionIfAny(builder, element, parserContext, "publisher-element-type");
5454
populateValueOrExpressionIfAny(builder, element, parserContext, "expected-response-type");
55+
populateValueOrExpressionIfAny(builder, element, parserContext, "metadata");
5556
return builder;
5657
}
5758

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketOutboundGatewaySpec.java

+42
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.rsocket.dsl;
1818

19+
import java.util.Map;
1920
import java.util.function.Function;
2021

2122
import org.springframework.expression.Expression;
@@ -25,6 +26,7 @@
2526
import org.springframework.integration.rsocket.ClientRSocketConnector;
2627
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
2728
import org.springframework.messaging.Message;
29+
import org.springframework.util.MimeType;
2830

2931
/**
3032
* The {@link MessageHandlerSpec} implementation for the {@link RSocketOutboundGateway}.
@@ -35,6 +37,10 @@
3537
*/
3638
public class RSocketOutboundGatewaySpec extends MessageHandlerSpec<RSocketOutboundGatewaySpec, RSocketOutboundGateway> {
3739

40+
RSocketOutboundGatewaySpec(String route, Object... routeVariables) {
41+
this.target = new RSocketOutboundGateway(route, routeVariables);
42+
}
43+
3844
RSocketOutboundGatewaySpec(Expression routeExpression) {
3945
this.target = new RSocketOutboundGateway(routeExpression);
4046
}
@@ -186,4 +192,40 @@ public RSocketOutboundGatewaySpec expectedResponseType(Expression expectedRespon
186192
return this;
187193
}
188194

195+
/**
196+
* Configure a {@link Function} to evaluate a metadata as a {@code Map<Object, MimeType>}
197+
* for RSocket request against request message.
198+
* @param metadataFunction the {@code Function} to use.
199+
* @param <P> the expected request message payload type.
200+
* @return the spec
201+
* @see RSocketOutboundGateway#setMetadataExpression(Expression)
202+
*/
203+
public <P> RSocketOutboundGatewaySpec metadata(Function<Message<P>, Map<Object, MimeType>> metadataFunction) {
204+
return metadata(new FunctionExpression<>(metadataFunction));
205+
}
206+
207+
/**
208+
Configure a SpEL expression to evaluate a metadata as a {@code Map<Object, MimeType>}
209+
* for RSocket request against request message.
210+
* @param metadataExpression the SpEL expression to use.
211+
* @return the spec
212+
* @see RSocketOutboundGateway#setMetadataExpression(Expression)
213+
*/
214+
public RSocketOutboundGatewaySpec metadata(String metadataExpression) {
215+
return metadata(PARSER.parseExpression(metadataExpression));
216+
}
217+
218+
/**
219+
* Configure a SpEL expression to evaluate a metadata as a {@code Map<Object, MimeType>}
220+
* for RSocket request against request message.
221+
* for RSocket request type at runtime against a request message.
222+
* @param metadataExpression the SpEL expression to use.
223+
* @return the spec
224+
* @see RSocketOutboundGateway#setMetadataExpression(Expression)
225+
*/
226+
public RSocketOutboundGatewaySpec metadata(Expression metadataExpression) {
227+
this.target.setMetadataExpression(metadataExpression);
228+
return this;
229+
}
230+
189231
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSockets.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.function.Function;
2020

2121
import org.springframework.expression.Expression;
22-
import org.springframework.expression.common.LiteralExpression;
2322
import org.springframework.integration.expression.FunctionExpression;
2423
import org.springframework.messaging.Message;
2524

@@ -34,12 +33,13 @@ public final class RSockets {
3433

3534
/**
3635
* Create an {@link RSocketOutboundGatewaySpec} builder for request-reply gateway
37-
* based on provided {@code route}.
36+
* based on provided {@code route} and optional variables to expand route template.
3837
* @param route the {@code route} to send requests.
38+
* @param routeVariables the variables to expand route template.
3939
* @return the RSocketOutboundGatewaySpec instance
4040
*/
41-
public static RSocketOutboundGatewaySpec outboundGateway(String route) {
42-
return outboundGateway(new LiteralExpression(route));
41+
public static RSocketOutboundGatewaySpec outboundGateway(String route, Object... routeVariables) {
42+
return new RSocketOutboundGatewaySpec(route, routeVariables);
4343
}
4444

4545
/**

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

+36-6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.rsocket.outbound;
1818

19+
import java.util.Map;
20+
1921
import org.reactivestreams.Publisher;
2022

2123
import org.springframework.core.ParameterizedTypeReference;
@@ -31,6 +33,8 @@
3133
import org.springframework.messaging.rsocket.annotation.support.RSocketRequesterMethodArgumentResolver;
3234
import org.springframework.util.Assert;
3335
import org.springframework.util.ClassUtils;
36+
import org.springframework.util.CollectionUtils;
37+
import org.springframework.util.MimeType;
3438

3539
import reactor.core.publisher.Mono;
3640

@@ -67,6 +71,8 @@ public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
6771

6872
private final Expression routeExpression;
6973

74+
private Object[] routeVars;
75+
7076
@Nullable
7177
private ClientRSocketConnector clientRSocketConnector;
7278

@@ -76,22 +82,29 @@ public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler
7682

7783
private Expression expectedResponseTypeExpression = new ValueExpression<>(String.class);
7884

85+
private Expression metadataExpression;
86+
7987
private EvaluationContext evaluationContext;
8088

8189
@Nullable
8290
private Mono<RSocketRequester> rsocketRequesterMono;
8391

8492
/**
85-
* Instantiate based on the provided RSocket endpoint {@code route}.
93+
* Instantiate based on the provided RSocket endpoint {@code route}
94+
* and optional variables to expand route template.
8695
* @param route the RSocket endpoint route to use.
96+
* @param routeVariables the variables to expand route template.
8797
*/
88-
public RSocketOutboundGateway(String route) {
98+
public RSocketOutboundGateway(String route, Object... routeVariables) {
8999
this(new ValueExpression<>(route));
100+
this.routeVars = routeVariables;
90101
}
91102

92103
/**
93104
* Instantiate based on the provided SpEL expression to evaluate an RSocket endpoint {@code route}
94105
* at runtime against a request message.
106+
* If route is a template and variables expansion is required, it is recommended to do that
107+
* in this expression evaluation, for example using some bean with an appropriate logic.
95108
* @param routeExpression the SpEL expression to use.
96109
*/
97110
public RSocketOutboundGateway(Expression routeExpression) {
@@ -173,6 +186,14 @@ public void setExpectedResponseTypeExpression(Expression expectedResponseTypeExp
173186
this.expectedResponseTypeExpression = expectedResponseTypeExpression;
174187
}
175188

189+
/**
190+
* Specify a SpEL expression to evaluate a metadata for RSocket request
191+
* as {@code Map<Object, MimeType>} against request message.
192+
* @param metadataExpression the expression for metadata.
193+
*/
194+
public void setMetadataExpression(Expression metadataExpression) {
195+
this.metadataExpression = metadataExpression;
196+
}
176197

177198
@Override
178199
protected void doInit() {
@@ -205,13 +226,23 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
205226
.flatMap((responseSpec) -> performRequest(responseSpec, requestMessage));
206227
}
207228

229+
@SuppressWarnings("unchecked")
208230
private RSocketRequester.RequestSpec createRequestSpec(RSocketRequester rsocketRequester,
209231
Message<?> requestMessage) {
210232

211233
String route = this.routeExpression.getValue(this.evaluationContext, requestMessage, String.class);
212234
Assert.notNull(route, () -> "The 'routeExpression' [" + this.routeExpression + "] must not evaluate to null");
213235

214-
return rsocketRequester.route(route);
236+
RSocketRequester.RequestSpec requestSpec = rsocketRequester.route(route, this.routeVars);
237+
if (this.metadataExpression != null) {
238+
Map<Object, MimeType> metadata =
239+
this.metadataExpression.getValue(this.evaluationContext, requestMessage, Map.class);
240+
if (!CollectionUtils.isEmpty(metadata)) {
241+
requestSpec.metadata((spec) -> metadata.forEach(spec::metadata));
242+
}
243+
}
244+
245+
return requestSpec;
215246
}
216247

217248
private RSocketRequester.ResponseSpec createResponseSpec(RSocketRequester.RequestSpec requestSpec,
@@ -228,15 +259,14 @@ private RSocketRequester.ResponseSpec createResponseSpec(RSocketRequester.Reques
228259
}
229260
}
230261

231-
@SuppressWarnings({ "rawtypes", "unchecked" })
232262
private RSocketRequester.ResponseSpec responseSpecForPublisher(RSocketRequester.RequestSpec requestSpec,
233263
Publisher<?> payload, Object publisherElementType) {
234264

235265
if (publisherElementType instanceof Class<?>) {
236-
return requestSpec.data(payload, (Class) publisherElementType);
266+
return requestSpec.data(payload, (Class<?>) publisherElementType);
237267
}
238268
else {
239-
return requestSpec.data(payload, (ParameterizedTypeReference) publisherElementType);
269+
return requestSpec.data(payload, (ParameterizedTypeReference<?>) publisherElementType);
240270
}
241271
}
242272

spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket-5.2.xsd

+9-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@
180180
<xsd:annotation>
181181
<xsd:documentation>
182182
A SpEL expression to evaluate a 'Class' or 'ParameterizedTypeReference'
183-
for for an RSocket response at runtime
183+
for an RSocket response at runtime
184184
against request message.
185185
Mutually exclusive with 'expected-response-type'.
186186
</xsd:documentation>
@@ -199,6 +199,14 @@
199199
</xsd:appinfo>
200200
</xsd:annotation>
201201
</xsd:attribute>
202+
<xsd:attribute name="metadata-expression" type="xsd:string">
203+
<xsd:annotation>
204+
<xsd:documentation>
205+
A SpEL expression to evaluate a 'Map' representing a metadata
206+
for an RSocket request at runtime against request message.
207+
</xsd:documentation>
208+
</xsd:annotation>
209+
</xsd:attribute>
202210
</xsd:extension>
203211
</xsd:complexContent>
204212
</xsd:complexType>

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParserTests-context.xml

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
route-expression="'testRoute'"
2222
request-channel="requestChannel"
2323
publisher-element-type="byte[]"
24-
expected-response-type="java.util.Date"/>
24+
expected-response-type="java.util.Date"
25+
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
2526

2627
</beans>

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParserTests.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.Collections;
22+
2123
import org.junit.jupiter.api.Test;
2224

2325
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.expression.Expression;
2427
import org.springframework.integration.rsocket.ClientRSocketConnector;
2528
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
2629
import org.springframework.integration.test.util.TestUtils;
2730
import org.springframework.test.annotation.DirtiesContext;
2831
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
32+
import org.springframework.util.MimeType;
2933

3034
/**
3135
* @author Artem Bilan
@@ -34,7 +38,7 @@
3438
*/
3539
@SpringJUnitConfig
3640
@DirtiesContext
37-
public class RSocketOutboundGatewayParserTests {
41+
class RSocketOutboundGatewayParserTests {
3842

3943
@Autowired
4044
private ClientRSocketConnector clientRSocketConnector;
@@ -54,6 +58,10 @@ void testOutboundGatewayParser() {
5458
.isEqualTo("byte[]");
5559
assertThat(TestUtils.getPropertyValue(this.outboundGateway, "expectedResponseTypeExpression.literalValue"))
5660
.isEqualTo("java.util.Date");
61+
Expression metadataExpression =
62+
TestUtils.getPropertyValue(this.outboundGateway, "metadataExpression", Expression.class);
63+
assertThat(metadataExpression.getValue())
64+
.isEqualTo(Collections.singletonMap("metadata", new MimeType("*")));
5765
}
5866

5967
}

0 commit comments

Comments
 (0)