diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java index 1da72909b5c..021320427d1 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketEndpoint.java @@ -39,4 +39,15 @@ public interface IntegrationRSocketEndpoint extends ReactiveMessageHandler { */ String[] getPath(); + /** + * Obtain {@link RSocketInteractionModel}s + * this {@link ReactiveMessageHandler} is going to be mapped onto. + * Defaults to all the {@link RSocketInteractionModel}s. + * @return the interaction models for mapping. + * @since 5.2.2 + */ + default RSocketInteractionModel[] getInteractionModels() { + return RSocketInteractionModel.values(); + } + } diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java index 235ebaf7471..eb3984dd8bc 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java @@ -17,6 +17,7 @@ package org.springframework.integration.rsocket; import java.lang.reflect.Method; +import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -79,13 +80,19 @@ public boolean detectEndpoints() { } public void addEndpoint(IntegrationRSocketEndpoint endpoint) { + RSocketFrameTypeMessageCondition frameTypeMessageCondition = RSocketFrameTypeMessageCondition.EMPTY_CONDITION; + + RSocketInteractionModel[] interactionModels = endpoint.getInteractionModels(); + if (interactionModels.length > 0) { + frameTypeMessageCondition = + new RSocketFrameTypeMessageCondition( + Arrays.stream(interactionModels) + .map(RSocketInteractionModel::getFrameType) + .toArray(FrameType[]::new)); + } registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD, new CompositeMessageCondition( - new RSocketFrameTypeMessageCondition( - FrameType.REQUEST_FNF, - FrameType.REQUEST_RESPONSE, - FrameType.REQUEST_STREAM, - FrameType.REQUEST_CHANNEL), + frameTypeMessageCondition, new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR } diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketInteractionModel.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketInteractionModel.java new file mode 100644 index 00000000000..9e4cdb009ba --- /dev/null +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketInteractionModel.java @@ -0,0 +1,63 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.rsocket; + +import io.rsocket.frame.FrameType; + +/** + * The RSocket protocol interaction models. + * + * @author Artem Bilan + * + * @since 5.2.2 + * + * @see RSocket protocol official site + * @see FrameType + */ +public enum RSocketInteractionModel { + + /** + * The model for {@link io.rsocket.RSocket#fireAndForget} operation. + */ + fireAndForget(FrameType.REQUEST_FNF), + + /** + * The model for {@link io.rsocket.RSocket#requestResponse} operation. + */ + requestResponse(FrameType.REQUEST_RESPONSE), + + /** + * The model for {@link io.rsocket.RSocket#requestStream} operation. + */ + requestStream(FrameType.REQUEST_STREAM), + + /** + * The model for {@link io.rsocket.RSocket#requestChannel} operation. + */ + requestChannel(FrameType.REQUEST_CHANNEL); + + private final FrameType frameType; + + RSocketInteractionModel(FrameType frameType) { + this.frameType = frameType; + } + + public FrameType getFrameType() { + return this.frameType; + } + +} diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java index 92d0f832b01..c3e29f26d75 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,13 +29,15 @@ /** * Parser for the <inbound-gateway/> element of the 'rsocket' namespace. * - * @author Mark Fisher - * @author Gary Russell + * @author Artem Bilan + * + * @since 5.2 */ public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser { private static final List NON_ELIGIBLE_ATTRIBUTES = Arrays.asList("path", + "interaction-models", "rsocket-strategies", "rsocket-connector", "request-element-type"); @@ -59,6 +61,7 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { "rSocketStrategies"); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector", "RSocketConnector"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models"); } } diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java index 96159c04c3e..cf6cb7124d6 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketOutboundGatewayParser.java @@ -49,6 +49,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars builder.addConstructorArgValue(routeExpression); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "client-rsocket-connector", "clientRSocketConnector"); + populateValueOrExpressionIfAny(builder, element, parserContext, "interaction-model"); populateValueOrExpressionIfAny(builder, element, parserContext, "command"); populateValueOrExpressionIfAny(builder, element, parserContext, "publisher-element-type"); populateValueOrExpressionIfAny(builder, element, parserContext, "expected-response-type"); diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java index ce5eaa5bf60..b0a5d9566ef 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java @@ -19,6 +19,7 @@ import org.springframework.core.ResolvableType; import org.springframework.integration.dsl.MessagingGatewaySpec; import org.springframework.integration.rsocket.AbstractRSocketConnector; +import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.integration.rsocket.inbound.RSocketInboundGateway; import org.springframework.messaging.rsocket.RSocketStrategies; @@ -36,7 +37,19 @@ public class RSocketInboundGatewaySpec extends MessagingGatewaySpec(command)); + return interactionModel(new ValueExpression<>(command)); } /** - * Configure a {@code Function} to evaluate a {@link RSocketOutboundGateway.Command} - * for RSocket request type at runtime against a request message. + * Configure an {@link RSocketInteractionModel} for the RSocket request type. + * @param interactionModel the {@link RSocketInteractionModel} to use. + * @return the spec + * @see RSocketOutboundGateway#setInteractionModel(RSocketInteractionModel) + * @since 5.2.2 + */ + public RSocketOutboundGatewaySpec interactionModel(RSocketInteractionModel interactionModel) { + return interactionModel(new ValueExpression<>(interactionModel)); + } + + /** + * Configure a {@link Function} to evaluate an {@link RSocketOutboundGateway.Command} + * for the RSocket request type at runtime against a request message. * @param commandFunction the {@code Function} to use. * @param

the expected request message payload type. * @return the spec - * @see RSocketOutboundGateway#setCommandExpression(Expression) + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @deprecated in favor of {@link #interactionModel(Function)} */ + @Deprecated public

RSocketOutboundGatewaySpec command(Function, ?> commandFunction) { - return command(new FunctionExpression<>(commandFunction)); + return interactionModel(commandFunction); } /** - * Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command} - * for RSocket request type at runtime against a request message. + * Configure a {@link Function} to evaluate an {@link RSocketInteractionModel} + * for the RSocket request type at runtime against a request message. + * @param interactionModelFunction the {@code Function} to use. + * @param

the expected request message payload type. + * @return the spec + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @since 5.2.2 + */ + public

RSocketOutboundGatewaySpec interactionModel(Function, ?> interactionModelFunction) { + return interactionModel(new FunctionExpression<>(interactionModelFunction)); + } + + /** + * Configure a SpEL expression to evaluate an {@link RSocketOutboundGateway.Command} + * for the RSocket request type at runtime against a request message. * @param commandExpression the SpEL expression to use. * @return the spec - * @see RSocketOutboundGateway#setCommandExpression(Expression) + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @deprecated in favor of {@link #interactionModel(String)} */ + @Deprecated public RSocketOutboundGatewaySpec command(String commandExpression) { - return command(PARSER.parseExpression(commandExpression)); + return interactionModel(commandExpression); } /** - * Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command} - * for RSocket request type at runtime against a request message. + * Configure a SpEL expression to evaluate an {@link RSocketInteractionModel} + * for the RSocket request type at runtime against a request message. + * @param interactionModelExpression the SpEL expression to use. + * @return the spec + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @since 5.2.2 + */ + public RSocketOutboundGatewaySpec interactionModel(String interactionModelExpression) { + return interactionModel(PARSER.parseExpression(interactionModelExpression)); + } + + /** + * Configure a SpEL expression to evaluate an {@link RSocketOutboundGateway.Command} + * for the RSocket request type at runtime against a request message. * @param commandExpression the SpEL expression to use. * @return the spec - * @see RSocketOutboundGateway#setCommandExpression(Expression) + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @deprecated in favor of {@link #interactionModel(Expression)} */ + @Deprecated public RSocketOutboundGatewaySpec command(Expression commandExpression) { - this.target.setCommandExpression(commandExpression); + return interactionModel(commandExpression); + } + + /** + * Configure a SpEL expression to evaluate an {@link RSocketInteractionModel} + * for the RSocket request type at runtime against a request message. + * @param interactionModelExpression the SpEL expression to use. + * @return the spec + * @see RSocketOutboundGateway#setInteractionModelExpression(Expression) + * @since 5.2.2 + */ + public RSocketOutboundGatewaySpec interactionModel(Expression interactionModelExpression) { + this.target.setInteractionModelExpression(interactionModelExpression); return this; } @@ -113,7 +170,7 @@ public RSocketOutboundGatewaySpec publisherElementType(Class publisherElement } /** - * Configure a {@code Function} to evaluate a request {@link org.reactivestreams.Publisher} + * Configure a {@link Function} to evaluate a request {@link org.reactivestreams.Publisher} * elements type at runtime against a request message. * @param publisherElementTypeFunction the {@code Function} to evaluate a type for the request * {@link org.reactivestreams.Publisher} elements. @@ -161,7 +218,7 @@ public RSocketOutboundGatewaySpec expectedResponseType(Class expectedResponse } /** - * Specify the {@code Function} to determine the type for the RSocket response. + * Specify the {@link Function} to determine the type for the RSocket response. * @param expectedResponseTypeFunction The expected response type {@code Function}. * @param

the expected request message payload type. * @return the spec @@ -182,7 +239,7 @@ public RSocketOutboundGatewaySpec expectedResponseType(String expectedResponseTy } /** - * Specify the {@link Expression} to determine the type for the RSocket response. + * Specify an {@link Expression} to determine the type for the RSocket response. * @param expectedResponseTypeExpression The expected response type expression. * @return the spec * @see RSocketOutboundGateway#setExpectedResponseTypeExpression(Expression) @@ -205,8 +262,8 @@ public

RSocketOutboundGatewaySpec metadata(Function, Map} - * for RSocket request against request message. + * Configure a SpEL expression to evaluate a metadata as a {@code Map} + * for the RSocket request against request message. * @param metadataExpression the SpEL expression to use. * @return the spec * @see RSocketOutboundGateway#setMetadataExpression(Expression) @@ -217,8 +274,7 @@ public RSocketOutboundGatewaySpec metadata(String metadataExpression) { /** * Configure a SpEL expression to evaluate a metadata as a {@code Map} - * for RSocket request against request message. - * for RSocket request type at runtime against a request message. + * for the RSocket request type at runtime against a request message. * @param metadataExpression the SpEL expression to use. * @return the spec * @see RSocketOutboundGateway#setMetadataExpression(Expression) diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java index c22d1406455..0abf10024a8 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java @@ -30,6 +30,7 @@ import org.springframework.integration.rsocket.AbstractRSocketConnector; import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.IntegrationRSocketEndpoint; +import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.integration.support.MessageBuilder; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -74,6 +75,8 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In private final String[] path; + private RSocketInteractionModel[] interactionModels = RSocketInteractionModel.values(); + private RSocketStrategies rsocketStrategies = RSocketStrategies.create(); @Nullable @@ -92,7 +95,7 @@ public RSocketInboundGateway(String... pathArg) { } /** - * Configure {@link RSocketStrategies} instead of a default one. + * Configure an {@link RSocketStrategies} instead of a default one. * Note: if {@link AbstractRSocketConnector} is provided, then its * {@link RSocketStrategies} have a precedence. * @param rsocketStrategies the {@link RSocketStrategies} to use. @@ -112,6 +115,21 @@ public void setRSocketConnector(AbstractRSocketConnector rsocketConnector) { this.rsocketConnector = rsocketConnector; } + /** + * Configure a set of {@link RSocketInteractionModel} this endpoint is mapped onto. + * @param interactionModelsArg the {@link RSocketInteractionModel}s for mapping. + * @since 5.2.2 + */ + public void setInteractionModels(RSocketInteractionModel... interactionModelsArg) { + Assert.notNull(interactionModelsArg, "'interactionModelsArg' must not be null"); + this.interactionModels = Arrays.copyOf(interactionModelsArg, interactionModelsArg.length); + } + + @Override + public RSocketInteractionModel[] getInteractionModels() { + return Arrays.copyOf(this.interactionModels, this.interactionModels.length); + } + /** * Get an array of the path patterns this endpoint is mapped onto. * @return the mapping path @@ -121,7 +139,7 @@ public String[] getPath() { } /** - * Specify the type of payload to be generated when the inbound RSocket request + * Specify a type of payload to be generated when the inbound RSocket request * content is read by the encoders. * By default this value is null which means at runtime any "text" Content-Type will * result in String while all others default to {@code byte[].class}. diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java index 061d5867d88..0cb0e4e294e 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java @@ -28,6 +28,7 @@ import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.rsocket.ClientRSocketConnector; +import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.rsocket.RSocketRequester; @@ -46,9 +47,9 @@ * {@link RSocketRequesterMethodArgumentResolver#RSOCKET_REQUESTER_HEADER} request message header * on the server side. *

- * An RSocket operation is determined by the configured {@link Command} or respective SpEL + * An RSocket operation is determined by the configured {@link RSocketInteractionModel} or respective SpEL * expression to be evaluated at runtime against the request message. - * By default the {@link Command#requestResponse} operation is used. + * By default the {@link RSocketInteractionModel#requestResponse} operation is used. *

* For a {@link Publisher}-based requests, it must be present in the request message {@code payload}. * The flattening via upstream {@link org.springframework.integration.channel.FluxMessageChannel} will work, too, @@ -65,7 +66,7 @@ * * @since 5.2 * - * @see Command + * @see RSocketInteractionModel * @see RSocketRequester */ public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler { @@ -77,7 +78,7 @@ public class RSocketOutboundGateway extends AbstractReplyProducingMessageHandler @Nullable private ClientRSocketConnector clientRSocketConnector; - private Expression commandExpression = new ValueExpression<>(Command.requestResponse); + private Expression interactionModelExpression = new ValueExpression<>(RSocketInteractionModel.requestResponse); private Expression publisherElementTypeExpression; @@ -130,23 +131,47 @@ public void setClientRSocketConnector(ClientRSocketConnector clientRSocketConnec } /** - * Configure a {@link Command} for RSocket request type. + * Configure a {@link Command} for the RSocket request type. * @param command the {@link Command} to use. + * @deprecated in favor of {@link #setInteractionModel(RSocketInteractionModel)} */ + @Deprecated public void setCommand(Command command) { - setCommandExpression(new ValueExpression<>(command)); + setInteractionModelExpression(new ValueExpression<>(command.interactionModel)); } /** - * Configure a SpEL expression to evaluate a {@link Command} for RSocket request type at runtime + * Configure an {@link RSocketInteractionModel} for the RSocket request type. + * @param interactionModel the {@link RSocketInteractionModel} to use. + * @since 5.2.2 + */ + public void setInteractionModel(RSocketInteractionModel interactionModel) { + setInteractionModelExpression(new ValueExpression<>(interactionModel)); + } + + /** + * Configure a SpEL expression to evaluate a {@link Command} for the RSocket request type at runtime * against a request message. * @param commandExpression the SpEL expression to use. + * @deprecated in favor of {@link #setInteractionModelExpression(Expression)} */ + @Deprecated public void setCommandExpression(Expression commandExpression) { - Assert.notNull(commandExpression, "'commandExpression' must not be null"); - this.commandExpression = commandExpression; + setInteractionModelExpression(commandExpression); + } + + /** + * Configure a SpEL expression to evaluate an {@link RSocketInteractionModel} + * for the RSocket request type at runtime against a request message. + * @param interactionModelExpression the SpEL expression to use. + * @since 5.2.2 + */ + public void setInteractionModelExpression(Expression interactionModelExpression) { + Assert.notNull(interactionModelExpression, "'interactionModelExpression' must not be null"); + this.interactionModelExpression = interactionModelExpression; } + /** * Configure a type for a request {@link Publisher} elements. * @param publisherElementType the type of the request {@link Publisher} elements. @@ -169,7 +194,7 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp } /** - * Specify the expected response type for the RSocket response. + * Specify an response type for the RSocket response. * @param expectedResponseType The expected type. * @see #setExpectedResponseTypeExpression(Expression) * @see RSocketRequester.RequestSpec#retrieveMono @@ -180,7 +205,7 @@ public void setExpectedResponseType(Class expectedResponseType) { } /** - * Specify the {@link Expression} to determine the type for the RSocket response. + * Specify an {@link Expression} to determine the type for the RSocket response. * @param expectedResponseTypeExpression The expected response type expression. * @see RSocketRequester.RequestSpec#retrieveMono * @see RSocketRequester.RequestSpec#retrieveFlux @@ -190,8 +215,8 @@ public void setExpectedResponseTypeExpression(Expression expectedResponseTypeExp } /** - * Specify a SpEL expression to evaluate a metadata for RSocket request - * as {@code Map} against request message. + * Specify a SpEL expression to evaluate a metadata for the RSocket request + * as {@code Map} against a request message. * @param metadataExpression the expression for metadata. */ public void setMetadataExpression(Expression metadataExpression) { @@ -226,7 +251,7 @@ protected Object handleRequestMessage(Message requestMessage) { return requesterMono .map((rSocketRequester) -> createRequestSpec(rSocketRequester, requestMessage)) .map((requestSpec) -> prepareRetrieveSpec(requestSpec, requestMessage)) - .flatMap((responseSpec) -> performRequest(responseSpec, requestMessage)); + .flatMap((retrieveSpec) -> performRetrieve(retrieveSpec, requestMessage)); } @SuppressWarnings("unchecked") @@ -273,36 +298,55 @@ private RSocketRequester.RetrieveSpec prepareRequestSpecForPublisher(RSocketRequ } } - private Mono performRequest(RSocketRequester.RetrieveSpec requestSpec, Message requestMessage) { - Command command = this.commandExpression.getValue(this.evaluationContext, requestMessage, Command.class); - Assert.notNull(command, - () -> "The 'command' [" + this.commandExpression + "] must not evaluate to null"); + private Mono performRetrieve(RSocketRequester.RetrieveSpec retrieveSpec, Message requestMessage) { + RSocketInteractionModel interactionModel = evaluateInteractionModel(requestMessage); + Assert.notNull(interactionModel, + () -> "The 'interactionModelExpression' [" + this.interactionModelExpression + "] must not evaluate to null"); Object expectedResponseType = null; - if (!Command.fireAndForget.equals(command)) { + if (!RSocketInteractionModel.fireAndForget.equals(interactionModel)) { expectedResponseType = evaluateExpressionForType(requestMessage, this.expectedResponseTypeExpression, "expectedResponseType"); } - switch (command) { + switch (interactionModel) { case fireAndForget: - return requestSpec.send(); + return retrieveSpec.send(); case requestResponse: if (expectedResponseType instanceof Class) { - return requestSpec.retrieveMono((Class) expectedResponseType); + return retrieveSpec.retrieveMono((Class) expectedResponseType); } else { - return requestSpec.retrieveMono((ParameterizedTypeReference) expectedResponseType); + return retrieveSpec.retrieveMono((ParameterizedTypeReference) expectedResponseType); } - case requestStreamOrChannel: + case requestStream: + case requestChannel: if (expectedResponseType instanceof Class) { - return Mono.just(requestSpec.retrieveFlux((Class) expectedResponseType)); + return Mono.just(retrieveSpec.retrieveFlux((Class) expectedResponseType)); } else { - return Mono.just(requestSpec.retrieveFlux((ParameterizedTypeReference) expectedResponseType)); + return Mono.just(retrieveSpec.retrieveFlux((ParameterizedTypeReference) expectedResponseType)); } default: - throw new UnsupportedOperationException("Unsupported command: " + command); + throw new UnsupportedOperationException("Unsupported interaction model: " + interactionModel); + } + } + + private RSocketInteractionModel evaluateInteractionModel(Message requestMessage) { + Object value = this.interactionModelExpression.getValue(this.evaluationContext, requestMessage); + if (value instanceof RSocketInteractionModel) { + return (RSocketInteractionModel) value; + } + else if (value instanceof Command) { + return ((Command) value).interactionModel; + } + else if (value instanceof String) { + return RSocketInteractionModel.valueOf((String) value); + } + else { + throw new IllegalStateException("The 'interactionModelExpression' [" + + this.interactionModelExpression + + "] must evaluate to 'RSocketInteractionModel' or 'String' type, but not into: '" + value + "'"); } } @@ -330,20 +374,22 @@ private Object evaluateExpressionForType(Message requestMessage, Expression e /** * Enumeration of commands supported by the gateways. + * @deprecated in favor of {@link RSocketInteractionModel} */ + @Deprecated public enum Command { /** * Perform {@link io.rsocket.RSocket#fireAndForget fireAndForget}. * @see RSocketRequester.RequestSpec#send() */ - fireAndForget, + fireAndForget(RSocketInteractionModel.fireAndForget), /** * Perform {@link io.rsocket.RSocket#requestResponse requestResponse}. * @see RSocketRequester.RequestSpec#retrieveMono */ - requestResponse, + requestResponse(RSocketInteractionModel.requestResponse), /** * Perform {@link io.rsocket.RSocket#requestStream requestStream} or @@ -351,7 +397,13 @@ public enum Command { * the request input consists of a single or multiple payloads. * @see RSocketRequester.RequestSpec#retrieveFlux */ - requestStreamOrChannel + requestStreamOrChannel(RSocketInteractionModel.requestStream); + + private final RSocketInteractionModel interactionModel; + + Command(RSocketInteractionModel interactionModel) { + this.interactionModel = interactionModel; + } } diff --git a/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket-5.2.xsd b/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket-5.2.xsd index efdbefbc6fb..5f765b0e97b 100644 --- a/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket-5.2.xsd +++ b/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket-5.2.xsd @@ -36,6 +36,17 @@ + + + + Comma-separated interaction models. + Determines which types of RSocket frames are allowed with this Endpoint. + + + + + + @@ -130,10 +141,11 @@ - + - A 'Command' for RSocket request type. + [DEPRECATED in favor of 'interaction-model'] + A 'Command' for the RSocket request type. Mutually exclusive with 'command-expression'. @@ -141,15 +153,36 @@ + + + + An 'RSocketInteractionModel' for the RSocket request type. + Mutually exclusive with 'interaction-model-expression'. + + + + + + - A SpEL expression to evaluate a 'command' for RSocket request type at runtime + [DEPRECATED in favor of 'interaction-model-expression'] + A SpEL expression to evaluate a 'command' for the RSocket request type at runtime against request message. Mutually exclusive with 'command'. + + + + A SpEL expression to evaluate an 'RSocketInteractionModel' + for the RSocket request type at runtime against request message. + Mutually exclusive with 'interaction-model'. + + + @@ -250,5 +283,13 @@ + + + + + + + + diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml index cf75431a9e1..3e80e4bb779 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml @@ -20,6 +20,7 @@ RSocketOutboundGateway.Command.requestStreamOrChannel) + .interactionModel((message) -> RSocketInteractionModel.requestChannel) .expectedResponseType("T(java.lang.String)") .clientRSocketConnector(clientRSocketConnector)) .get(); @@ -89,7 +89,8 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client @Bean public IntegrationFlow rsocketUpperCaseFlow() { return IntegrationFlows - .from(RSockets.inboundGateway("/uppercase")) + .from(RSockets.inboundGateway("/uppercase") + .interactionModels(RSocketInteractionModel.requestChannel)) ., Flux>transform((flux) -> flux.map(String::toUpperCase)) .get(); } diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java index ec2d5737f2d..f8c07b9bf7a 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java @@ -38,6 +38,7 @@ import org.springframework.integration.dsl.MessageChannels; import org.springframework.integration.expression.FunctionExpression; import org.springframework.integration.rsocket.ClientRSocketConnector; +import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.integration.rsocket.ServerRSocketMessageHandler; import org.springframework.integration.support.MessageBuilder; import org.springframework.lang.Nullable; @@ -80,7 +81,7 @@ public class RSocketOutboundGatewayIntegrationTests { private static final String ROUTE_HEADER = "rsocket_route"; - private static final String COMMAND_HEADER = "rsocket_command"; + private static final String INTERACTION_MODEL_HEADER = "interaction_model"; private static AnnotationConfigApplicationContext serverContext; @@ -154,7 +155,7 @@ private void fireAndForget(MessageChannel inputChannel, FluxMessageChannel resul inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "receive") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.fireAndForget) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.fireAndForget) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -192,7 +193,7 @@ private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel, inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -224,7 +225,7 @@ private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultCha inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo-async") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -258,7 +259,7 @@ private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultCh inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "echo-stream") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestStream) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -292,7 +293,7 @@ private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultC inputChannel.send( MessageBuilder.withPayload(Flux.range(1, 10).map(i -> "Hello " + i)) .setHeader(ROUTE_HEADER, "echo-channel") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestChannel) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -323,7 +324,7 @@ private void voidReturnValue(MessageChannel inputChannel, FluxMessageChannel res inputChannel.send( MessageBuilder.withPayload("Hello") .setHeader(ROUTE_HEADER, "void-return-value") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -353,7 +354,7 @@ private void voidReturnValueFromExceptionHandler(MessageChannel inputChannel, Fl inputChannel.send( MessageBuilder.withPayload("bad") .setHeader(ROUTE_HEADER, "void-return-value") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -385,7 +386,7 @@ private void handleWithThrownException(MessageChannel inputChannel, FluxMessageC inputChannel.send( MessageBuilder.withPayload("a") .setHeader(ROUTE_HEADER, "thrown-exception") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -417,7 +418,7 @@ private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChann inputChannel.send( MessageBuilder.withPayload("a") .setHeader(ROUTE_HEADER, "error-signal") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -441,7 +442,7 @@ private void noMatchingRoute(MessageChannel inputChannel, FluxMessageChannel res inputChannel.send( MessageBuilder.withPayload("anything") .setHeader(ROUTE_HEADER, "invalid") - .setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse) + .setHeader(INTERACTION_MODEL_HEADER, RSocketInteractionModel.requestResponse) .setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester) .build()); @@ -471,8 +472,8 @@ public RSocketOutboundGateway rsocketOutboundGateway() { new RSocketOutboundGateway( new FunctionExpression>((m) -> m.getHeaders().get(ROUTE_HEADER))); - rsocketOutboundGateway.setCommandExpression( - new FunctionExpression>((m) -> m.getHeaders().get(COMMAND_HEADER))); + rsocketOutboundGateway.setInteractionModelExpression( + new FunctionExpression>((m) -> m.getHeaders().get(INTERACTION_MODEL_HEADER))); return rsocketOutboundGateway; } diff --git a/src/reference/asciidoc/rsocket.adoc b/src/reference/asciidoc/rsocket.adoc index 49214fb7e7c..8520d26e1ab 100644 --- a/src/reference/asciidoc/rsocket.adoc +++ b/src/reference/asciidoc/rsocket.adoc @@ -122,6 +122,8 @@ See the next section for more information. The `RSocketInboundGateway` is responsible for receiving RSocket requests and producing responses (if any). It requires an array of `path` mapping which could be as patterns similar to MVC request mapping or `@MessageMapping` semantics. +In addition (since version 5.2.2), a set of interaction models (see `RSocketInteractionModel`) can be configured on the `RSocketInboundGateway` to restrict RSocket requests to this endpoint by the particular frame type. +By default all the interaction models are supported. Such a bean, according its `IntegrationRSocketEndpoint` implementation (extension of a `ReactiveMessageHandler`), is auto detected either by the `ServerRSocketConnector` or `ClientRSocketConnector` for a routing logic in the internal `IntegrationRSocketMessageHandler` for incoming requests. An `AbstractRSocketConnector` can be provided to the `RSocketInboundGateway` for explicit endpoint registration. This way, the auto-detection option is disabled on that `AbstractRSocketConnector`. @@ -132,7 +134,7 @@ In this case, an `RSocketInboundGateway` performs a plain `send` operation into Otherwise a `MonoProcessor` value from the `RSocketPayloadReturnValueHandler.RESPONSE_HEADER` header is used for sending a reply to the RSocket. For this purpose, an `RSocketInboundGateway` performs a `sendAndReceiveMessageReactive` operation on the `outputChannel`. The `payload` of the message to send downstream is always a `Flux` according to `MessagingRSocket` logic. -When in a `fireAndForget` RSocket interaction model, the messsage has a plain converted `payload`. +When in a `fireAndForget` RSocket interaction model, the message has a plain converted `payload`. The reply `payload` could be a plain object or a `Publisher` - the `RSocketInboundGateway` converts both of them properly into an RSocket response according to the encoders provided in the `RSocketStrategies`. See <> for samples how to configure an `RSocketInboundGateway` endpoint and deal with payloads downstream. @@ -147,7 +149,7 @@ See `ServerRSocketConnector` JavaDocs for more information. The `route` to send request has to be configured explicitly (together with path variables) or via a SpEL expression which is evaluated against request message. -The RSocket communication command can be provided via `RSocketOutboundGateway.Command` option or respective expression setting. +The RSocket interaction model can be provided via `RSocketInteractionModel` option or respective expression setting. By default a `requestResponse` is used for common gateway use-cases. When request message payload is a `Publisher`, a `publisherElementType` option can be provided to encode its elements according an `RSocketStrategies` supplied in the target `RSocketRequester`. @@ -158,14 +160,14 @@ An RSocket request can also be enhanced with a `metadata`. For this purpose a `metadataExpression` against request message can be configured on the `RSocketOutboundGateway`. Such an expression must evaluate to a `Map`. -When `command` is not `fireAndForget`, an `expectedResponseType` must be supplied. +When `interactionModel` is not `fireAndForget`, an `expectedResponseType` must be supplied. It is a `String.class` by default. An expression for this option can evaluate to a `ParameterizedTypeReference`. -See the `RSocketRequester.RequestSpec.retrieveMono()` and `RSocketRequester.RequestSpec.retrieveFlux()` JavaDocs for more information about reply data and its type. +See the `RSocketRequester.RetrieveSpec.retrieveMono()` and `RSocketRequester.RetrieveSpec.retrieveFlux()` JavaDocs for more information about reply data and its type. -A reply `payload` from the `RSocketOutboundGateway` is always `Mono` (even for a `fireAndForget` command it is `Mono`) always making this component as `async`. +A reply `payload` from the `RSocketOutboundGateway` is a `Mono` (even for a `fireAndForget` interaction model it is `Mono`) always making this component as `async`. Such a `Mono` is subscribed before producing into the `outputChannel` for regular channels or processed on demand by the `FluxMessageChannel`. -A `Flux` response for the `requestStreamOrChannel` command is also wrapped into a reply `Mono`. +A `Flux` response for the `requestStream` or `requestChannel` interaction model is also wrapped into a reply `Mono`. It can be flattened downstream by the `FluxMessageChannel` with a passthrough service activator: ==== @@ -218,6 +220,7 @@ The following example shows how to configure it: ---- , Mono>transform((flux) -> flux.next().map(String::toUpperCase)) .get(); } ---- ==== -A `ClientRSocketConnector` or `ServerRSocketConnector` is assumed in this configuration with meaning for auto-detection of such an endpoint on the "`/uppercase`" path. +A `ClientRSocketConnector` or `ServerRSocketConnector` is assumed in this configuration with meaning for auto-detection of such an endpoint on the "`/uppercase`" path and expected interaction model as "`request channel`". The following example shows how to configure a RSocket outbound gateway with Java: @@ -300,8 +304,8 @@ public RSocketOutboundGateway rsocketOutboundGateway() { new RSocketOutboundGateway( new FunctionExpression>((m) -> m.getHeaders().get("route_header"))); - rsocketOutboundGateway.setCommandExpression( - new FunctionExpression>((m) -> m.getHeaders().get("rsocket_command"))); + rsocketOutboundGateway.setInteractionModelExpression( + new FunctionExpression>((m) -> m.getHeaders().get("rsocket_interaction_model"))); rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector()); return rsocketOutboundGateway; } @@ -322,7 +326,7 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client return IntegrationFlows .from(Function.class) .handle(RSockets.outboundGateway("/uppercase") - .command(RSocketOutboundGateway.Command.requestResponse) + .interactionModel(RSocketInteractionModel.requestResponse) .expectedResponseType(String.class) .clientRSocketConnector(clientRSocketConnector)) .get();