Skip to content

Commit 5245eff

Browse files
garyrussellartembilan
authored andcommitted
GH-2797: AMQP: Add confirm-timeout
Resolves #2797 Avoid runtime casting to `RabbitTemplate`.
1 parent e13aa28 commit 5245eff

File tree

13 files changed

+243
-58
lines changed

13 files changed

+243
-58
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParser.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -80,6 +80,7 @@ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext pa
8080
}
8181
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-ack-channel");
8282
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-nack-channel");
83+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "confirm-timeout");
8384
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "return-channel");
8485
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
8586
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delay-expression",

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpOutboundGatewayParser.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,6 +94,7 @@ protected BeanDefinitionBuilder parseHandler(Element element, ParserContext pars
9494

9595
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-ack-channel");
9696
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-nack-channel");
97+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "confirm-timeout");
9798
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
9899

99100
BeanDefinitionBuilder mapperBuilder = BeanDefinitionBuilder

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

+49-4
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.time.Duration;
20+
import java.util.Collection;
1921
import java.util.HashMap;
2022
import java.util.Map;
2123
import java.util.UUID;
24+
import java.util.concurrent.ScheduledFuture;
2225

2326
import org.springframework.amqp.core.MessageDeliveryMode;
2427
import org.springframework.amqp.rabbit.connection.Connection;
2528
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2629
import org.springframework.amqp.rabbit.connection.CorrelationData;
30+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2731
import org.springframework.amqp.support.AmqpHeaders;
2832
import org.springframework.amqp.support.converter.MessageConverter;
2933
import org.springframework.beans.factory.BeanFactory;
@@ -40,6 +44,7 @@
4044
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4145
import org.springframework.integration.support.DefaultErrorMessageStrategy;
4246
import org.springframework.integration.support.ErrorMessageStrategy;
47+
import org.springframework.lang.Nullable;
4348
import org.springframework.messaging.Message;
4449
import org.springframework.messaging.MessageChannel;
4550
import org.springframework.util.Assert;
@@ -100,8 +105,12 @@ public abstract class AbstractAmqpOutboundEndpoint extends AbstractReplyProducin
100105

101106
private ErrorMessageStrategy errorMessageStrategy = new DefaultErrorMessageStrategy();
102107

108+
private Duration confirmTimeout;
109+
103110
private volatile boolean running;
104111

112+
private volatile ScheduledFuture<?> confirmChecker;
113+
105114
/**
106115
* Set a custom {@link AmqpHeaderMapper} for mapping request and reply headers.
107116
* Defaults to {@link DefaultAmqpHeaderMapper#outboundMapper()}.
@@ -311,6 +320,18 @@ public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
311320
this.errorMessageStrategy = errorMessageStrategy;
312321
}
313322

323+
/**
324+
* Set a timeout after which a nack will be synthesized if no publisher confirm has
325+
* been received within that time. Missing confirms will be checked every 50% of this
326+
* value so the synthesized nack will be sent between 1x and 1.5x this timeout.
327+
* @param confirmTimeout the approximate timeout.
328+
* @since 5.2
329+
* @see #setConfirmNackChannel(MessageChannel)
330+
*/
331+
public void setConfirmTimeout(long confirmTimeout) {
332+
this.confirmTimeout = Duration.ofMillis(confirmTimeout);
333+
}
334+
314335
protected final synchronized void setConnectionFactory(ConnectionFactory connectionFactory) {
315336
this.connectionFactory = connectionFactory;
316337
}
@@ -381,6 +402,10 @@ protected boolean isHeadersMappedLast() {
381402
return this.headersMappedLast;
382403
}
383404

405+
protected Duration getConfirmTimeout() {
406+
return this.confirmTimeout;
407+
}
408+
384409
@Override
385410
protected final void doInit() {
386411
Assert.state(this.exchangeNameExpression == null || this.exchangeName == null,
@@ -411,7 +436,8 @@ protected final void doInit() {
411436
}
412437
else {
413438
NullChannel nullChannel = extractTypeIfPossible(this.confirmAckChannel, NullChannel.class);
414-
Assert.state((this.confirmAckChannel == null || nullChannel != null) && this.confirmAckChannelName == null,
439+
Assert.state(
440+
(this.confirmAckChannel == null || nullChannel != null) && this.confirmAckChannelName == null,
415441
"A 'confirmCorrelationExpression' is required when specifying a 'confirmAckChannel'");
416442
nullChannel = extractTypeIfPossible(this.confirmNackChannel, NullChannel.class);
417443
Assert.state(
@@ -450,16 +476,35 @@ public synchronized void start() {
450476
}
451477
}
452478
doStart();
479+
if (this.confirmTimeout != null && getConfirmNackChannel() != null && getRabbitTemplate() != null) {
480+
this.confirmChecker = getTaskScheduler()
481+
.scheduleAtFixedRate(checkUnconfirmed(), this.confirmTimeout.dividedBy(2L));
482+
}
453483
this.running = true;
454484
}
455485
}
456486

487+
private Runnable checkUnconfirmed() {
488+
return () -> {
489+
Collection<CorrelationData> unconfirmed =
490+
getRabbitTemplate().getUnconfirmed(getConfirmTimeout().toMillis());
491+
unconfirmed.forEach(correlation -> handleConfirm(correlation, false, "Confirm timed out"));
492+
};
493+
}
494+
495+
@Nullable
496+
protected abstract RabbitTemplate getRabbitTemplate();
497+
457498
@Override
458499
public synchronized void stop() {
459500
if (this.running) {
460501
doStop();
461502
}
462503
this.running = false;
504+
if (this.confirmChecker != null) {
505+
this.confirmChecker.cancel(false);
506+
this.confirmChecker = null;
507+
}
463508
}
464509

465510
protected void doStart() {
@@ -526,7 +571,7 @@ protected AbstractIntegrationMessageBuilder<?> buildReply(MessageConverter conve
526571
}
527572

528573
protected Message<?> buildReturnedMessage(org.springframework.amqp.core.Message message,
529-
int replyCode, String replyText, String exchange, String routingKey, MessageConverter converter) {
574+
int replyCode, String replyText, String exchange, String returnedRoutingKey, MessageConverter converter) {
530575
Object returnedObject = converter.fromMessage(message);
531576
AbstractIntegrationMessageBuilder<?> builder = (returnedObject instanceof Message)
532577
? this.getMessageBuilderFactory().fromMessage((Message<?>) returnedObject)
@@ -537,12 +582,12 @@ protected Message<?> buildReturnedMessage(org.springframework.amqp.core.Message
537582
.setHeader(AmqpHeaders.RETURN_REPLY_CODE, replyCode)
538583
.setHeader(AmqpHeaders.RETURN_REPLY_TEXT, replyText)
539584
.setHeader(AmqpHeaders.RETURN_EXCHANGE, exchange)
540-
.setHeader(AmqpHeaders.RETURN_ROUTING_KEY, routingKey);
585+
.setHeader(AmqpHeaders.RETURN_ROUTING_KEY, returnedRoutingKey);
541586
}
542587
Message<?> returnedMessage = builder.build();
543588
if (this.errorMessageStrategy != null) {
544589
returnedMessage = this.errorMessageStrategy.buildErrorMessage(new ReturnedAmqpMessageException(
545-
returnedMessage, message, replyCode, replyText, exchange, routingKey), null);
590+
returnedMessage, message, replyCode, replyText, exchange, returnedRoutingKey), null);
546591
}
547592
return returnedMessage;
548593
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpoint.java

+28-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -42,13 +42,19 @@ public class AmqpOutboundEndpoint extends AbstractAmqpOutboundEndpoint
4242

4343
private final AmqpTemplate amqpTemplate;
4444

45-
private volatile boolean expectReply;
45+
private final RabbitTemplate rabbitTemplate;
46+
47+
private boolean expectReply;
4648

4749
public AmqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
4850
Assert.notNull(amqpTemplate, "amqpTemplate must not be null");
4951
this.amqpTemplate = amqpTemplate;
5052
if (amqpTemplate instanceof RabbitTemplate) {
5153
setConnectionFactory(((RabbitTemplate) amqpTemplate).getConnectionFactory());
54+
this.rabbitTemplate = (RabbitTemplate) amqpTemplate;
55+
}
56+
else {
57+
this.rabbitTemplate = null;
5258
}
5359
}
5460

@@ -62,17 +68,23 @@ public String getComponentType() {
6268
return this.expectReply ? "amqp:outbound-gateway" : "amqp:outbound-channel-adapter";
6369
}
6470

71+
72+
@Override
73+
protected RabbitTemplate getRabbitTemplate() {
74+
return this.rabbitTemplate;
75+
}
76+
6577
@Override
6678
protected void endpointInit() {
6779
if (getConfirmCorrelationExpression() != null) {
68-
Assert.isInstanceOf(RabbitTemplate.class, this.amqpTemplate,
80+
Assert.notNull(this.rabbitTemplate,
6981
"RabbitTemplate implementation is required for publisher confirms");
70-
((RabbitTemplate) this.amqpTemplate).setConfirmCallback(this);
82+
this.rabbitTemplate.setConfirmCallback(this);
7183
}
7284
if (getReturnChannel() != null) {
73-
Assert.isInstanceOf(RabbitTemplate.class, this.amqpTemplate,
85+
Assert.notNull(this.rabbitTemplate,
7486
"RabbitTemplate implementation is required for publisher confirms");
75-
((RabbitTemplate) this.amqpTemplate).setReturnCallback(this);
87+
this.rabbitTemplate.setReturnCallback(this);
7688
}
7789
}
7890

@@ -99,12 +111,12 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
99111

100112
private void send(String exchangeName, String routingKey,
101113
final Message<?> requestMessage, CorrelationData correlationData) {
102-
if (this.amqpTemplate instanceof RabbitTemplate) {
103-
MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
114+
if (this.rabbitTemplate != null) {
115+
MessageConverter converter = this.rabbitTemplate.getMessageConverter();
104116
org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
105117
getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
106118
addDelayProperty(requestMessage, amqpMessage);
107-
((RabbitTemplate) this.amqpTemplate).send(exchangeName, routingKey, amqpMessage, correlationData);
119+
this.rabbitTemplate.send(exchangeName, routingKey, amqpMessage, correlationData);
108120
}
109121
else {
110122
this.amqpTemplate.convertAndSend(exchangeName, routingKey, requestMessage.getPayload(),
@@ -118,14 +130,15 @@ private void send(String exchangeName, String routingKey,
118130

119131
private AbstractIntegrationMessageBuilder<?> sendAndReceive(String exchangeName, String routingKey,
120132
Message<?> requestMessage, CorrelationData correlationData) {
121-
Assert.isInstanceOf(RabbitTemplate.class, this.amqpTemplate,
133+
134+
Assert.state(this.rabbitTemplate != null,
122135
"RabbitTemplate implementation is required for publisher confirms");
123-
MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
136+
MessageConverter converter = this.rabbitTemplate.getMessageConverter();
124137
org.springframework.amqp.core.Message amqpMessage = MappingUtils.mapMessage(requestMessage, converter,
125138
getHeaderMapper(), getDefaultDeliveryMode(), isHeadersMappedLast());
126139
addDelayProperty(requestMessage, amqpMessage);
127140
org.springframework.amqp.core.Message amqpReplyMessage =
128-
((RabbitTemplate) this.amqpTemplate).sendAndReceive(exchangeName, routingKey, amqpMessage,
141+
this.rabbitTemplate.sendAndReceive(exchangeName, routingKey, amqpMessage,
129142
correlationData);
130143

131144
if (amqpReplyMessage == null) {
@@ -142,8 +155,9 @@ public void confirm(CorrelationData correlationData, boolean ack, String cause)
142155
@Override
143156
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
144157
String exchange, String routingKey) {
145-
// safe to cast; we asserted we have a RabbitTemplate in doInit()
146-
MessageConverter converter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
158+
159+
// no need for null check; we asserted we have a RabbitTemplate in doInit()
160+
MessageConverter converter = this.rabbitTemplate.getMessageConverter();
147161
Message<?> returned = buildReturnedMessage(message, replyCode, replyText, exchange,
148162
routingKey, converter);
149163
getReturnChannel().send(returned);

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AsyncAmqpOutboundGateway.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2018 the original author or authors.
2+
* Copyright 2016-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
2222
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
2323
import org.springframework.amqp.rabbit.connection.CorrelationData;
24+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2425
import org.springframework.amqp.support.converter.MessageConverter;
2526
import org.springframework.integration.amqp.support.MappingUtils;
2627
import org.springframework.integration.handler.ReplyRequiredException;
@@ -61,6 +62,11 @@ public String getComponentType() {
6162
return "amqp:outbound-async-gateway";
6263
}
6364

65+
@Override
66+
protected RabbitTemplate getRabbitTemplate() {
67+
return this.template.getRabbitTemplate();
68+
}
69+
6470
@Override
6571
protected void doStart() {
6672
super.doStart();

spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp-5.2.xsd

+8
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,14 @@ property set to TRUE.
544544
</xsd:appinfo>
545545
</xsd:annotation>
546546
</xsd:attribute>
547+
<xsd:attribute name="confirm-timeout" type="xsd:string">
548+
<xsd:annotation>
549+
<xsd:documentation><![CDATA[
550+
Generate a negative acknowledgment (nack) if a publisher confirm is not received within this time
551+
in milliseconds. Default none (nacks will not be generated).
552+
]]></xsd:documentation>
553+
</xsd:annotation>
554+
</xsd:attribute>
547555
<xsd:attribute name="error-message-strategy" type="xsd:string">
548556
<xsd:annotation>
549557
<xsd:documentation><![CDATA[

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParserTests-context.xml

+17
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,19 @@
5454
confirm-ack-channel="ackChannel"
5555
error-message-strategy="ems"/>
5656

57+
<rabbit:template id="amqpTemplateConfirms2" connection-factory="connectionFactory"/>
58+
59+
<amqp:outbound-channel-adapter id="withPublisherConfirms2" channel="pcRequestChannel"
60+
exchange-name="outboundchanneladapter.test.1"
61+
mapped-request-headers="foo*"
62+
auto-startup="false"
63+
amqp-template="amqpTemplateConfirms2"
64+
confirm-correlation-expression="headers['amqp_confirmCorrelationData']"
65+
confirm-ack-channel="ackChannel"
66+
confirm-nack-channel="nackChannel"
67+
confirm-timeout="2000"
68+
error-message-strategy="ems"/>
69+
5770
<bean id="ems" class="org.springframework.integration.support.DefaultErrorMessageStrategy" />
5871

5972
<int:channel id="pcRequestChannel"/>
@@ -62,6 +75,10 @@
6275
<int:queue/>
6376
</int:channel>
6477

78+
<int:channel id="nackChannel">
79+
<int:queue/>
80+
</int:channel>
81+
6582
<amqp:outbound-channel-adapter id="withDefaultAmqpTemplateExchangeAndRoutingKey"/>
6683

6784
<rabbit:template id="amqpTemplateWithSuppliedExchangeAndRoutingKey" connection-factory="connectionFactory"

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParserTests.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import java.io.IOException;
3333
import java.lang.reflect.Field;
34+
import java.time.Duration;
3435
import java.util.List;
3536
import java.util.concurrent.ExecutorService;
3637
import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,9 +126,9 @@ public void withHeaderMapperCustomHeaders() {
125126
.getExpressionString()).isEqualTo("42");
126127
assertThat(TestUtils.getPropertyValue(endpoint, "headersMappedLast", Boolean.class)).isFalse();
127128

128-
Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "amqpTemplate");
129+
Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "rabbitTemplate");
129130
amqpTemplateField.setAccessible(true);
130-
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "amqpTemplate", RabbitTemplate.class);
131+
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "rabbitTemplate", RabbitTemplate.class);
131132
amqpTemplate = Mockito.spy(amqpTemplate);
132133
final AtomicBoolean shouldBePersistent = new AtomicBoolean();
133134

@@ -179,6 +180,19 @@ public void parseWithPublisherConfirms() {
179180
assertThat(TestUtils.getPropertyValue(endpoint, "errorMessageStrategy")).isSameAs(context.getBean("ems"));
180181
}
181182

183+
@Test
184+
public void parseWithPublisherConfirms2() {
185+
Object eventDrivenConsumer = context.getBean("withPublisherConfirms2");
186+
AmqpOutboundEndpoint endpoint = TestUtils.getPropertyValue(eventDrivenConsumer, "handler",
187+
AmqpOutboundEndpoint.class);
188+
MessageChannel nackChannel = context.getBean("nackChannel", MessageChannel.class);
189+
MessageChannel ackChannel = context.getBean("ackChannel", MessageChannel.class);
190+
assertThat(TestUtils.getPropertyValue(endpoint, "confirmAckChannel")).isSameAs(ackChannel);
191+
assertThat(TestUtils.getPropertyValue(endpoint, "confirmNackChannel")).isSameAs(nackChannel);
192+
assertThat(TestUtils.getPropertyValue(endpoint, "confirmTimeout")).isEqualTo(Duration.ofMillis(2000));
193+
assertThat(TestUtils.getPropertyValue(endpoint, "errorMessageStrategy")).isSameAs(context.getBean("ems"));
194+
}
195+
182196
@SuppressWarnings("rawtypes")
183197
@Test
184198
public void amqpOutboundChannelAdapterWithinChain() {
@@ -189,9 +203,9 @@ public void amqpOutboundChannelAdapterWithinChain() {
189203
AmqpOutboundEndpoint endpoint = (AmqpOutboundEndpoint) chainHandlers.get(0);
190204
assertThat(TestUtils.getPropertyValue(endpoint, "defaultDeliveryMode")).isNull();
191205

192-
Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "amqpTemplate");
206+
Field amqpTemplateField = ReflectionUtils.findField(AmqpOutboundEndpoint.class, "rabbitTemplate");
193207
amqpTemplateField.setAccessible(true);
194-
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "amqpTemplate", RabbitTemplate.class);
208+
RabbitTemplate amqpTemplate = TestUtils.getPropertyValue(endpoint, "rabbitTemplate", RabbitTemplate.class);
195209
amqpTemplate = Mockito.spy(amqpTemplate);
196210

197211
Mockito.doAnswer(invocation -> {

0 commit comments

Comments
 (0)