Closed
Description
Discussed in #2805
Originally posted by m-hetz September 11, 2023
Hi!
I'm using RecordFilterStrategy
to filter records based on headers. This works great on single messages, the filter works and records don't reach the listener (annotated with @KafkaListener
).
When using batch mode, filtering also works but when the entire batch is filtered, the listener is invoked with an empty list.
Is this expected behavior? According to this when returning an empty list all records are filtered
Is there a way to configure it not to reach the listener at all? (on an empty bulk)
Thanks!
val filter = object : RecordFilterStrategy<String, String> {
override fun filter(consumerRecord: ConsumerRecord<String, String>): Boolean {
val header = consumerRecord.headers().lastHeader("SKIP")
if (header != null) {
logger.info("Filtering header=" + consumerRecord.key())
return true
}
return false
}
override fun filterBatch(records: MutableList<ConsumerRecord<String, String>>): MutableList<ConsumerRecord<String, String>> {
val iterator: MutableIterator<ConsumerRecord<String, String>> = records.iterator()
while (iterator.hasNext()) {
if (this.filter(iterator.next())) {
iterator.remove()
}
}
if (records.size > 0) return records
return mutableListOf()
}
}
factory.setBatchListener(isBatchListener)
factory.setRecordFilterStrategy(filter)
@KafkaListener(topics = ["topic"], containerFactory = "messageContainerFactory")
fun listener(messages: List<ConsumerRecord<String, String?>>,
) {
logger.info("Consuming messages, batch size=${messages.size}")
}
Sorry for the unformatted code, I tried editing the message but the code is still messed up