diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java index c3e29f26d75..a06dd11c396 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,6 @@ public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser { private static final List NON_ELIGIBLE_ATTRIBUTES = Arrays.asList("path", - "interaction-models", "rsocket-strategies", "rsocket-connector", "request-element-type"); @@ -61,7 +60,6 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { "rSocketStrategies"); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector", "RSocketConnector"); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models"); } } diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java index 0c7cd14909d..46479e7476a 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/dsl/RSocketInboundGatewaySpec.java @@ -23,6 +23,8 @@ import org.springframework.integration.rsocket.inbound.RSocketInboundGateway; import org.springframework.messaging.rsocket.RSocketStrategies; +import reactor.core.publisher.Flux; + /** * The {@link MessagingGatewaySpec} implementation for the {@link RSocketInboundGateway}. * @@ -82,4 +84,16 @@ public RSocketInboundGatewaySpec requestElementType(ResolvableType requestElemen return this; } + /** + * Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately. + * @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately. + * @return the spec + * @since 5.3 + * @see RSocketInboundGateway#setDecodeFluxAsUnit(boolean) + */ + public RSocketInboundGatewaySpec decodeFluxAsUnit(boolean decodeFluxAsUnit) { + this.target.setDecodeFluxAsUnit(decodeFluxAsUnit); + return this; + } + } diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java index 0abf10024a8..899c6e927e0 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,6 +85,8 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In @Nullable private ResolvableType requestElementType; + private boolean decodeFluxAsUnit; + /** * Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests. * @param pathArg the mapping patterns to use. @@ -160,6 +162,20 @@ public void setRequestElementType(ResolvableType requestElementType) { this.requestElementType = requestElementType; } + /** + * Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately. + * Defaults to {@code false} for consistency with Spring Messaging {@code @MessageMapping}. + * The target {@link Flux} decoding logic depends on the {@link Decoder} selected. + * For example a {@link org.springframework.core.codec.StringDecoder} requires a new line separator to + * be present in the stream to indicate a byte buffer end. + * @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately. + * @since 5.3 + * @see Decoder#decode(Publisher, ResolvableType, MimeType, java.util.Map) + */ + public void setDecodeFluxAsUnit(boolean decodeFluxAsUnit) { + this.decodeFluxAsUnit = decodeFluxAsUnit; + } + @Override protected void onInit() { super.onInit(); @@ -219,14 +235,17 @@ private Mono> decodeRequestMessage(Message requestMessage) { @SuppressWarnings("unchecked") @Nullable private Object decodePayload(Message requestMessage) { - ResolvableType elementType = this.requestElementType; + ResolvableType elementType; MimeType mimeType = requestMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class); - if (elementType == null) { + if (this.requestElementType == null) { elementType = mimeType != null && "text".equals(mimeType.getType()) ? ResolvableType.forClass(String.class) : ResolvableType.forClass(byte[].class); } + else { + elementType = this.requestElementType; + } Object payload = requestMessage.getPayload(); @@ -235,9 +254,18 @@ private Object decodePayload(Message requestMessage) { if (payload instanceof DataBuffer) { return decoder.decode((DataBuffer) payload, elementType, mimeType, null); } - else { + else if (this.decodeFluxAsUnit) { return decoder.decode((Publisher) payload, elementType, mimeType, null); } + else { + return Flux.from((Publisher) payload) + .handle((buffer, synchronousSink) -> { + Object value = decoder.decode(buffer, elementType, mimeType, null); + if (value != null) { + synchronousSink.next(value); + } + }); + } } private Flux createReply(Object reply, Message requestMessage) { diff --git a/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket.xsd b/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket.xsd index cbf18080c63..59a17afff71 100644 --- a/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket.xsd +++ b/spring-integration-rsocket/src/main/resources/org/springframework/integration/rsocket/config/spring-integration-rsocket.xsd @@ -93,6 +93,16 @@ + + + + Decode incoming Flux as a single unit or each event separately. + + + + + + diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml index 3e80e4bb779..a19da953cc2 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests-context.xml @@ -25,6 +25,7 @@ auto-startup="false" request-channel="requestChannel" rsocket-strategies="rsocketStrategies" - request-element-type="byte[]"/> + request-element-type="byte[]" + decode-flux-as-unit="true"/> diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests.java index f76776a7fd7..64d6452e1dd 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/config/RSocketInboundGatewayParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,7 +44,7 @@ class RSocketInboundGatewayParserTests { private RSocketInboundGateway inboundGateway; @Test - void testOutboundGatewayParser() { + void testInboundGatewayParser() { assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketConnector")) .isSameAs(this.clientRSocketConnector); assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketStrategies")) @@ -54,6 +54,7 @@ void testOutboundGatewayParser() { .isEqualTo(byte[].class); assertThat(this.inboundGateway.getInteractionModels()) .containsExactly(RSocketInteractionModel.fireAndForget, RSocketInteractionModel.requestChannel); + assertThat(TestUtils.getPropertyValue(this.inboundGateway, "decodeFluxAsUnit", Boolean.class)).isTrue(); } } diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java index de6383623c0..f198d21f333 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java @@ -31,6 +31,8 @@ import org.springframework.integration.rsocket.ClientRSocketConnector; import org.springframework.integration.rsocket.RSocketInteractionModel; import org.springframework.integration.rsocket.ServerRSocketConnector; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -48,17 +50,30 @@ public class RSocketDslTests { @Autowired @Qualifier("rsocketUpperCaseRequestFlow.gateway") - private Function, Flux> rsocketUpperCaseFlowFunction; + private Function> rsocketUpperCaseFlowFunction; @Test void testRsocketUpperCaseFlows() { - Flux result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n")); + Flux result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c")); StepVerifier.create(result) .expectNext("A", "B", "C") .verifyComplete(); } + @Test + void testRsocketUpperCaseWholeFlows() { + Message> testMessage = + MessageBuilder.withPayload(Flux.just("a", "b", "c", "\n")) + .setHeader("route", "/uppercaseWhole") + .build(); + Flux result = this.rsocketUpperCaseFlowFunction.apply(testMessage); + + StepVerifier.create(result) + .expectNext("ABC") + .verifyComplete(); + } + @Configuration @EnableIntegration public static class TestConfiguration { @@ -80,7 +95,8 @@ public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serv public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) { return IntegrationFlows .from(Function.class) - .handle(RSockets.outboundGateway("/uppercase") + .handle(RSockets.outboundGateway(message -> + message.getHeaders().getOrDefault("route", "/uppercase")) .interactionModel((message) -> RSocketInteractionModel.requestChannel) .expectedResponseType("T(java.lang.String)") .clientRSocketConnector(clientRSocketConnector), @@ -100,6 +116,16 @@ public IntegrationFlow rsocketUpperCaseFlow() { .get(); } + @Bean + public IntegrationFlow rsocketUpperCaseWholeFlow() { + return IntegrationFlows + .from(RSockets.inboundGateway("/uppercaseWhole") + .interactionModels(RSocketInteractionModel.requestChannel) + .decodeFluxAsUnit(true)) + ., Flux>transform((flux) -> flux.map(String::toUpperCase)) + .get(); + } + } } diff --git a/src/reference/asciidoc/rsocket.adoc b/src/reference/asciidoc/rsocket.adoc index 8520d26e1ab..b78038a1eb3 100644 --- a/src/reference/asciidoc/rsocket.adoc +++ b/src/reference/asciidoc/rsocket.adoc @@ -137,6 +137,12 @@ The `payload` of the message to send downstream is always a `Flux` according to When in a `fireAndForget` RSocket interaction model, the message has a plain converted `payload`. The reply `payload` could be a plain object or a `Publisher` - the `RSocketInboundGateway` converts both of them properly into an RSocket response according to the encoders provided in the `RSocketStrategies`. +Starting with version 5.3, a `decodeFluxAsUnit` option (default `false`) is added to the `RSocketInboundGateway`. +By default incoming `Flux` is transformed the way that each its event is decoded separately. +This is an exact behavior present currently with `@MessageMapping` semantics. +To restore a previous behavior or decode the whole `Flux` as single unit according application requirements, the `decodeFluxAsUnit` has to be set to `true`. +However the target decoding logic depends on the `Decoder` selected, e.g. a `StringDecoder` requires a new line separator (by default) to be present in the stream to indicate a byte buffer end. + See <> for samples how to configure an `RSocketInboundGateway` endpoint and deal with payloads downstream. [[rsocket-outbound]] diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 66f5ddbc3f5..5375e9d61d1 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -95,3 +95,9 @@ See <<./ws.adoc#ws,Web Services Support>> for more information. The `FailoverClientConnectionFactory` no longer fails back, by default, until the current connection fails. See <<./ip.adoc#failover-cf,TCP Failover Client Connection Factory>> for more information. + +[[x5.3-rsocket]] +=== RSocket Changes + +A `decodeFluxAsUnit` option has been added to the `RSocketInboundGateway` with the meaning to decode incoming `Flux` as a single unit or apply decoding for each event in it. +See <<./rsocket.adoc#rsocket-inbound,RSocket Inbound Gateway>> for more information.