diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java index 431851d942d..9edb92c064c 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/dsl/WebFluxMessageHandlerSpec.java @@ -17,6 +17,7 @@ package org.springframework.integration.webflux.dsl; import java.net.URI; +import java.time.Duration; import java.util.function.Function; import org.springframework.core.ParameterizedTypeReference; @@ -144,6 +145,41 @@ public WebFluxMessageHandlerSpec publisherElementTypeExpression(Expression publi return this; } + /** + * Specify the timeout value for receiving the response form the server. + * If the response is not received with in timeout value, will result in Timeout Exception + * @param timeoutInMillis the timeout in milliseconds + * @return the spec + * @since 5.3 + */ + public WebFluxMessageHandlerSpec timeout(long timeoutInMillis) { + return timeout(Duration.ofMillis(timeoutInMillis)); + } + + /** + * Specify the timeout value for receiving the response form the server. + * If the response is not received with in timeout value, will result in Timeout Exception + * @param timeout accepts {@link java.time.Duration} + * @return the spec + * @since 5.3 + */ + public WebFluxMessageHandlerSpec timeout(Duration timeout) { + this.target.setTimeout(timeout); + return this; + } + + /** + * @param timeout accepts {@link Function}. The function accepts a Message as input and returns Duration. + * Function is evaluated on each request basis and the {@link Duration} is applied to Mono + * If the response is not received with in timeout value, will result in Timeout Exception + * @return the spec + * @since 5.3 + */ + public WebFluxMessageHandlerSpec timeout(Function, Duration> timeout) { + this.target.setTimeoutFunction(timeout); + return this; + } + @Override protected boolean isClientSet() { return this.webClient != null; diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java index 525ad8565f6..c105a48649d 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java @@ -18,7 +18,9 @@ import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Map; +import java.util.function.Function; import org.reactivestreams.Publisher; @@ -80,6 +82,8 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx private Expression publisherElementTypeExpression; + Function, Duration> timeoutFunction; + /** * Create a handler that will send requests to the provided URI. * @param uri The URI. @@ -198,6 +202,26 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp this.publisherElementTypeExpression = publisherElementTypeExpression; } + /** + * Specify the timeout value for receiving the response form the server. + * If the response is not received with in timeout value, will result in Timeout Exception + * @param timeout accepts {@link java.time.Duration} + * @since 5.3 + */ + public void setTimeout(Duration timeout) { + this.timeoutFunction = m -> timeout; + } + + /** + * @param timeoutFunction accepts {@link Function}. The function accepts a Message as input and returns Duration. + * Function is evaluated on each request basis and the {@link Duration} is applied to Mono + * If the response is not received with in timeout value, will result in Timeout Exception + * @since 5.3 + */ + public void setTimeoutFunction(Function, Duration> timeoutFunction) { + this.timeoutFunction = timeoutFunction; + } + @Override public String getComponentType() { return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter"); @@ -213,6 +237,10 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity httpR Mono responseMono = exchangeForResponseMono(requestSpec); + if (this.timeoutFunction != null) { + responseMono = responseMono.timeout(this.timeoutFunction.apply(requestMessage)); + } + if (isExpectReply()) { return createReplyFromResponse(expectedResponseType, responseMono); } diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java index 72ac27815d9..1c80c8ccd31 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.security.Principal; import java.time.Duration; import java.util.Collections; +import java.util.concurrent.TimeoutException; import javax.annotation.Resource; @@ -56,6 +57,7 @@ import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.PollableChannel; import org.springframework.security.access.AccessDecisionManager; import org.springframework.security.access.vote.AffirmativeBased; @@ -101,7 +103,7 @@ * @author Shiliang Li * @author Abhijit Sarkar * @author Gary Russell - * + * @author Jayadev Sirimamilla * @since 5.0 */ @RunWith(SpringRunner.class) @@ -119,6 +121,14 @@ public class WebFluxDslTests { @Qualifier("webFluxWithReplyPayloadToFlux.handler") private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux; + @Autowired + @Qualifier("webFluxWithTimeout.handler") + private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeout; + + @Autowired + @Qualifier("webFluxWithTimeoutFunction.handler") + private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeoutFunction; + @Resource(name = "httpReactiveProxyFlow.webflux:outbound-gateway#0") private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow; @@ -126,6 +136,14 @@ public class WebFluxDslTests { @Qualifier("webFluxFlowWithReplyPayloadToFlux.input") private MessageChannel webFluxFlowWithReplyPayloadToFluxInput; + @Autowired + @Qualifier("webFluxFlowWithTimeout.input") + private MessageChannel webFluxFlowWithTimeoutInput; + + @Autowired + @Qualifier("webFluxFlowWithTimeoutFunction.input") + private MessageChannel webFluxFlowWithTimeoutFunctionInput; + private MockMvc mockMvc; private WebTestClient webTestClient; @@ -145,6 +163,82 @@ public void setup() { .build(); } + @Test + public void testTimeoutForWebFluxOutboundGateway() { + ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + response.getHeaders().setContentType(MediaType.TEXT_PLAIN); + + + DataBufferFactory bufferFactory = response.bufferFactory(); + return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())) + .delayElement(Duration.ofMillis(11000))) + .then(Mono.defer(response::setComplete)); + }); + + WebClient webClient = WebClient.builder() + .clientConnector(httpConnector) + .build(); + + new DirectFieldAccessor(this.webFluxFlowWithTimeout) + .setPropertyValue("webClient", webClient); + + QueueChannel replyChannel = new QueueChannel(); + + Message testMessage = + MessageBuilder.withPayload("test") + .setReplyChannel(replyChannel) + .setErrorChannel(replyChannel) + .build(); + + this.webFluxFlowWithTimeoutInput.send(testMessage); + + Message receive = replyChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class); + assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class); + + } + + @Test + public void testTimeoutFunctionForWebFluxOutboundGateway() { + ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + response.getHeaders().setContentType(MediaType.TEXT_PLAIN); + + DataBufferFactory bufferFactory = response.bufferFactory(); + return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes())) + .delayElement(Duration.ofMillis(15000))) + .then(Mono.defer(response::setComplete)); + }); + + WebClient webClient = WebClient.builder() + .clientConnector(httpConnector) + .build(); + + new DirectFieldAccessor(this.webFluxFlowWithTimeoutFunction) + .setPropertyValue("webClient", webClient); + + QueueChannel replyChannel = new QueueChannel(); + + Message testMessage = + MessageBuilder.withPayload("test") + .setReplyChannel(replyChannel) + .setErrorChannel(replyChannel) + .setHeader("timeout", 100L) + .build(); + + this.webFluxFlowWithTimeoutFunctionInput.send(testMessage); + + Message receive = replyChannel.receive(10_000); + + assertThat(receive).isNotNull(); + assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class); + assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class); + + } + @Test public void testWebFluxFlowWithReplyPayloadToFlux() { ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { @@ -386,6 +480,26 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() { e -> e.id("webFluxWithReplyPayloadToFlux")); } + @Bean + public IntegrationFlow webFluxFlowWithTimeout() { + return f -> f + .handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration") + .httpMethod(HttpMethod.GET) + .timeout(100) + .expectedResponseType(String.class), + e -> e.id("webFluxWithTimeout")); + } + + @Bean + public IntegrationFlow webFluxFlowWithTimeoutFunction() { + return f -> f + .handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration") + .httpMethod(HttpMethod.GET) + .timeout(m -> Duration.ofMillis(m.getHeaders().get("timeout", Long.class))) + .expectedResponseType(String.class), + e -> e.id("webFluxWithTimeoutFunction")); + } + @Bean public IntegrationFlow httpReactiveProxyFlow() { return IntegrationFlows diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 8350dfb171a..9642f4ed1c1 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -68,3 +68,9 @@ See <<./amqp.adoc#amqp-inbound-channel-adapter,AMQP Inbound Channel Adapter>> The `encodeUri` property on the `AbstractHttpRequestExecutingMessageHandler` has been deprecated in favor of newly introduced `encodingMode`. See `DefaultUriBuilderFactory.EncodingMode` JavaDocs and <<./http.adoc#http-uri-encoding,Controlling URI Encoding>> for more information. This also affects `WebFluxRequestExecutingMessageHandler`, respective Java DSL and XML configuration. + +[[x5.3-WebFlux]] +=== WebFlux Changes + +The `timeoutFunction` property is added to `WebFluxRequestExecutingMessageHandler`. +Java DSL accessor which accepts long Timeout, Duration Timeout & Timeout Function is provided