Skip to content

Commit d327eac

Browse files
cfredri4sobychacko
authored andcommitted
Handle null group id in listener observation (#3778)
This change fixes an NPE when group id is null and observation is enabled. Fixes: #3778 Signed-off-by: cfredri4 <[email protected]> (cherry picked from commit ccf4666)
1 parent 5726123 commit d327eac

File tree

2 files changed

+68
-10
lines changed

2 files changed

+68
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java

+26-10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* @author Gary Russell
3333
* @author Christian Mergenthaler
3434
* @author Wang Zhiyang
35+
* @author Christian Fredriksson
3536
*
3637
* @since 3.0
3738
*
@@ -224,33 +225,45 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi
224225
new DefaultKafkaListenerObservationConvention();
225226

226227
@Override
228+
@NonNull
227229
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {
228-
229-
return KeyValues.of(
230+
String groupId = context.getGroupId();
231+
KeyValues keyValues = KeyValues.of(
230232
ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()),
231233
ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"),
232234
ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"),
233235
ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()),
234-
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"),
235-
ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId())
236+
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic")
236237
);
238+
239+
if (StringUtils.hasText(groupId)) {
240+
keyValues = keyValues
241+
.and(ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(groupId));
242+
}
243+
244+
return keyValues;
237245
}
238246

239247
@Override
240248
@NonNull
241249
public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) {
242250
String clientId = context.getClientId();
251+
String consumerId = getConsumerId(context.getGroupId(), clientId);
243252
KeyValues keyValues = KeyValues.of(
244253
ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()),
245-
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()),
246-
ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context, clientId))
254+
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset())
247255
);
248256

249257
if (StringUtils.hasText(clientId)) {
250258
keyValues = keyValues
251259
.and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(clientId));
252260
}
253261

262+
if (StringUtils.hasText(consumerId)) {
263+
keyValues = keyValues
264+
.and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(consumerId));
265+
}
266+
254267
return keyValues;
255268
}
256269

@@ -259,11 +272,14 @@ public String getContextualName(KafkaRecordReceiverContext context) {
259272
return context.getSource() + " receive";
260273
}
261274

262-
private static String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) {
263-
if (StringUtils.hasText(clientId)) {
264-
return context.getGroupId() + " - " + clientId;
275+
private static @Nullable String getConsumerId(@Nullable String groupId, @Nullable String clientId) {
276+
if (StringUtils.hasText(groupId)) {
277+
if (StringUtils.hasText(clientId)) {
278+
return groupId + " - " + clientId;
279+
}
280+
return groupId;
265281
}
266-
return context.getGroupId();
282+
return clientId;
267283
}
268284

269285
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2020-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.micrometer;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.junit.jupiter.api.Test;
21+
22+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
23+
24+
/**
25+
* @author Christian Fredriksson
26+
*/
27+
public class KafkaListenerObservationTests {
28+
29+
@Test
30+
void lowCardinalityKeyValues() {
31+
ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 1, 2, "key", "value");
32+
KafkaRecordReceiverContext context = new KafkaRecordReceiverContext(record, "listener", () -> null);
33+
DefaultKafkaListenerObservationConvention.INSTANCE.getLowCardinalityKeyValues(context);
34+
}
35+
36+
@Test
37+
void highCardinalityKeyValues() {
38+
ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 1, 2, "key", "value");
39+
KafkaRecordReceiverContext context = new KafkaRecordReceiverContext(record, "listener", () -> null);
40+
DefaultKafkaListenerObservationConvention.INSTANCE.getHighCardinalityKeyValues(context);
41+
}
42+
}

0 commit comments

Comments
 (0)