Skip to content

Timeout added to WebFluxMessageHandlerSpec.java & WebFluxRequestExecutingMessageHandler.java #3188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.webflux.dsl;

import java.net.URI;
import java.time.Duration;
import java.util.function.Function;

import org.springframework.core.ParameterizedTypeReference;
Expand Down Expand Up @@ -144,6 +145,41 @@ public WebFluxMessageHandlerSpec publisherElementTypeExpression(Expression publi
return this;
}

/**
* Specify the timeout value for receiving the response form the server.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: from ? And that has to be an "a timeout"

* If the response is not received with in timeout value, will result in Timeout Exception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"within"

* @param timeoutInMillis the timeout in milliseconds
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(long timeoutInMillis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's bad name. There are many aspects in Spring Integration components and a lot of different timeout options.
Would be great to be more specific here. How about a requestTimeout ?

Also: I wouldn't expose this long option. It is not too hard to have that Duration.ofMillis() in the target application.
Plus with Spring Boot configuration properties we can provide an option in the duration style and we'll get exactly Duration instance in the POJO

return timeout(Duration.ofMillis(timeoutInMillis));
}

/**
* Specify the timeout value for receiving the response form the server.
* If the response is not received with in timeout value, will result in Timeout Exception
* @param timeout accepts {@link java.time.Duration}
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(Duration timeout) {
this.target.setTimeout(timeout);
return this;
}

/**
* @param timeout accepts {@link Function}. The function accepts a Message as input and returns Duration.
* Function is evaluated on each request basis and the {@link Duration} is applied to Mono
* If the response is not received with in timeout value, will result in Timeout Exception
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(Function<Message<?>, Duration> timeout) {
this.target.setTimeoutFunction(timeout);
return this;
}

@Override
protected boolean isClientSet() {
return this.webClient != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;

import org.reactivestreams.Publisher;

Expand Down Expand Up @@ -80,6 +82,8 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx

private Expression publisherElementTypeExpression;

Function<Message<?>, Duration> timeoutFunction;

/**
* Create a handler that will send requests to the provided URI.
* @param uri The URI.
Expand Down Expand Up @@ -198,6 +202,26 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp
this.publisherElementTypeExpression = publisherElementTypeExpression;
}

/**
* Specify the timeout value for receiving the response form the server.
* If the response is not received with in timeout value, will result in Timeout Exception
* @param timeout accepts {@link java.time.Duration}
* @since 5.3
*/
public void setTimeout(Duration timeout) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that this is simple one, but would be better to not have it since a Function can simple cover the static value as well.

So, please, remove

this.timeoutFunction = m -> timeout;
}

/**
* @param timeoutFunction accepts {@link Function}. The function accepts a Message as input and returns Duration.
* Function is evaluated on each request basis and the {@link Duration} is applied to Mono
* If the response is not received with in timeout value, will result in Timeout Exception
* @since 5.3
*/
public void setTimeoutFunction(Function<Message<?>, Duration> timeoutFunction) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we prefer an Expression variant instead of function. This way a timeout could be configured in the XML an easy way.
There is a FunctionExpression to cover this case. And exactly Java DSL should expose a function-based option. Of course, alongside with an expression. See publisherElementTypeFunction in the modified by you WebFluxMessageHandlerSpec

this.timeoutFunction = timeoutFunction;
}

@Override
public String getComponentType() {
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter");
Expand All @@ -213,6 +237,10 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpR

Mono<ClientResponse> responseMono = exchangeForResponseMono(requestSpec);

if (this.timeoutFunction != null) {
responseMono = responseMono.timeout(this.timeoutFunction.apply(requestMessage));
}

if (isExpectReply()) {
return createReplyFromResponse(expectedResponseType, responseMono);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.security.Principal;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeoutException;

import javax.annotation.Resource;

Expand Down Expand Up @@ -56,6 +57,7 @@
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.vote.AffirmativeBased;
Expand Down Expand Up @@ -101,7 +103,7 @@
* @author Shiliang Li
* @author Abhijit Sarkar
* @author Gary Russell
*
* @author Jayadev Sirimamilla
* @since 5.0
*/
@RunWith(SpringRunner.class)
Expand All @@ -119,13 +121,29 @@ public class WebFluxDslTests {
@Qualifier("webFluxWithReplyPayloadToFlux.handler")
private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux;

@Autowired
@Qualifier("webFluxWithTimeout.handler")
private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeout;

@Autowired
@Qualifier("webFluxWithTimeoutFunction.handler")
private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeoutFunction;

@Resource(name = "httpReactiveProxyFlow.webflux:outbound-gateway#0")
private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow;

@Autowired
@Qualifier("webFluxFlowWithReplyPayloadToFlux.input")
private MessageChannel webFluxFlowWithReplyPayloadToFluxInput;

@Autowired
@Qualifier("webFluxFlowWithTimeout.input")
private MessageChannel webFluxFlowWithTimeoutInput;

@Autowired
@Qualifier("webFluxFlowWithTimeoutFunction.input")
private MessageChannel webFluxFlowWithTimeoutFunctionInput;

private MockMvc mockMvc;

private WebTestClient webTestClient;
Expand All @@ -145,6 +163,82 @@ public void setup() {
.build();
}

@Test
public void testTimeoutForWebFluxOutboundGateway() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);


DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes()))
.delayElement(Duration.ofMillis(11000)))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.webFluxFlowWithTimeout)
.setPropertyValue("webClient", webClient);

QueueChannel replyChannel = new QueueChannel();

Message<String> testMessage =
MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't use a setReplyChannel for this kind of test. Since you use the same instance for both reply and error responses, I was confused first about exception assertion in the end. I was expected from "error channel", but I saw a replyChannel. So, your test-case mislead me to investigate how do we process exceptions like that timeout 😢

.setErrorChannel(replyChannel)
.build();

this.webFluxFlowWithTimeoutInput.send(testMessage);

Message<?> receive = replyChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class);
assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class);

}

@Test
public void testTimeoutFunctionForWebFluxOutboundGateway() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes()))
.delayElement(Duration.ofMillis(15000)))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.webFluxFlowWithTimeoutFunction)
.setPropertyValue("webClient", webClient);

QueueChannel replyChannel = new QueueChannel();

Message<String> testMessage =
MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.setHeader("timeout", 100L)
.build();

this.webFluxFlowWithTimeoutFunctionInput.send(testMessage);

Message<?> receive = replyChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class);
assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class);

}

@Test
public void testWebFluxFlowWithReplyPayloadToFlux() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
Expand Down Expand Up @@ -386,6 +480,26 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() {
e -> e.id("webFluxWithReplyPayloadToFlux"));
}

@Bean
public IntegrationFlow webFluxFlowWithTimeout() {
return f -> f
.handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration")
.httpMethod(HttpMethod.GET)
.timeout(100)
.expectedResponseType(String.class),
e -> e.id("webFluxWithTimeout"));
}

@Bean
public IntegrationFlow webFluxFlowWithTimeoutFunction() {
return f -> f
.handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration")
.httpMethod(HttpMethod.GET)
.timeout(m -> Duration.ofMillis(m.getHeaders().get("timeout", Long.class)))
.expectedResponseType(String.class),
e -> e.id("webFluxWithTimeoutFunction"));
}

@Bean
public IntegrationFlow httpReactiveProxyFlow() {
return IntegrationFlows
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ See <<./amqp.adoc#amqp-inbound-channel-adapter,AMQP Inbound Channel Adapter>>
The `encodeUri` property on the `AbstractHttpRequestExecutingMessageHandler` has been deprecated in favor of newly introduced `encodingMode`.
See `DefaultUriBuilderFactory.EncodingMode` JavaDocs and <<./http.adoc#http-uri-encoding,Controlling URI Encoding>> for more information.
This also affects `WebFluxRequestExecutingMessageHandler`, respective Java DSL and XML configuration.

[[x5.3-WebFlux]]
=== WebFlux Changes

The `timeoutFunction` property is added to `WebFluxRequestExecutingMessageHandler`.
Java DSL accessor which accepts long Timeout, Duration Timeout & Timeout Function is provided
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The good sentence ends with period.
I don't think that word "timeout" deserves to be capitalized.
I would even say if your don't talk about code, then even function word doesn't deserve to be capitalized.

Would be great to have here a brief description for the reason of this new option. See something already existed in this doc.
Then we definitely need a link to the target docs.
And what is very important we need a description in the final docs - webflux.adoc.