Skip to content

GH-3207: RSocket inbound: decode each flux item #3208

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,6 @@ public class RSocketInboundGatewayParser extends AbstractInboundGatewayParser {

private static final List<String> NON_ELIGIBLE_ATTRIBUTES =
Arrays.asList("path",
"interaction-models",
"rsocket-strategies",
"rsocket-connector",
"request-element-type");
Expand All @@ -61,7 +60,6 @@ protected void doPostProcess(BeanDefinitionBuilder builder, Element element) {
"rSocketStrategies");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "rsocket-connector",
"RSocketConnector");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "interaction-models");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.springframework.integration.rsocket.inbound.RSocketInboundGateway;
import org.springframework.messaging.rsocket.RSocketStrategies;

import reactor.core.publisher.Flux;

/**
* The {@link MessagingGatewaySpec} implementation for the {@link RSocketInboundGateway}.
*
Expand Down Expand Up @@ -82,4 +84,16 @@ public RSocketInboundGatewaySpec requestElementType(ResolvableType requestElemen
return this;
}

/**
* Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately.
* @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately.
* @return the spec
* @since 5.3
* @see RSocketInboundGateway#setDecodeFluxAsUnit(boolean)
*/
public RSocketInboundGatewaySpec decodeFluxAsUnit(boolean decodeFluxAsUnit) {
this.target.setDecodeFluxAsUnit(decodeFluxAsUnit);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,6 +85,8 @@ public class RSocketInboundGateway extends MessagingGatewaySupport implements In
@Nullable
private ResolvableType requestElementType;

private boolean decodeFluxAsUnit;

/**
* Instantiate based on the provided path patterns to map this endpoint for incoming RSocket requests.
* @param pathArg the mapping patterns to use.
Expand Down Expand Up @@ -160,6 +162,20 @@ public void setRequestElementType(ResolvableType requestElementType) {
this.requestElementType = requestElementType;
}

/**
* Configure an option to decode an incoming {@link Flux} as a single unit or each its event separately.
* Defaults to {@code false} for consistency with Spring Messaging {@code @MessageMapping}.
* The target {@link Flux} decoding logic depends on the {@link Decoder} selected.
* For example a {@link org.springframework.core.codec.StringDecoder} requires a new line separator to
* be present in the stream to indicate a byte buffer end.
* @param decodeFluxAsUnit decode incoming {@link Flux} as a single unit or each event separately.
* @since 5.3
* @see Decoder#decode(Publisher, ResolvableType, MimeType, java.util.Map)
*/
public void setDecodeFluxAsUnit(boolean decodeFluxAsUnit) {
this.decodeFluxAsUnit = decodeFluxAsUnit;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -219,14 +235,17 @@ private Mono<Message<?>> decodeRequestMessage(Message<?> requestMessage) {
@SuppressWarnings("unchecked")
@Nullable
private Object decodePayload(Message<?> requestMessage) {
ResolvableType elementType = this.requestElementType;
ResolvableType elementType;
MimeType mimeType = requestMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class);
if (elementType == null) {
if (this.requestElementType == null) {
elementType =
mimeType != null && "text".equals(mimeType.getType())
? ResolvableType.forClass(String.class)
: ResolvableType.forClass(byte[].class);
}
else {
elementType = this.requestElementType;
}

Object payload = requestMessage.getPayload();

Expand All @@ -235,9 +254,18 @@ private Object decodePayload(Message<?> requestMessage) {
if (payload instanceof DataBuffer) {
return decoder.decode((DataBuffer) payload, elementType, mimeType, null);
}
else {
else if (this.decodeFluxAsUnit) {
return decoder.decode((Publisher<DataBuffer>) payload, elementType, mimeType, null);
}
else {
return Flux.from((Publisher<DataBuffer>) payload)
.handle((buffer, synchronousSink) -> {
Object value = decoder.decode(buffer, elementType, mimeType, null);
if (value != null) {
synchronousSink.next(value);
}
});
}
}

private Flux<DataBuffer> createReply(Object reply, Message<?> requestMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="decode-flux-as-unit" default="false">
<xsd:annotation>
<xsd:documentation>
Decode incoming Flux as a single unit or each event separately.
</xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:union memberTypes="xsd:boolean xsd:string"/>
</xsd:simpleType>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
auto-startup="false"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
request-element-type="byte[]"
decode-flux-as-unit="true"/>

</beans>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ class RSocketInboundGatewayParserTests {
private RSocketInboundGateway inboundGateway;

@Test
void testOutboundGatewayParser() {
void testInboundGatewayParser() {
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketConnector"))
.isSameAs(this.clientRSocketConnector);
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "rsocketStrategies"))
Expand All @@ -54,6 +54,7 @@ void testOutboundGatewayParser() {
.isEqualTo(byte[].class);
assertThat(this.inboundGateway.getInteractionModels())
.containsExactly(RSocketInteractionModel.fireAndForget, RSocketInteractionModel.requestChannel);
assertThat(TestUtils.getPropertyValue(this.inboundGateway, "decodeFluxAsUnit", Boolean.class)).isTrue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.RSocketInteractionModel;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

Expand All @@ -48,17 +50,30 @@ public class RSocketDslTests {

@Autowired
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
private Function<Object, Flux<String>> rsocketUpperCaseFlowFunction;

@Test
void testRsocketUpperCaseFlows() {
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c"));

StepVerifier.create(result)
.expectNext("A", "B", "C")
.verifyComplete();
}

@Test
void testRsocketUpperCaseWholeFlows() {
Message<Flux<String>> testMessage =
MessageBuilder.withPayload(Flux.just("a", "b", "c", "\n"))
.setHeader("route", "/uppercaseWhole")
.build();
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(testMessage);

StepVerifier.create(result)
.expectNext("ABC")
.verifyComplete();
}

@Configuration
@EnableIntegration
public static class TestConfiguration {
Expand All @@ -80,7 +95,8 @@ public ClientRSocketConnector clientRSocketConnector(ServerRSocketConnector serv
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.handle(RSockets.outboundGateway(message ->
message.getHeaders().getOrDefault("route", "/uppercase"))
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector),
Expand All @@ -100,6 +116,16 @@ public IntegrationFlow rsocketUpperCaseFlow() {
.get();
}

@Bean
public IntegrationFlow rsocketUpperCaseWholeFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercaseWhole")
.interactionModels(RSocketInteractionModel.requestChannel)
.decodeFluxAsUnit(true))
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}

}

}
6 changes: 6 additions & 0 deletions src/reference/asciidoc/rsocket.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ The `payload` of the message to send downstream is always a `Flux` according to
When in a `fireAndForget` RSocket interaction model, the message has a plain converted `payload`.
The reply `payload` could be a plain object or a `Publisher` - the `RSocketInboundGateway` converts both of them properly into an RSocket response according to the encoders provided in the `RSocketStrategies`.

Starting with version 5.3, a `decodeFluxAsUnit` option (default `false`) is added to the `RSocketInboundGateway`.
By default incoming `Flux` is transformed the way that each its event is decoded separately.
This is an exact behavior present currently with `@MessageMapping` semantics.
To restore a previous behavior or decode the whole `Flux` as single unit according application requirements, the `decodeFluxAsUnit` has to be set to `true`.
However the target decoding logic depends on the `Decoder` selected, e.g. a `StringDecoder` requires a new line separator (by default) to be present in the stream to indicate a byte buffer end.

See <<rsocket-java-config>> for samples how to configure an `RSocketInboundGateway` endpoint and deal with payloads downstream.

[[rsocket-outbound]]
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ See <<./ws.adoc#ws,Web Services Support>> for more information.

The `FailoverClientConnectionFactory` no longer fails back, by default, until the current connection fails.
See <<./ip.adoc#failover-cf,TCP Failover Client Connection Factory>> for more information.

[[x5.3-rsocket]]
=== RSocket Changes

A `decodeFluxAsUnit` option has been added to the `RSocketInboundGateway` with the meaning to decode incoming `Flux` as a single unit or apply decoding for each event in it.
See <<./rsocket.adoc#rsocket-inbound,RSocket Inbound Gateway>> for more information.