Skip to content

Add a reply timeout hook to TCP inbound gateway #3107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
garyrussell opened this issue Nov 11, 2019 · 2 comments
Closed

Add a reply timeout hook to TCP inbound gateway #3107

garyrussell opened this issue Nov 11, 2019 · 2 comments

Comments

@garyrussell
Copy link
Contributor

garyrussell commented Nov 11, 2019

Allow the user to determine the action after a reply times out.

https://stackoverflow.com/questions/58796500/tcpinboundgateway-server-configuration-on-timeout-a-custom-message-to-be-sent

@artembilan
Copy link
Member

I wonder if this feature could help us here:

/**
 * If errorOnTimeout is true, construct an instance that will send an
 * {@link ErrorMessage} with a {@link MessageTimeoutException} payload to the error
 * channel if a reply is expected but none is received. If no error channel is
 * configured, the {@link MessageTimeoutException} will be thrown.
 * @param errorOnTimeout true to create the error message.
 * @since 4.2
 */
public MessagingGatewaySupport(boolean errorOnTimeout) {

Looks like only HTTP one implement this logic when it is in a gateway mode:

public BaseHttpInboundEndpoint(boolean expectReply) {
	super(expectReply);
	this.expectReply = expectReply;
}

So, it looks like with that MessageTimeoutException and an errorChannel we have a hook to handle missed replies during timeouts.

@garyrussell
Copy link
Contributor Author

Yes, it looks like that would work - the "hook" to return some other response can be done on the error flow.

@artembilan artembilan self-assigned this Nov 12, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 12, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 13, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 19, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 19, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 19, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 19, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 22, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Nov 22, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Dec 2, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Dec 2, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Dec 3, 2019
Fixes spring-projects#3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option
artembilan added a commit to artembilan/spring-integration that referenced this issue Dec 3, 2019
garyrussell pushed a commit that referenced this issue Dec 3, 2019
* Use EmitterProcessor in the FluxMessageChannel

The `EmitterProcessor` has a good logic to block upstream producer
when its downstream subscriber cannot keep up with overproducing.

* Rework `FluxMessageChannel` logic to rely on the `EmitterProcessor`
instead of `Flux.create()`
* Cancel `FluxMessageChannel` internal subscriptions in the `destroy()`
* Fix `ReactiveStreamsTests.testFluxTransform()` for the splitter's
delimiter
* Ensure in the `FluxMessageChannelTests.testFluxMessageChannel`
that we can have several concurrent subscribers to the
`FluxMessageChannel`

* * Use `flux.onComplete()` instead of iteration over subscribers
* Change `subscribers` list into just `AtomicInteger` count marker
* fix `DefaultSplitterTests` according a new logic in the `FluxMessageChannel`

* GH-3107: Add errorOnTimeout for TcpInboundGateway

Fixes #3107

The `MessagingGatewaySupport` has an `errorOnTimeout` option to throw
a `MessageTimeoutException` when downstream reply doesn't come back in
time for configured reply timeout

* Expose an `errorOnTimeout` option as a `TcpInboundGateway` ctor
property
* Add new factory methods into a `Tcp` factory for Java DSL
* Ensure a property works as expected in the `IpIntegrationTests`
* Document a new option

* * Use `delaySubscription()` for subscribing publishers in the `FluxMessageChannel`
to wait until this one subscribed.
* Use an `EmitterProcessor` to catch subscriptions and pass them as a
signal to delayed upstream publishers
* Fix  `FluxMessageChannelTests.testFluxMessageChannelCleanUp` to
verify an actual property instead of removed.
* Fix `RSocketOutboundGatewayIntegrationTests` for the proper subscription
into a `FluxMessageChannel` before actual interaction with an RSocket
gateway.
This should help us also to avoid some race conditions in the future

* Revert "GH-3107: Add errorOnTimeout for TcpInboundGateway"

This reverts commit fa6119d.

* * Refactor `FluxMessageChannel` to use `ReplayProcessor` for `subscribedSignal`.
This one is used `delaySubscription` for the upstream publishers
* Use a `AtomicBoolean` for subscription state since `doOnSubscribe()`
is called before `EmitterProcessor` adds subscribers for its `downstreams`
* Use `publishOn(Schedulers.boundedElastic())` for upstream publishers
to avoid blocking over there when our `EmitterProcessor` doesn't have
enough demand
* Refactor reactive tests to have a subscription into the `FluxMessageChannel`
earlier than emission happens for it

* * Use `Flux.subscribe(Consumer)` instead of `doOnNext(Consumer).subscribe()`

* * Emit `subscribedSignal` value after `.subscribe(subscriber)`
instead of `doOnSubscribe`
* Check for `this.processor.hasDownstreams()` before emitting such an event

* * Use `this.processor.hasDownstreams()` as a value to emit for `subscribedSignal`.
This way we are less vulnerable race conditions when subscribers are changed
actively
@artembilan artembilan modified the milestones: Backlog, 5.2.2 Feb 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants