Skip to content

Commit 4335435

Browse files
committed
This change fixes an NPE when group id is null and observation is enabled.
Signed-off-by: cfredri4 <[email protected]>
1 parent cfd7c98 commit 4335435

File tree

1 file changed

+26
-10
lines changed

1 file changed

+26
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaListenerObservation.java

+26-10
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
* @author Gary Russell
3333
* @author Christian Mergenthaler
3434
* @author Wang Zhiyang
35+
* @author Christian Fredriksson
3536
*
3637
* @since 3.0
3738
*
@@ -224,33 +225,45 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi
224225
new DefaultKafkaListenerObservationConvention();
225226

226227
@Override
228+
@NonNull
227229
public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context) {
228-
229-
return KeyValues.of(
230+
String groupId = context.getGroupId();
231+
KeyValues keyValues = KeyValues.of(
230232
ListenerLowCardinalityTags.LISTENER_ID.withValue(context.getListenerId()),
231233
ListenerLowCardinalityTags.MESSAGING_SYSTEM.withValue("kafka"),
232234
ListenerLowCardinalityTags.MESSAGING_OPERATION.withValue("receive"),
233235
ListenerLowCardinalityTags.MESSAGING_SOURCE_NAME.withValue(context.getSource()),
234-
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic"),
235-
ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(context.getGroupId())
236+
ListenerLowCardinalityTags.MESSAGING_SOURCE_KIND.withValue("topic")
236237
);
238+
239+
if (StringUtils.hasText(groupId)) {
240+
keyValues = keyValues
241+
.and(ListenerLowCardinalityTags.MESSAGING_CONSUMER_GROUP.withValue(groupId));
242+
}
243+
244+
return keyValues;
237245
}
238246

239247
@Override
240248
@NonNull
241249
public KeyValues getHighCardinalityKeyValues(KafkaRecordReceiverContext context) {
242250
String clientId = context.getClientId();
251+
String consumerId = getConsumerId(context.getGroupId(), clientId);
243252
KeyValues keyValues = KeyValues.of(
244253
ListenerHighCardinalityTags.MESSAGING_PARTITION.withValue(context.getPartition()),
245-
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset()),
246-
ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(getConsumerId(context, clientId))
254+
ListenerHighCardinalityTags.MESSAGING_OFFSET.withValue(context.getOffset())
247255
);
248256

249257
if (StringUtils.hasText(clientId)) {
250258
keyValues = keyValues
251259
.and(ListenerHighCardinalityTags.MESSAGING_CLIENT_ID.withValue(clientId));
252260
}
253261

262+
if (StringUtils.hasText(consumerId)) {
263+
keyValues = keyValues
264+
.and(ListenerHighCardinalityTags.MESSAGING_CONSUMER_ID.withValue(consumerId));
265+
}
266+
254267
return keyValues;
255268
}
256269

@@ -259,11 +272,14 @@ public String getContextualName(KafkaRecordReceiverContext context) {
259272
return context.getSource() + " receive";
260273
}
261274

262-
private static @Nullable String getConsumerId(KafkaRecordReceiverContext context, @Nullable String clientId) {
263-
if (StringUtils.hasText(clientId)) {
264-
return context.getGroupId() + " - " + clientId;
275+
private static @Nullable String getConsumerId(@Nullable String groupId, @Nullable String clientId) {
276+
if (StringUtils.hasText(groupId)) {
277+
if (StringUtils.hasText(clientId)) {
278+
return groupId + " - " + clientId;
279+
}
280+
return groupId;
265281
}
266-
return context.getGroupId();
282+
return clientId;
267283
}
268284

269285
}

0 commit comments

Comments
 (0)