-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add RSocketInboundGateway
; refactoring
#2923
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add RSocketInboundGateway
; refactoring
#2923
Conversation
RSocketInboundGateway
; refactoringRSocketInboundGateway
; refactoring
This is not fully ready yet. At least tests are needed to cover an introduced functionality. The server inbound side and docs might be deferred into a separate PR... Right now I just want to share what I have so far to gather any possible feedback. Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments from first review pass.
...ration-rsocket/src/main/java/org/springframework/integration/rsocket/IntegrationRSocket.java
Show resolved
Hide resolved
int refCount = refCount(dataBuffer); | ||
return Mono | ||
.defer(() -> { | ||
this.applicationEventPublisher.publishEvent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check or no-op implementation needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment at caller.
@Override | ||
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { | ||
IntegrationRSocket rsocket = createRSocket(sendingRSocket); | ||
rsocket.setApplicationEventPublisher(this.applicationEventPublisher); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null check or no-op default implementation.
...ion-rsocket/src/main/java/org/springframework/integration/rsocket/RSocketConnectedEvent.java
Show resolved
Hide resolved
3001d03
to
ecee757
Compare
Pushed some further implements and implementation. Thanks |
RSocketInboundGateway
; refactoringRSocketInboundGateway
; refactoring
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one issue from this pass.
public String toString() { | ||
return "RSocketConnectedEvent{" + | ||
"destination='" + this.destination + '\'' + | ||
", data=" + Arrays.toString(this.data.asByteBuffer().array()) + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think rendering a byte[]
as a bunch of ints is useful, maybe omit this (or just include the length, perhaps).
OK. Now I think it is very close for merging. Pay attention, please, how |
@@ -67,7 +65,7 @@ public RSocketRequester getRequester() { | |||
public String toString() { | |||
return "RSocketConnectedEvent{" + | |||
"destination='" + this.destination + '\'' + | |||
", data=" + Arrays.toString(this.data.asByteBuffer().array()) + | |||
", data length=" + this.data.asByteBuffer().array().length + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still seems to be a lot of overhead in a toString()
.
Why do we need this DataBuffer
in the event?
* Extract an `AbstractRSocketConnector` for common client and server connectors logic * Introduce an `IntegrationRSocketAcceptor` and `IntegrationRSocket` for the mapping and handling logic of RSockets and messages in between * Introduce an `IntegrationRSocketEndpoint` marker interface for Inbound Gateway mappings * Add `RSocketInboundGateway` implementation, which is called from the `IntegrationRSocketAcceptor` by the `IntegrationRSocketEndpoint` mapping * Add `RSocketConnectedEvent` to emit when the client is connected to the server. It does not make sense in Spring Integration to delegate such a logic into the `RSocketInboundGateway`
container for connected `RSocketRequester`s from clients * Extract `accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket)` server logic into an internal `ServerRSocketAcceptor` extension for the `IntegrationRSocketAcceptor` * Address PR comments: - `RSocketConnectedEvent.toString()` - `ApplicationEventPublisherAware` into the `ServerRSocketConnector` - Log RSocket connection if no `this.applicationEventPublisher` * Remove `handleConnectionSetupPayload()` from the `IntegrationRSocket` since it is not delegated to the target handler * Provide reasonable default `RSocketStrategies` for the `AbstractRSocketConnector` and `RSocketInboundGateway` * Add initial `RSocketInboundGatewayIntegrationTests`
* Always `payload.retain()` when we convert `Payload` to `DataBuffer` * Fix `IntegrationRSocketAcceptor.detectEndpoints()` stream logic to really iterate over all the `IntegrationRSocketEndpoint` beans * Fix test to use an explicit `ClientConfig` class for the `@SpringJUnitConfig`: looks like JUnit 5 is OK to scan for `package protected` classes as well * Add request/reply tests into the `RSocketInboundGatewayIntegrationTests` for both server and client sides
149f43d
to
18e1d59
Compare
AbstractRSocketConnector
for common client and server connectors logicIntegrationRSocketAcceptor
andIntegrationRSocket
for themapping and handling logic of RSockets and messages in between
IntegrationRSocketEndpoint
marker interface for Inbound Gateway mappingsRSocketInboundGateway
implementation, which is called from theIntegrationRSocketAcceptor
by theIntegrationRSocketEndpoint
mappingRSocketConnectedEvent
to emit when the client is connected to the server.It does not make sense in Spring Integration to delegate such a logic into the
RSocketInboundGateway