Skip to content

Introduce RSocketInteractionModel #3110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ public interface IntegrationRSocketEndpoint extends ReactiveMessageHandler {
*/
String[] getPath();

/**
* Obtain {@link RSocketInteractionModel}s
* this {@link ReactiveMessageHandler} is going to be mapped onto.
* Defaults to all the {@link RSocketInteractionModel}s.
* @return the interaction models for mapping.
* @since 5.2.2
*/
default RSocketInteractionModel[] getInteractionModels() {
return RSocketInteractionModel.values();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.rsocket;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down Expand Up @@ -79,13 +80,19 @@ public boolean detectEndpoints() {
}

public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
RSocketFrameTypeMessageCondition frameTypeMessageCondition = RSocketFrameTypeMessageCondition.EMPTY_CONDITION;

RSocketInteractionModel[] interactionModels = endpoint.getInteractionModels();
if (interactionModels.length > 0) {
frameTypeMessageCondition =
new RSocketFrameTypeMessageCondition(
Arrays.stream(interactionModels)
.map(RSocketInteractionModel::getFrameType)
.toArray(FrameType[]::new));
}
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
new CompositeMessageCondition(
new RSocketFrameTypeMessageCondition(
FrameType.REQUEST_FNF,
FrameType.REQUEST_RESPONSE,
FrameType.REQUEST_STREAM,
FrameType.REQUEST_CHANNEL),
frameTypeMessageCondition,
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.rsocket;

import io.rsocket.frame.FrameType;

/**
* The RSocket protocol interaction models.
*
* @author Artem Bilan
*
* @since 5.2.2
*
* @see <a href="https://rsocket.io/">RSocket protocol official site</a>
* @see FrameType
*/
public enum RSocketInteractionModel {

/**
* The model for {@link io.rsocket.RSocket#fireAndForget} operation.
*/
fireAndForget(FrameType.REQUEST_FNF),

/**
* The model for {@link io.rsocket.RSocket#requestResponse} operation.
*/
requestResponse(FrameType.REQUEST_RESPONSE),

/**
* The model for {@link io.rsocket.RSocket#requestStream} operation.
*/
requestStream(FrameType.REQUEST_STREAM),

/**
* The model for {@link io.rsocket.RSocket#requestChannel} operation.
*/
requestChannel(FrameType.REQUEST_CHANNEL);

private final FrameType frameType;

RSocketInteractionModel(FrameType frameType) {
this.frameType = frameType;
}

public FrameType getFrameType() {
return this.frameType;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,13 +29,15 @@
/**
* Parser for the &lt;inbound-gateway/&gt; element of the 'rsocket' namespace.
*
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*
* @since 5.2
*/
public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser {

private static final List<String> NON_ELIGIBLE_ATTRIBUTES =
Arrays.asList("path",
"interaction-models",
"rsocket-strategies",
"rsocket-connector",
"request-element-type");
Expand All @@ -59,6 +61,7 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) {
"rSocketStrategies");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector",
"RSocketConnector");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
builder.addConstructorArgValue(routeExpression);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "client-rsocket-connector",
"clientRSocketConnector");
populateValueOrExpressionIfAny(builder, element, parserContext, "interaction-model");
populateValueOrExpressionIfAny(builder, element, parserContext, "command");
populateValueOrExpressionIfAny(builder, element, parserContext, "publisher-element-type");
populateValueOrExpressionIfAny(builder, element, parserContext, "expected-response-type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.core.ResolvableType;
import org.springframework.integration.dsl.MessagingGatewaySpec;
import org.springframework.integration.rsocket.AbstractRSocketConnector;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.rsocket.inbound.RSocketInboundGateway;
import org.springframework.messaging.rsocket.RSocketStrategies;

Expand All @@ -36,7 +37,19 @@ public class RSocketInboundGatewaySpec extends MessagingGatewaySpec<RSocketInbou
}

/**
* Configure {@link RSocketStrategies} instead of a default one.
* Configure a set of {@link RSocketInteractionModel} the endpoint is going to be mapped onto.
* @param interactionModels the {@link RSocketInteractionModel}s for mapping.
* @return the spec.
* @since 5.2.2
* @see RSocketInboundGateway#setInteractionModels(RSocketInteractionModel...)
*/
public RSocketInboundGatewaySpec interactionModels(RSocketInteractionModel... interactionModels) {
this.target.setInteractionModels(interactionModels);
return this;
}

/**
* Configure an {@link RSocketStrategies} instead of a default one.
* @param rsocketStrategies the {@link RSocketStrategies} to use.
* @return the spec
* @see RSocketInboundGateway#setRSocketStrategies(RSocketStrategies)
Expand All @@ -58,7 +71,7 @@ public RSocketInboundGatewaySpec rsocketConnector(AbstractRSocketConnector rsock
}

/**
* Specify the type of payload to be generated when the inbound RSocket request
* Specify a type of payload to be generated when the inbound RSocket request
* content is read by the converters/encoders.
* @param requestElementType The payload type.
* @return the spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.messaging.Message;
import org.springframework.util.MimeType;
Expand Down Expand Up @@ -58,47 +59,103 @@ public RSocketOutboundGatewaySpec clientRSocketConnector(ClientRSocketConnector
}

/**
* Configure a {@link RSocketOutboundGateway.Command} for RSocket request type.
* Configure an {@link RSocketOutboundGateway.Command} for the RSocket request type.
* @param command the {@link RSocketOutboundGateway.Command} to use.
* @return the spec
* @see RSocketOutboundGateway#setCommand(RSocketOutboundGateway.Command)
* @deprecated in favor of {@link #interactionModel(RSocketInteractionModel)}
*/
@Deprecated
public RSocketOutboundGatewaySpec command(RSocketOutboundGateway.Command command) {
return command(new ValueExpression<>(command));
return interactionModel(new ValueExpression<>(command));
}

/**
* Configure a {@code Function} to evaluate a {@link RSocketOutboundGateway.Command}
* for RSocket request type at runtime against a request message.
* Configure an {@link RSocketInteractionModel} for the RSocket request type.
* @param interactionModel the {@link RSocketInteractionModel} to use.
* @return the spec
* @see RSocketOutboundGateway#setInteractionModel(RSocketInteractionModel)
* @since 5.2.2
*/
public RSocketOutboundGatewaySpec interactionModel(RSocketInteractionModel interactionModel) {
return interactionModel(new ValueExpression<>(interactionModel));
}

/**
* Configure a {@link Function} to evaluate an {@link RSocketOutboundGateway.Command}
* for the RSocket request type at runtime against a request message.
* @param commandFunction the {@code Function} to use.
* @param <P> the expected request message payload type.
* @return the spec
* @see RSocketOutboundGateway#setCommandExpression(Expression)
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @deprecated in favor of {@link #interactionModel(Function)}
*/
@Deprecated
public <P> RSocketOutboundGatewaySpec command(Function<Message<P>, ?> commandFunction) {
return command(new FunctionExpression<>(commandFunction));
return interactionModel(commandFunction);
}

/**
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
* for RSocket request type at runtime against a request message.
* Configure a {@link Function} to evaluate an {@link RSocketInteractionModel}
* for the RSocket request type at runtime against a request message.
* @param interactionModelFunction the {@code Function} to use.
* @param <P> the expected request message payload type.
* @return the spec
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @since 5.2.2
*/
public <P> RSocketOutboundGatewaySpec interactionModel(Function<Message<P>, ?> interactionModelFunction) {
return interactionModel(new FunctionExpression<>(interactionModelFunction));
}

/**
* Configure a SpEL expression to evaluate an {@link RSocketOutboundGateway.Command}
* for the RSocket request type at runtime against a request message.
* @param commandExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setCommandExpression(Expression)
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @deprecated in favor of {@link #interactionModel(String)}
*/
@Deprecated
public RSocketOutboundGatewaySpec command(String commandExpression) {
return command(PARSER.parseExpression(commandExpression));
return interactionModel(commandExpression);
}

/**
* Configure a SpEL expression to evaluate a {@link RSocketOutboundGateway.Command}
* for RSocket request type at runtime against a request message.
* Configure a SpEL expression to evaluate an {@link RSocketInteractionModel}
* for the RSocket request type at runtime against a request message.
* @param interactionModelExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @since 5.2.2
*/
public RSocketOutboundGatewaySpec interactionModel(String interactionModelExpression) {
return interactionModel(PARSER.parseExpression(interactionModelExpression));
}

/**
* Configure a SpEL expression to evaluate an {@link RSocketOutboundGateway.Command}
* for the RSocket request type at runtime against a request message.
* @param commandExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setCommandExpression(Expression)
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @deprecated in favor of {@link #interactionModel(Expression)}
*/
@Deprecated
public RSocketOutboundGatewaySpec command(Expression commandExpression) {
this.target.setCommandExpression(commandExpression);
return interactionModel(commandExpression);
}

/**
* Configure a SpEL expression to evaluate an {@link RSocketInteractionModel}
* for the RSocket request type at runtime against a request message.
* @param interactionModelExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setInteractionModelExpression(Expression)
* @since 5.2.2
*/
public RSocketOutboundGatewaySpec interactionModel(Expression interactionModelExpression) {
this.target.setInteractionModelExpression(interactionModelExpression);
return this;
}

Expand All @@ -113,7 +170,7 @@ public RSocketOutboundGatewaySpec publisherElementType(Class<?> publisherElement
}

/**
* Configure a {@code Function} to evaluate a request {@link org.reactivestreams.Publisher}
* Configure a {@link Function} to evaluate a request {@link org.reactivestreams.Publisher}
* elements type at runtime against a request message.
* @param publisherElementTypeFunction the {@code Function} to evaluate a type for the request
* {@link org.reactivestreams.Publisher} elements.
Expand Down Expand Up @@ -161,7 +218,7 @@ public RSocketOutboundGatewaySpec expectedResponseType(Class<?> expectedResponse
}

/**
* Specify the {@code Function} to determine the type for the RSocket response.
* Specify the {@link Function} to determine the type for the RSocket response.
* @param expectedResponseTypeFunction The expected response type {@code Function}.
* @param <P> the expected request message payload type.
* @return the spec
Expand All @@ -182,7 +239,7 @@ public RSocketOutboundGatewaySpec expectedResponseType(String expectedResponseTy
}

/**
* Specify the {@link Expression} to determine the type for the RSocket response.
* Specify an {@link Expression} to determine the type for the RSocket response.
* @param expectedResponseTypeExpression The expected response type expression.
* @return the spec
* @see RSocketOutboundGateway#setExpectedResponseTypeExpression(Expression)
Expand All @@ -205,8 +262,8 @@ public <P> RSocketOutboundGatewaySpec metadata(Function<Message<P>, Map<Object,
}

/**
Configure a SpEL expression to evaluate a metadata as a {@code Map<Object, MimeType>}
* for RSocket request against request message.
* Configure a SpEL expression to evaluate a metadata as a {@code Map<Object, MimeType>}
* for the RSocket request against request message.
* @param metadataExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setMetadataExpression(Expression)
Expand All @@ -217,8 +274,7 @@ public RSocketOutboundGatewaySpec metadata(String metadataExpression) {

/**
* Configure a SpEL expression to evaluate a metadata as a {@code Map<Object, MimeType>}
* for RSocket request against request message.
* for RSocket request type at runtime against a request message.
* for the RSocket request type at runtime against a request message.
* @param metadataExpression the SpEL expression to use.
* @return the spec
* @see RSocketOutboundGateway#setMetadataExpression(Expression)
Expand Down
Loading