Skip to content

Scatter Gather is getting hung when Nested Scatter Gathers are used #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sirimamilla opened this issue Jan 23, 2020 · 20 comments
Closed
Milestone

Comments

@sirimamilla
Copy link
Contributor

sirimamilla commented Jan 23, 2020

5.2.3.log
5.1.3.log

Affects Version(s): <Spring Integration version>
5.1.7 and above
5.2.3

Hi,

Scatter Gather is failing when nested scatter Gathers are used with latest versions of Spring Integration. we tried with 5.1.7, 5.1.9 and 5.2.3. Same is working fine in 5.1.6 and below.
Here is the sample project
https://github.com/sirimamilla/ScatterGatherTest

Test case ScatterGatherTestSuccess runs indefinitely in this project.

when analyzed, scatter gather is using origialReplyChannel and originalErrorChannel Headers to preserve the previous original reply and error channels.

Inner scatterGather is responding correctly to the outer scatter gather. But outer scatter gather is failing to respond as it is finding its own gatherchannel in original replychannel header and sending response to gather channel and waiting indefinitely to continue.

This issue is blocking our version upgrade from spring boot 2.1.3 to 2.2.4. Please assist.

Regards,
Jayadev

@sirimamilla
Copy link
Contributor Author

sirimamilla commented Jan 23, 2020

Hi,

On Further analysis of issue, in version 5.1.3, reply channels are handled in handleRequestMessage. Reply channels are directly taken from the request message headers. Also no additional headers are added to the request sent to the subflows.

@Override
	protected Object handleRequestMessage(Message<?> requestMessage) {
		PollableChannel gatherResultChannel = new QueueChannel();

		Object gatherResultChannelName = this.replyChannelRegistry.channelToChannelName(gatherResultChannel);

		Message<?> scatterMessage = getMessageBuilderFactory()
				.fromMessage(requestMessage)
				.setHeader(GATHER_RESULT_CHANNEL, gatherResultChannelName)
				.setReplyChannel(this.gatherChannel)
				.build();

		this.messagingTemplate.send(this.scatterChannel, scatterMessage);

		Message<?> gatherResult = gatherResultChannel.receive(this.gatherTimeout);
		if (gatherResult != null) {
			return getMessageBuilderFactory()
					.fromMessage(gatherResult)
					.removeHeader(GATHER_RESULT_CHANNEL)
					.setHeader(MessageHeaders.REPLY_CHANNEL, requestMessage.getHeaders().getReplyChannel())
					.build();
		}

		return null;
	}

In 5.2.4 version, ReplyChannel and ErrorChannel are copied to headers ORIGINAL_REPLY_CHANNEL, ORIGINAL_ERROR_CHANNEL. This process is not taking care of any ORIGINAL_REPLY_CHANNEL, ORIGINAL_ERROR_CHANNEL already available in request headers. if these headers are already present in the headers, they will be overridden without being preserved for the reply path.

This is causing issue in the nested ScatterGather situations.

@Override
	protected Object handleRequestMessage(Message<?> requestMessage) {
		MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
		PollableChannel gatherResultChannel = new QueueChannel();

		Message<?> scatterMessage =
				getMessageBuilderFactory()
						.fromMessage(requestMessage)
						.setHeader(GATHER_RESULT_CHANNEL, gatherResultChannel)
						.setHeader(ORIGINAL_REPLY_CHANNEL, requestMessageHeaders.getReplyChannel())
						.setHeader(ORIGINAL_ERROR_CHANNEL, requestMessageHeaders.getErrorChannel())
						.setReplyChannel(this.gatherChannel)
						.setErrorChannelName(this.errorChannelName)
						.build();

		this.messagingTemplate.send(this.scatterChannel, scatterMessage);

		return gatherResultChannel.receive(this.gatherTimeout);
	}

private Message<?> enhanceScatterReplyMessage(Message<?> message) {
		MessageHeaders headers = message.getHeaders();
		return getMessageBuilderFactory()
				.fromMessage(message)
				.setHeader(MessageHeaders.REPLY_CHANNEL, headers.get(ORIGINAL_REPLY_CHANNEL))
				.setHeader(MessageHeaders.ERROR_CHANNEL, headers.get(ORIGINAL_ERROR_CHANNEL))
				.removeHeaders(ORIGINAL_REPLY_CHANNEL, ORIGINAL_ERROR_CHANNEL)
				.build();
	}

@sirimamilla
Copy link
Contributor Author

Hi,

I have done fix for the issue. But I am not able to raise a pull request as i dont have access to raise pull request. Please find the attached code to fix the issue with Nested channels.

There is further scope to refine the scatter gather.

ScatterGatherHandler.zip

@artembilan
Copy link
Member

@sirimamilla ,

thank you very much for the report!

I'll take a look into this today.

I am not able to raise a pull request as i dont have access to raise pull request

I wonder if you have forked the project, created a branch for the fix and then you can try to Pull Request it.

See instructions here: https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.adoc

@artembilan
Copy link
Member

I'm sorry, would you mind to be more specific.
I tried your projectwith all the versions down to Spring Integration 5.1.3 (5.1.2 doesn't have an errorChannel() on the ScatterGatherSpec yet) and all tests pass for me unless ScatterGatherTestAggregatorError().

So, what we are looking for?
Would you mind to minimize the project the way only about a problem and point, please, in which exactly version it works as expected.

Meanwhile I'll try to analyze your code...

@artembilan artembilan added the status: waiting-for-reporter Needs a feedback from the reporter label Jan 23, 2020
@artembilan
Copy link
Member

I have fixed your problem with this sg->sg.errorChannel("scatterGatherErrorChannel").gatherTimeout(1000). Pay attention to the gatherTimeout(1000).

The real problem that you block the main thread waiting for gatherer reply with the return gatherResultChannel.receive(this.gatherTimeout);.
At the same time the your gatherer (g->g.outputProcessor(this::processGroup)) doesn't finish its work but raises an exception which from its thread goes to the replyChannel of the gateway originating the request.
And since gateway is blocked already waiting for reply it doesn't go to receive of its reply channel.
Essentailly with such a thread shifting downstream you make a dead lock: you would like to let it to deal with an error, but that error doesn't release the main thread.

I'm not sure yet that can do anything on the matter, more over we have documented such a situation to make you ready: https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/message-routing.html#scatter-gather-error-handling

finite gatherTimeout must be configured for the ScatterGatherHandler. Otherwise it is going to be blocked waiting for a reply from the gatherer forever, by default.

So, I'll wait for your feedback, but sounds like "Works as Designed"

@sirimamilla
Copy link
Contributor Author

Hi Artem, it seems I didnt properly explain. Test case ScatterGatherTestSuccess always hangs. Not sure how that works for you.

GatherTumeout is to handle timeout scenarios.
In this testcase there is no channel timing out.

@artembilan
Copy link
Member

Ok. Try to have only that one single test. Please, come back to me with a minimal reproducing sample and with consistent versions. In one comment you claimed one version, the you talk about different. And in your sample we have a third one.

Thanks for understanding!

@sirimamilla
Copy link
Contributor Author

Hi @artembilan,

Good Morning.

I have updated the test project to have only failing test case(hangs forever) with version 5.2.3.
I have added gatherTimeout as per your suggestion above.

Same test case works perfectly fine with version 5.1.3

Thanks,
Jayadev

@artembilan
Copy link
Member

Good. I’ll take a look in my morning in 12 hours 😀

@artembilan
Copy link
Member

works perfectly fine with version 5.1.3

You still are confusing me. Does it work with version 5.1.6 because in the linked PR you claim that it stopped woriking since 5.1.7.

@sirimamilla
Copy link
Contributor Author

It works with 5.1.4, 5.1.5, 5.1.6 as well

@artembilan
Copy link
Member

OK. Thanks. BTW, you can override versions in your Spring Boot application using specific property. See more here: https://docs.spring.io/spring-boot/docs/2.2.4.RELEASE/reference/html/howto.html#howto-customize-dependency-versions

I mean you have overidden the version for spring-integration-core, but left it for spring-integration-test, which may cause some versions conflicts at runtime.

@artembilan
Copy link
Member

Caused by: java.lang.IllegalStateException: Cannot convert value of type 'java.util.ArrayList' to required type 'java.lang.String': no matching editors or conversion strategy found

Please, give me a working sample.
And you still provide 3 test methods.

Give me, please, as simple code as possible and the single test-case which fails with the current version, but works with that latest 5.1.6 as we have realized now.

@sirimamilla
Copy link
Contributor Author

Hi @artembilan

I have tried with correcting the versions and tried all possible options before creating the issue.
If you analyze the Debug log correctly, it shows the response is being sent back to scatter Gather itself in second scatter gather rather than sending to the right reply channel

@artembilan
Copy link
Member

M-m-m. We are going to sit here forever...

One more time: please, give me a simple project (only flow you worry about) with a single test-case which fails in version 5.1.7, but still works in version 5.1.6.
That's all. We don't need any other tests and flows to confuse us which works well.
Then I'll tell you what is wrong - on our or on your side.

If you analyze the Debug log correctly

Might be the case 😉 . That's why I'm asking about single tests and simple single flow (even if it is with a nested scatterGather()).

Thanks for understanding.

@sirimamilla
Copy link
Contributor Author

HI @artembilan ,

I have updated the the project with single test case. Please test with versions 5.1.6 & 5.1.7 please.

5.1.6 test case is successful. But in 5.1.7 the same test case fails.

https://github.com/sirimamilla/ScatterGatherTest

sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Jan 24, 2020
@artembilan artembilan added backport 5.1.x type: bug and removed status: waiting-for-reporter Needs a feedback from the reporter labels Jan 24, 2020
@artembilan artembilan added this to the 5.3 M2 milestone Jan 24, 2020
@artembilan
Copy link
Member

So, confirmed the issue.
It has been introduced after #2967.
We again have missed to have a test coverage. Nested Scatter-Gather in this case.

Previously we had a requestMessage context, so we could easily get access to original headers on every level.
But moving the logic outside of handleRequestMessage led us to loss of the requestMessage context.

Looking into provide PR for the possible fix.

@sirimamilla ,

thank you for your report and the fix!

sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Jan 25, 2020
sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Jan 25, 2020
sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Jan 26, 2020
…eaders.

Added additional not to be executed line of code in test case.

spring-projects#3152
sirimamilla added a commit to sirimamilla/spring-integration that referenced this issue Jan 27, 2020
artembilan pushed a commit that referenced this issue Jan 27, 2020
Fixes #3152

The upstream `gatherResultChannel` header has been missed when we produced a reply from nested scatter-gather

Added Test Case for Nested Scatter Gather test

Simplified the the test cases and added author in changed cases

Corrected codestyle issue in Travis CI

Removed additional OriginalReplyChannel and originalErrorChannel in Headers.
Added additional not to be executed line of code in test case.

Restored OriginalErrorChannel Header and removed error handling related fixes

* Clean up code style and improve readability

**Cherry-pick to 5.1.x & master**
artembilan pushed a commit that referenced this issue Jan 27, 2020
Fixes #3152

The upstream `gatherResultChannel` header has been missed when we produced a reply from nested scatter-gather

Added Test Case for Nested Scatter Gather test

Simplified the the test cases and added author in changed cases

Corrected codestyle issue in Travis CI

Removed additional OriginalReplyChannel and originalErrorChannel in Headers.
Added additional not to be executed line of code in test case.

Restored OriginalErrorChannel Header and removed error handling related fixes

* Clean up code style and improve readability

**Cherry-pick to 5.1.x & master**

# Conflicts:
#	spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java
@sirimamilla
Copy link
Contributor Author

HI @artembilan,

Thanks a ton for patiently working this fix. I have learnt a lot from you during the process of fix.

When will 5.1.x or 5.2.x be released with this fix?

@artembilan
Copy link
Member

5.2.4 - February 26; 5.1.10 - April 15. Just in time for respective Spring Boot releases!

Although we may consider to not release 5.1.x any more.
You have mentioned that you are going to upgrade to Boot 2.2.x, so it might even not make sense for you to wait for 5.1.10.

Let me think if we can come up with some workaround not to wait for upgrading version!

@artembilan
Copy link
Member

@sirimamilla ,

Here is a workaround:

.recipientFlow(inflow -> inflow
									.enrichHeaders(headers ->
											headers.headerFunction("originalGatherResultChannel",
													m -> m.getHeaders().get("gatherResultChannel")))
									.scatterGather(s1 -> s1.applySequence(true)
													.recipientFlow(IntegrationFlowDefinition::bridge),
											g -> g.outputProcessor(MessageGroup::getOne)
									)
									.enrichHeaders(headers ->
											headers.headerFunction("gatherResultChannel",
													m -> m.getHeaders().get("originalGatherResultChannel"), true))),

What is my point that I store a top-level gatherResultChannel header in the originalGatherResultChannel one.
And then I restore it after that nested scatterGather in sub-flow.
This is essentially the same what wen do with the .removeHeaders(GATHER_RESULT_CHANNEL) in your fix for ScatterGatherHandler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants