Skip to content

Timeout added to WebFluxMessageHandlerSpec.java & WebFluxRequestExecutingMessageHandler.java #3187

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.webflux.dsl;

import java.net.URI;
import java.time.Duration;
import java.util.function.Function;

import org.springframework.core.ParameterizedTypeReference;
Expand Down Expand Up @@ -144,6 +145,41 @@ public WebFluxMessageHandlerSpec publisherElementTypeExpression(Expression publi
return this;
}

/**
* Specify the timeout value for receiving the response form the server.
* If the response is not received with in timeout value, will result in Timeout Exception
* @param timeoutInMillis the timeout in milliseconds
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(long timeoutInMillis) {
return timeout(Duration.ofMillis(timeoutInMillis));
}

/**
* Specify the timeout value for receiving the response form the server.
* If the response is not received with in timeout value, will result in Timeout Exception
* @param timeout accepts {@link java.time.Duration}
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(Duration timeout) {
this.target.setTimeout(timeout);
return this;
}

/**
* @param timeout accepts {@link Function}. The function accepts a Message as input and returns Duration.
* Function is evaluated on each request basis and the {@link Duration} is applied to Mono
* If the response is not received with in timeout value, will result in Timeout Exception
* @return the spec
* @since 5.3
*/
public WebFluxMessageHandlerSpec timeout(Function<Message<?>, Duration> timeout) {
this.target.setTimeoutFunction(timeout);
return this;
}

@Override
protected boolean isClientSet() {
return this.webClient != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;

import org.reactivestreams.Publisher;

Expand Down Expand Up @@ -80,6 +82,8 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx

private Expression publisherElementTypeExpression;

Function<Message<?>, Duration> timeoutFunction;

/**
* Create a handler that will send requests to the provided URI.
* @param uri The URI.
Expand Down Expand Up @@ -198,6 +202,26 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp
this.publisherElementTypeExpression = publisherElementTypeExpression;
}

/**
* Specify the timeout value for receiving the response form the server.
* If the response is not received with in timeout value, will result in Timeout Exception
* @param timeout accepts {@link java.time.Duration}
* @since 5.3
*/
public void setTimeout(Duration timeout) {
this.timeoutFunction = m -> timeout;
}

/**
* @param timeoutFunction accepts {@link Function}. The function accepts a Message as input and returns Duration.
* Function is evaluated on each request basis and the {@link Duration} is applied to Mono
* If the response is not received with in timeout value, will result in Timeout Exception
* @since 5.3
*/
public void setTimeoutFunction(Function<Message<?>, Duration> timeoutFunction) {
this.timeoutFunction = timeoutFunction;
}

@Override
public String getComponentType() {
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter");
Expand All @@ -213,6 +237,10 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpR

Mono<ClientResponse> responseMono = exchangeForResponseMono(requestSpec);

if (this.timeoutFunction != null) {
responseMono = responseMono.timeout(this.timeoutFunction.apply(requestMessage));
}

if (isExpectReply()) {
return createReplyFromResponse(expectedResponseType, responseMono);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import java.security.Principal;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.TimeoutException;

import javax.annotation.Resource;

Expand Down Expand Up @@ -56,6 +57,7 @@
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.security.access.AccessDecisionManager;
import org.springframework.security.access.vote.AffirmativeBased;
Expand Down Expand Up @@ -101,7 +103,7 @@
* @author Shiliang Li
* @author Abhijit Sarkar
* @author Gary Russell
*
* @author Jayadev Sirimamilla
* @since 5.0
*/
@RunWith(SpringRunner.class)
Expand All @@ -119,13 +121,29 @@ public class WebFluxDslTests {
@Qualifier("webFluxWithReplyPayloadToFlux.handler")
private WebFluxRequestExecutingMessageHandler webFluxWithReplyPayloadToFlux;

@Autowired
@Qualifier("webFluxWithTimeout.handler")
private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeout;

@Autowired
@Qualifier("webFluxWithTimeoutFunction.handler")
private WebFluxRequestExecutingMessageHandler webFluxFlowWithTimeoutFunction;

@Resource(name = "httpReactiveProxyFlow.webflux:outbound-gateway#0")
private WebFluxRequestExecutingMessageHandler httpReactiveProxyFlow;

@Autowired
@Qualifier("webFluxFlowWithReplyPayloadToFlux.input")
private MessageChannel webFluxFlowWithReplyPayloadToFluxInput;

@Autowired
@Qualifier("webFluxFlowWithTimeout.input")
private MessageChannel webFluxFlowWithTimeoutInput;

@Autowired
@Qualifier("webFluxFlowWithTimeoutFunction.input")
private MessageChannel webFluxFlowWithTimeoutFunctionInput;

private MockMvc mockMvc;

private WebTestClient webTestClient;
Expand All @@ -145,6 +163,82 @@ public void setup() {
.build();
}

@Test
public void testTimeoutForWebFluxOutboundGateway() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);


DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes()))
.delayElement(Duration.ofMillis(11000)))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.webFluxFlowWithTimeout)
.setPropertyValue("webClient", webClient);

QueueChannel replyChannel = new QueueChannel();

Message<String> testMessage =
MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.build();

this.webFluxFlowWithTimeoutInput.send(testMessage);

Message<?> receive = replyChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class);
assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class);

}

@Test
public void testTimeoutFunctionForWebFluxOutboundGateway() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

DataBufferFactory bufferFactory = response.bufferFactory();
return response.writeWith(Mono.just(bufferFactory.wrap("FOO\nBAR\n".getBytes()))
.delayElement(Duration.ofMillis(15000)))
.then(Mono.defer(response::setComplete));
});

WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.build();

new DirectFieldAccessor(this.webFluxFlowWithTimeoutFunction)
.setPropertyValue("webClient", webClient);

QueueChannel replyChannel = new QueueChannel();

Message<String> testMessage =
MessageBuilder.withPayload("test")
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.setHeader("timeout", 100L)
.build();

this.webFluxFlowWithTimeoutFunctionInput.send(testMessage);

Message<?> receive = replyChannel.receive(10_000);

assertThat(receive).isNotNull();
assertThat(receive.getPayload()).isInstanceOf(MessageHandlingException.class);
assertThat(((MessageHandlingException) receive.getPayload()).getCause()).isInstanceOf(TimeoutException.class);

}

@Test
public void testWebFluxFlowWithReplyPayloadToFlux() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
Expand Down Expand Up @@ -386,6 +480,26 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() {
e -> e.id("webFluxWithReplyPayloadToFlux"));
}

@Bean
public IntegrationFlow webFluxFlowWithTimeout() {
return f -> f
.handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration")
.httpMethod(HttpMethod.GET)
.timeout(100)
.expectedResponseType(String.class),
e -> e.id("webFluxWithTimeout"));
}

@Bean
public IntegrationFlow webFluxFlowWithTimeoutFunction() {
return f -> f
.handle(WebFlux.outboundGateway("https://www.springsource.org/spring-integration")
.httpMethod(HttpMethod.GET)
.timeout(m -> Duration.ofMillis(m.getHeaders().get("timeout", Long.class)))
.expectedResponseType(String.class),
e -> e.id("webFluxWithTimeoutFunction"));
}

@Bean
public IntegrationFlow httpReactiveProxyFlow() {
return IntegrationFlows
Expand Down
6 changes: 6 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ See <<./amqp.adoc#amqp-inbound-channel-adapter,AMQP Inbound Channel Adapter>>
The `encodeUri` property on the `AbstractHttpRequestExecutingMessageHandler` has been deprecated in favor of newly introduced `encodingMode`.
See `DefaultUriBuilderFactory.EncodingMode` JavaDocs and <<./http.adoc#http-uri-encoding,Controlling URI Encoding>> for more information.
This also affects `WebFluxRequestExecutingMessageHandler`, respective Java DSL and XML configuration.

[[x5.3-WebFlux]]
=== WebFlux Changes

The `timeoutFunction` property is added to `WebFluxRequestExecutingMessageHandler`.
Java DSL accessor which accepts long Timeout, Duration Timeout & Timeout Function is provided