Skip to content

Add support for DLQ error handling within functional programming model #1729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
boonware opened this issue Jun 10, 2019 · 9 comments
Closed
Assignees
Milestone

Comments

@boonware
Copy link

I'm been reading available documentation and I am very confused about the correct approach to get an error topic working when using the Kafka binder.

The current documentation states that the property spring.cloud.stream.bindings.error.destination can be used to configure a global error topic, but this support appears to have been removed in this commit.

I have also tried to implement using properties for the Kafka binder specifically; the code below demonstrates my attempt, including the configuration.

My Maven BOM is org.springframework.cloud:spring-cloud-dependencies:Greenwich.SR1 and my dependencies are:

    org.springframework.boot:spring-boot-starter-web
    org.springframework.cloud:spring-cloud-stream
    org.springframework.cloud:spring-cloud-starter-stream-kafka

It would be great if it could be clarified how exactly a global error topic can be set up. So far, with the code below, I can see the message being handled multiple times via retries, but nothing is sent to the error topic (DLQ) that I have configured.

@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {

    public static void main(final String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
@Configuration
public class AppConfig {

    @Bean
    public ServiceImpl service() {
        return new ServiceImpl();
    }

    @Bean
    public Function<String, String> execute(ServiceImpl service) {
        return service::doBusinessLogic;
    }
}
public class ServiceImpl {

    public String doBusinessLogic(String message) {
        System.out.println("Handling message");
        if (message.startsWith("foo")) {
            throw new RuntimeException("This is a test");
        }
        return message;
    }

}
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9091
          autoCreateTopics: false
        bindings:
          input:
            consumer:
              enableDlq: true
              dlqName: myErrorTopic

      bindings:
        input:
          destination: myInputTopic
          group: myConsumerGroup
        output:
          destination: myOutputTopic

      function:
        definition: execute
@olegz
Copy link
Contributor

olegz commented Jun 10, 2019

I believe this is related to https://stackoverflow.com/questions/56528263/sending-errors-to-kafka-retry-topic-with-spring-cloud-stream

This is indeed a bug that would require some enhancements in upstream projects (e.g, spring-kafka/integration/rabbit etc).
We should be able to address it shortly and make it available in the next SR, hopefully next week.

Thanks for reporting it.

@olegz
Copy link
Contributor

olegz commented Jun 10, 2019

Tagging @garyrussell @artembilan

@garyrussell
Copy link
Contributor

The spring.cloud.stream.bindings.error.destination was a legacy implementation that was naaive and has been replaced by the current error handling. Which, as Oleg said, is broken for spring-cloud-function support.

The documentation should be fixed to remove that legacy stuff if it's still there.

That commit did remove one reference: a82c906#diff-369a0fe7a77fddec73a04336c6c349bd

@boonware
Copy link
Author

Thank you for the rapid response.
Is there a version I can fall back to, preferably compatible with latest Spring Boot, that includes the new error topic mechanism?

@olegz
Copy link
Contributor

olegz commented Jun 11, 2019

I'll fix the documentation as part of the commit for this issue once Gary/Artem will signal that their snapshots with relevant enhancements ready

@artembilan
Copy link
Member

The feature has been implemented in Spring Integration: spring-projects/spring-integration#2958.

We we always add IntegrationMessageHeaderAccessor.SOURCE_DATA header into the message emitted by the AMQP Inbound Channel Adapter (Gateway/Source).
This must be yet implemented in the Spring Integration Kafka and AWS, but at least we have something to play with from the Rabbit Binder perspective.

@olegz olegz changed the title Unclear how global error topic is set up with Kafka binder Add support for DLQ error handling within functional programming model Jun 11, 2019
@olegz olegz added enhancement and removed bug labels Jun 11, 2019
@olegz
Copy link
Contributor

olegz commented Jun 11, 2019

I just changed the title and also reclassified it as an enhancement rather then a bug given we never really had it working

olegz added a commit to olegz/spring-cloud-stream that referenced this issue Jun 12, 2019
…odel

- Change FunctionInvoker.onError() to send to binding error channel instead of global one
- Binders contain relevant changes as well

Resolves spring-cloud#1729
@olegz
Copy link
Contributor

olegz commented Jun 12, 2019

So the issue is addressed in the corresponding PR(s), however we can't really merge them until the relevant bits on spring-rabbit/kafka/integration are released with subsequent spring-boot release.

That said you can certainly build for now from PR(s) and test it. You'd have to build core first and then corresponding kafka or rabbit PR depending on the binder you are using.

@vasanthapriya-svp
Copy link

Issue exist with Stream Listener as well. Has this been fixed only for functions?
spring-cloud-depencies: Hoxton.M3
spring-cloud-stream
spring-cloud-starter-stream-kafka

sobychacko pushed a commit to sobychacko/spring-cloud-stream that referenced this issue Feb 24, 2022
sobychacko pushed a commit to sobychacko/spring-cloud-stream that referenced this issue Feb 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment