Skip to content

Spring Integraton- Issue with Nested Scatter Gather Errors not Propagated #2735

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sirimamilla opened this issue Feb 5, 2019 · 8 comments
Closed

Comments

@sirimamilla
Copy link
Contributor

Affects Version(s): <Spring Integration version 5.1.1>


We are facing issues with Spring Integration, when nested Scatter Gather is used, success Response is getting propagated to the caller. Error Response is not being sent back to the caller.

Here is the reference project that I am using to replicate the issue.
ScatterGatherTest.zip

This issue is similar to the issue reported here.#2731

Thanks for your support in advance.

@artembilan
Copy link
Member

@sirimamilla ,

thank you very much for reporting this!

I'll take a look into your ZIP soon, but would you mind in the future share with us really projects on GH?
It's much easier to use, much safer and also we would be able to push some commits to your repo if that.

Thanks for understanding!

@artembilan artembilan self-assigned this Feb 5, 2019
@sirimamilla
Copy link
Contributor Author

Hi Artem,

I will try to route this request through Pivotal support, who will create repository in Git on our behalf.

I am trying to secure required approvals to checkin source code in Github for Tech only projects like this.

Also to contribute to Spring Integration, we are currently in process of securing approvals, which is long pending as it goes through legal & compliance.

Thanks for your understanding.

Regards,
Jayadev Sirimamilla

@artembilan
Copy link
Member

It turns out that nested Scatter-Gather is not guilty. We can simply reproduce an issue with this configuration:

 @Bean
  public IntegrationFlow scatterGatherflow(){
    return flow->flow.log()
        .scatterGather(s->s.applySequence(true).requiresReply(true)
            .recipient("transformer.input")
            .recipient("scatterGatherTransformer.input"))
        .log()
        .bridge();
  }

  @Bean IntegrationFlow transformer(){
    return flow->flow.log()
        .transform(m->"recipient1 Response");
  }

  @Bean
  public IntegrationFlow scatterGatherTransformer(){
    return f->f.channel(channels -> channels.executor(Executors.newWorkStealingPool()))
        .transform(this::MessageTransformer);
  }

  public String MessageTransformer(String msg){
    if ("Test".equals(msg)){
      return msg;
    }else {
      throw new RuntimeException("Throw Error");
    }
  }

The problem in this scenario is similar to what we have with a gateway and an ExecutorChannel downstream.
Pay attention like one of your recipients is really started from the channels.executor(). In this case an error is sent to the errorChannel header from the gateway in the beginning.
However what I see in the debug that we are blocked on the send() and we just don't go to receive() part of the gateway. My best guess that it is indeed something in the ScatterGatherHandler which is blocking us waiting for reply.

So, I guess we need something over there to be sure that we deal with the proper errorChannel/replyChannel headers, really in the context of the current ScatterGatherHandler.

Looking into that today...

@artembilan
Copy link
Member

I found the workaround for you:

.recipientFlow(sub -> sub.gateway("scatterGatherTransformer.input")))

So, we wrap erroneous flow to its own gateway, which will populate its own errorChannel/replyChannel headers and will do the trick with that ExecutorChannel.

The problem is that this is really an expected behavior, where our Scatter-Gather is just a router for downstream flows. And it isn't so clear if that is OK to have just a single error from downstream especially when all recipients are async.

Would be better from design perspective to catch those exceptions is their own branches and convert them to some compensation result which could be filtered by the gatherer upstream or already in the next flow after Scatter-Gather reply.

I think for convenience we will consider to add some errorChannel option into the ScatterGatherHandler exactly for similar use-cases when sub-flows are async. This option is going to be populated into the MessageHeaders.ERROR_CHANNEL for the proper callback from async failure.
Then you can configure some flow for this error channel to return compensation or drop error off. The gatherer releaseStrategy should be configured respectively to honor possible errors in scatterer sub-flows.

How that all make sense...

BTW, this issue is fully not related to what we talked about Gateway and an ExecutorChannel. The behavior and problem are fully different.

Thanks for understanding.

@artembilan artembilan added this to the 5.1.3 milestone Feb 5, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 5, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 8, 2019
garyrussell pushed a commit to garyrussell/spring-integration that referenced this issue Feb 11, 2019
@sirimamilla
Copy link
Contributor Author

Hi @artembilan ,

I have created a local build of Spring-Integration, but the error test case in my project is still failing.
Calling Thread is indefinitely hung waiting for response.

I fear the error is not fixed with the fix provided.

Behavior is not changed yet.

Thanks,
Jayadev

@artembilan
Copy link
Member

The fix provided is not about your problem. It is for a more convenient configuration and some explanation in docs.
It is your configuration issue, not a Framework.
Please, read what we added to the Docs with this fix.

Plus, you need to change your configuration as I explained: with that .gateway() before sending to the sub-flow with an ExecutorChannel.

Anyway the answer is there in the docs.

Please, don't treat this issue as similar to what we have with nested gateways.

@sirimamilla
Copy link
Contributor Author

HI @artembilan ,

Thanks for the explanation. This is a change in behavior of the scatter gather compared to what it use to be prior to this change.

Prior to this Fix, Error use to be thrown back to the caller. This will now has to be explicitly handled in Aggregator.

@artembilan
Copy link
Member

I don't think so. What we did here is just a convenient property for the errorChannel header.
We haven't changed anything else in the logic.
The problem is somewhere in your configuration.

I think you still deal with the nested ExecutorChannel. In general we don't recommend to re-throw exceptions from downstream. Would be better to ignore or return compensation message do not lose other scatter branches.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants