diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java index 0d18699313..50e9e310a5 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/micrometer/KafkaRecordSenderContext.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2024 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,7 @@ import io.micrometer.observation.transport.SenderContext; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; /** * {@link SenderContext} for {@link ProducerRecord}s. @@ -28,6 +29,7 @@ * @author Gary Russell * @author Christian Mergenthaler * @author Wang Zhiyang + * @author Soby Chacko * * @since 3.0 * @@ -39,8 +41,12 @@ public class KafkaRecordSenderContext extends SenderContext private final ProducerRecord record; public KafkaRecordSenderContext(ProducerRecord record, String beanName, Supplier clusterId) { - super((carrier, key, value) -> record.headers().add(key, - value == null ? null : value.getBytes(StandardCharsets.UTF_8))); + super((carrier, key, value) -> { + Headers headers = record.headers(); + headers.remove(key); + headers.add(key, value == null ? null : value.getBytes(StandardCharsets.UTF_8)); + }); + setCarrier(record); this.beanName = beanName; this.record = record; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java index 99442dd2ba..10f32d2a45 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java @@ -16,6 +16,7 @@ package org.springframework.kafka.support.micrometer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Deque; import java.util.List; @@ -26,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.StreamSupport; import io.micrometer.common.KeyValues; import io.micrometer.core.instrument.MeterRegistry; @@ -56,6 +58,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -78,6 +81,7 @@ import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention; import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention; import org.springframework.kafka.test.EmbeddedKafkaBroker; @@ -104,7 +108,7 @@ @SpringJUnitConfig @EmbeddedKafka(topics = { ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2, ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, - ObservationTests.OBSERVATION_ERROR }, partitions = 1) + ObservationTests.OBSERVATION_ERROR, ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE }, partitions = 1) @DirtiesContext public class ObservationTests { @@ -122,6 +126,8 @@ public class ObservationTests { public final static String OBSERVATION_ERROR_MONO = "observation.error.mono"; + public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate"; + @Test void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate template, @Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler, @@ -449,6 +455,62 @@ void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig( assertThat(template.getKafkaAdmin()).isSameAs(kafkaAdmin); } + @Test + void verifyKafkaRecordSenderContextTraceParentHandling() { + String initialTraceParent = "traceparent-from-previous"; + String updatedTraceParent = "traceparent-current"; + ProducerRecord record = new ProducerRecord<>("test-topic", "test-value"); + record.headers().add("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8)); + + // Create the context and update the traceparent + KafkaRecordSenderContext context = new KafkaRecordSenderContext( + record, + "test-bean", + () -> "test-cluster" + ); + context.getSetter().set(record, "traceparent", updatedTraceParent); + + Iterable
traceparentHeaders = record.headers().headers("traceparent"); + + List headerValues = StreamSupport.stream(traceparentHeaders.spliterator(), false) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .toList(); + + // Verify there's only one traceparent header and it contains the updated value + assertThat(headerValues).containsExactly(updatedTraceParent); + } + + @Test + void verifyTraceParentHeader(@Autowired KafkaTemplate template, + @Autowired SimpleTracer tracer) throws Exception { + CompletableFuture> producerRecordFuture = new CompletableFuture<>(); + template.setProducerListener(new ProducerListener<>() { + @Override + public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { + producerRecordFuture.complete(producerRecord); + } + }); + String initialTraceParent = "traceparent-from-previous"; + Header header = new RecordHeader("traceparent", initialTraceParent.getBytes(StandardCharsets.UTF_8)); + ProducerRecord producerRecord = new ProducerRecord<>( + OBSERVATION_TRACEPARENT_DUPLICATE, + null, null, null, + "test-value", + List.of(header) + ); + + template.send(producerRecord).get(10, TimeUnit.SECONDS); + ProducerRecord recordResult = producerRecordFuture.get(10, TimeUnit.SECONDS); + + Iterable
traceparentHeaders = recordResult.headers().headers("traceparent"); + assertThat(traceparentHeaders).hasSize(1); + + String traceparentValue = new String(traceparentHeaders.iterator().next().value(), StandardCharsets.UTF_8); + assertThat(traceparentValue).isEqualTo("traceparent-from-propagator"); + + tracer.getSpans().clear(); + } + @Configuration @EnableKafka public static class Config { @@ -598,6 +660,9 @@ public List fields() { public void inject(TraceContext context, @Nullable C carrier, Setter setter) { setter.set(carrier, "foo", "some foo value"); setter.set(carrier, "bar", "some bar value"); + + // Add a traceparent header to simulate W3C trace context + setter.set(carrier, "traceparent", "traceparent-from-propagator"); } // This is called on the consumer side when the message is consumed @@ -606,7 +671,9 @@ public void inject(TraceContext context, @Nullable C carrier, Setter sett public Span.Builder extract(C carrier, Getter getter) { String foo = getter.get(carrier, "foo"); String bar = getter.get(carrier, "bar"); - return tracer.spanBuilder().tag("foo", foo).tag("bar", bar); + return tracer.spanBuilder() + .tag("foo", foo) + .tag("bar", bar); } }; }