Skip to content

Commit ab5a5b5

Browse files
committed
Introduce RSocketInteractionModel
* Deprecate `RSocketOutboundGateway.Command` in favor of newly introduced top level `RSocketInteractionModel` * Deprecate setter, DSL and XML configurations for the deprecated `RSocketOutboundGateway.Command` in favor of newly introduced configurators for the mentioned `RSocketInteractionModel` * Add `IntegrationRSocketEndpoint.getInteractionModels()` contract for inbound endpoints. This way we can restrict mapped endpoints to the particular interaction model(s) * Add DSL and XML configuration for inbound interaction model option * Document changes
1 parent 25e9459 commit ab5a5b5

17 files changed

+340
-65
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java

+11
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,15 @@ public interface IntegrationRSocketEndpoint extends ReactiveMessageHandler {
3939
*/
4040
String[] getPath();
4141

42+
/**
43+
* Obtain {@link RSocketInteractionModel}s
44+
* this {@link ReactiveMessageHandler} is going to be mapped onto.
45+
* Defaults to all the {@link RSocketInteractionModel}s.
46+
* @return the interaction models for mapping.
47+
* @since 5.2.2
48+
*/
49+
default RSocketInteractionModel[] getInteractionModels() {
50+
return RSocketInteractionModel.values();
51+
}
52+
4253
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java

+12-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.rsocket;
1818

1919
import java.lang.reflect.Method;
20+
import java.util.Arrays;
2021
import java.util.Collections;
2122
import java.util.List;
2223

@@ -79,13 +80,19 @@ public boolean detectEndpoints() {
7980
}
8081

8182
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
83+
RSocketFrameTypeMessageCondition frameTypeMessageCondition = RSocketFrameTypeMessageCondition.EMPTY_CONDITION;
84+
85+
RSocketInteractionModel[] interactionModels = endpoint.getInteractionModels();
86+
if (interactionModels.length > 0) {
87+
frameTypeMessageCondition =
88+
new RSocketFrameTypeMessageCondition(
89+
Arrays.stream(interactionModels)
90+
.map(RSocketInteractionModel::getFrameType)
91+
.toArray(FrameType[]::new));
92+
}
8293
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
8394
new CompositeMessageCondition(
84-
new RSocketFrameTypeMessageCondition(
85-
FrameType.REQUEST_FNF,
86-
FrameType.REQUEST_RESPONSE,
87-
FrameType.REQUEST_STREAM,
88-
FrameType.REQUEST_CHANNEL),
95+
frameTypeMessageCondition,
8996
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR
9097
}
9198

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.rsocket;
18+
19+
import io.rsocket.frame.FrameType;
20+
21+
/**
22+
* The RSocket protocol interaction models.
23+
*
24+
* @author Artem Bilan
25+
*
26+
* @since 5.2.2
27+
*
28+
* @see <a href="https://rsocket.io/">RSocket protocol official site</a>
29+
* @see FrameType
30+
*/
31+
public enum RSocketInteractionModel {
32+
33+
/**
34+
* The model for {@link io.rsocket.RSocket#fireAndForget} operation.
35+
*/
36+
fireAndForget(FrameType.REQUEST_FNF),
37+
38+
/**
39+
* The model for {@link io.rsocket.RSocket#requestResponse} operation.
40+
*/
41+
requestResponse(FrameType.REQUEST_RESPONSE),
42+
43+
/**
44+
* The model for {@link io.rsocket.RSocket#requestStream} operation.
45+
*/
46+
requestStream(FrameType.REQUEST_STREAM),
47+
48+
/**
49+
* The model for {@link io.rsocket.RSocket#requestChannel} operation.
50+
*/
51+
requestChannel(FrameType.REQUEST_CHANNEL);
52+
53+
private final FrameType frameType;
54+
55+
RSocketInteractionModel(FrameType frameType) {
56+
this.frameType = frameType;
57+
}
58+
59+
public FrameType getFrameType() {
60+
return this.frameType;
61+
}
62+
63+
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,13 +29,15 @@
2929
/**
3030
* Parser for the &lt;inbound-gateway/&gt; element of the 'rsocket' namespace.
3131
*
32-
* @author Mark Fisher
33-
* @author Gary Russell
32+
* @author Artem Bilan
33+
*
34+
* @since 5.2
3435
*/
3536
public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser {
3637

3738
private static final List<String> NON_ELIGIBLE_ATTRIBUTES =
3839
Arrays.asList("path",
40+
"interaction-models",
3941
"rsocket-strategies",
4042
"rsocket-connector",
4143
"request-element-type");
@@ -59,6 +61,7 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) {
5961
"rSocketStrategies");
6062
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector",
6163
"RSocketConnector");
64+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models");
6265
}
6366

6467
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
4949
builder.addConstructorArgValue(routeExpression);
5050
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "client-rsocket-connector",
5151
"clientRSocketConnector");
52+
populateValueOrExpressionIfAny(builder, element, parserContext, "interaction-model");
5253
populateValueOrExpressionIfAny(builder, element, parserContext, "command");
5354
populateValueOrExpressionIfAny(builder, element, parserContext, "publisher-element-type");
5455
populateValueOrExpressionIfAny(builder, element, parserContext, "expected-response-type");

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java

+13
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.springframework.core.ResolvableType;
2020
import org.springframework.integration.dsl.MessagingGatewaySpec;
2121
import org.springframework.integration.rsocket.AbstractRSocketConnector;
22+
import org.springframework.integration.rsocket.RSocketInteractionModel;
2223
import org.springframework.integration.rsocket.inbound.RSocketInboundGateway;
2324
import org.springframework.messaging.rsocket.RSocketStrategies;
2425

@@ -35,6 +36,18 @@ public class RSocketInboundGatewaySpec extends MessagingGatewaySpec<RSocketInbou
3536
super(new RSocketInboundGateway(path));
3637
}
3738

39+
/**
40+
* Configure a set of {@link RSocketInteractionModel} the endpoint is going to be mapped onto.
41+
* @param interactionModels the {@link RSocketInteractionModel}s for mapping.
42+
* @return the spec.
43+
* @since 5.2.2
44+
* @see RSocketInboundGateway#setInteractionModels(RSocketInteractionModel...)
45+
*/
46+
public RSocketInboundGatewaySpec interactionModels(RSocketInteractionModel... interactionModels) {
47+
this.target.setInteractionModels(interactionModels);
48+
return this;
49+
}
50+
3851
/**
3952
* Configure {@link RSocketStrategies} instead of a default one.
4053
* @param rsocketStrategies the {@link RSocketStrategies} to use.

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketOutboundGatewaySpec.java

+64-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.integration.expression.FunctionExpression;
2525
import org.springframework.integration.expression.ValueExpression;
2626
import org.springframework.integration.rsocket.ClientRSocketConnector;
27+
import org.springframework.integration.rsocket.RSocketInteractionModel;
2728
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
2829
import org.springframework.messaging.Message;
2930
import org.springframework.util.MimeType;
@@ -62,9 +63,22 @@ public RSocketOutboundGatewaySpec clientRSocketConnector(ClientRSocketConnector
6263
* @param command the {@link RSocketOutboundGateway.Command} to use.
6364
* @return the spec
6465
* @see RSocketOutboundGateway#setCommand(RSocketOutboundGateway.Command)
66+
* @deprecated in favor of {@link #interactionModel(RSocketInteractionModel)}
6567
*/
68+
@Deprecated
6669
public RSocketOutboundGatewaySpec command(RSocketOutboundGateway.Command command) {
67-
return command(new ValueExpression<>(command));
70+
return interactionModel(new ValueExpression<>(command));
71+
}
72+
73+
/**
74+
* Configure a {@link RSocketInteractionModel} for RSocket request type.
75+
* @param interactionModel the {@link RSocketInteractionModel} to use.
76+
* @return the spec
77+
* @see RSocketOutboundGateway#setInteractionModel(RSocketInteractionModel)
78+
* @since 5.2.2
79+
*/
80+
public RSocketOutboundGatewaySpec interactionModel(RSocketInteractionModel interactionModel) {
81+
return interactionModel(new ValueExpression<>(interactionModel));
6882
}
6983

7084
/**
@@ -73,32 +87,75 @@ public RSocketOutboundGatewaySpec command(RSocketOutboundGateway.Command command
7387
* @param commandFunction the {@code Function} to use.
7488
* @param <P> the expected request message payload type.
7589
* @return the spec
76-
* @see RSocketOutboundGateway#setCommandExpression(Expression)
90+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
91+
* @deprecated in favor of {@link #interactionModel(Function)}
7792
*/
93+
@Deprecated
7894
public <P> RSocketOutboundGatewaySpec command(Function<Message<P>, ?> commandFunction) {
79-
return command(new FunctionExpression<>(commandFunction));
95+
return interactionModel(commandFunction);
96+
}
97+
98+
/**
99+
* Configure a {@code Function} to evaluate a {@link RSocketInteractionModel}
100+
* for RSocket request type at runtime against a request message.
101+
* @param interactionModelFunction the {@code Function} to use.
102+
* @param <P> the expected request message payload type.
103+
* @return the spec
104+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
105+
* @since 5.2.2
106+
*/
107+
public <P> RSocketOutboundGatewaySpec interactionModel(Function<Message<P>, ?> interactionModelFunction) {
108+
return interactionModel(new FunctionExpression<>(interactionModelFunction));
80109
}
81110

82111
/**
83112
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
84113
* for RSocket request type at runtime against a request message.
85114
* @param commandExpression the SpEL expression to use.
86115
* @return the spec
87-
* @see RSocketOutboundGateway#setCommandExpression(Expression)
116+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
117+
* @deprecated in favor of {@link #interactionModel(String)}
88118
*/
119+
@Deprecated
89120
public RSocketOutboundGatewaySpec command(String commandExpression) {
90-
return command(PARSER.parseExpression(commandExpression));
121+
return interactionModel(commandExpression);
122+
}
123+
124+
/**
125+
* Configure a SpEL expression to evaluate a {@link RSocketInteractionModel}
126+
* for RSocket request type at runtime against a request message.
127+
* @param interactionModelExpression the SpEL expression to use.
128+
* @return the spec
129+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
130+
* @since 5.2.2
131+
*/
132+
public RSocketOutboundGatewaySpec interactionModel(String interactionModelExpression) {
133+
return interactionModel(PARSER.parseExpression(interactionModelExpression));
91134
}
92135

93136
/**
94137
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
95138
* for RSocket request type at runtime against a request message.
96139
* @param commandExpression the SpEL expression to use.
97140
* @return the spec
98-
* @see RSocketOutboundGateway#setCommandExpression(Expression)
141+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
142+
* @deprecated in favor of {@link #interactionModel(Expression)}
99143
*/
144+
@Deprecated
100145
public RSocketOutboundGatewaySpec command(Expression commandExpression) {
101-
this.target.setCommandExpression(commandExpression);
146+
return interactionModel(commandExpression);
147+
}
148+
149+
/**
150+
* Configure a SpEL expression to evaluate a {@link RSocketInteractionModel}
151+
* for RSocket request type at runtime against a request message.
152+
* @param interactionModelExpression the SpEL expression to use.
153+
* @return the spec
154+
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
155+
* @since 5.2.2
156+
*/
157+
public RSocketOutboundGatewaySpec interactionModel(Expression interactionModelExpression) {
158+
this.target.setInteractionModelExpression(interactionModelExpression);
102159
return this;
103160
}
104161

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

+18
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.integration.rsocket.AbstractRSocketConnector;
3131
import org.springframework.integration.rsocket.ClientRSocketConnector;
3232
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
33+
import org.springframework.integration.rsocket.RSocketInteractionModel;
3334
import org.springframework.integration.support.MessageBuilder;
3435
import org.springframework.lang.Nullable;
3536
import org.springframework.messaging.Message;
@@ -74,6 +75,8 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
7475

7576
private final String[] path;
7677

78+
private RSocketInteractionModel[] interactionModels = RSocketInteractionModel.values();
79+
7780
private RSocketStrategies rsocketStrategies = RSocketStrategies.create();
7881

7982
@Nullable
@@ -112,6 +115,21 @@ public void setRSocketConnector(AbstractRSocketConnector rsocketConnector) {
112115
this.rsocketConnector = rsocketConnector;
113116
}
114117

118+
/**
119+
* Configure a set of {@link RSocketInteractionModel} this endpoint is mapped onto.
120+
* @param interactionModelsArg the {@link RSocketInteractionModel}s for mapping.
121+
* @since 5.2.2
122+
*/
123+
public void setInteractionModels(RSocketInteractionModel... interactionModelsArg) {
124+
Assert.notNull(interactionModelsArg, "'interactionModelsArg' must not be null");
125+
this.interactionModels = Arrays.copyOf(interactionModelsArg, interactionModelsArg.length);
126+
}
127+
128+
@Override
129+
public RSocketInteractionModel[] getInteractionModels() {
130+
return Arrays.copyOf(this.interactionModels, this.interactionModels.length);
131+
}
132+
115133
/**
116134
* Get an array of the path patterns this endpoint is mapped onto.
117135
* @return the mapping path

0 commit comments

Comments
 (0)