Skip to content

Kafka ProducerRecord may end up with duplicated trace headers #3095

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
GFriedrich opened this issue Mar 7, 2025 · 12 comments
Closed

Kafka ProducerRecord may end up with duplicated trace headers #3095

GFriedrich opened this issue Mar 7, 2025 · 12 comments
Labels

Comments

@GFriedrich
Copy link
Contributor

GFriedrich commented Mar 7, 2025

Describe the issue
I found a bit of a weird behaviour when enabling the observation on Spring Integrations and the Spring Kafka binder.
In this case the Spring Integration already adds a traceparent header (e.g. when using W3C traces) to the internal Message but later on the Spring Kafka binder will also add another traceparent header.

To Reproduce
Steps to reproduce the behavior:

  1. Enable the observation for Spring Integrations via spring.integration.management.observation-patterns
  2. Enable Spring Kafka binder observations via spring.cloud.stream.kafka.binder.enable-observation
  3. Send a message to a StreamBridge
  4. See that the produced record on Kafka has two traceparent headers

Version of the framework
4.2.0

Expected behavior
There should be just one traceparent header on a produced record.

Additional context
Even though it is already possible to handle that in a way by suppressing the header from the Spring Integration via spring.cloud.stream.kafka.default.producer.header-patterns it would be great to see a more permanent solution.
I would suggest to either:

  • add a default exclusion for traceparent headers to the BinderHeaderMapper implementation (potentially also other headers would be needed then in case people use something different than W3C headers)
  • or make the KafkaRecordSenderContext to first remove the same existing header and replacing it instead of just adding new ones
@GFriedrich GFriedrich changed the title Kafka ProducerRecord may end up with two trace headers Kafka ProducerRecord may end up with duplicated trace headers Mar 7, 2025
@artembilan
Copy link
Member

Can you, please, share with us what are those header values?
I feel like the producer binding has to participate in the trace already provided by Spring Integration.

@GFriedrich
Copy link
Contributor Author

GFriedrich commented Mar 7, 2025

An example would be (in the format of key - value):

  • traceparent - 00-b88cd047dc48dd0954e10f1a51895eac-a857be2ab4c1c87d-00
  • traceparent - 00-b88cd047dc48dd0954e10f1a51895eac-b48ae52b577f6259-00

So of course the trace ID is the same, but the span is different because one is from the outer Spring Integration span and the other one is from the Spring Kafka binder observation.

@artembilan
Copy link
Member

Thanks for confirmation!
And I guess that is in the ProducerRecord because that one has an ability for multi-value of the same header.
I believe that the one we create in binding producer has to override the one coming from upstream.
It is already participating in the existing trace, so no need to keep the parent one.

Any chances that you can share with us a simple project we can reproduce and play with on our side?

@GFriedrich
Copy link
Contributor Author

And I guess that is in the ProducerRecord because that one has an ability for multi-value of the same header.

Yes exactly.

I believe that the one we create in binding producer has to override the one coming from upstream.
It is already participating in the existing trace, so no need to keep the parent one.

Yea that is also what I'm thinking.

Any chances that you can share with us a simple project we can reproduce and play with on our side?

I don't have any sample project at hand right now. But if you follow the reproduction steps it should be very easy to reproduce.

@artembilan
Copy link
Member

OK. I'll try to make one myself:

spring.integration.management.observation-patterns=*
spring.cloud.stream.kafka.binder.enable-observation=true

And send message to the Kafka topic via StreamBridge.

Is that correct?

@GFriedrich
Copy link
Contributor Author

Is that correct?

Yep.

@artembilan
Copy link
Member

So, I created a simple Spring Boot project for Kafka Binder.
There is no any code in main and no any properties.
I was able to cover everything with just test class and its configuration:

package org.springframework.cloud.stream.sample.gh3095;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.actuate.observability.AutoConfigureObservability;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(properties = {
		"spring.integration.management.observation-patterns=*",
		"spring.cloud.stream.kafka.binder.enable-observation=true",
		"spring.cloud.stream.output-bindings=testDestination"
})
@EmbeddedKafka
@DirtiesContext
@AutoConfigureObservability
class SpringCloudStream3095ApplicationTests {

	@Autowired
	StreamBridge streamBridge;

	@Autowired
	TestConfig testConfig;

	@Test
	void verifyTraceParentHeaderInProducerRecord() {
		streamBridge.send("testDestination", "testData");

		assertThat(this.testConfig.producerRecordCompletableFuture).succeedsWithin(10, TimeUnit.SECONDS)
				.extracting(ProducerRecord::headers)
				.extracting((headers) -> headers.headers("traceparent"))
				.asInstanceOf(InstanceOfAssertFactories.ITERABLE)
				.hasSize(1);
	}

	@TestConfiguration
	static class TestConfig {

		final CompletableFuture<ProducerRecord<Object, Object>> producerRecordCompletableFuture =
				new CompletableFuture<>();

		@Bean
		ProducerListener<Object, Object> testProducerListener() {
			return new ProducerListener<>() {

				@Override
				public void onSuccess(ProducerRecord<Object, Object> producerRecord, RecordMetadata recordMetadata) {
					TestConfig.this.producerRecordCompletableFuture.complete(producerRecord);
				}

			};
		}

	}

}

It indeed fails like:

Expected size: 1 but was: 2 in:
[RecordHeader(key = traceparent, value = [48, 48, 45, 54, 55, 99, 98, 51, 51, 99, 102, 100, 49, 50, 100, 100, 54, 97, 101, 49, 48, 101, 97, 100, 53, 53, 102, 97, 97, 53, 97, 52, 56, 99, 101, 45, 49, 48, 101, 97, 100, 53, 53, 102, 97, 97, 53, 97, 52, 56, 99, 101, 45, 48, 48]),
    RecordHeader(key = traceparent, value = [48, 48, 45, 54, 55, 99, 98, 51, 51, 99, 102, 100, 49, 50, 100, 100, 54, 97, 101, 49, 48, 101, 97, 100, 53, 53, 102, 97, 97, 53, 97, 52, 56, 99, 101, 45, 52, 48, 99, 49, 54, 57, 97, 102, 57, 102, 51, 57, 49, 56, 56, 48, 45, 48, 48])]

The first header is a result of observation on the MessageChannel for that predefined spring.cloud.stream.output-bindings=testDestination.
If it is dynamic, no extra header value: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/producing-and-consuming-messages.html#streambridge-and-dynamic-destinations.

Looking further for the place where we add extra one...

@artembilan
Copy link
Member

So, in debug I see, that first traceparent is populated by the KafkaProducerMessageHandler used in the Kafka Binder.
This one creates a ProducerRecord and then uses a KafkaTemplate.

The second one is created by the KafkaRecordSenderContext from that KafkaTemplate:

super((carrier, key, value) -> record.headers().add(key,
				value == null ? null : value.getBytes(StandardCharsets.UTF_8)));

So, look like Spring Kafka issue and Spring Cloud Stream is suffering here just with consequences of components involved.

We cannot transfer issues between GitHub organization, so we would appreciate if you can created an issue in https://github.com/spring-projects/spring-kafka with a link to this one.
And then this one could be closed in favor of that.
Just because there is really nothing to do from Spring Cloud Stream perspective.

Thank you!

@GFriedrich
Copy link
Contributor Author

Sure will do, lets hope this doesn't result in a ping-pong of the issue between both projects. 😄

@artembilan
Copy link
Member

this doesn't result in a ping-pong

No, it won't. Because the one who takes care about that issue in Spring Kafka would be me 😉

@GFriedrich
Copy link
Contributor Author

No, it won't. Because the one who takes care about that issue in Spring Kafka would be me 😉

Ha, great. 😄
The issue is now available at spring-projects/spring-kafka#3786

@artembilan
Copy link
Member

Closed in favor of: spring-projects/spring-kafka#3786

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Mar 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants