Skip to content
This repository was archived by the owner on Mar 30, 2023. It is now read-only.

SIGH-2958: Add source record to common SI Header #270

Merged
merged 1 commit into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ ext {
junitJupiterVersion = '5.4.2'
junitPlatformVersion = '1.4.2'
log4jVersion = '2.11.2'
springIntegrationVersion = '5.2.0.M2'
springIntegrationVersion = '5.2.0.BUILD-SNAPSHOT'
springKafkaVersion = '2.3.0.M2'

idPrefix = 'kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class KafkaInboundGateway<K, V, R> extends MessagingGatewaySupport implem

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

private boolean bindSourceRecord;

/**
* Construct an instance with the provided container.
* @param messageListenerContainer the container.
Expand Down Expand Up @@ -155,6 +157,16 @@ public void setOnPartitionsAssignedSeekCallback(
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
}

/**
* Set to true to bind the source consumer record in the header named
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
* @param bindSourceRecord true to bind.
* @since 3.1.4
*/
public void setBindSourceRecord(boolean bindSourceRecord) {
this.bindSourceRecord = bindSourceRecord;
}

@Override
protected void onInit() {
super.onInit();
Expand Down Expand Up @@ -245,10 +257,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = null;
try {
message = toMessagingMessage(record, acknowledgment, consumer);
if (KafkaInboundGateway.this.retryTemplate != null) {
message = addDeliveryAttemptHeader(message);
}
message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
setAttributesIfNecessary(record, message);
}
catch (RuntimeException e) {
Expand Down Expand Up @@ -277,18 +286,30 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
}
}

private Message<?> addDeliveryAttemptHeader(Message<?> message) {
private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
Message<?> messageToReturn = message;
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
if (message.getHeaders() instanceof KafkaMessageHeaders) {
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
if (KafkaInboundGateway.this.retryTemplate != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
if (KafkaInboundGateway.this.bindSourceRecord) {
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
}
else {
messageToReturn = MessageBuilder.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt)
.build();
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
if (KafkaInboundGateway.this.retryTemplate != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
if (KafkaInboundGateway.this.bindSourceRecord) {
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
messageToReturn = builder.build();
}
return messageToReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class KafkaMessageDrivenChannelAdapter<K, V> extends MessageProducerSuppo

private BiConsumer<Map<TopicPartition, Long>, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback;

private boolean bindSourceRecord;

/**
* Construct an instance with mode {@link ListenerMode#record}.
* @param messageListenerContainer the container.
Expand Down Expand Up @@ -247,6 +249,17 @@ public void setOnPartitionsAssignedSeekCallback(
this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback;
}

/**
* Set to true to bind the source consumer record in the header named
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
* Does not apply to batch listeners.
* @param bindSourceRecord true to bind.
* @since 3.1.4
*/
public void setBindSourceRecord(boolean bindSourceRecord) {
this.bindSourceRecord = bindSourceRecord;
}

@Override
public String getComponentType() {
return "kafka:message-driven-channel-adapter";
Expand All @@ -264,10 +277,10 @@ protected void onInit() {
if (this.mode.equals(ListenerMode.record)) {
MessageListener<K, V> listener = this.recordListener;

boolean filterInRetry = this.filterInRetry && this.retryTemplate != null
boolean doFilterInRetry = this.filterInRetry && this.retryTemplate != null
&& this.recordFilterStrategy != null;

if (filterInRetry) {
if (doFilterInRetry) {
listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy,
this.ackDiscarded);
listener = new RetryingMessageListenerAdapter<>(listener, this.retryTemplate,
Expand Down Expand Up @@ -418,10 +431,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = null;
try {
message = toMessagingMessage(record, acknowledgment, consumer);
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
message = addDeliveryAttemptHeader(message);
}
message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record);
setAttributesIfNecessary(record, message);
}
catch (RuntimeException e) {
Expand All @@ -432,18 +442,30 @@ public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment
sendMessageIfAny(message, record);
}

private Message<?> addDeliveryAttemptHeader(Message<?> message) {
private Message<?> enhanceHeaders(Message<?> message, ConsumerRecord<K, V> record) {
Message<?> messageToReturn = message;
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
if (message.getHeaders() instanceof KafkaMessageHeaders) {
((KafkaMessageHeaders) message.getHeaders()).getRawHeaders()
.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
Map<String, Object> rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders();
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
}
else {
messageToReturn = MessageBuilder.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt)
.build();
MessageBuilder<?> builder = MessageBuilder.fromMessage(message);
if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) {
AtomicInteger deliveryAttempt =
new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1);
builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt);
}
if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) {
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
messageToReturn = builder.build();
}
return messageToReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ protected boolean isRawMessageHeader() {
}

/**
* Set to true to include the raw {@link ConsumerRecord} as a header
* with key {@link KafkaHeaders#RAW_DATA},
* enabling callers to have access to the record to process errors.
* Set to true to include the raw {@link ConsumerRecord} as headers with keys
* {@link KafkaHeaders#RAW_DATA} and
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. enabling callers to have
* access to the record to process errors.
* @param rawMessageHeader true to include the header.
*/
public void setRawMessageHeader(boolean rawMessageHeader) {
Expand Down Expand Up @@ -458,6 +459,7 @@ record = nextRecord();
rawHeaders.put(REMAINING_RECORDS, this.remainingCount.get());
if (this.rawMessageHeader) {
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
return message;
}
Expand All @@ -467,6 +469,7 @@ record = nextRecord();
.setHeader(REMAINING_RECORDS, this.remainingCount.get());
if (this.rawMessageHeader) {
builder.setHeader(KafkaHeaders.RAW_DATA, record);
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record);
}
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.springframework.integration.kafka.support;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.kafka.support.KafkaHeaders;
Expand All @@ -41,9 +42,13 @@ public class RawRecordHeaderErrorMessageStrategy implements ErrorMessageStrategy

@Override
public ErrorMessage buildErrorMessage(Throwable throwable, @Nullable AttributeAccessor context) {
Object inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
Map<String, Object> headers =
Collections.singletonMap(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
Object inputMessage = null;
Map<String, Object> headers = new HashMap<>();
if (context != null) {
inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY);
headers.put(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, context.getAttribute(KafkaHeaders.RAW_DATA));
}
if (inputMessage instanceof Message) {
return new ErrorMessage(throwable, headers, (Message<?>) inputMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.junit.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
Expand Down Expand Up @@ -170,20 +171,20 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle

@Test
public void testInboundErrorRecover() throws Exception {
EmbeddedKafkaBroker embeddedKafka = InboundGatewayTests.embeddedKafka.getEmbeddedKafka();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", embeddedKafka);
EmbeddedKafkaBroker broker = InboundGatewayTests.embeddedKafka.getEmbeddedKafka();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", broker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf2 = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf2.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic4);
broker.consumeFromAnEmbeddedTopic(consumer, topic4);

Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka);
Map<String, Object> props = KafkaTestUtils.consumerProps("test2", "false", broker);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
ContainerProperties containerProps = new ContainerProperties(topic3);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
Map<String, Object> senderProps = KafkaTestUtils.producerProps(broker);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic3);
Expand Down Expand Up @@ -215,15 +216,19 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle

});
gateway.setReplyTimeout(30_000);
gateway.setBindSourceRecord(true);
gateway.afterPropertiesSet();
gateway.start();
ContainerTestUtils.waitForAssignment(container, 2);

template.sendDefault(0, 1487694048607L, 1, "foo");
ErrorMessage em = (ErrorMessage) errors.receive(30_000);
assertThat(em).isNotNull();
assertThat(em.getHeaders().get(KafkaHeaders.RAW_DATA)).isNotNull();
Message<?> failed = ((MessagingException) em.getPayload()).getFailedMessage();
assertThat(failed).isNotNull();
assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
.isSameAs(em.getHeaders().get(KafkaHeaders.RAW_DATA));
MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel();
MessageHeaders headers = failed.getHeaders();
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
Expand Down Expand Up @@ -308,6 +313,7 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
assertThat(em).isNotNull();
Message<?> failed = ((MessagingException) em.getPayload()).getFailedMessage();
assertThat(failed).isNotNull();
assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel();
MessageHeaders headers = failed.getHeaders();
reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.junit.ClassRule;
import org.junit.Test;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
Expand Down Expand Up @@ -238,14 +239,15 @@ protected boolean doSend(Message<?> message, long timeout) {
assertThat(received).isInstanceOf(ErrorMessage.class);
MessageHeaders headers = received.getHeaders();
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull();
received = ((ErrorMessage) received).getOriginalMessage();
assertThat(received).isNotNull();
headers = received.getHeaders();
Message<?> originalMessage = ((ErrorMessage) received).getOriginalMessage();
assertThat(originalMessage).isNotNull();
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull();
headers = originalMessage.getHeaders();
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic4);
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L);
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(received).get()).isEqualTo(2);
assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(2);

adapter.stop();
}
Expand All @@ -272,6 +274,7 @@ protected boolean doSend(Message<?> message, long timeout) {
adapter.setErrorChannel(errorChannel);
adapter.setRecoveryCallback(
new ErrorMessageSendingRecoverer(errorChannel, new RawRecordHeaderErrorMessageStrategy()));
adapter.setBindSourceRecord(true);
adapter.afterPropertiesSet();
adapter.start();
ContainerTestUtils.waitForAssignment(container, 2);
Expand All @@ -286,9 +289,13 @@ protected boolean doSend(Message<?> message, long timeout) {
assertThat(received).isInstanceOf(ErrorMessage.class);
MessageHeaders headers = received.getHeaders();
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull();
received = ((ErrorMessage) received).getOriginalMessage();
assertThat(received).isNotNull();
headers = received.getHeaders();
assertThat(headers.get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
Message<?> originalMessage = ((ErrorMessage) received).getOriginalMessage();
assertThat(originalMessage).isNotNull();
assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
.isSameAs(headers.get(KafkaHeaders.RAW_DATA));
headers = originalMessage.getHeaders();
assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1);
assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5);
assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.mockito.InOrder;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AcknowledgmentCallback;
import org.springframework.integration.test.util.TestUtils;
Expand Down Expand Up @@ -116,6 +117,8 @@ public void testAck() {
Message<?> received = source.receive();
assertThat(received).isNotNull();
assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class);
assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA))
.isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA));
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(AcknowledgmentCallback.Status.ACCEPT);
received = source.receive();
Expand Down