Skip to content

INT-3459: Log exceptions in case of failOver #2790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,12 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;

import org.springframework.integration.MessageDispatchingException;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.MessageHandlingRunnable;
Expand All @@ -47,25 +49,26 @@
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*
* @since 1.0.2
*/
public class UnicastingDispatcher extends AbstractDispatcher {

private final MessageHandler dispatchHandler = message -> doDispatch(message);
private final MessageHandler dispatchHandler = this::doDispatch;

private final Executor executor;

private volatile boolean failover = true;
private boolean failover = true;

private volatile LoadBalancingStrategy loadBalancingStrategy;
private LoadBalancingStrategy loadBalancingStrategy;

private volatile MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;

public UnicastingDispatcher() {
this.executor = null;
}

public UnicastingDispatcher(Executor executor) {
public UnicastingDispatcher(@Nullable Executor executor) {
this.executor = executor;
}

Expand All @@ -74,7 +77,6 @@ public UnicastingDispatcher(Executor executor) {
* Specify whether this dispatcher should failover when a single
* {@link MessageHandler} throws an Exception. The default value is
* <code>true</code>.
*
* @param failover The failover boolean.
*/
public void setFailover(boolean failover) {
Expand All @@ -83,10 +85,9 @@ public void setFailover(boolean failover) {

/**
* Provide a {@link LoadBalancingStrategy} for this dispatcher.
*
* @param loadBalancingStrategy The load balancing strategy implementation.
*/
public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy) {
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
this.loadBalancingStrategy = loadBalancingStrategy;
}

Expand Down Expand Up @@ -133,22 +134,30 @@ private boolean doDispatch(Message<?> message) {
return true;
}
boolean success = false;
Iterator<MessageHandler> handlerIterator = this.getHandlerIterator(message);
Iterator<MessageHandler> handlerIterator = getHandlerIterator(message);
if (!handlerIterator.hasNext()) {
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
}
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
List<RuntimeException> exceptions = null;
while (!success && handlerIterator.hasNext()) {
MessageHandler handler = handlerIterator.next();
try {
handler.handleMessage(message);
success = true; // we have a winner.
}
catch (Exception e) {
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", e);
catch (Exception ex) {
RuntimeException runtimeException =
IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
() -> "Dispatcher failed to deliver Message", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this line wrap? It was previously 112.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, my own preferences: I find it is much readable when variable initialization is multi-line. So, I start initialization block fully from a new line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

if (exceptions == null) {
exceptions = new ArrayList<>();
}
exceptions.add(runtimeException);
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
boolean isLast = !handlerIterator.hasNext();
if (!isLast && this.failover) {
logExceptionBeforeFailOver(ex, handler, message);
}
handleExceptions(exceptions, message, isLast);
}
}
return success;
Expand All @@ -160,10 +169,22 @@ private boolean doDispatch(Message<?> message) {
* it simply returns the Iterator for the existing handler List.
*/
private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
Set<MessageHandler> handlers = getHandlers();
if (this.loadBalancingStrategy != null) {
return this.loadBalancingStrategy.getHandlerIterator(message, this.getHandlers());
return this.loadBalancingStrategy.getHandlerIterator(message, handlers);
}
return handlers.iterator();
}

private void logExceptionBeforeFailOver(Exception ex, MessageHandler handler, Message<?> message) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("An exception was thrown by '" + handler + "' while handling '" + message +
"'. Failing over to the next subscriber.", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug has to be detected first otherwise we'll always take the INFO branch.

}
else if (this.logger.isInfoEnabled()) {
this.logger.info("An exception was thrown by '" + handler + "' while handling '" + message + "': " +
ex.getMessage() + ". Failing over to the next subscriber.");
}
return this.getHandlers().iterator();
}

/**
Expand All @@ -176,10 +197,10 @@ private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
*/
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
if (isLast || !this.failover) {
if (allExceptions != null && allExceptions.size() == 1) {
if (allExceptions.size() == 1) {
throw allExceptions.get(0);
}
throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive
throw new AggregateMessageDeliveryException(message,
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
package org.springframework.integration.dispatcher;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -33,6 +38,7 @@

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
Expand All @@ -41,6 +47,7 @@
* @author Iwein Fuld
* @author Mark Fisher
* @author Gary Russell
* @author Artem Bilan
*/
@RunWith(MockitoJUnitRunner.class)
public class RoundRobinDispatcherTests {
Expand All @@ -63,44 +70,44 @@ public void setupDispatcher() {


@Test
public void dispatchMessageWithSingleHandler() throws Exception {
dispatcher.addHandler(handler);
dispatcher.dispatch(message);
public void dispatchMessageWithSingleHandler() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.dispatch(this.message);
verify(this.handler).handleMessage(this.message);
}

@Test
public void differentHandlerInvokedOnSecondMessage() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
dispatcher.dispatch(message);
dispatcher.dispatch(message);
verify(handler).handleMessage(message);
verify(differentHandler).handleMessage(message);
public void differentHandlerInvokedOnSecondMessage() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
this.dispatcher.dispatch(this.message);
this.dispatcher.dispatch(this.message);
verify(this.handler).handleMessage(this.message);
verify(this.differentHandler).handleMessage(this.message);
}

@Test
public void multipleCyclesThroughHandlers() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
public void multipleCyclesThroughHandlers() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
for (int i = 0; i < 7; i++) {
dispatcher.dispatch(message);
this.dispatcher.dispatch(this.message);
}
verify(handler, times(4)).handleMessage(message);
verify(differentHandler, times(3)).handleMessage(message);
verify(this.handler, times(4)).handleMessage(this.message);
verify(this.differentHandler, times(3)).handleMessage(this.message);
}

@Test
public void currentHandlerIndexOverFlow() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
DirectFieldAccessor accessor = new DirectFieldAccessor(
new DirectFieldAccessor(dispatcher).getPropertyValue("loadBalancingStrategy"));
((AtomicInteger) accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE - 5);
public void currentHandlerIndexOverFlow() {
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);
TestUtils.getPropertyValue(this.dispatcher, "loadBalancingStrategy.currentHandlerIndex", AtomicInteger.class)
.set(Integer.MAX_VALUE - 5);
for (int i = 0; i < 40; i++) {
dispatcher.dispatch(message);
this.dispatcher.dispatch(this.message);
}
verify(handler, atLeast(18)).handleMessage(message);
verify(differentHandler, atLeast(18)).handleMessage(message);
verify(this.handler, atLeast(18)).handleMessage(this.message);
verify(this.differentHandler, atLeast(18)).handleMessage(this.message);
}

/**
Expand All @@ -109,16 +116,14 @@ public void currentHandlerIndexOverFlow() throws Exception {
*/
@Test
public void testExceptionEnhancement() {
dispatcher.addHandler(handler);
doThrow(new MessagingException("Mock Exception")).
when(handler).handleMessage(message);
try {
dispatcher.dispatch(message);
fail("Expected Exception");
}
catch (MessagingException e) {
assertThat(e.getFailedMessage()).isEqualTo(message);
}
this.dispatcher.addHandler(this.handler);
doThrow(new MessagingException("Mock Exception"))
.when(this.handler)
.handleMessage(this.message);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(this.message));
}

/**
Expand All @@ -127,16 +132,37 @@ public void testExceptionEnhancement() {
*/
@Test
public void testNoExceptionEnhancement() {
dispatcher.addHandler(handler);
this.dispatcher.addHandler(this.handler);
Message<String> dontReplaceThisMessage = MessageBuilder.withPayload("x").build();
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")).
when(handler).handleMessage(message);
try {
dispatcher.dispatch(message);
fail("Expected Exception");
}
catch (MessagingException e) {
assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage);
}
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception"))
.when(this.handler)
.handleMessage(this.message);

assertThatExceptionOfType(MessagingException.class)
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(dontReplaceThisMessage));
}

@Test
public void testFailOverAndLogging() {
RuntimeException testException = new RuntimeException("intentional");
doThrow(testException)
.when(this.handler)
.handleMessage(this.message);
this.dispatcher.addHandler(this.handler);
this.dispatcher.addHandler(this.differentHandler);

DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(this.dispatcher);
Log log = (Log) spy(directFieldAccessor.getPropertyType("logger"));
given(log.isDebugEnabled()).willReturn(true);
directFieldAccessor.setPropertyValue("logger", log);

this.dispatcher.dispatch(this.message);

verify(this.handler).handleMessage(this.message);
verify(this.differentHandler).handleMessage(this.message);

verify(log).debug(startsWith("An exception was thrown by '"), eq(testException));
}

}
4 changes: 3 additions & 1 deletion src/reference/asciidoc/channel.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ However, since version 3.0, you can provide your own implementation of the `Load
Note that the `load-balancer` and `load-balancer-ref` attributes are mutually exclusive.

The load-balancing also works in conjunction with a boolean `failover` property.
If the "`failover`" value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
If the `failover` value is true (the default), the dispatcher falls back to any subsequent handlers (as necessary) when preceding handlers throw exceptions.
The order is determined by an optional order value defined on the handlers themselves or, if no such value exists, the order in which the handlers subscribed.

If a certain situation requires that the dispatcher always try to invoke the first handler and then fall back in the same fixed order sequence every time an error occurs, no load-balancing strategy should be provided.
Expand All @@ -187,6 +187,8 @@ When using the namespace support, the `order` attribute on any endpoint determin
NOTE: Keep in mind that load-balancing and `failover` apply only when a channel has more than one subscribed message handler.
When using the namespace support, this means that more than one endpoint shares the same channel reference defined in the `input-channel` attribute.

Starting with version 5.2, when `failover` is true, a failure of the current handler together with the failed message is logged under `debug` or `info` if configured respectively.

[[executor-channel]]
===== `ExecutorChannel`

Expand Down