diff --git a/build.gradle b/build.gradle index 87d0353f..cf6f7f1f 100644 --- a/build.gradle +++ b/build.gradle @@ -59,7 +59,7 @@ ext { junitJupiterVersion = '5.4.2' junitPlatformVersion = '1.4.2' log4jVersion = '2.11.2' - springIntegrationVersion = '5.2.0.M2' + springIntegrationVersion = '5.2.0.BUILD-SNAPSHOT' springKafkaVersion = '2.3.0.M2' idPrefix = 'kafka' diff --git a/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java b/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java index 37e92ba3..06c75f88 100644 --- a/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java +++ b/src/main/java/org/springframework/integration/kafka/inbound/KafkaInboundGateway.java @@ -83,6 +83,8 @@ public class KafkaInboundGateway extends MessagingGatewaySupport implem private BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; + private boolean bindSourceRecord; + /** * Construct an instance with the provided container. * @param messageListenerContainer the container. @@ -155,6 +157,16 @@ public void setOnPartitionsAssignedSeekCallback( this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback; } + /** + * Set to true to bind the source consumer record in the header named + * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. + * @param bindSourceRecord true to bind. + * @since 3.1.4 + */ + public void setBindSourceRecord(boolean bindSourceRecord) { + this.bindSourceRecord = bindSourceRecord; + } + @Override protected void onInit() { super.onInit(); @@ -245,10 +257,7 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { Message message = null; try { - message = toMessagingMessage(record, acknowledgment, consumer); - if (KafkaInboundGateway.this.retryTemplate != null) { - message = addDeliveryAttemptHeader(message); - } + message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record); setAttributesIfNecessary(record, message); } catch (RuntimeException e) { @@ -277,18 +286,30 @@ public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment } } - private Message addDeliveryAttemptHeader(Message message) { + private Message enhanceHeaders(Message message, ConsumerRecord record) { Message messageToReturn = message; - AtomicInteger deliveryAttempt = - new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); if (message.getHeaders() instanceof KafkaMessageHeaders) { - ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders() - .put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + Map rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders(); + if (KafkaInboundGateway.this.retryTemplate != null) { + AtomicInteger deliveryAttempt = + new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); + rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + } + if (KafkaInboundGateway.this.bindSourceRecord) { + rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); + } } else { - messageToReturn = MessageBuilder.fromMessage(message) - .setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt) - .build(); + MessageBuilder builder = MessageBuilder.fromMessage(message); + if (KafkaInboundGateway.this.retryTemplate != null) { + AtomicInteger deliveryAttempt = + new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); + builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + } + if (KafkaInboundGateway.this.bindSourceRecord) { + builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); + } + messageToReturn = builder.build(); } return messageToReturn; } diff --git a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java index 317fac6c..43f058ef 100644 --- a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java +++ b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageDrivenChannelAdapter.java @@ -96,6 +96,8 @@ public class KafkaMessageDrivenChannelAdapter extends MessageProducerSuppo private BiConsumer, ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedSeekCallback; + private boolean bindSourceRecord; + /** * Construct an instance with mode {@link ListenerMode#record}. * @param messageListenerContainer the container. @@ -247,6 +249,17 @@ public void setOnPartitionsAssignedSeekCallback( this.onPartitionsAssignedSeekCallback = onPartitionsAssignedCallback; } + /** + * Set to true to bind the source consumer record in the header named + * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. + * Does not apply to batch listeners. + * @param bindSourceRecord true to bind. + * @since 3.1.4 + */ + public void setBindSourceRecord(boolean bindSourceRecord) { + this.bindSourceRecord = bindSourceRecord; + } + @Override public String getComponentType() { return "kafka:message-driven-channel-adapter"; @@ -264,10 +277,10 @@ protected void onInit() { if (this.mode.equals(ListenerMode.record)) { MessageListener listener = this.recordListener; - boolean filterInRetry = this.filterInRetry && this.retryTemplate != null + boolean doFilterInRetry = this.filterInRetry && this.retryTemplate != null && this.recordFilterStrategy != null; - if (filterInRetry) { + if (doFilterInRetry) { listener = new FilteringMessageListenerAdapter<>(listener, this.recordFilterStrategy, this.ackDiscarded); listener = new RetryingMessageListenerAdapter<>(listener, this.retryTemplate, @@ -418,10 +431,7 @@ public void onPartitionsAssigned(Map assignments, Consumer public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer) { Message message = null; try { - message = toMessagingMessage(record, acknowledgment, consumer); - if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) { - message = addDeliveryAttemptHeader(message); - } + message = enhanceHeaders(toMessagingMessage(record, acknowledgment, consumer), record); setAttributesIfNecessary(record, message); } catch (RuntimeException e) { @@ -432,18 +442,30 @@ public void onMessage(ConsumerRecord record, Acknowledgment acknowledgment sendMessageIfAny(message, record); } - private Message addDeliveryAttemptHeader(Message message) { + private Message enhanceHeaders(Message message, ConsumerRecord record) { Message messageToReturn = message; - AtomicInteger deliveryAttempt = - new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); if (message.getHeaders() instanceof KafkaMessageHeaders) { - ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders() - .put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + Map rawHeaders = ((KafkaMessageHeaders) message.getHeaders()).getRawHeaders(); + if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) { + AtomicInteger deliveryAttempt = + new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); + rawHeaders.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + } + if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) { + rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); + } } else { - messageToReturn = MessageBuilder.fromMessage(message) - .setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt) - .build(); + MessageBuilder builder = MessageBuilder.fromMessage(message); + if (KafkaMessageDrivenChannelAdapter.this.retryTemplate != null) { + AtomicInteger deliveryAttempt = + new AtomicInteger(((RetryContext) attributesHolder.get()).getRetryCount() + 1); + builder.setHeader(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, deliveryAttempt); + } + if (KafkaMessageDrivenChannelAdapter.this.bindSourceRecord) { + builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); + } + messageToReturn = builder.build(); } return messageToReturn; } diff --git a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java index 1346e6f5..15126f35 100644 --- a/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java +++ b/src/main/java/org/springframework/integration/kafka/inbound/KafkaMessageSource.java @@ -310,9 +310,10 @@ protected boolean isRawMessageHeader() { } /** - * Set to true to include the raw {@link ConsumerRecord} as a header - * with key {@link KafkaHeaders#RAW_DATA}, - * enabling callers to have access to the record to process errors. + * Set to true to include the raw {@link ConsumerRecord} as headers with keys + * {@link KafkaHeaders#RAW_DATA} and + * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. enabling callers to have + * access to the record to process errors. * @param rawMessageHeader true to include the header. */ public void setRawMessageHeader(boolean rawMessageHeader) { @@ -458,6 +459,7 @@ record = nextRecord(); rawHeaders.put(REMAINING_RECORDS, this.remainingCount.get()); if (this.rawMessageHeader) { rawHeaders.put(KafkaHeaders.RAW_DATA, record); + rawHeaders.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); } return message; } @@ -467,6 +469,7 @@ record = nextRecord(); .setHeader(REMAINING_RECORDS, this.remainingCount.get()); if (this.rawMessageHeader) { builder.setHeader(KafkaHeaders.RAW_DATA, record); + builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, record); } return builder; } diff --git a/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java b/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java index e6b5a755..93f97069 100644 --- a/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java +++ b/src/main/java/org/springframework/integration/kafka/support/RawRecordHeaderErrorMessageStrategy.java @@ -16,10 +16,11 @@ package org.springframework.integration.kafka.support; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import org.springframework.core.AttributeAccessor; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.kafka.support.KafkaHeaders; @@ -41,9 +42,13 @@ public class RawRecordHeaderErrorMessageStrategy implements ErrorMessageStrategy @Override public ErrorMessage buildErrorMessage(Throwable throwable, @Nullable AttributeAccessor context) { - Object inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY); - Map headers = - Collections.singletonMap(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA)); + Object inputMessage = null; + Map headers = new HashMap<>(); + if (context != null) { + inputMessage = context.getAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY); + headers.put(KafkaHeaders.RAW_DATA, context.getAttribute(KafkaHeaders.RAW_DATA)); + headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, context.getAttribute(KafkaHeaders.RAW_DATA)); + } if (inputMessage instanceof Message) { return new ErrorMessage(throwable, headers, (Message) inputMessage); } diff --git a/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java b/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java index 9a9b764f..c298017c 100644 --- a/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java +++ b/src/test/java/org/springframework/integration/kafka/inbound/InboundGatewayTests.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.springframework.beans.factory.BeanFactory; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; @@ -170,20 +171,20 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle @Test public void testInboundErrorRecover() throws Exception { - EmbeddedKafkaBroker embeddedKafka = InboundGatewayTests.embeddedKafka.getEmbeddedKafka(); - Map consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", embeddedKafka); + EmbeddedKafkaBroker broker = InboundGatewayTests.embeddedKafka.getEmbeddedKafka(); + Map consumerProps = KafkaTestUtils.consumerProps("replyHandler2", "false", broker); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf2 = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf2.createConsumer(); - embeddedKafka.consumeFromAnEmbeddedTopic(consumer, topic4); + broker.consumeFromAnEmbeddedTopic(consumer, topic4); - Map props = KafkaTestUtils.consumerProps("test2", "false", embeddedKafka); + Map props = KafkaTestUtils.consumerProps("test2", "false", broker); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); ContainerProperties containerProps = new ContainerProperties(topic3); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); - Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); + Map senderProps = KafkaTestUtils.producerProps(broker); ProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); KafkaTemplate template = new KafkaTemplate<>(pf); template.setDefaultTopic(topic3); @@ -215,6 +216,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle }); gateway.setReplyTimeout(30_000); + gateway.setBindSourceRecord(true); gateway.afterPropertiesSet(); gateway.start(); ContainerTestUtils.waitForAssignment(container, 2); @@ -222,8 +224,11 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle template.sendDefault(0, 1487694048607L, 1, "foo"); ErrorMessage em = (ErrorMessage) errors.receive(30_000); assertThat(em).isNotNull(); + assertThat(em.getHeaders().get(KafkaHeaders.RAW_DATA)).isNotNull(); Message failed = ((MessagingException) em.getPayload()).getFailedMessage(); assertThat(failed).isNotNull(); + assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) + .isSameAs(em.getHeaders().get(KafkaHeaders.RAW_DATA)); MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel(); MessageHeaders headers = failed.getHeaders(); reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build()); @@ -308,6 +313,7 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle assertThat(em).isNotNull(); Message failed = ((MessagingException) em.getPayload()).getFailedMessage(); assertThat(failed).isNotNull(); + assertThat(failed.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull(); MessageChannel reply = (MessageChannel) em.getHeaders().getReplyChannel(); MessageHeaders headers = failed.getHeaders(); reply.send(MessageBuilder.withPayload("ERROR").copyHeaders(headers).build()); diff --git a/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java b/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java index bc183977..99908322 100644 --- a/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java +++ b/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java @@ -49,6 +49,7 @@ import org.junit.ClassRule; import org.junit.Test; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; @@ -238,14 +239,15 @@ protected boolean doSend(Message message, long timeout) { assertThat(received).isInstanceOf(ErrorMessage.class); MessageHeaders headers = received.getHeaders(); assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull(); - received = ((ErrorMessage) received).getOriginalMessage(); - assertThat(received).isNotNull(); - headers = received.getHeaders(); + Message originalMessage = ((ErrorMessage) received).getOriginalMessage(); + assertThat(originalMessage).isNotNull(); + assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)).isNull(); + headers = originalMessage.getHeaders(); assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1); assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic4); assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0); assertThat(headers.get(KafkaHeaders.OFFSET)).isEqualTo(0L); - assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(received).get()).isEqualTo(2); + assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(originalMessage).get()).isEqualTo(2); adapter.stop(); } @@ -272,6 +274,7 @@ protected boolean doSend(Message message, long timeout) { adapter.setErrorChannel(errorChannel); adapter.setRecoveryCallback( new ErrorMessageSendingRecoverer(errorChannel, new RawRecordHeaderErrorMessageStrategy())); + adapter.setBindSourceRecord(true); adapter.afterPropertiesSet(); adapter.start(); ContainerTestUtils.waitForAssignment(container, 2); @@ -286,9 +289,13 @@ protected boolean doSend(Message message, long timeout) { assertThat(received).isInstanceOf(ErrorMessage.class); MessageHeaders headers = received.getHeaders(); assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNotNull(); - received = ((ErrorMessage) received).getOriginalMessage(); - assertThat(received).isNotNull(); - headers = received.getHeaders(); + assertThat(headers.get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) + .isSameAs(headers.get(KafkaHeaders.RAW_DATA)); + Message originalMessage = ((ErrorMessage) received).getOriginalMessage(); + assertThat(originalMessage).isNotNull(); + assertThat(originalMessage.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) + .isSameAs(headers.get(KafkaHeaders.RAW_DATA)); + headers = originalMessage.getHeaders(); assertThat(headers.get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo(1); assertThat(headers.get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo(topic5); assertThat(headers.get(KafkaHeaders.RECEIVED_PARTITION_ID)).isEqualTo(0); diff --git a/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java b/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java index 04660dcf..dbaa04a2 100644 --- a/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java +++ b/src/test/java/org/springframework/integration/kafka/inbound/MessageSourceTests.java @@ -56,6 +56,7 @@ import org.mockito.InOrder; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.test.util.TestUtils; @@ -116,6 +117,8 @@ public void testAck() { Message received = source.receive(); assertThat(received).isNotNull(); assertThat(received.getHeaders().get(KafkaHeaders.RAW_DATA)).isInstanceOf(ConsumerRecord.class); + assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) + .isSameAs(received.getHeaders().get(KafkaHeaders.RAW_DATA)); StaticMessageHeaderAccessor.getAcknowledgmentCallback(received) .acknowledge(AcknowledgmentCallback.Status.ACCEPT); received = source.receive();