Skip to content

GH-2744: ScatterGather: reinstate request headers #2750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

artembilan
Copy link
Member

@artembilan artembilan commented Feb 14, 2019

Fixes #2744

When we get scattering results, there is no reason to keep internal
headers any more.

  • Fix ScatterGatherHandler to modify scattering result messages to
    reinstate headers from original request message.
    This way we are able to re-throw an exception from the gatherer to
    the caller

Fixes spring-projects#2744

When we get scattering results, there is no reason to keep internal
headers any more.
* Fix `ScatterGatherHandler` to modify scattering result messages to
reinstate headers from original request message.
This way we are able to re-throw an exception from the gatherer to
the caller.
Copy link
Contributor

@garyrussell garyrussell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor nit-picks

}
MessageChannel gatherResultChannel = headers.get(GATHER_RESULT_CHANNEL, MessageChannel.class);
if (gatherResultChannel != null) {
this.messagingTemplate.send(gatherResultChannel, message);
}
else {
throw new MessageDeliveryException(message,
"The 'gatherResultChannel' header is required to delivery gather result.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/delivery/deliver the/

@@ -206,3 +206,8 @@ public Message<?> processAsyncScatterError(MessagingException payload) {
To produce a proper reply, we have to copy headers (including `replyChannel` and `errorChannel`) from the `failedMessage` of the `MessagingException` that has been sent to the `scatterGatherErrorChannel` by the `MessagePublishingErrorHandler`.
This way the target exception is returned to the gatherer of the `ScatterGatherHandler` for reply messages group completion.
Such an exception `payload` can be filtered out in the `MessageGroupProcessor` of the gatherer or processed other way downstream, after the scatter-gather endpoint.

NOTE: Before sending scattering results to the gatherer, `ScatterGatherHandler` reinstates the request message headers, including reply and error channels if any.
This way errors from the `AggregatingMessageHandler` are going to be propagated to the caller, even if async hand off is applied for scatter recipients.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/if async/if an async/

s/applied for scatter recipients/applied in scatter recipient subflows/ ?

NOTE: Before sending scattering results to the gatherer, `ScatterGatherHandler` reinstates the request message headers, including reply and error channels if any.
This way errors from the `AggregatingMessageHandler` are going to be propagated to the caller, even if async hand off is applied for scatter recipients.
In this case a reasonable, finite `gatherTimeout` must be configured for the `ScatterGatherHandler`.
Otherwise it is going to be blocked waiting for reply from the gatherer forever by default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for a reply ... forever, by default

@artembilan
Copy link
Member Author

Gary, requested changes have been pushed.
There is a ChannelInterceptorAware point in the ScatterGatherHandler, which has to be fixed after merging #2751 .
The fix plan depends of the order of merging 😄

Thanks

@@ -208,6 +208,6 @@ This way the target exception is returned to the gatherer of the `ScatterGatherH
Such an exception `payload` can be filtered out in the `MessageGroupProcessor` of the gatherer or processed other way downstream, after the scatter-gather endpoint.

NOTE: Before sending scattering results to the gatherer, `ScatterGatherHandler` reinstates the request message headers, including reply and error channels if any.
This way errors from the `AggregatingMessageHandler` are going to be propagated to the caller, even if async hand off is applied for scatter recipients.
This way errors from the `AggregatingMessageHandler` are going to be propagated to the caller, even if async an hand off is applied in scatter recipient subflows.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an async - will fix on merge

@garyrussell
Copy link
Contributor

Merged as 0d37566

garyrussell added a commit to garyrussell/spring-integration that referenced this pull request Mar 16, 2022
artembilan pushed a commit that referenced this pull request Mar 16, 2022
Resolves #3750

**5.5.x only; removed in 6.0**
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants