Skip to content

Commit 9f07803

Browse files
sirimamillaartembilan
authored andcommitted
GH-3152: Fix for nested Scatter Gather
Fixes #3152 The upstream `gatherResultChannel` header has been missed when we produced a reply from nested scatter-gather Added Test Case for Nested Scatter Gather test Simplified the the test cases and added author in changed cases Corrected codestyle issue in Travis CI Removed additional OriginalReplyChannel and originalErrorChannel in Headers. Added additional not to be executed line of code in test case. Restored OriginalErrorChannel Header and removed error handling related fixes * Clean up code style and improve readability **Cherry-pick to 5.1.x & master** * Migrate `RouterTests` to JUnit 5
1 parent 290c803 commit 9f07803

File tree

2 files changed

+50
-14
lines changed

2 files changed

+50
-14
lines changed

spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,15 +49,14 @@
4949
*
5050
* @author Artem Bilan
5151
* @author Abdul Zaheer
52+
* @author Jayadev Sirimamilla
5253
*
5354
* @since 4.1
5455
*/
5556
public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler implements Lifecycle {
5657

5758
private static final String GATHER_RESULT_CHANNEL = "gatherResultChannel";
5859

59-
private static final String ORIGINAL_REPLY_CHANNEL = "originalReplyChannel";
60-
6160
private static final String ORIGINAL_ERROR_CHANNEL = "originalErrorChannel";
6261

6362
private final MessageChannel scatterChannel;
@@ -178,9 +177,7 @@ private Message<?> enhanceScatterReplyMessage(Message<?> message) {
178177
MessageHeaders headers = message.getHeaders();
179178
return getMessageBuilderFactory()
180179
.fromMessage(message)
181-
.setHeader(MessageHeaders.REPLY_CHANNEL, headers.get(ORIGINAL_REPLY_CHANNEL))
182180
.setHeader(MessageHeaders.ERROR_CHANNEL, headers.get(ORIGINAL_ERROR_CHANNEL))
183-
.removeHeaders(ORIGINAL_REPLY_CHANNEL, ORIGINAL_ERROR_CHANNEL)
184181
.build();
185182
}
186183

@@ -193,15 +190,22 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
193190
getMessageBuilderFactory()
194191
.fromMessage(requestMessage)
195192
.setHeader(GATHER_RESULT_CHANNEL, gatherResultChannel)
196-
.setHeader(ORIGINAL_REPLY_CHANNEL, requestMessageHeaders.getReplyChannel())
197193
.setHeader(ORIGINAL_ERROR_CHANNEL, requestMessageHeaders.getErrorChannel())
198194
.setReplyChannel(this.gatherChannel)
199195
.setErrorChannelName(this.errorChannelName)
200196
.build();
201197

202198
this.messagingTemplate.send(this.scatterChannel, scatterMessage);
203199

204-
return gatherResultChannel.receive(this.gatherTimeout);
200+
Message<?> gatherResult = gatherResultChannel.receive(this.gatherTimeout);
201+
if (gatherResult != null) {
202+
return getMessageBuilderFactory()
203+
.fromMessage(gatherResult)
204+
.removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL,
205+
MessageHeaders.REPLY_CHANNEL, MessageHeaders.ERROR_CHANNEL);
206+
}
207+
208+
return null;
205209
}
206210

207211
@Override

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,8 +26,7 @@
2626
import java.util.function.Function;
2727
import java.util.stream.Collectors;
2828

29-
import org.junit.Test;
30-
import org.junit.runner.RunWith;
29+
import org.junit.jupiter.api.Test;
3130

3231
import org.springframework.beans.factory.ListableBeanFactory;
3332
import org.springframework.beans.factory.annotation.Autowired;
@@ -42,9 +41,11 @@
4241
import org.springframework.integration.config.EnableIntegration;
4342
import org.springframework.integration.config.EnableMessageHistory;
4443
import org.springframework.integration.dsl.IntegrationFlow;
44+
import org.springframework.integration.dsl.IntegrationFlowDefinition;
4545
import org.springframework.integration.dsl.IntegrationFlows;
4646
import org.springframework.integration.dsl.MessageChannels;
4747
import org.springframework.integration.expression.FunctionExpression;
48+
import org.springframework.integration.store.MessageGroup;
4849
import org.springframework.integration.support.MessageBuilder;
4950
import org.springframework.messaging.Message;
5051
import org.springframework.messaging.MessageChannel;
@@ -57,15 +58,16 @@
5758
import org.springframework.messaging.support.ErrorMessage;
5859
import org.springframework.messaging.support.GenericMessage;
5960
import org.springframework.test.annotation.DirtiesContext;
60-
import org.springframework.test.context.junit4.SpringRunner;
61+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
6162

6263
/**
6364
* @author Artem Bilan
6465
* @author Gary Russell
66+
* @author Jayadev Sirimamilla
6567
*
6668
* @since 5.0
6769
*/
68-
@RunWith(SpringRunner.class)
70+
@SpringJUnitConfig
6971
@DirtiesContext
7072
public class RouterTests {
7173

@@ -531,7 +533,6 @@ public void testExceptionTypeRouteFlow() {
531533
private MessageChannel nestedScatterGatherFlowInput;
532534

533535
@Test
534-
@SuppressWarnings("unchecked")
535536
public void testNestedScatterGather() {
536537
QueueChannel replyChannel = new QueueChannel();
537538
Message<String> request = MessageBuilder.withPayload("this is a test")
@@ -578,7 +579,7 @@ public void testScatterGatherWithExecutorChannelSubFlow() {
578579
assertThat(receive).isNotNull();
579580
Object payload = receive.getPayload();
580581
assertThat(payload).isInstanceOf(List.class);
581-
assertThat(((List) payload).get(1)).isInstanceOf(RuntimeException.class);
582+
assertThat(((List<?>) payload).get(1)).isInstanceOf(RuntimeException.class);
582583
}
583584

584585
@Autowired
@@ -592,6 +593,25 @@ public void propagateErrorFromGatherer() {
592593
.withMessage("intentional");
593594
}
594595

596+
@Autowired
597+
@Qualifier("scatterGatherInSubFlow.input")
598+
MessageChannel scatterGatherInSubFlowChannel;
599+
600+
601+
@Test
602+
public void testNestedScatterGatherSuccess() {
603+
PollableChannel replyChannel = new QueueChannel();
604+
this.scatterGatherInSubFlowChannel.send(
605+
org.springframework.integration.support.MessageBuilder.withPayload("baz")
606+
.setReplyChannel(replyChannel)
607+
.build());
608+
609+
Message<?> receive = replyChannel.receive(10000);
610+
assertThat(receive).isNotNull();
611+
assertThat(receive.getPayload()).isEqualTo("baz");
612+
613+
}
614+
595615
@Configuration
596616
@EnableIntegration
597617
@EnableMessageHistory({ "recipientListOrder*", "recipient1*", "recipient2*" })
@@ -896,9 +916,21 @@ public IntegrationFlow propagateErrorFromGatherer(TaskExecutor taskExecutor) {
896916
throw new RuntimeException("intentional");
897917
}),
898918
sg -> sg.gatherTimeout(100))
919+
.transform(m -> "This should not be executed, results must have been propagated to Error Channel")
899920
.get();
900921
}
901922

923+
@Bean
924+
public IntegrationFlow scatterGatherInSubFlow() {
925+
return flow -> flow.scatterGather(s -> s.applySequence(true)
926+
.recipientFlow(inflow -> inflow
927+
.scatterGather(s1 -> s1.applySequence(true)
928+
.recipientFlow(IntegrationFlowDefinition::bridge),
929+
g -> g.outputProcessor(MessageGroup::getOne)
930+
)),
931+
g -> g.outputProcessor(MessageGroup::getOne));
932+
}
933+
902934
}
903935

904936
private static class RoutingTestBean {

0 commit comments

Comments
 (0)