diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java index 8a099b81c33..978d4c3f3e5 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,10 +19,12 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.Executor; import org.springframework.integration.MessageDispatchingException; import org.springframework.integration.support.utils.IntegrationUtils; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.support.MessageHandlingRunnable; @@ -47,25 +49,26 @@ * @author Gary Russell * @author Oleg Zhurakousky * @author Artem Bilan + * * @since 1.0.2 */ public class UnicastingDispatcher extends AbstractDispatcher { - private final MessageHandler dispatchHandler = message -> doDispatch(message); + private final MessageHandler dispatchHandler = this::doDispatch; private final Executor executor; - private volatile boolean failover = true; + private boolean failover = true; - private volatile LoadBalancingStrategy loadBalancingStrategy; + private LoadBalancingStrategy loadBalancingStrategy; - private volatile MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task; + private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task; public UnicastingDispatcher() { this.executor = null; } - public UnicastingDispatcher(Executor executor) { + public UnicastingDispatcher(@Nullable Executor executor) { this.executor = executor; } @@ -74,7 +77,6 @@ public UnicastingDispatcher(Executor executor) { * Specify whether this dispatcher should failover when a single * {@link MessageHandler} throws an Exception. The default value is * true. - * * @param failover The failover boolean. */ public void setFailover(boolean failover) { @@ -83,10 +85,9 @@ public void setFailover(boolean failover) { /** * Provide a {@link LoadBalancingStrategy} for this dispatcher. - * * @param loadBalancingStrategy The load balancing strategy implementation. */ - public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy) { + public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) { this.loadBalancingStrategy = loadBalancingStrategy; } @@ -133,22 +134,30 @@ private boolean doDispatch(Message message) { return true; } boolean success = false; - Iterator handlerIterator = this.getHandlerIterator(message); + Iterator handlerIterator = getHandlerIterator(message); if (!handlerIterator.hasNext()) { throw new MessageDispatchingException(message, "Dispatcher has no subscribers"); } - List exceptions = new ArrayList(); + List exceptions = null; while (!success && handlerIterator.hasNext()) { MessageHandler handler = handlerIterator.next(); try { handler.handleMessage(message); success = true; // we have a winner. } - catch (Exception e) { - RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, - () -> "Dispatcher failed to deliver Message", e); + catch (Exception ex) { + RuntimeException runtimeException = + IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message, + () -> "Dispatcher failed to deliver Message", ex); + if (exceptions == null) { + exceptions = new ArrayList<>(); + } exceptions.add(runtimeException); - this.handleExceptions(exceptions, message, !handlerIterator.hasNext()); + boolean isLast = !handlerIterator.hasNext(); + if (!isLast && this.failover) { + logExceptionBeforeFailOver(ex, handler, message); + } + handleExceptions(exceptions, message, isLast); } } return success; @@ -160,10 +169,22 @@ private boolean doDispatch(Message message) { * it simply returns the Iterator for the existing handler List. */ private Iterator getHandlerIterator(Message message) { + Set handlers = getHandlers(); if (this.loadBalancingStrategy != null) { - return this.loadBalancingStrategy.getHandlerIterator(message, this.getHandlers()); + return this.loadBalancingStrategy.getHandlerIterator(message, handlers); + } + return handlers.iterator(); + } + + private void logExceptionBeforeFailOver(Exception ex, MessageHandler handler, Message message) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("An exception was thrown by '" + handler + "' while handling '" + message + + "'. Failing over to the next subscriber.", ex); + } + else if (this.logger.isInfoEnabled()) { + this.logger.info("An exception was thrown by '" + handler + "' while handling '" + message + "': " + + ex.getMessage() + ". Failing over to the next subscriber."); } - return this.getHandlers().iterator(); } /** @@ -176,10 +197,10 @@ private Iterator getHandlerIterator(Message message) { */ private void handleExceptions(List allExceptions, Message message, boolean isLast) { if (isLast || !this.failover) { - if (allExceptions != null && allExceptions.size() == 1) { + if (allExceptions.size() == 1) { throw allExceptions.get(0); } - throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive + throw new AggregateMessageDeliveryException(message, "All attempts to deliver Message to MessageHandlers failed.", allExceptions); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java index 9ffca587e8b..2dab8e9e144 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java @@ -17,14 +17,19 @@ package org.springframework.integration.dispatcher; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.startsWith; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -33,6 +38,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; @@ -41,6 +47,7 @@ * @author Iwein Fuld * @author Mark Fisher * @author Gary Russell + * @author Artem Bilan */ @RunWith(MockitoJUnitRunner.class) public class RoundRobinDispatcherTests { @@ -63,44 +70,44 @@ public void setupDispatcher() { @Test - public void dispatchMessageWithSingleHandler() throws Exception { - dispatcher.addHandler(handler); - dispatcher.dispatch(message); + public void dispatchMessageWithSingleHandler() { + this.dispatcher.addHandler(this.handler); + this.dispatcher.dispatch(this.message); + verify(this.handler).handleMessage(this.message); } @Test - public void differentHandlerInvokedOnSecondMessage() throws Exception { - dispatcher.addHandler(handler); - dispatcher.addHandler(differentHandler); - dispatcher.dispatch(message); - dispatcher.dispatch(message); - verify(handler).handleMessage(message); - verify(differentHandler).handleMessage(message); + public void differentHandlerInvokedOnSecondMessage() { + this.dispatcher.addHandler(this.handler); + this.dispatcher.addHandler(this.differentHandler); + this.dispatcher.dispatch(this.message); + this.dispatcher.dispatch(this.message); + verify(this.handler).handleMessage(this.message); + verify(this.differentHandler).handleMessage(this.message); } @Test - public void multipleCyclesThroughHandlers() throws Exception { - dispatcher.addHandler(handler); - dispatcher.addHandler(differentHandler); + public void multipleCyclesThroughHandlers() { + this.dispatcher.addHandler(this.handler); + this.dispatcher.addHandler(this.differentHandler); for (int i = 0; i < 7; i++) { - dispatcher.dispatch(message); + this.dispatcher.dispatch(this.message); } - verify(handler, times(4)).handleMessage(message); - verify(differentHandler, times(3)).handleMessage(message); + verify(this.handler, times(4)).handleMessage(this.message); + verify(this.differentHandler, times(3)).handleMessage(this.message); } @Test - public void currentHandlerIndexOverFlow() throws Exception { - dispatcher.addHandler(handler); - dispatcher.addHandler(differentHandler); - DirectFieldAccessor accessor = new DirectFieldAccessor( - new DirectFieldAccessor(dispatcher).getPropertyValue("loadBalancingStrategy")); - ((AtomicInteger) accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE - 5); + public void currentHandlerIndexOverFlow() { + this.dispatcher.addHandler(this.handler); + this.dispatcher.addHandler(this.differentHandler); + TestUtils.getPropertyValue(this.dispatcher, "loadBalancingStrategy.currentHandlerIndex", AtomicInteger.class) + .set(Integer.MAX_VALUE - 5); for (int i = 0; i < 40; i++) { - dispatcher.dispatch(message); + this.dispatcher.dispatch(this.message); } - verify(handler, atLeast(18)).handleMessage(message); - verify(differentHandler, atLeast(18)).handleMessage(message); + verify(this.handler, atLeast(18)).handleMessage(this.message); + verify(this.differentHandler, atLeast(18)).handleMessage(this.message); } /** @@ -109,16 +116,14 @@ public void currentHandlerIndexOverFlow() throws Exception { */ @Test public void testExceptionEnhancement() { - dispatcher.addHandler(handler); - doThrow(new MessagingException("Mock Exception")). - when(handler).handleMessage(message); - try { - dispatcher.dispatch(message); - fail("Expected Exception"); - } - catch (MessagingException e) { - assertThat(e.getFailedMessage()).isEqualTo(message); - } + this.dispatcher.addHandler(this.handler); + doThrow(new MessagingException("Mock Exception")) + .when(this.handler) + .handleMessage(this.message); + + assertThatExceptionOfType(MessagingException.class) + .isThrownBy(() -> this.dispatcher.dispatch(this.message)) + .satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(this.message)); } /** @@ -127,16 +132,37 @@ public void testExceptionEnhancement() { */ @Test public void testNoExceptionEnhancement() { - dispatcher.addHandler(handler); + this.dispatcher.addHandler(this.handler); Message dontReplaceThisMessage = MessageBuilder.withPayload("x").build(); - doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")). - when(handler).handleMessage(message); - try { - dispatcher.dispatch(message); - fail("Expected Exception"); - } - catch (MessagingException e) { - assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage); - } + doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")) + .when(this.handler) + .handleMessage(this.message); + + assertThatExceptionOfType(MessagingException.class) + .isThrownBy(() -> this.dispatcher.dispatch(this.message)) + .satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(dontReplaceThisMessage)); } + + @Test + public void testFailOverAndLogging() { + RuntimeException testException = new RuntimeException("intentional"); + doThrow(testException) + .when(this.handler) + .handleMessage(this.message); + this.dispatcher.addHandler(this.handler); + this.dispatcher.addHandler(this.differentHandler); + + DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(this.dispatcher); + Log log = (Log) spy(directFieldAccessor.getPropertyType("logger")); + given(log.isDebugEnabled()).willReturn(true); + directFieldAccessor.setPropertyValue("logger", log); + + this.dispatcher.dispatch(this.message); + + verify(this.handler).handleMessage(this.message); + verify(this.differentHandler).handleMessage(this.message); + + verify(log).debug(startsWith("An exception was thrown by '"), eq(testException)); + } + } diff --git a/src/reference/asciidoc/channel.adoc b/src/reference/asciidoc/channel.adoc index 49eb32ce481..b5b9a0586db 100644 --- a/src/reference/asciidoc/channel.adoc +++ b/src/reference/asciidoc/channel.adoc @@ -175,7 +175,7 @@ However, since version 3.0, you can provide your own implementation of the `Load Note that the `load-balancer` and `load-balancer-ref` attributes are mutually exclusive. The load-balancing also works in conjunction with a boolean `failover` property. -If the "`failover`" value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions. +If the `failover` value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions. The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed. If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided. @@ -187,6 +187,8 @@ When using the namespace support, the `order` attribute on any endpoint determin NOTE: Keep in mind that load-balancing and `failover` apply only when a channel has more than one subscribed message handler. When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the `input-channel` attribute. +Starting with version 5.2, when `failover` is true, a failure of the current handler together with the failed message is logged under `debug` or `info` if configured respectively. + [[executor-channel]] ===== `ExecutorChannel`