Skip to content

Commit e32b26a

Browse files
committed
GH-3207: RSocket inbound: decode each flux item
Fixes #3207 Previously an incoming RSocket Publisher has been decoded as a single unit leading to extra work on the client side, e.g. a delimiter has to be provided to treat each payload item as independent * To have a consistency with Spring Messaging and its `PayloadMethodArgumentResolver` change an `RSocketInboundGateway` to process inbound payloads as `Flux` and decode each item independently. * Change `RSocketDslTests` to remove delimiters and make it consistent with the regular `RSocketRequester` client
1 parent e71e250 commit e32b26a

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -219,14 +219,17 @@ private Mono<Message<?>> decodeRequestMessage(Message<?> requestMessage) {
219219
@SuppressWarnings("unchecked")
220220
@Nullable
221221
private Object decodePayload(Message<?> requestMessage) {
222-
ResolvableType elementType = this.requestElementType;
222+
ResolvableType elementType;
223223
MimeType mimeType = requestMessage.getHeaders().get(MessageHeaders.CONTENT_TYPE, MimeType.class);
224-
if (elementType == null) {
224+
if (this.requestElementType == null) {
225225
elementType =
226226
mimeType != null && "text".equals(mimeType.getType())
227227
? ResolvableType.forClass(String.class)
228228
: ResolvableType.forClass(byte[].class);
229229
}
230+
else {
231+
elementType = this.requestElementType;
232+
}
230233

231234
Object payload = requestMessage.getPayload();
232235

@@ -236,7 +239,14 @@ private Object decodePayload(Message<?> requestMessage) {
236239
return decoder.decode((DataBuffer) payload, elementType, mimeType, null);
237240
}
238241
else {
239-
return decoder.decode((Publisher<DataBuffer>) payload, elementType, mimeType, null);
242+
return Flux.from((Publisher<DataBuffer>) payload)
243+
.handle((buffer, synchronousSink) -> {
244+
Object value = decoder.decode(buffer, elementType, mimeType, null);
245+
if (value == null) {
246+
value = buffer;
247+
}
248+
synchronousSink.next(value);
249+
});
240250
}
241251
}
242252

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@ public class RSocketDslTests {
5151

5252
@Test
5353
void testRsocketUpperCaseFlows() {
54-
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a\n", "b\n", "c\n"));
54+
Flux<String> result = this.rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c"));
5555

5656
StepVerifier.create(result)
5757
.expectNext("A", "B", "C")

0 commit comments

Comments
 (0)