Skip to content

Commit 7f45017

Browse files
Igor Macedo Quintanilhaigormq
authored andcommitted
support per-record observations in batch listeners
Signed-off-by: Igor Macedo Quintanilha <[email protected]>
1 parent 6425682 commit 7f45017

File tree

6 files changed

+491
-3
lines changed

6 files changed

+491
-3
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
= Micrometer Support
2+
3+
Spring for Apache Kafka provides support for Micrometer.
4+
5+
[[listener-obs]]
6+
== Listener Observations
7+
8+
When you have `micrometer-observation` on the class path, and you configure a
9+
`ObservationRegistry` in the application context, the container will automatically create
10+
`Observation`s for consumed records.
11+
You can disable this feature by setting the `observationEnabled` container property to
12+
`false`.
13+
14+
The default `KafkaListenerObservationConvention` is `DefaultKafkaListenerObservationConvention`
15+
which provides the following observation names and key/values.
16+
17+
Listener observation names:
18+
19+
* `spring.kafka.listener`
20+
21+
Key Values:
22+
23+
* `spring.kafka.listener.id` - the listener id.
24+
* `spring.kafka.group.id` - the consumer group id.
25+
* `spring.kafka.client.id` - the consumer client id.
26+
* `spring.kafka.cluster.id` - the cluster id.
27+
28+
When using a record listener (`@KafkaListener` method), a single observation is
29+
created for each record.
30+
31+
[[batch-listener-obs]]
32+
=== Batch Listener Observations
33+
34+
When using a batch listener, by default, no observations are created, even if a
35+
`ObservationRegistry` is present.
36+
This is because the scope of an observation is tied to the thread, and with a batch
37+
listener, there is no one-to-one mapping between an observation and a record.
38+
39+
To enable per-record observations in a batch listener, set the container factory
40+
property `recordObservationsInBatch` to `true`.
41+
42+
[source,java]
43+
----
44+
@Bean
45+
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
46+
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
47+
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
48+
49+
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
50+
configurer.configure(factory, kafkaConsumerFactory);
51+
factory.getContainerProperties().setRecordObservationsInBatch(true);
52+
return factory;
53+
}
54+
----
55+
56+
When this property is `true`, an observation will be created for each record in the
57+
batch, similar to a record listener.
58+
This allows you to have visibility into the processing of each record, even within a
59+
batch context.
60+
No spans will be created for the batch itself, only for each record.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,10 @@ For details, see xref:kafka/receiving-messages/rebalance-listeners.adoc#new-reba
7676

7777
The `DefaultKafkaHeaderMapper` and `SimpleKafkaHeaderMapper` support multi-value header mapping for Kafka records.
7878
More details are available in xref:kafka/headers.adoc#multi-value-header[Support multi-value header mapping].
79+
80+
[[x40-batch-observability]]
81+
=== Per-Record Observation in Batch Listeners
82+
83+
It is now possible to get an observation for each record when using a batch listener.
84+
Previously, only a single observation was created for the entire batch.
85+
See xref:micrometer.adoc#batch-listener-obs[Observability for Batch Listeners] for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,8 @@ public enum EOSMode {
310310

311311
private boolean restartAfterAuthExceptions;
312312

313+
private boolean recordObservationsInBatch;
314+
313315
/**
314316
* Create properties for a container that will subscribe to the specified topics.
315317
* @param topics the topics.
@@ -1091,6 +1093,27 @@ public void setRestartAfterAuthExceptions(boolean restartAfterAuthExceptions) {
10911093
this.restartAfterAuthExceptions = restartAfterAuthExceptions;
10921094
}
10931095

1096+
/**
1097+
* When true, and a batch listener is configured with observation enabled, an observation
1098+
* will be started for each record in the batch.
1099+
* @return recordObservationsInBatch.
1100+
* @since 4.0
1101+
*/
1102+
public boolean isRecordObservationsInBatch() {
1103+
return this.recordObservationsInBatch;
1104+
}
1105+
1106+
/**
1107+
* Set whether to enable individual record observations in a batch.
1108+
* When true, and a batch listener is configured with observation enabled, an observation
1109+
* will be started for each record in the batch. Default false.
1110+
* @param recordObservationsInBatch true to enable individual record observations.
1111+
* @since 4.0
1112+
*/
1113+
public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
1114+
this.recordObservationsInBatch = recordObservationsInBatch;
1115+
}
1116+
10941117
@Override
10951118
public String toString() {
10961119
return "ContainerProperties ["
@@ -1141,6 +1164,7 @@ public String toString() {
11411164
? "\n observationRegistry=" + this.observationRegistry
11421165
: "")
11431166
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
1167+
+ "\n recordObservationsInBatch=" + this.recordObservationsInBatch
11441168
+ "\n]";
11451169
}
11461170

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
898898
this.isBatchListener = true;
899899
this.wantsFullRecords = this.batchListener.wantsPollResult();
900900
this.pollThreadStateProcessor = setUpPollProcessor(true);
901-
this.observationEnabled = false;
901+
this.observationEnabled = this.containerProperties.isObservationEnabled() && this.containerProperties.isRecordObservationsInBatch();
902902
}
903903
else if (listener instanceof MessageListener) {
904904
this.listener = (MessageListener<K, V>) listener;
@@ -2443,7 +2443,13 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
24432443
}
24442444
}
24452445
Object sample = startMicrometerSample();
2446+
2447+
24462448
try {
2449+
if (this.observationEnabled) {
2450+
invokeBatchWithIndividualRecordObservation(recordList);
2451+
}
2452+
24472453
if (this.wantsFullRecords) {
24482454
Objects.requireNonNull(this.batchListener).onMessage(records, // NOSONAR
24492455
this.isAnyManualAck
@@ -4005,6 +4011,21 @@ private Long computeBackwardWhereTo(long offset, boolean toCurrent, TopicPartiti
40054011

40064012
}
40074013

4014+
private void invokeBatchWithIndividualRecordObservation(List<ConsumerRecord<K, V>> recordList) {
4015+
// Create individual observations for each record without scopes
4016+
for (ConsumerRecord<K, V> record : recordList) {
4017+
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
4018+
this.containerProperties.getObservationConvention(),
4019+
DefaultKafkaListenerObservationConvention.INSTANCE,
4020+
() -> new KafkaRecordReceiverContext(record, getListenerId(), getClientId(), this.consumerGroupId,
4021+
this::clusterId),
4022+
this.observationRegistry);
4023+
observation.observe(() -> {
4024+
logger.debug(() -> "Observing record in batch: " + KafkaUtils.format(record));
4025+
});
4026+
}
4027+
}
4028+
40084029
}
40094030

40104031

0 commit comments

Comments
 (0)