Skip to content

Add hooks for reactive reply customizations: timeout(), retry(), tag() etc. #3183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sirimamilla opened this issue Feb 16, 2020 · 8 comments · Fixed by #3197
Closed

Add hooks for reactive reply customizations: timeout(), retry(), tag() etc. #3183

sirimamilla opened this issue Feb 16, 2020 · 8 comments · Fixed by #3197

Comments

@sirimamilla
Copy link
Contributor

Affects Version(s): 5.2.3


Enhancement

Consider adding Timeout to Webflux.outboundGateway in DSL.

Mono & Flex Supports timeout. If the timeout is exposed in exposed in the Outbound Gateway, it will simplify the configuration of services in a fluent API style.

Happy to contribute if accepted for enhancement.

@artembilan
Copy link
Member

The request is not clear.
What timeout are you talking about?
The Webflux.outboundGateway() in DSL is backed by the WebFluxRequestExecutingMessageHandler and there is no any options to apply for the Mono & Flux.

Please, be more specific what is the use-case and what exactly you would like to have customized.

Also, please, consider do not raise a concern against Java DSL if it really doesn't belong to that high level API.
In this case we have to do something in the target WebFluxRequestExecutingMessageHandler or its super class. Then, when we expose that property here, we are good to go with Java DSL fixes.
Also don't forget that we have an XML DSL as well which is in support any way.

So, your requests just against DSL are confusing.

Thanks for understanding

@artembilan artembilan added the status: waiting-for-reporter Needs a feedback from the reporter label Feb 17, 2020
@sirimamilla
Copy link
Contributor Author

HI @artembilan,

The requirement is to set the timeout at per request level. So the application can control the timeout as per the usecase.

In our case we make calls to multiple backends, based on the request type, we need to set a different timeout.

As per my tests, adding a timeout to Mono in WebFluxRequestExecutingMessageHandler will actually cancel the underlying HttpConnection when the response is not received in time.

@Override
	@Nullable
	protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpRequest,
			Object expectedResponseType, Message<?> requestMessage, Map<String, ?> uriVariables) {

		WebClient.RequestBodySpec requestSpec =
				createRequestBodySpec(uri, httpMethod, httpRequest, requestMessage, uriVariables);

		Mono<ClientResponse> responseMono = exchangeForResponseMono(requestSpec);

		if (this.timeout != null) {
			responseMono = responseMono.timeout(this.timeout);
		}

		if (isExpectReply()) {
			return createReplyFromResponse(expectedResponseType, responseMono);
		}
		else {
			responseMono.subscribe(v -> { }, ex -> sendErrorMessage(requestMessage, ex));
			return null;
		}
	}

Hope this makes sense.

I have requested for an enhancement in DSL, But this can be added to XML and javaConfig as well.

This is a good to have feature.

@artembilan
Copy link
Member

OK. Got it!

I wonder if we should consider some more general solution for any possible post-response customization. More over since you say it is "per request", then it might look like this:

BiFunction<Message<?>, Mono<?>, Publisher<?>> responsePostProcessor;

Which is going to be called from the Mono.transform().

This way you can apply a requested timeout() or any other possible operators, e.g. retry() or even further response conversion.

How does that sound for you?

@sirimamilla
Copy link
Contributor Author

Hi @artembilan,

Having to use transform sounds interesting.

But timeout as a simple Function(Message<?>, Duration) or a simple timeout as Duration are easy to adapt.

Its my thought.

May be we can have a opinion from @garyrussell or @dturanski

sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Feb 20, 2020
sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Feb 20, 2020
…-3183

# Conflicts:
#	src/reference/asciidoc/whats-new.adoc
sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Feb 20, 2020
@sirimamilla
Copy link
Contributor Author

HI @garyrussell ,

Can you please review the issue and confirm if this enhancement makes sense.

This simple enhancement gives us a lot of flexibility while designing applications. Specifically where involving with varied legacy backend that are often timing out and have different SLA's

@garyrussell
Copy link
Contributor

@artembilan is more familiar with Reactor than I so I defer to him on this one.

@artembilan
Copy link
Member

OK. After more "sleeping" with this I decided to accept just only a timeout option, which is really more common requirement than everything else what could be done with the Mono.transform().

Therefore let's continue a discussion on the Pull Request!

We can come back to the BiFunction<Message<?>, Mono<?>, Publisher<?>> responsePostProcessor; somewhere later in the future when requirement appears. 😄

@artembilan
Copy link
Member

See my comment on the related PR: #3188 (comment)

artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 27, 2020
Fixes spring-projects#3183

* Introduce a `BaseReplyProducingMessageHandler` with common advices logic for target message handler
* Implement `BaseReplyProducingMessageHandler` from the `AbstractReplyProducingMessageHandler`
and newly introduced `AbstractReactiveReplyProducingMessageHandler` for message handlers with `Mono` replies
* Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>`
logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply
* Make `RSocketOutboundGateway` implementing an `AbstractReactiveReplyProducingMessageHandler`
* Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription -
it happens downstream anyway during reply producing with a proper error handling, too
* Fix `ConsumerEndpointSpec` & `ConsumerEndpointFactoryBean` to deal with an
`AbstractReactiveReplyProducingMessageHandler` as well
* Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 28, 2020
Fixes spring-projects#3183

* Introduce a `BaseReplyProducingMessageHandler` with common advices logic for target message handler
* Implement `BaseReplyProducingMessageHandler` from the `AbstractReplyProducingMessageHandler`
and newly introduced `AbstractReactiveReplyProducingMessageHandler` for message handlers with `Mono` replies
* Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>`
logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply
* Make `RSocketOutboundGateway` implementing an `AbstractReactiveReplyProducingMessageHandler`
* Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription -
it happens downstream anyway during reply producing with a proper error handling, too
* Fix `ConsumerEndpointSpec` & `ConsumerEndpointFactoryBean` to deal with an
`AbstractReactiveReplyProducingMessageHandler` as well
* Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
artembilan added a commit to artembilan/spring-integration that referenced this issue Mar 3, 2020
Fixes spring-projects#3183

* Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>`
logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply
* Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription -
it happens downstream anyway during reply producing with a proper error handling, too
* Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
* Add `ConsumerEndpointSpec.customizeMonoReply()` for convenience
* Document `ReactiveRequestHandlerAdvice` feature
@artembilan artembilan changed the title Consider adding Timeout to Webflux.outboundGateway in DSL Add hooks for reactive reply customizations: timeout(), retry(), tag() etc. Mar 4, 2020
artembilan added a commit to artembilan/spring-integration that referenced this issue Mar 5, 2020
Fixes spring-projects#3183

* Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>`
logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply
* Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription -
it happens downstream anyway during reply producing with a proper error handling, too
* Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
* Add `ConsumerEndpointSpec.customizeMonoReply()` for convenience
* Document `ReactiveRequestHandlerAdvice` feature
garyrussell added a commit that referenced this issue Mar 5, 2020
* GH-3183: Add ReactiveRequestHandlerAdvice

Fixes #3183

* Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>`
logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply
* Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription -
it happens downstream anyway during reply producing with a proper error handling, too
* Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
* Add `ConsumerEndpointSpec.customizeMonoReply()` for convenience
* Document `ReactiveRequestHandlerAdvice` feature

* * Fix language in docs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment