Skip to content

Backoff breaks recovery on batching listener container in multi partition environment #4371

@oneacik

Description

@oneacik

In what version(s) of Spring for Apache Kafka are you seeing this issue?
spring-boot = "3.4.4"

Describe the bug

		@Nullable
		private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> records, // NOSONAR
				List<ConsumerRecord<K, V>> recordList) {
			try {
				invokeBatchOnMessage(records, recordList);
				if (this.batchFailed) {
					this.batchFailed = false;
					if (this.commonErrorHandler != null) {
						this.commonErrorHandler.clearThreadState();
					}
					getAfterRollbackProcessor().clearThreadState();
				}
				if (!this.autoCommit && !this.isRecordAck) {
					processCommits();
				}
			}
			catch (RuntimeException e) {
				if (this.commonErrorHandler == null) {
					throw e;
				}
				try {
					invokeBatchErrorHandler(records, recordList, e);
					commitOffsetsIfNeededAfterHandlingError(records);
				}

doInvokeBatchListener is invoked with indeterministic record list.
Therefore this situation quite often happens:

Given:
we have 2 partitions: A(contains broken record), B(contains correct records).

When:
doInvokeBatchListener is executed with record list from A:
batchFailed is set to true in invokeBatchErrorHandler and backoff is started

doInvokeBatchListener is executed with record list from B:
batchFailed is set to false and state of backoff is cleared (by this.commonErrorHandler.clearThreadState();)

Then:
Record from A has never a chance to go through full backoff cycle and be sent to dlq.
Resulting in being stuck on this record and never being able to pass through.

To Reproduce
Create a topic with >1 partition, add backoff to dlq recovery, and use non-concurrent container.

Expected behavior
Record from A should be sent to DLQ and processing should continue.

Workaround
Use Concurrent Container and set concurrency to the number of partition so each container can consume only from one partition (making therefore poll deterministic).

Sample
No sample.

Proposed solution
Change this.commonErrorHandler.clearThreadState(); method to:
this.commonErrorHandler.clearTopicPartitionState(TopicPartition tp);
and change the code accordingly.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions