Skip to content

Non-blocking retries: partition rebalance causes emergency consumer stop #2576

Closed
@v-chernyshev

Description

@v-chernyshev

In what version(s) of Spring for Apache Kafka are you seeing this issue?

2.9.4, 3.0.2.

Describe the bug

When the "seek after error" recovery mode is disabled, KafkaMessageListenerContainer stashes the records returned from the most recent poll call into a member variable:

if (!records.isEmpty()) {
this.remainingRecords = new ConsumerRecords<>(records);
this.pauseForPending = true;
}

If a partition rebalance happens before they are processed, the affected consumer instance terminates with an emergency stop error:

if (howManyRecords > 0) {
this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused "
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
KafkaMessageListenerContainer.this.emergencyStop.run();
}

It seems that the only possible reason for it is that the newly assigned partitions are not properly paused.

To Reproduce

I've uploaded a minimal example into this GitHub repository. Unfortunately, it lacks the Docker Compose setup as we are not allowed to use Docker Hub at work and, therefore, have a custom Kafka container that is not compatible with the open source one. Anyone wishing to reproduce this bug should create the following Kafka topics, with 4 partitions each:

  • input
  • input-retry-0
  • input-dlt
  • output

You may also have to adjust the bootstrap servers configuration in application.yaml. Assuming that this is done, the steps to trigger the problem are:

  • Send like 10 messages to the input topic via e.g. for i in {1..10}; do echo "test_$i" | kcat -b localhost:9192 -t input -P; done.
  • Clone the above example repository.
  • Start one instance of the service via mvn spring-boot:run. Wait for the partitions to be assigned and for the first Processing message= to appear in the console log.
  • Start another instance of the service and wait for the rebalance.

The first instance then fails with the emergency stop error, sometimes for multiple topics at once:

2023-02-10T17:48:49.149Z INFO 40483 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : rebalance-issue-consumer: partitions revoked: [input-0, input-1, input-2, input-3]
2023-02-10T17:48:49.149Z INFO 40483 --- [0-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : rebalance-issue-consumer: partitions revoked: [input-retry-0-0, input-retry-0-1, input-retry-0-2, input-retry-0-3]
2023-02-10T17:48:49.149Z INFO 40483 --- [ner#0-dlt-0-C-1] o.s.k.l.KafkaMessageListenerContainer : rebalance-issue-consumer: partitions revoked: [input-dlt-0, input-dlt-1, input-dlt-2, input-dlt-3]
2023-02-10T17:48:49.173Z INFO 40483 --- [0-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : rebalance-issue-consumer: partitions assigned: [input-retry-0-2, input-retry-0-3]
2023-02-10T17:48:49.173Z INFO 40483 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : rebalance-issue-consumer: partitions assigned: [input-2, input-3]
2023-02-10T17:48:49.179Z ERROR 40483 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Poll returned 7 record(s) while consumer was paused after an error; emergency stop invoked to avoid message loss
<...>
2023-02-10T17:48:52.831Z ERROR 40483 --- [0-retry-0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Poll returned 1 record(s) while consumer was paused after an error; emergency stop invoked to avoid message loss

The second service instance may fail too, effectively causing an outage as only the DLT topic has consumers attached to it after that.

Expected behavior

I expect the rebalance events to be handled by the framework in a way that does not cause partial termination of the service.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions