Skip to content

Kafka inbound channel adapter no longer adds 'id' and 'timestamp' headers #9801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
smitsjelle opened this issue Jan 30, 2025 · 1 comment
Closed

Comments

@smitsjelle
Copy link
Contributor

In what version(s) of Spring Integration are you seeing this issue?
Encountered in 6.4.1, seems to be introduced in 6.3.0

Describe the bug

When updating my application from SI 6.2.x to 6.4.1 I found out that whereas the Kafka inbound channel adapter previously produced messages with the MessageHeaders.ID and the MessageHeaders.TIMESTAMP headers, it stopped doing so.

On investigation, the changed behavior comes from GH-7925, which is why I'm posting the issue here. With this PR, MessageHistory#write(..) no longer defaults to execute message = messageBuilderFactory.fromMessage(message).setHeader(HEADER_NAME, history).build();, that would trigger default behavior to add the headers.

The root cause may be deeper in either Spring Framework or Spring Kafka: on conversion from Kafka Message to Spring Message, it is constructed by Spring Kafka eventually through MessagingMessageConverter#toMessage(..) that subsequently calls MessageBuilder.createMessage(..). This method doesn't do a .build() that would normally trigger the creation of the headersToUse that do include the MessageHeaders.ID and MessageHeaders.TIMESTAMP headers.

To Reproduce

In Spring Integration test MessageSourceTests#testAckCommon(), add 2 assertions with the header assertions that start in line 295:

assertThat(received.getHeaders().get(MessageHeaders.TIMESTAMP)).isNotNull();
assertThat(received.getHeaders().get(MessageHeaders.ID)).isNotNull();

This should result in test cases testAckAsyncCommits, testAckSyncCommits and testAckSyncCommitsTimeout to fail.

Expected behavior

I'd expect inbounds to consistently produce SI messages with ID and timestamp header. I tested some other inbound (JMS), that does not seem to have this issue.
Tested JMS by adjusting JmsInboundChannelAdapterTests#testTransactionalReceive() to:

JmsTemplate template = new JmsTemplate(connectionFactory);
template.convertAndSend("incatQ", "bar");
Message<?> message = out.receive(20000);
assertThat(message).isNotNull();
assertThat(message.getHeaders()).containsKeys(MessageHeaders.ID, MessageHeaders.TIMESTAMP);
@smitsjelle smitsjelle added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jan 30, 2025
@artembilan
Copy link
Member

The behavior is correct.
That MessagingMessageConverter was always like that by default:

public class MessagingMessageConverter implements RecordMessageConverter {

	protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR

	private final Function<Message<?>, Integer> partitionProvider;

	private boolean generateMessageId = false;

	private boolean generateTimestamp = false;

The side effect that you had those headers in previous versions just because indeed the MessageHistory has always created a new message on track.
Right now that does not happen and the message produced from the KafkaMessageSource comes without those headers by default.

The easy fix for you is to inject MessagingMessageConverter into this KafkaMessageSource with those properties set to true.

At the same time I agree with you about consistency between Spring Integration channel adapters.
So, I'm treating this issue as an improvement and will make those generateMessageId and generateTimestamp as true by default in this KafkaMessageSource.
But this is going to be as a feature for the current 6.5 version since it is going to be a slight breaking change in the behavior.

I don't treat it as a bug and won't back-port it into the mentioned version since the behavior is really expected according to the default Spring for Apache Kafka settings and there is an easy workaround.

@artembilan artembilan added this to the 6.5.0-M2 milestone Jan 30, 2025
@artembilan artembilan added type: enhancement in: Kafka and removed type: bug status: waiting-for-triage The issue need to be evaluated and its future decided labels Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants