Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.2
Describe the bug
I've followed this part of the documentation to integrate micrometer in the Kafka listener's container:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MyPayload>> myPayloadKafkaListenerContainerFactory(
ConsumerFactory<String, MyPayload> MyPayloadKafkaConsumerFactory,
ObservationRegistry observationRegistry
) {
ConcurrentKafkaListenerContainerFactory<String, MyPayload> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myPayloadKafkaConsumerFactory);
factory.setConcurrency(1);
// the relevant part
factory.getContainerProperties().setObservationEnabled(true);
factory.getContainerProperties().setObservationRegistry(observationRegistry);
return factory;
}
Then after throwing an exception from the consumer:
@RetryableTopic(
kafkaTemplate = "myProcessRecoverKafkaTemplate",
attempts = "3",
backoff = @Backoff(
delay = 2000, maxDelay = 10000, multiplier = 5
)
)
@KafkaListener(
id = "processmyProcess",
topics = "my-process",
containerFactory = "myProcessKafkaListenerContainerFactory"
)
public void processPayload(
ConsumerRecord<String, MyPayload> data
) {
if (1 == 1) {
throw new CustomException();
}
}
This is what I see in the metric:
spring_kafka_listener_seconds_count{error="ListenerExecutionFailedException",messaging_kafka_consumer_group="processMethod-retry-10000",messaging_operation="receive",messaging_source_kind="topic",messaging_source_name="my-method-retry-10000",messaging_system="kafka",spring_kafka_listener_id="processMethod-retry-10000-0"} 6
Without configuring micrometer observation:
spring_kafka_listener_seconds_sum{exception="ListenerExecutionFailedException",name="my-process-retry-4000-0",result="failure"} 0.02504
I tracked to this piece of code:
Though here I don't know if this exclusive to the micrometer version of the metric or not.
I've described as a bug because I find the metric not particular insightful when every error counts towards the same metric regardless of what happens in the consumer.
To Reproduce
Create a kafka consumer, configure micrometer observability (see above), throw a custom exception from within the consumer and check what metric was generated.
Expected behavior
If MyException
is thrown during the consumption of the message, that's what I'd like to see in the metric.
basically:
catch (ListenerExecutionFailedException e) {
listenerError = e;
if (e.getCause() != null) {
currentObservation.error(e.getCause());
} else {
currentObservation.error(e);
}
handleException(records, acknowledgment, consumer, message, e);
}
Maybe I'm oversimplifying and the actual solution is something else, but it makes very explicit what I mean.
And then that's what I would like to see in the metric:
spring_kafka_listener_seconds_count{error="MyException",messaging_kafka_consumer_group="processMethod-retry-10000",messaging_operation="receive",messaging_source_kind="topic",messaging_source_name="my-method-retry-10000",messaging_system="kafka",spring_kafka_listener_id="processMethod-retry-10000-0"} 6
Sample
I didn't prepare one, but if really needed, I can provide.