Skip to content

Commit 25e9459

Browse files
committed
Fix RSocket module according changes in SF
Related to spring-projects/spring-framework#23999 * Since Spring Integration inbound endpoints are generic in their method signature we can't rely on a new `EMPTY_CONDITION` because it turns on configuration merge into just `FrameType.REQUEST_FNF` & `FrameType.REQUEST_RESPONSE`. So, use `FrameType.REQUEST_FNF`, `FrameType.REQUEST_RESPONSE`, `FrameType.REQUEST_STREAM` & `FrameType.REQUEST_CHANNEL` explicitly to cover all the valid request-response models for SI endpoints * Rework `RSocketOutboundGatewayIntegrationTests` according new logic. Plus refactor to earlier subscription into `FluxMessageChannel` to avoid potential race conditions
1 parent 0f5bd4a commit 25e9459

File tree

2 files changed

+89
-88
lines changed

2 files changed

+89
-88
lines changed

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocketMessageHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
3333
import org.springframework.util.ReflectionUtils;
3434

35+
import io.rsocket.frame.FrameType;
36+
3537
/**
3638
* The {@link RSocketMessageHandler} extension for Spring Integration needs.
3739
* <p>
@@ -79,7 +81,11 @@ public boolean detectEndpoints() {
7981
public void addEndpoint(IntegrationRSocketEndpoint endpoint) {
8082
registerHandlerMethod(endpoint, HANDLE_MESSAGE_METHOD,
8183
new CompositeMessageCondition(
82-
RSocketFrameTypeMessageCondition.REQUEST_CONDITION,
84+
new RSocketFrameTypeMessageCondition(
85+
FrameType.REQUEST_FNF,
86+
FrameType.REQUEST_RESPONSE,
87+
FrameType.REQUEST_STREAM,
88+
FrameType.REQUEST_CHANNEL),
8389
new DestinationPatternsMessageCondition(endpoint.getPath(), getRouteMatcher()))); // NOSONAR
8490
}
8591

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGatewayIntegrationTests.java

Lines changed: 82 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -180,20 +180,23 @@ void serverEcho() {
180180
private void echo(MessageChannel inputChannel, FluxMessageChannel resultChannel,
181181
RSocketRequester rsocketRequester) {
182182

183+
StepVerifier verifier =
184+
StepVerifier.create(
185+
Flux.from(resultChannel)
186+
.map(Message::getPayload)
187+
.cast(String.class))
188+
.expectNext("Hello")
189+
.thenCancel()
190+
.verifyLater();
191+
183192
inputChannel.send(
184193
MessageBuilder.withPayload("Hello")
185194
.setHeader(ROUTE_HEADER, "echo")
186195
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
187196
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
188197
.build());
189198

190-
StepVerifier.create(
191-
Flux.from(resultChannel)
192-
.map(Message::getPayload)
193-
.cast(String.class))
194-
.expectNext("Hello")
195-
.thenCancel()
196-
.verify();
199+
verifier.verify();
197200
}
198201

199202
@Test
@@ -209,20 +212,23 @@ void serverEchoAsync() {
209212
private void echoAsync(MessageChannel inputChannel, FluxMessageChannel resultChannel,
210213
RSocketRequester rsocketRequester) {
211214

215+
StepVerifier verifier =
216+
StepVerifier.create(
217+
Flux.from(resultChannel)
218+
.map(Message::getPayload)
219+
.cast(String.class))
220+
.expectNext("Hello async")
221+
.thenCancel()
222+
.verifyLater();
223+
212224
inputChannel.send(
213225
MessageBuilder.withPayload("Hello")
214226
.setHeader(ROUTE_HEADER, "echo-async")
215227
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
216228
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
217229
.build());
218230

219-
StepVerifier.create(
220-
Flux.from(resultChannel)
221-
.map(Message::getPayload)
222-
.cast(String.class))
223-
.expectNext("Hello async")
224-
.thenCancel()
225-
.verify();
231+
verifier.verify();
226232
}
227233

228234
@Test
@@ -238,29 +244,25 @@ void serverEchoStream() {
238244
private void echoStream(MessageChannel inputChannel, FluxMessageChannel resultChannel,
239245
RSocketRequester rsocketRequester) {
240246

247+
@SuppressWarnings("unchecked")
248+
StepVerifier verifier =
249+
StepVerifier.create(
250+
Flux.from(resultChannel)
251+
.next()
252+
.map(Message::getPayload)
253+
.flatMapMany((payload) -> (Flux<String>) payload))
254+
.expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7")
255+
.thenCancel()
256+
.verifyLater();
257+
241258
inputChannel.send(
242259
MessageBuilder.withPayload("Hello")
243260
.setHeader(ROUTE_HEADER, "echo-stream")
244261
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
245262
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
246263
.build());
247264

248-
Message<?> resultMessage =
249-
Flux.from(resultChannel)
250-
.blockFirst();
251-
252-
assertThat(resultMessage)
253-
.isNotNull()
254-
.extracting(Message::getPayload)
255-
.isInstanceOf(Flux.class);
256-
257-
@SuppressWarnings("unchecked")
258-
Flux<String> resultStream = (Flux<String>) resultMessage.getPayload();
259-
StepVerifier.create(resultStream)
260-
.expectNext("Hello 0").expectNextCount(6).expectNext("Hello 7")
261-
.thenCancel()
262-
.verify();
263-
265+
verifier.verify();
264266
}
265267

266268
@Test
@@ -276,28 +278,25 @@ void serverEchoChannel() {
276278
private void echoChannel(MessageChannel inputChannel, FluxMessageChannel resultChannel,
277279
RSocketRequester rsocketRequester) {
278280

281+
@SuppressWarnings("unchecked")
282+
StepVerifier verifier =
283+
StepVerifier.create(
284+
Flux.from(resultChannel)
285+
.next()
286+
.map(Message::getPayload)
287+
.flatMapMany((payload) -> (Flux<String>) payload))
288+
.expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
289+
.thenCancel()
290+
.verifyLater();
291+
279292
inputChannel.send(
280293
MessageBuilder.withPayload(Flux.range(1, 10).map(i -> "Hello " + i))
281294
.setHeader(ROUTE_HEADER, "echo-channel")
282295
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
283296
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
284297
.build());
285298

286-
Message<?> resultMessage =
287-
Flux.from(resultChannel)
288-
.blockFirst();
289-
290-
assertThat(resultMessage)
291-
.isNotNull()
292-
.extracting(Message::getPayload)
293-
.isInstanceOf(Flux.class);
294-
295-
@SuppressWarnings("unchecked")
296-
Flux<String> resultStream = (Flux<String>) resultMessage.getPayload();
297-
StepVerifier.create(resultStream)
298-
.expectNext("Hello 1 async").expectNextCount(8).expectNext("Hello 10 async")
299-
.thenCancel()
300-
.verify();
299+
verifier.verify();
301300
}
302301

303302

@@ -314,26 +313,21 @@ void serverVoidReturnValue() {
314313
private void voidReturnValue(MessageChannel inputChannel, FluxMessageChannel resultChannel,
315314
RSocketRequester rsocketRequester) {
316315

316+
StepVerifier verifier =
317+
StepVerifier.create(resultChannel)
318+
.expectSubscription()
319+
.expectNoEvent(Duration.ofMillis(100))
320+
.thenCancel()
321+
.verifyLater();
322+
317323
inputChannel.send(
318324
MessageBuilder.withPayload("Hello")
319325
.setHeader(ROUTE_HEADER, "void-return-value")
320-
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
326+
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
321327
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
322328
.build());
323329

324-
Message<?> resultMessage =
325-
Flux.from(resultChannel)
326-
.blockFirst();
327-
328-
assertThat(resultMessage)
329-
.isNotNull()
330-
.extracting(Message::getPayload)
331-
.isInstanceOf(Flux.class);
332-
333-
Flux<?> resultStream = (Flux<?>) resultMessage.getPayload();
334-
StepVerifier.create(resultStream)
335-
.expectComplete()
336-
.verify();
330+
verifier.verify();
337331
}
338332

339333
@Test
@@ -349,26 +343,21 @@ void serverVoidReturnValueFromExceptionHandler() {
349343
private void voidReturnValueFromExceptionHandler(MessageChannel inputChannel, FluxMessageChannel resultChannel,
350344
RSocketRequester rsocketRequester) {
351345

346+
StepVerifier verifier =
347+
StepVerifier.create(resultChannel)
348+
.expectSubscription()
349+
.expectNoEvent(Duration.ofMillis(100))
350+
.thenCancel()
351+
.verifyLater();
352+
352353
inputChannel.send(
353354
MessageBuilder.withPayload("bad")
354355
.setHeader(ROUTE_HEADER, "void-return-value")
355-
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestStreamOrChannel)
356+
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
356357
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
357358
.build());
358359

359-
Message<?> resultMessage =
360-
Flux.from(resultChannel)
361-
.blockFirst();
362-
363-
assertThat(resultMessage)
364-
.isNotNull()
365-
.extracting(Message::getPayload)
366-
.isInstanceOf(Flux.class);
367-
368-
Flux<?> resultStream = (Flux<?>) resultMessage.getPayload();
369-
StepVerifier.create(resultStream)
370-
.expectComplete()
371-
.verify();
360+
verifier.verify();
372361
}
373362

374363
@Test
@@ -384,20 +373,23 @@ void serverHandleWithThrownException() {
384373
private void handleWithThrownException(MessageChannel inputChannel, FluxMessageChannel resultChannel,
385374
RSocketRequester rsocketRequester) {
386375

376+
StepVerifier verifier =
377+
StepVerifier.create(
378+
Flux.from(resultChannel)
379+
.map(Message::getPayload)
380+
.cast(String.class))
381+
.expectNext("Invalid input error handled")
382+
.thenCancel()
383+
.verifyLater();
384+
387385
inputChannel.send(
388386
MessageBuilder.withPayload("a")
389387
.setHeader(ROUTE_HEADER, "thrown-exception")
390388
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
391389
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
392390
.build());
393391

394-
StepVerifier.create(
395-
Flux.from(resultChannel)
396-
.map(Message::getPayload)
397-
.cast(String.class))
398-
.expectNext("Invalid input error handled")
399-
.thenCancel()
400-
.verify();
392+
verifier.verify();
401393
}
402394

403395
@Test
@@ -413,20 +405,23 @@ void serverHandleWithErrorSignal() {
413405
private void handleWithErrorSignal(MessageChannel inputChannel, FluxMessageChannel resultChannel,
414406
RSocketRequester rsocketRequester) {
415407

408+
StepVerifier verifier =
409+
StepVerifier.create(
410+
Flux.from(resultChannel)
411+
.map(Message::getPayload)
412+
.cast(String.class))
413+
.expectNext("Invalid input error handled")
414+
.thenCancel()
415+
.verifyLater();
416+
416417
inputChannel.send(
417418
MessageBuilder.withPayload("a")
418419
.setHeader(ROUTE_HEADER, "error-signal")
419420
.setHeader(COMMAND_HEADER, RSocketOutboundGateway.Command.requestResponse)
420421
.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, rsocketRequester)
421422
.build());
422423

423-
StepVerifier.create(
424-
Flux.from(resultChannel)
425-
.map(Message::getPayload)
426-
.cast(String.class))
427-
.expectNext("Invalid input error handled")
428-
.thenCancel()
429-
.verify();
424+
verifier.verify();
430425
}
431426

432427
@Test

0 commit comments

Comments
 (0)