Skip to content

Commit df2d21a

Browse files
committed
SIGH-2958: Add source record to common SI Header
Resolves spring-projects/spring-integration#2958 Enables applications to be agnostic regarding the source of a message. e.g Kafka Vs. AMQP.
1 parent ea486d2 commit df2d21a

File tree

8 files changed

+113
-46
lines changed

8 files changed

+113
-46
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ ext {
5959
junitJupiterVersion = '5.4.2'
6060
junitPlatformVersion = '1.4.2'
6161
log4jVersion = '2.11.2'
62-
springIntegrationVersion = '5.2.0.M2'
62+
springIntegrationVersion = '5.2.0.BUILD-SNAPSHOT'
6363
springKafkaVersion = '2.3.0.M2'
6464

6565
idPrefix = 'kafka'

src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implem
8383

8484
private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
8585

86+
private boolean bindSourceRecord;
87+
8688
/**
8789
* Construct an instance with the provided container.
8890
* @param messageListenerContainer the container.
@@ -155,6 +157,16 @@ public void setOnPartitionsAssignedSeekCallback(
155157
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
156158
}
157159

160+
/**
161+
* Set to true to bind the source consumer record in the header named
162+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
163+
* @param bindSourceRecord true to bind.
164+
* @since 3.1.4
165+
*/
166+
public void setBindSourceRecord(boolean bindSourceRecord) {
167+
this.bindSourceRecord = bindSourceRecord;
168+
}
169+
158170
@Override
159171
protected void onInit() {
160172
super.onInit();
@@ -245,10 +257,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
245257
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
246258
Message<?> message = null;
247259
try {
248-
message = toMessagingMessage(record, acknowledgment, consumer);
249-
if (KafkaInboundGateway.this.retryTemplate != null) {
250-
message = addDeliveryAttemptHeader(message);
251-
}
260+
message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
252261
setAttributesIfNecessary(record, message);
253262
}
254263
catch (RuntimeException e) {
@@ -277,18 +286,30 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
277286
}
278287
}
279288

280-
private Message<?> addDeliveryAttemptHeader(Message<?> message) {
289+
private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
281290
Message<?> messageToReturn = message;
282-
AtomicInteger deliveryAttempt =
283-
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
284291
if (message.getHeaders() instanceof KafkaMessageHeaders) {
285-
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
286-
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
292+
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
293+
if (KafkaInboundGateway.this.retryTemplate != null) {
294+
AtomicInteger deliveryAttempt =
295+
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
296+
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
297+
}
298+
if (KafkaInboundGateway.this.bindSourceRecord) {
299+
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
300+
}
287301
}
288302
else {
289-
messageToReturn = MessageBuilder.fromMessage(message)
290-
.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt)
291-
.build();
303+
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
304+
if (KafkaInboundGateway.this.retryTemplate != null) {
305+
AtomicInteger deliveryAttempt =
306+
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
307+
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
308+
}
309+
if (KafkaInboundGateway.this.bindSourceRecord) {
310+
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
311+
}
312+
messageToReturn = builder.build();
292313
}
293314
return messageToReturn;
294315
}

src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo
9696

9797
private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;
9898

99+
private boolean bindSourceRecord;
100+
99101
/**
100102
* Construct an instance with mode {@link ListenerMode#record}.
101103
* @param messageListenerContainer the container.
@@ -247,6 +249,17 @@ public void setOnPartitionsAssignedSeekCallback(
247249
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
248250
}
249251

252+
/**
253+
* Set to true to bind the source consumer record in the header named
254+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
255+
* Does not apply to batch listeners.
256+
* @param bindSourceRecord true to bind.
257+
* @since 3.1.4
258+
*/
259+
public void setBindSourceRecord(boolean bindSourceRecord) {
260+
this.bindSourceRecord = bindSourceRecord;
261+
}
262+
250263
@Override
251264
public String getComponentType() {
252265
return "kafka:message-driven-channel-adapter";
@@ -264,10 +277,10 @@ protected void onInit() {
264277
if (this.mode.equals(ListenerMode.record)) {
265278
MessageListener<K, V> listener = this.recordListener;
266279

267-
boolean filterInRetry = this.filterInRetry && this.retryTemplate != null
280+
boolean doFilterInRetry = this.filterInRetry && this.retryTemplate != null
268281
&& this.recordFilterStrategy != null;
269282

270-
if (filterInRetry) {
283+
if (doFilterInRetry) {
271284
listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy,
272285
this.ackDiscarded);
273286
listener = new RetryingMessageListenerAdapter<>(listener, this.retryTemplate,
@@ -418,10 +431,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
418431
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
419432
Message<?> message = null;
420433
try {
421-
message = toMessagingMessage(record, acknowledgment, consumer);
422-
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
423-
message = addDeliveryAttemptHeader(message);
424-
}
434+
message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
425435
setAttributesIfNecessary(record, message);
426436
}
427437
catch (RuntimeException e) {
@@ -432,18 +442,30 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
432442
sendMessageIfAny(message, record);
433443
}
434444

435-
private Message<?> addDeliveryAttemptHeader(Message<?> message) {
445+
private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
436446
Message<?> messageToReturn = message;
437-
AtomicInteger deliveryAttempt =
438-
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
439447
if (message.getHeaders() instanceof KafkaMessageHeaders) {
440-
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
441-
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
448+
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
449+
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
450+
AtomicInteger deliveryAttempt =
451+
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
452+
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
453+
}
454+
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
455+
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
456+
}
442457
}
443458
else {
444-
messageToReturn = MessageBuilder.fromMessage(message)
445-
.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt)
446-
.build();
459+
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
460+
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
461+
AtomicInteger deliveryAttempt =
462+
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
463+
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
464+
}
465+
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
466+
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
467+
}
468+
messageToReturn = builder.build();
447469
}
448470
return messageToReturn;
449471
}

src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -310,9 +310,10 @@ protected boolean isRawMessageHeader() {
310310
}
311311

312312
/**
313-
* Set to true to include the raw {@link ConsumerRecord} as a header
314-
* with key {@link KafkaHeaders#RAW_DATA},
315-
* enabling callers to have access to the record to process errors.
313+
* Set to true to include the raw {@link ConsumerRecord} as headers with keys
314+
* {@link KafkaHeaders#RAW_DATA} and
315+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. enabling callers to have
316+
* access to the record to process errors.
316317
* @param rawMessageHeader true to include the header.
317318
*/
318319
public void setRawMessageHeader(boolean rawMessageHeader) {
@@ -458,6 +459,7 @@ record = nextRecord();
458459
rawHeaders.put(REMAINING_RECORDS, this.remainingCount.get());
459460
if (this.rawMessageHeader) {
460461
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
462+
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
461463
}
462464
return message;
463465
}
@@ -467,6 +469,7 @@ record = nextRecord();
467469
.setHeader(REMAINING_RECORDS, this.remainingCount.get());
468470
if (this.rawMessageHeader) {
469471
builder.setHeader(KafkaHeaders.RAW_DATA, record);
472+
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
470473
}
471474
return builder;
472475
}

src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616

1717
package org.springframework.integration.kafka.support;
1818

19-
import java.util.Collections;
19+
import java.util.HashMap;
2020
import java.util.Map;
2121

2222
import org.springframework.core.AttributeAccessor;
23+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2324
import org.springframework.integration.support.ErrorMessageStrategy;
2425
import org.springframework.integration.support.ErrorMessageUtils;
2526
import org.springframework.kafka.support.KafkaHeaders;
@@ -41,9 +42,13 @@ public class RawRecordHeaderErrorMessageStrategy implements ErrorMessageStrategy
4142

4243
@Override
4344
public ErrorMessage buildErrorMessage(Throwable throwable, @Nullable AttributeAccessor context) {
44-
Object inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
45-
Map<String, Object> headers =
46-
Collections.singletonMap(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
45+
Object inputMessage = null;
46+
Map<String, Object> headers = new HashMap<>();
47+
if (context != null) {
48+
inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
49+
headers.put(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
50+
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
51+
}
4752
if (inputMessage instanceof Message) {
4853
return new ErrorMessage(throwable, headers, (Message<?>) inputMessage);
4954
}

src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.junit.Test;
3535

3636
import org.springframework.beans.factory.BeanFactory;
37+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3738
import org.springframework.integration.channel.DirectChannel;
3839
import org.springframework.integration.channel.QueueChannel;
3940
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
@@ -170,20 +171,20 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
170171

171172
@Test
172173
public void testInboundErrorRecover() throws Exception {
173-
EmbeddedKafkaBroker embeddedKafka = InboundGatewayTests.embeddedKafka.getEmbeddedKafka();
174-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", embeddedKafka);
174+
EmbeddedKafkaBroker broker = InboundGatewayTests.embeddedKafka.getEmbeddedKafka();
175+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", broker);
175176
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
176177
ConsumerFactory<Integer, String> cf2 = new DefaultKafkaConsumerFactory<>(consumerProps);
177178
Consumer<Integer, String> consumer = cf2.createConsumer();
178-
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic4);
179+
broker.consumeFromAnEmbeddedTopic(consumer, topic4);
179180

180-
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
181+
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", broker);
181182
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
182183
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
183184
ContainerProperties containerProps = new ContainerProperties(topic3);
184185
KafkaMessageListenerContainer<Integer, String> container =
185186
new KafkaMessageListenerContainer<>(cf, containerProps);
186-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
187+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(broker);
187188
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
188189
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
189190
template.setDefaultTopic(topic3);
@@ -215,15 +216,19 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
215216

216217
});
217218
gateway.setReplyTimeout(30_000);
219+
gateway.setBindSourceRecord(true);
218220
gateway.afterPropertiesSet();
219221
gateway.start();
220222
ContainerTestUtils.waitForAssignment(container, 2);
221223

222224
template.sendDefault(0, 1487694048607L, 1, "foo");
223225
ErrorMessage em = (ErrorMessage) errors.receive(30_000);
224226
assertThat(em).isNotNull();
227+
assertThat(em.getHeaders().get(KafkaHeaders.RAW_DATA)).isNotNull();
225228
Message<?> failed = ((MessagingException) em.getPayload()).getFailedMessage();
226229
assertThat(failed).isNotNull();
230+
assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
231+
.isSameAs(em.getHeaders().get(KafkaHeaders.RAW_DATA));
227232
MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel();
228233
MessageHeaders headers = failed.getHeaders();
229234
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
@@ -308,6 +313,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
308313
assertThat(em).isNotNull();
309314
Message<?> failed = ((MessagingException) em.getPayload()).getFailedMessage();
310315
assertThat(failed).isNotNull();
316+
assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
311317
MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel();
312318
MessageHeaders headers = failed.getHeaders();
313319
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());

src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.junit.ClassRule;
5050
import org.junit.Test;
5151

52+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
5253
import org.springframework.integration.StaticMessageHeaderAccessor;
5354
import org.springframework.integration.channel.DirectChannel;
5455
import org.springframework.integration.channel.QueueChannel;
@@ -238,14 +239,15 @@ protected boolean doSend(Message<?> message, long timeout) {
238239
assertThat(received).isInstanceOf(ErrorMessage.class);
239240
MessageHeaders headers = received.getHeaders();
240241
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull();
241-
received = ((ErrorMessage) received).getOriginalMessage();
242-
assertThat(received).isNotNull();
243-
headers = received.getHeaders();
242+
Message<?> originalMessage = ((ErrorMessage) received).getOriginalMessage();
243+
assertThat(originalMessage).isNotNull();
244+
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
245+
headers = originalMessage.getHeaders();
244246
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
245247
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic4);
246248
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
247249
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
248-
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(received).get()).isEqualTo(2);
250+
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(2);
249251

250252
adapter.stop();
251253
}
@@ -272,6 +274,7 @@ protected boolean doSend(Message<?> message, long timeout) {
272274
adapter.setErrorChannel(errorChannel);
273275
adapter.setRecoveryCallback(
274276
new ErrorMessageSendingRecoverer(errorChannel, new RawRecordHeaderErrorMessageStrategy()));
277+
adapter.setBindSourceRecord(true);
275278
adapter.afterPropertiesSet();
276279
adapter.start();
277280
ContainerTestUtils.waitForAssignment(container, 2);
@@ -286,9 +289,13 @@ protected boolean doSend(Message<?> message, long timeout) {
286289
assertThat(received).isInstanceOf(ErrorMessage.class);
287290
MessageHeaders headers = received.getHeaders();
288291
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull();
289-
received = ((ErrorMessage) received).getOriginalMessage();
290-
assertThat(received).isNotNull();
291-
headers = received.getHeaders();
292+
assertThat(headers.get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
293+
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
294+
Message<?> originalMessage = ((ErrorMessage) received).getOriginalMessage();
295+
assertThat(originalMessage).isNotNull();
296+
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
297+
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
298+
headers = originalMessage.getHeaders();
292299
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
293300
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
294301
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);

src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.mockito.InOrder;
5757

5858
import org.springframework.beans.DirectFieldAccessor;
59+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
5960
import org.springframework.integration.StaticMessageHeaderAccessor;
6061
import org.springframework.integration.acks.AcknowledgmentCallback;
6162
import org.springframework.integration.test.util.TestUtils;
@@ -116,6 +117,8 @@ public void testAck() {
116117
Message<?> received = source.receive();
117118
assertThat(received).isNotNull();
118119
assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class);
120+
assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
121+
.isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA));
119122
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
120123
.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
121124
received = source.receive();

0 commit comments

Comments
 (0)