-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add initial support for RSockets #2902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
* Add `spring-integration-rsocket` module and respective dependencies * Implement `RSocketOutboundGateway` based on the Spring Messaging `RSocketRequester`. This component supports dynamic RSocket properties via expressions against request message. to handle `Publisher` for requests, it must be present in the request message `payload` instead of `FluxMessageChannel` upstream, since the last one just flattens events to be handled in the `MessageHandler` one by one. The result `Mono` is subscribed downstream in the `FluxMessageChannel` or directly by the `AbstractReplyProducingMessageHandler`. If result is a `Flux` it is just wrapped into the `Mono` to be processed downstream by end-user code. The point is that these request/replies are volatile and live in the particular context meanwhile a `FluxMessageChannel` is long living publisher in the application context boundaries. * The `RSocketOutboundGatewayIntegrationTests` is an adapted copy of `RSocketClientToServerIntegrationTests` from Spring Messaging * Add `doOnError()` into the `Flux` created in the `AbstractMessageProducingHandler` for `Publisher` replies
You can merge if it is OK: other features and Docs can be added in the upcoming PRs. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! Just a few issues.
build.gradle
Outdated
@@ -596,6 +598,18 @@ project('spring-integration-rmi') { | |||
} | |||
} | |||
|
|||
project('spring-integration-rsocket') { | |||
description = 'Spring Integration RSockets Support' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears this should be singular RSocket
.
protected void doInit() { | ||
super.doInit(); | ||
RSocketRequester.Builder builder = RSocketRequester.builder(); | ||
if (this.factoryConfigurer != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably cleaner to default to a no-op configurers and assert not null in setters.
...t/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java
Show resolved
Hide resolved
@DirtiesContext | ||
public class RSocketOutboundGatewayIntegrationTests { | ||
|
||
private static final int PORT = SocketUtils.findAvailableTcpPort(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there no support for port 0 so the OS allocates the port?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just can't swallow code complexity in Netty, so it isn't clear if it is there.
@rstoyanchev just uses a static 7000
port on the matter 😢
/CC @smaldini
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just that we went through a process of converting all our TCP module tests to use port 0 so the OS chooses the port. These "findAvailablePort" schemes, while they work most of the time, we occasionally saw failures (BindException) on CI builds. A hard-wired port # doesn't sound good either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I have figured out how to use 0
for port.
See latest commit.
Thanks
* Use no-op `Consumer` for the `strategiesConfigurer` and `factoryConfigurer` in the `RSocketOutboundGateway` and also `Assert.notNull()` in the appropriate setters to avoid null check during `RSocketRequester.builder()` initialization * Use `TcpServer.create().port(0)` in the `RSocketOutboundGatewayIntegrationTests` to allow to select free OS port and bind into it. The selected port is used later for client configuration in the `RSocketOutboundGateway` bean definition
@@ -74,7 +74,7 @@ | |||
@DirtiesContext | |||
public class RSocketOutboundGatewayIntegrationTests { | |||
|
|||
private static final int PORT = SocketUtils.findAvailableTcpPort(); | |||
private static int PORT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should no longer be static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
??? It is initialized from the static @BeforeAll
and it won't be available in the ClientConfig
below which has to be a static
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry; I meant it should no longer be upper case.
spring-integration-rsocket
module and respective dependenciesRSocketOutboundGateway
based on the Spring MessagingRSocketRequester
.This component supports dynamic RSocket properties via expressions
against request message.
to handle
Publisher
for requests, it must be present in the requestmessage
payload
instead ofFluxMessageChannel
upstream, since thelast one just flattens events to be handled in the
MessageHandler
oneby one.
The result
Mono
is subscribed downstream in theFluxMessageChannel
or directly by the
AbstractReplyProducingMessageHandler
.If result is a
Flux
it is just wrapped into theMono
to be processeddownstream by end-user code.
The point is that these request/replies are volatile and live in the
particular context meanwhile a
FluxMessageChannel
is long livingpublisher in the application context boundaries.
RSocketOutboundGatewayIntegrationTests
is an adapted copy ofRSocketClientToServerIntegrationTests
from Spring MessagingdoOnError()
into theFlux
created in theAbstractMessageProducingHandler
forPublisher
replies