Skip to content

Commit e63aae5

Browse files
committed
INT-3459: Log exceptions in case of failOver
JIRA: https://jira.spring.io/browse/INT-3459 When `UnicastingDispatcher` is configured with `failOver` (true by default), it loses exceptions it caught with previous handler when the next one processes message properly * Add INFO logging for exceptions which are caught before going to fail over to the next handler
1 parent e32f877 commit e63aae5

File tree

2 files changed

+94
-62
lines changed

2 files changed

+94
-62
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java

+22-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323

2424
import org.springframework.integration.MessageDispatchingException;
2525
import org.springframework.integration.support.utils.IntegrationUtils;
26+
import org.springframework.lang.Nullable;
2627
import org.springframework.messaging.Message;
2728
import org.springframework.messaging.MessageHandler;
2829
import org.springframework.messaging.support.MessageHandlingRunnable;
@@ -47,25 +48,26 @@
4748
* @author Gary Russell
4849
* @author Oleg Zhurakousky
4950
* @author Artem Bilan
51+
*
5052
* @since 1.0.2
5153
*/
5254
public class UnicastingDispatcher extends AbstractDispatcher {
5355

54-
private final MessageHandler dispatchHandler = message -> doDispatch(message);
56+
private final MessageHandler dispatchHandler = this::doDispatch;
5557

5658
private final Executor executor;
5759

58-
private volatile boolean failover = true;
60+
private boolean failover = true;
5961

60-
private volatile LoadBalancingStrategy loadBalancingStrategy;
62+
private LoadBalancingStrategy loadBalancingStrategy;
6163

62-
private volatile MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
64+
private MessageHandlingTaskDecorator messageHandlingTaskDecorator = task -> task;
6365

6466
public UnicastingDispatcher() {
6567
this.executor = null;
6668
}
6769

68-
public UnicastingDispatcher(Executor executor) {
70+
public UnicastingDispatcher(@Nullable Executor executor) {
6971
this.executor = executor;
7072
}
7173

@@ -74,7 +76,6 @@ public UnicastingDispatcher(Executor executor) {
7476
* Specify whether this dispatcher should failover when a single
7577
* {@link MessageHandler} throws an Exception. The default value is
7678
* <code>true</code>.
77-
*
7879
* @param failover The failover boolean.
7980
*/
8081
public void setFailover(boolean failover) {
@@ -83,10 +84,9 @@ public void setFailover(boolean failover) {
8384

8485
/**
8586
* Provide a {@link LoadBalancingStrategy} for this dispatcher.
86-
*
8787
* @param loadBalancingStrategy The load balancing strategy implementation.
8888
*/
89-
public void setLoadBalancingStrategy(LoadBalancingStrategy loadBalancingStrategy) {
89+
public void setLoadBalancingStrategy(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
9090
this.loadBalancingStrategy = loadBalancingStrategy;
9191
}
9292

@@ -137,18 +137,24 @@ private boolean doDispatch(Message<?> message) {
137137
if (!handlerIterator.hasNext()) {
138138
throw new MessageDispatchingException(message, "Dispatcher has no subscribers");
139139
}
140-
List<RuntimeException> exceptions = new ArrayList<RuntimeException>();
140+
List<RuntimeException> exceptions = new ArrayList<>();
141141
while (!success && handlerIterator.hasNext()) {
142142
MessageHandler handler = handlerIterator.next();
143143
try {
144144
handler.handleMessage(message);
145145
success = true; // we have a winner.
146146
}
147-
catch (Exception e) {
148-
RuntimeException runtimeException = IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
149-
() -> "Dispatcher failed to deliver Message", e);
147+
catch (Exception ex) {
148+
RuntimeException runtimeException =
149+
IntegrationUtils.wrapInDeliveryExceptionIfNecessary(message,
150+
() -> "Dispatcher failed to deliver Message", ex);
150151
exceptions.add(runtimeException);
151-
this.handleExceptions(exceptions, message, !handlerIterator.hasNext());
152+
boolean isLast = !handlerIterator.hasNext();
153+
if (!isLast && this.failover && this.logger.isInfoEnabled()) {
154+
this.logger.info("An exception thrown from the '" + handler + "' for the '" + message + "'. " +
155+
"Will be in a failover for the next subscriber.", ex);
156+
}
157+
handleExceptions(exceptions, message, isLast);
152158
}
153159
}
154160
return success;
@@ -176,10 +182,10 @@ private Iterator<MessageHandler> getHandlerIterator(Message<?> message) {
176182
*/
177183
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
178184
if (isLast || !this.failover) {
179-
if (allExceptions != null && allExceptions.size() == 1) {
185+
if (allExceptions.size() == 1) {
180186
throw allExceptions.get(0);
181187
}
182-
throw new AggregateMessageDeliveryException(message, //NOSONAR - false positive
188+
throw new AggregateMessageDeliveryException(message,
183189
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
184190
}
185191
}

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/RoundRobinDispatcherTests.java

+72-46
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@
1717
package org.springframework.integration.dispatcher;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.fail;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.ArgumentMatchers.startsWith;
23+
import static org.mockito.BDDMockito.given;
2124
import static org.mockito.Mockito.atLeast;
2225
import static org.mockito.Mockito.doThrow;
26+
import static org.mockito.Mockito.spy;
2327
import static org.mockito.Mockito.times;
2428
import static org.mockito.Mockito.verify;
2529

2630
import java.util.concurrent.atomic.AtomicInteger;
2731

32+
import org.apache.commons.logging.Log;
2833
import org.junit.Before;
2934
import org.junit.Test;
3035
import org.junit.runner.RunWith;
@@ -33,6 +38,7 @@
3338

3439
import org.springframework.beans.DirectFieldAccessor;
3540
import org.springframework.integration.support.MessageBuilder;
41+
import org.springframework.integration.test.util.TestUtils;
3642
import org.springframework.messaging.Message;
3743
import org.springframework.messaging.MessageHandler;
3844
import org.springframework.messaging.MessagingException;
@@ -41,6 +47,7 @@
4147
* @author Iwein Fuld
4248
* @author Mark Fisher
4349
* @author Gary Russell
50+
* @author Artem Bilan
4451
*/
4552
@RunWith(MockitoJUnitRunner.class)
4653
public class RoundRobinDispatcherTests {
@@ -63,44 +70,44 @@ public void setupDispatcher() {
6370

6471

6572
@Test
66-
public void dispatchMessageWithSingleHandler() throws Exception {
67-
dispatcher.addHandler(handler);
68-
dispatcher.dispatch(message);
73+
public void dispatchMessageWithSingleHandler() {
74+
this.dispatcher.addHandler(this.handler);
75+
this.dispatcher.dispatch(this.message);
76+
verify(this.handler).handleMessage(this.message);
6977
}
7078

7179
@Test
72-
public void differentHandlerInvokedOnSecondMessage() throws Exception {
73-
dispatcher.addHandler(handler);
74-
dispatcher.addHandler(differentHandler);
75-
dispatcher.dispatch(message);
76-
dispatcher.dispatch(message);
77-
verify(handler).handleMessage(message);
78-
verify(differentHandler).handleMessage(message);
80+
public void differentHandlerInvokedOnSecondMessage() {
81+
this.dispatcher.addHandler(this.handler);
82+
this.dispatcher.addHandler(this.differentHandler);
83+
this.dispatcher.dispatch(this.message);
84+
this.dispatcher.dispatch(this.message);
85+
verify(this.handler).handleMessage(this.message);
86+
verify(this.differentHandler).handleMessage(this.message);
7987
}
8088

8189
@Test
82-
public void multipleCyclesThroughHandlers() throws Exception {
83-
dispatcher.addHandler(handler);
84-
dispatcher.addHandler(differentHandler);
90+
public void multipleCyclesThroughHandlers() {
91+
this.dispatcher.addHandler(this.handler);
92+
this.dispatcher.addHandler(this.differentHandler);
8593
for (int i = 0; i < 7; i++) {
86-
dispatcher.dispatch(message);
94+
this.dispatcher.dispatch(this.message);
8795
}
88-
verify(handler, times(4)).handleMessage(message);
89-
verify(differentHandler, times(3)).handleMessage(message);
96+
verify(this.handler, times(4)).handleMessage(this.message);
97+
verify(this.differentHandler, times(3)).handleMessage(this.message);
9098
}
9199

92100
@Test
93-
public void currentHandlerIndexOverFlow() throws Exception {
94-
dispatcher.addHandler(handler);
95-
dispatcher.addHandler(differentHandler);
96-
DirectFieldAccessor accessor = new DirectFieldAccessor(
97-
new DirectFieldAccessor(dispatcher).getPropertyValue("loadBalancingStrategy"));
98-
((AtomicInteger) accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE - 5);
101+
public void currentHandlerIndexOverFlow() {
102+
this.dispatcher.addHandler(this.handler);
103+
this.dispatcher.addHandler(this.differentHandler);
104+
TestUtils.getPropertyValue(this.dispatcher, "loadBalancingStrategy.currentHandlerIndex", AtomicInteger.class)
105+
.set(Integer.MAX_VALUE - 5);
99106
for (int i = 0; i < 40; i++) {
100-
dispatcher.dispatch(message);
107+
this.dispatcher.dispatch(this.message);
101108
}
102-
verify(handler, atLeast(18)).handleMessage(message);
103-
verify(differentHandler, atLeast(18)).handleMessage(message);
109+
verify(this.handler, atLeast(18)).handleMessage(this.message);
110+
verify(this.differentHandler, atLeast(18)).handleMessage(this.message);
104111
}
105112

106113
/**
@@ -109,16 +116,14 @@ public void currentHandlerIndexOverFlow() throws Exception {
109116
*/
110117
@Test
111118
public void testExceptionEnhancement() {
112-
dispatcher.addHandler(handler);
113-
doThrow(new MessagingException("Mock Exception")).
114-
when(handler).handleMessage(message);
115-
try {
116-
dispatcher.dispatch(message);
117-
fail("Expected Exception");
118-
}
119-
catch (MessagingException e) {
120-
assertThat(e.getFailedMessage()).isEqualTo(message);
121-
}
119+
this.dispatcher.addHandler(this.handler);
120+
doThrow(new MessagingException("Mock Exception"))
121+
.when(this.handler)
122+
.handleMessage(this.message);
123+
124+
assertThatExceptionOfType(MessagingException.class)
125+
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
126+
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(this.message));
122127
}
123128

124129
/**
@@ -127,16 +132,37 @@ public void testExceptionEnhancement() {
127132
*/
128133
@Test
129134
public void testNoExceptionEnhancement() {
130-
dispatcher.addHandler(handler);
135+
this.dispatcher.addHandler(this.handler);
131136
Message<String> dontReplaceThisMessage = MessageBuilder.withPayload("x").build();
132-
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")).
133-
when(handler).handleMessage(message);
134-
try {
135-
dispatcher.dispatch(message);
136-
fail("Expected Exception");
137-
}
138-
catch (MessagingException e) {
139-
assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage);
140-
}
137+
doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception"))
138+
.when(this.handler)
139+
.handleMessage(this.message);
140+
141+
assertThatExceptionOfType(MessagingException.class)
142+
.isThrownBy(() -> this.dispatcher.dispatch(this.message))
143+
.satisfies(ex -> assertThat(ex.getFailedMessage()).isEqualTo(dontReplaceThisMessage));
141144
}
145+
146+
@Test
147+
public void testFailOverAndLogging() {
148+
RuntimeException testException = new RuntimeException("intentional");
149+
doThrow(testException)
150+
.when(this.handler)
151+
.handleMessage(this.message);
152+
this.dispatcher.addHandler(this.handler);
153+
this.dispatcher.addHandler(this.differentHandler);
154+
155+
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(this.dispatcher);
156+
Log log = (Log) spy(directFieldAccessor.getPropertyType("logger"));
157+
given(log.isInfoEnabled()).willReturn(true);
158+
directFieldAccessor.setPropertyValue("logger", log);
159+
160+
this.dispatcher.dispatch(this.message);
161+
162+
verify(this.handler).handleMessage(this.message);
163+
verify(this.differentHandler).handleMessage(this.message);
164+
165+
verify(log).info(startsWith("An exception thrown from the"), eq(testException));
166+
}
167+
142168
}

0 commit comments

Comments
 (0)