Skip to content

Commit 054ff57

Browse files
committed
GH-9801: Kafka: generate ID & TIMESTAMP headers by default
Fixes: #9801 Spring Integration Apache Kafka inbound channel adapters have always produced messages without `ID` & `TIMESTAMP` headers. See `MessagingMessageConverter` used over there by default. However, for consistency with the rest of Spring Integration channel adapters, it would be better to have Kafka-specific behave same way. * Configure default `MessagingMessageConverter` in the `KafkaMessageSource` & `KafkaMessageDrivenChannelAdapter` for generating `ID` & `TIMESTAMP` headers.
1 parent c44363a commit 054ff57

File tree

5 files changed

+37
-19
lines changed

5 files changed

+37
-19
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2023 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -133,6 +133,9 @@ public KafkaMessageDrivenChannelAdapter(AbstractMessageListenerContainer<K, V> m
133133

134134
if (JacksonPresent.isJackson2Present()) {
135135
MessagingMessageConverter messageConverter = new MessagingMessageConverter();
136+
// For consistency with the rest of Spring Integration channel adapters
137+
messageConverter.setGenerateMessageId(true);
138+
messageConverter.setGenerateTimestamp(true);
136139
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
137140
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
138141
messageConverter.setHeaderMapper(headerMapper);
@@ -217,8 +220,7 @@ public void setRetryTemplate(RetryTemplate retryTemplate) {
217220
* channel is configured). Only used if a
218221
* {@link #setRetryTemplate(RetryTemplate)} is specified. Default is an
219222
* {@link ErrorMessageSendingRecoverer} if an error channel has been provided. Set to
220-
* null if you wish to throw the exception back to the container after retries are
221-
* exhausted.
223+
* null if you wish to throw the exception back to the container after retries are exhausted.
222224
* @param recoveryCallback the recovery callback.
223225
*/
224226
public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -256,10 +256,15 @@ public KafkaMessageSource(ConsumerFactory<K, V> consumerFactory,
256256
Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic
257257
this.commitTimeout = consumerProperties.getSyncCommitTimeout();
258258

259+
MessagingMessageConverter messagingMessageConverter = (MessagingMessageConverter) this.messageConverter;
260+
// For consistency with the rest of Spring Integration channel adapters
261+
messagingMessageConverter.setGenerateMessageId(true);
262+
messagingMessageConverter.setGenerateTimestamp(true);
263+
259264
if (JacksonPresent.isJackson2Present()) {
260265
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper();
261266
headerMapper.addTrustedPackages(JacksonJsonUtils.DEFAULT_TRUSTED_PACKAGES.toArray(new String[0]));
262-
((MessagingMessageConverter) this.messageConverter).setHeaderMapper(headerMapper);
267+
messagingMessageConverter.setHeaderMapper(headerMapper);
263268
}
264269
}
265270

@@ -843,7 +848,7 @@ private void rollback(ConsumerRecord<K, V> record) {
843848
i.setRolledBack(true);
844849
return i.getRecord().offset();
845850
})
846-
.collect(Collectors.toList());
851+
.toList();
847852
if (!rewound.isEmpty()) {
848853
this.logger.warn(() -> "Rolled back " + KafkaUtils.format(record)
849854
+ " later in-flight offsets "

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2024 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -166,12 +166,14 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
166166
assertThat(received).isNotNull();
167167

168168
MessageHeaders headers = received.getHeaders();
169-
assertThat(headers.get(KafkaHeaders.RECEIVED_KEY)).isEqualTo(1);
170-
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic1);
171-
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION)).isEqualTo(0);
172-
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
173-
assertThat(headers.get(KafkaHeaders.RECEIVED_TIMESTAMP)).isEqualTo(1487694048607L);
174-
assertThat(headers.get(KafkaHeaders.TIMESTAMP_TYPE)).isEqualTo("CREATE_TIME");
169+
assertThat(headers)
170+
.containsEntry(KafkaHeaders.RECEIVED_KEY, 1)
171+
.containsEntry(KafkaHeaders.RECEIVED_TOPIC, topic1)
172+
.containsEntry(KafkaHeaders.RECEIVED_PARTITION, 0)
173+
.containsEntry(KafkaHeaders.OFFSET, 0L)
174+
.containsEntry(KafkaHeaders.RECEIVED_TIMESTAMP, 1487694048607L)
175+
.containsEntry(KafkaHeaders.TIMESTAMP_TYPE, "CREATE_TIME")
176+
.containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID);
175177

176178
assertThat(headers.get("testHeader")).isEqualTo("testValue");
177179

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2024 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -69,6 +69,7 @@
6969
import org.springframework.kafka.support.TopicPartitionOffset;
7070
import org.springframework.kafka.test.utils.KafkaTestUtils;
7171
import org.springframework.messaging.Message;
72+
import org.springframework.messaging.MessageHeaders;
7273

7374
import static org.assertj.core.api.Assertions.assertThat;
7475
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
@@ -297,6 +298,7 @@ private void testAckCommon(boolean sync, boolean timeout) {
297298
assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class);
298299
assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
299300
.isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA));
301+
assertThat(received.getHeaders()).containsKeys(MessageHeaders.TIMESTAMP, MessageHeaders.ID);
300302
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
301303
.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
302304
received = source.receive();

src/reference/antora/modules/ROOT/pages/whats-new.adoc

+12-5
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,34 @@ The `AbstractCorrelatingMessageHandler` does not throw an `IllegalArgumentExcept
2727
Instead, such a collection is wrapped into a single reply message.
2828
See xref:aggregator.adoc[Aggregator] for more information.
2929

30-
[[x6.4-correlation-changes]]
30+
[[x6.5-correlation-changes]]
3131
== The `discardIndividuallyOnExpiry` Option For Correlation Handlers
3232

3333
The aggregator and resequencer can now discard the whole expired group as a single message via setting `discardIndividuallyOnExpiry` to `false`.
3434
See xref:aggregator.adoc#releasestrategy[ReleaseStrategy] for more information.
3535

36-
[[x6.4-message-store-with-locks]]
36+
[[x6.5-message-store-with-locks]]
3737
== The `LockRegistry` in the `MessageStore`
3838

3939
The `AbstractMessageGroupStore` now can be configured with a `LockRegistry` to perform series of persistent operation atomically.
4040
See xref:message-store.adoc#use-lock-registry[Use LockRegistry] for more information.
4141

42-
[[x6.4-observation-changes]]
42+
[[x6.5-observation-changes]]
4343
== Micrometer Observation Changes
4444

4545
The `SourcePollingChannelAdapter` endpoint now starts a `CONSUMER` kind observation for the received message.
4646
The `MessageReceiverContext` now distinguishes between `handler`, `message-source` and `message-producer` values for the `spring.integration.type` low cardinality tag.
4747
See xref:metrics.adoc#micrometer-observation[Micrometer Observation] for more information.
4848

49-
[[x6.4-mqtt-changes]]
49+
[[x6.5-mqtt-changes]]
5050
== Optional Paho MQTT Dependencies
5151

5252
The `org.eclipse.paho:org.eclipse.paho.client.mqttv3` dependency for `spring-integration-mqtt` is now also optional as `org.eclipse.paho:org.eclipse.paho.mqttv5.client` always was.
53-
See xref:mqtt.adoc[MQTT Support] for more information.
53+
See xref:mqtt.adoc[MQTT Support] for more information.
54+
55+
[[x6.5-kafka-changes]]
56+
== Apache Kafka support Changes
57+
58+
The `KafkaMessageSource` and `KafkaMessageDrivenChannelAdapter` now generate `MessageHeaders.ID` and `MessageHeaders.TIMESTAMP` headers by default as the rest of Spring Integration channel adapters.
59+
The behavior can be restored to the previous with injection of the `MessagingMessageConverter` with default settings.
60+
See xref:kafka.adoc[Apache Kafka Support] for more information.

0 commit comments

Comments
 (0)