Skip to content

Commit f256974

Browse files
garyrussellartembilan
authored andcommitted
AMQP OB Adapter - option to wait for confirms
- add an option to block the caller until a confirm is received * Resolve PR comments re exceptions, default timeout etc
1 parent fab4c45 commit f256974

File tree

10 files changed

+241
-14
lines changed

10 files changed

+241
-14
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ protected AbstractBeanDefinition parseConsumer(Element element, ParserContext pa
8181
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-ack-channel");
8282
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "confirm-nack-channel");
8383
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "confirm-timeout");
84+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "wait-for-confirm");
8485
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "return-channel");
8586
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-message-strategy");
8687
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delay-expression",

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpOutboundEndpointSpec.java

+12
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,16 @@ public AmqpOutboundEndpointSpec mappedReplyHeaders(String... headers) {
4545
return super.mappedReplyHeaders(headers);
4646
}
4747

48+
/**
49+
* Wait for a publisher confirm.
50+
* @param waitForConfirm true to wait.
51+
* @return the spec.
52+
* @since 5.2
53+
* @see AmqpOutboundEndpoint#setWaitForConfirm(boolean)
54+
*/
55+
public AmqpOutboundEndpointSpec waitForConfirm(boolean waitForConfirm) {
56+
this.target.setWaitForConfirm(waitForConfirm);
57+
return this;
58+
}
59+
4860
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

+1
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,7 @@ protected boolean isHeadersMappedLast() {
402402
return this.headersMappedLast;
403403
}
404404

405+
@Nullable
405406
protected Duration getConfirmTimeout() {
406407
return this.confirmTimeout;
407408
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpoint.java

+60-3
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19+
import java.time.Duration;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
24+
import org.springframework.amqp.AmqpException;
1925
import org.springframework.amqp.core.AmqpTemplate;
2026
import org.springframework.amqp.rabbit.connection.CorrelationData;
27+
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
2128
import org.springframework.amqp.rabbit.core.RabbitTemplate;
29+
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
2230
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
2331
import org.springframework.amqp.support.converter.MessageConverter;
2432
import org.springframework.context.Lifecycle;
33+
import org.springframework.integration.MessageTimeoutException;
2534
import org.springframework.integration.amqp.support.MappingUtils;
2635
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
2736
import org.springframework.messaging.Message;
@@ -38,14 +47,20 @@
3847
* @since 2.1
3948
*/
4049
public class AmqpOutboundEndpoint extends AbstractAmqpOutboundEndpoint
41-
implements RabbitTemplate.ConfirmCallback, ReturnCallback {
50+
implements ConfirmCallback, ReturnCallback {
51+
52+
private static final Duration DEFAULT_CONFIRM_TIMEOUT = Duration.ofSeconds(5);
4253

4354
private final AmqpTemplate amqpTemplate;
4455

4556
private final RabbitTemplate rabbitTemplate;
4657

4758
private boolean expectReply;
4859

60+
private boolean waitForConfirm;
61+
62+
private Duration waitForConfirmTimeout = DEFAULT_CONFIRM_TIMEOUT;
63+
4964
public AmqpOutboundEndpoint(AmqpTemplate amqpTemplate) {
5065
Assert.notNull(amqpTemplate, "amqpTemplate must not be null");
5166
this.amqpTemplate = amqpTemplate;
@@ -62,6 +77,19 @@ public void setExpectReply(boolean expectReply) {
6277
this.expectReply = expectReply;
6378
}
6479

80+
/**
81+
* Set to true if you want to block the calling thread until a publisher confirm has
82+
* been received. Requires a template configured for returns. If a confirm is not
83+
* received within the confirm timeout or a negative acknowledgment or returned
84+
* message is received, an exception will be thrown. Does not apply to the gateway
85+
* since it blocks awaiting the reply.
86+
* @param waitForConfirm true to block until the confirmation or timeout is received.
87+
* @since 5.2
88+
* @see #setConfirmTimeout(long)
89+
*/
90+
public void setWaitForConfirm(boolean waitForConfirm) {
91+
this.waitForConfirm = waitForConfirm;
92+
}
6593

6694
@Override
6795
public String getComponentType() {
@@ -86,6 +114,10 @@ protected void endpointInit() {
86114
"RabbitTemplate implementation is required for publisher confirms");
87115
this.rabbitTemplate.setReturnCallback(this);
88116
}
117+
Duration confirmTimeout = getConfirmTimeout();
118+
if (confirmTimeout != null) {
119+
this.waitForConfirmTimeout = confirmTimeout;
120+
}
89121
}
90122

91123
@Override
@@ -101,14 +133,39 @@ protected Object handleRequestMessage(Message<?> requestMessage) {
101133
String exchangeName = generateExchangeName(requestMessage);
102134
String routingKey = generateRoutingKey(requestMessage);
103135
if (this.expectReply) {
104-
return this.sendAndReceive(exchangeName, routingKey, requestMessage, correlationData);
136+
return sendAndReceive(exchangeName, routingKey, requestMessage, correlationData);
105137
}
106138
else {
107-
this.send(exchangeName, routingKey, requestMessage, correlationData);
139+
send(exchangeName, routingKey, requestMessage, correlationData);
140+
if (this.waitForConfirm && correlationData != null) {
141+
waitForConfirm(requestMessage, correlationData);
142+
}
108143
return null;
109144
}
110145
}
111146

147+
private void waitForConfirm(Message<?> requestMessage, CorrelationData correlationData) {
148+
try {
149+
Confirm confirm = correlationData.getFuture().get(this.waitForConfirmTimeout.toMillis(),
150+
TimeUnit.MILLISECONDS);
151+
if (!confirm.isAck()) {
152+
throw new AmqpException("Negative publisher confirm received: " + confirm);
153+
}
154+
if (correlationData.getReturnedMessage() != null) {
155+
throw new AmqpException("Message was returned by the broker");
156+
}
157+
}
158+
catch (@SuppressWarnings("unused") InterruptedException e) {
159+
Thread.currentThread().interrupt();
160+
}
161+
catch (ExecutionException e) {
162+
throw new AmqpException("Failed to get publisher confirm", e);
163+
}
164+
catch (TimeoutException e) {
165+
throw new MessageTimeoutException(requestMessage, this + ": Timed out awaiting publisher confirm", e);
166+
}
167+
}
168+
112169
private void send(String exchangeName, String routingKey,
113170
final Message<?> requestMessage, CorrelationData correlationData) {
114171
if (this.rabbitTemplate != null) {

spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp-5.2.xsd

+15
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,21 @@
5858
</xsd:appinfo>
5959
</xsd:annotation>
6060
</xsd:attribute>
61+
<xsd:attribute name="wait-for-confirm">
62+
<xsd:annotation>
63+
<xsd:appinfo>
64+
<xsd:documentation>
65+
Set to true if you want to block the calling thread until a publisher confirm has
66+
been received. Requires a template configured for returns. If a confirm is not
67+
received within the confirm timeout or a negative acknowledgment or returned
68+
message is received, an exception will be thrown.
69+
</xsd:documentation>
70+
<xsd:simpleType>
71+
<xsd:union memberTypes="xsd:boolean xsd:string" />
72+
</xsd:simpleType>
73+
</xsd:appinfo>
74+
</xsd:annotation>
75+
</xsd:attribute>
6176
</xsd:extension>
6277
</xsd:complexContent>
6378
</xsd:complexType>

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParserTests-context.xml

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
confirm-ack-channel="ackChannel"
6666
confirm-nack-channel="nackChannel"
6767
confirm-timeout="2000"
68+
wait-for-confirm="true"
6869
error-message-strategy="ems"/>
6970

7071
<bean id="ems" class="org.springframework.integration.support.DefaultErrorMessageStrategy" />

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpOutboundChannelAdapterParserTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public void parseWithPublisherConfirms() {
178178
assertThat(TestUtils.getPropertyValue(endpoint, "confirmAckChannel")).isSameAs(ackChannel);
179179
assertThat(TestUtils.getPropertyValue(endpoint, "confirmNackChannel")).isSameAs(nullChannel);
180180
assertThat(TestUtils.getPropertyValue(endpoint, "errorMessageStrategy")).isSameAs(context.getBean("ems"));
181+
assertThat(TestUtils.getPropertyValue(endpoint, "waitForConfirm", Boolean.class)).isFalse();
181182
}
182183

183184
@Test
@@ -191,6 +192,7 @@ public void parseWithPublisherConfirms2() {
191192
assertThat(TestUtils.getPropertyValue(endpoint, "confirmNackChannel")).isSameAs(nackChannel);
192193
assertThat(TestUtils.getPropertyValue(endpoint, "confirmTimeout")).isEqualTo(Duration.ofMillis(2000));
193194
assertThat(TestUtils.getPropertyValue(endpoint, "errorMessageStrategy")).isSameAs(context.getBean("ems"));
195+
assertThat(TestUtils.getPropertyValue(endpoint, "waitForConfirm", Boolean.class)).isTrue();
194196
}
195197

196198
@SuppressWarnings("rawtypes")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.outbound;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
21+
22+
import java.util.Collections;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.amqp.AmqpException;
27+
import org.springframework.amqp.core.Queue;
28+
import org.springframework.amqp.core.QueueBuilder;
29+
import org.springframework.amqp.core.QueueBuilder.Overflow;
30+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
31+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
32+
import org.springframework.amqp.rabbit.core.RabbitAdmin;
33+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
34+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
35+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
36+
import org.springframework.beans.factory.annotation.Autowired;
37+
import org.springframework.context.annotation.Bean;
38+
import org.springframework.context.annotation.Configuration;
39+
import org.springframework.integration.amqp.dsl.Amqp;
40+
import org.springframework.integration.config.EnableIntegration;
41+
import org.springframework.integration.dsl.IntegrationFlow;
42+
import org.springframework.messaging.MessageHandlingException;
43+
import org.springframework.messaging.support.GenericMessage;
44+
import org.springframework.test.annotation.DirtiesContext;
45+
import org.springframework.test.context.junit.jupiter.DisabledIf;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
/**
49+
* @author Gary Russell
50+
* @since 5.2
51+
*
52+
*/
53+
@SpringJUnitConfig
54+
@RabbitAvailable(queues = "testConfirmOk")
55+
@DirtiesContext
56+
public class AmqpOutboundEndpointTests2 {
57+
58+
@Test
59+
void testConfirmOk(@Autowired IntegrationFlow flow, @Autowired RabbitTemplate template) {
60+
flow.getInputChannel().send(new GenericMessage<>("test", Collections.singletonMap("rk", "testConfirmOk")));
61+
assertThat(template.receive("testConfirmOk")).isNotNull();
62+
}
63+
64+
@Test
65+
void testWithReturn(@Autowired IntegrationFlow flow) {
66+
assertThatThrownBy(() -> flow.getInputChannel()
67+
.send(new GenericMessage<>("test", Collections.singletonMap("rk", "junkjunk"))))
68+
.isInstanceOf(MessageHandlingException.class)
69+
.hasCauseInstanceOf(AmqpException.class)
70+
.extracting(ex -> ex.getCause())
71+
.extracting(ex -> ex.getMessage())
72+
.isEqualTo("Message was returned by the broker");
73+
}
74+
75+
@Test
76+
@DisabledIf("#{systemEnvironment['TRAVIS'] ?: false}") // needs RabbitMQ 3.7
77+
void testWithReject(@Autowired IntegrationFlow flow, @Autowired RabbitAdmin admin,
78+
@Autowired RabbitTemplate template) {
79+
80+
Queue queue = QueueBuilder.nonDurable().autoDelete().maxLength(1).overflow(Overflow.rejectPublish).build();
81+
admin.declareQueue(queue);
82+
flow.getInputChannel().send(new GenericMessage<>("test", Collections.singletonMap("rk", queue.getName())));
83+
assertThatThrownBy(() -> flow.getInputChannel()
84+
.send(new GenericMessage<>("test", Collections.singletonMap("rk", queue.getName()))))
85+
.hasCauseInstanceOf(AmqpException.class)
86+
.extracting(ex -> ex.getCause())
87+
.extracting(ex -> ex.getMessage())
88+
.matches(msg -> msg.matches("Negative publisher confirm received: .*"));
89+
assertThat(template.receive(queue.getName())).isNotNull();
90+
admin.deleteQueue(queue.getName());
91+
}
92+
93+
@Configuration(proxyBeanMethods = false)
94+
@EnableIntegration
95+
public static class Config {
96+
97+
@Bean
98+
public IntegrationFlow flow(RabbitTemplate template) {
99+
return f -> f.handle(Amqp.outboundAdapter(template)
100+
.exchangeName("")
101+
.routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
102+
.confirmCorrelationFunction(msg -> msg)
103+
.waitForConfirm(true));
104+
}
105+
106+
@Bean
107+
public CachingConnectionFactory cf() {
108+
CachingConnectionFactory ccf = new CachingConnectionFactory(
109+
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
110+
ccf.setPublisherConfirms(true);
111+
ccf.setPublisherReturns(true);
112+
return ccf;
113+
}
114+
115+
@Bean
116+
public RabbitTemplate template(ConnectionFactory cf) {
117+
RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
118+
rabbitTemplate.setMandatory(true);
119+
rabbitTemplate.setReceiveTimeout(10_000);
120+
return rabbitTemplate;
121+
}
122+
123+
@Bean
124+
public RabbitAdmin admin(ConnectionFactory cf) {
125+
return new RabbitAdmin(cf);
126+
}
127+
128+
}
129+
130+
}

src/reference/asciidoc/amqp.adoc

+16-11
Original file line numberDiff line numberDiff line change
@@ -547,12 +547,12 @@ The following example shows the available properties for an AMQP outbound channe
547547
confirm-ack-channel="" <11>
548548
confirm-nack-channel="" <12>
549549
confirm-timeout="" <13>
550-
return-channel="" <14>
551-
error-message-strategy="" <15>
552-
header-mapper="" <16>
553-
mapped-request-headers="" <17>
554-
lazy-connect="true" /> <18>
555-
550+
wait-for-confirm="" <14>
551+
return-channel="" <15>
552+
error-message-strategy="" <16>
553+
header-mapper="" <17>
554+
mapped-request-headers="" <18>
555+
lazy-connect="true" /> <19>
556556
----
557557
558558
<1> The unique ID for this adapter.
@@ -609,21 +609,26 @@ Optional (the default is `nullChannel`).
609609
<13> When set, the adapter will synthesize a negative acknowledgment (nack) if a publisher confirm is not received within this time in milliseconds.
610610
Pending confirms are checked every 50% of this value, so the actual time a nack is sent will be between 1x and 1.5x this value.
611611
Default none (nacks will not be generated).
612-
<14> The channel to which returned messages are sent.
612+
<14> When set to true, the calling thread will block, waiting for a publisher confirmation.
613+
This requires a `RabbitTemplate` configured for confirms as well as a `confirm-correlation-expression`.
614+
The thread will block for up to `confirm-timeout` (or 5 seconds by default).
615+
If a timeout occurs, a `MessageTimeoutException` will be thrown.
616+
If returns are enabled and a message is returned, or any other exception occurs while awaiting the confirm, a `MessageHandlingException` will be thrown, with an appropriate message.
617+
<15> The channel to which returned messages are sent.
613618
When provided, the underlying AMQP template is configured to return undeliverable messages to the adapter.
614619
When there is no `ErrorMessageStrategy` configured, the message is constructed from the data received from AMQP, with the following additional headers: `amqp_returnReplyCode`, `amqp_returnReplyText`, `amqp_returnExchange`, `amqp_returnRoutingKey`.
615620
When there is an `ErrorMessageStrategy`, the message is an `ErrorMessage` with a `ReturnedAmqpMessageException` payload.
616621
Optional.
617-
<15> A reference to an `ErrorMessageStrategy` implementation used to build `ErrorMessage` instances when sending returned or negatively acknowledged messages.
618-
<16> A reference to an `AmqpHeaderMapper` to use when sending AMQP Messages.
622+
<16> A reference to an `ErrorMessageStrategy` implementation used to build `ErrorMessage` instances when sending returned or negatively acknowledged messages.
623+
<17> A reference to an `AmqpHeaderMapper` to use when sending AMQP Messages.
619624
By default, only standard AMQP properties (such as `contentType`) are copied to the Spring Integration `MessageHeaders`.
620625
Any user-defined headers is not copied to the message by the default`DefaultAmqpHeaderMapper`.
621626
Not allowed if 'request-header-names' is provided.
622627
Optional.
623-
<17> Comma-separated list of names of AMQP Headers to be mapped from the `MessageHeaders` to the AMQP Message.
628+
<18> Comma-separated list of names of AMQP Headers to be mapped from the `MessageHeaders` to the AMQP Message.
624629
Not allowed if the 'header-mapper' reference is provided.
625630
The values in this list can also be simple patterns to be matched against the header names (e.g. `"\*"` or `"thing1*, thing2"` or `"*thing1"`).
626-
<18> When set to `false`, the endpoint attempts to connect to the broker during application context initialization.
631+
<19> When set to `false`, the endpoint attempts to connect to the broker during application context initialization.
627632
This allows "`fail fast`" detection of bad configuration but also causes initialization to fail if the broker is down.
628633
When `true` (the default), the connection is established (if it does not already exist because some other component established it) when the first message is sent.
629634
====

src/reference/asciidoc/whats-new.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ See <<./amqp.adoc#amqp-outbound-endpoints,Outbound Channel Adapter>> for more in
8989
The inbound channel adapter can now receive batched messages as a `List<?>` payload instead of receiving a discrete message for each batch fragment.
9090
See <<./amqp.adoc#amqp-debatching,Batched Messages>> for more information.
9191

92+
The outbound channel adapter can now be configured to block the calling thread until a publisher confirm (acknowledgment) is received.
93+
See <<./amqp.adoc#amqp-outbound-channel-adapter,Outbound Channel Adapter>> for more information.
94+
9295
[[x5.2-file]]
9396
==== File Changes
9497

0 commit comments

Comments
 (0)