Skip to content

Commit 845a7e0

Browse files
garyrussellartembilan
authored andcommitted
GH-1034: DMLC: Cancel consumer after failed ack
Resolves #1034 The monitor task now cancels the consumer after a failed ack/nack, whether or not the channel `isOpen()` returns true. Test with a mock channel that stays open after a failed ack. **cherry-pick to 2.1.x, 2.0.x**
1 parent eb85078 commit 845a7e0

File tree

2 files changed

+77
-11
lines changed

2 files changed

+77
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -472,11 +472,11 @@ private void checkConsumers(long now) {
472472
final List<SimpleConsumer> consumersToCancel;
473473
synchronized (this.consumersMonitor) {
474474
consumersToCancel = this.consumers.stream()
475-
.filter(c -> {
476-
boolean open = c.getChannel().isOpen();
475+
.filter(consumer -> {
476+
boolean open = consumer.getChannel().isOpen() && !consumer.isAckFailed();
477477
if (open && this.messagesPerAck > 1) {
478478
try {
479-
c.ackIfNecessary(now);
479+
consumer.ackIfNecessary(now);
480480
}
481481
catch (IOException e) {
482482
this.logger.error("Exception while sending delayed ack", e);
@@ -487,18 +487,18 @@ private void checkConsumers(long now) {
487487
.collect(Collectors.toList());
488488
}
489489
consumersToCancel
490-
.forEach(c -> {
490+
.forEach(consumer -> {
491491
try {
492-
RabbitUtils.closeMessageConsumer(c.getChannel(),
493-
Collections.singletonList(c.getConsumerTag()), isChannelTransacted());
492+
RabbitUtils.closeMessageConsumer(consumer.getChannel(),
493+
Collections.singletonList(consumer.getConsumerTag()), isChannelTransacted());
494494
}
495495
catch (Exception e) {
496496
if (logger.isDebugEnabled()) {
497-
logger.debug("Error closing consumer " + c, e);
497+
logger.debug("Error closing consumer " + consumer, e);
498498
}
499499
}
500-
this.logger.error("Consumer canceled - channel closed " + c);
501-
c.cancelConsumer("Consumer " + c + " channel closed");
500+
this.logger.error("Consumer canceled - channel closed " + consumer);
501+
consumer.cancelConsumer("Consumer " + consumer + " channel closed");
502502
});
503503
}
504504

@@ -813,7 +813,7 @@ private void cancelConsumer(SimpleConsumer consumer) {
813813
this.logger.debug("Canceling " + consumer);
814814
}
815815
synchronized (consumer) {
816-
consumer.canceled = true;
816+
consumer.setCanceled(true);
817817
if (this.messagesPerAck > 1) {
818818
consumer.ackIfNecessary(0L);
819819
}
@@ -887,6 +887,8 @@ final class SimpleConsumer extends DefaultConsumer {
887887

888888
private volatile boolean canceled;
889889

890+
private volatile boolean ackFailed;
891+
890892
private SimpleConsumer(Connection connection, Channel channel, String queue) {
891893
super(channel);
892894
this.connection = connection;
@@ -911,6 +913,23 @@ int getEpoch() {
911913
return this.epoch;
912914
}
913915

916+
/**
917+
* Set to true to indicate this consumer is canceled and should send any pending
918+
* acks.
919+
* @param canceled the canceled to set
920+
*/
921+
void setCanceled(boolean canceled) {
922+
this.canceled = canceled;
923+
}
924+
925+
/**
926+
* True if an ack/nack failed (probably due to a closed channel).
927+
* @return the ackFailed
928+
*/
929+
boolean isAckFailed() {
930+
return this.ackFailed;
931+
}
932+
914933
/**
915934
* Increment and return the current epoch for this consumer; consumersMonitor must
916935
* be held.
@@ -1069,6 +1088,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
10691088
}
10701089
}
10711090
catch (Exception e) {
1091+
this.ackFailed = true;
10721092
this.logger.error("Error acking", e);
10731093
}
10741094
}
@@ -1079,7 +1099,7 @@ else if (!isChannelTransacted() || isLocallyTransacted) {
10791099
* @param now the current time.
10801100
* @throws IOException if one occurs.
10811101
*/
1082-
private synchronized void ackIfNecessary(long now) throws IOException {
1102+
synchronized void ackIfNecessary(long now) throws IOException {
10831103
if (this.pendingAcks >= this.messagesPerAck || (
10841104
this.pendingAcks > 0 && (now - this.lastAck > this.ackTimeout || this.canceled))) {
10851105
sendAck(now);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainerMockTests.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.ArgumentMatchers.eq;
2727
import static org.mockito.BDDMockito.given;
2828
import static org.mockito.BDDMockito.willAnswer;
29+
import static org.mockito.BDDMockito.willThrow;
2930
import static org.mockito.Mockito.mock;
3031
import static org.mockito.Mockito.times;
3132
import static org.mockito.Mockito.verify;
@@ -40,6 +41,7 @@
4041
import org.junit.Test;
4142
import org.mockito.Mockito;
4243

44+
import org.springframework.amqp.core.MessageListener;
4345
import org.springframework.amqp.rabbit.connection.ChannelProxy;
4446
import org.springframework.amqp.rabbit.connection.Connection;
4547
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -263,6 +265,50 @@ public void testRemoveQueuesWhileNotConnected() throws Exception {
263265
container.stop();
264266
}
265267

268+
@Test
269+
public void testMonitorCancelsAfterBadAckEvenIfChannelReportsOpen() throws Exception {
270+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
271+
Connection connection = mock(Connection.class);
272+
ChannelProxy channel = mock(ChannelProxy.class);
273+
Channel rabbitChannel = mock(Channel.class);
274+
given(channel.getTargetChannel()).willReturn(rabbitChannel);
275+
276+
given(connectionFactory.createConnection()).willReturn(connection);
277+
given(connection.createChannel(anyBoolean())).willReturn(channel);
278+
given(channel.isOpen()).willReturn(true);
279+
given(channel.queueDeclarePassive(Mockito.anyString()))
280+
.willAnswer(invocation -> mock(AMQP.Queue.DeclareOk.class));
281+
AtomicReference<Consumer> consumer = new AtomicReference<>();
282+
final CountDownLatch latch1 = new CountDownLatch(1);
283+
final CountDownLatch latch2 = new CountDownLatch(1);
284+
willAnswer(inv -> {
285+
consumer.set(inv.getArgument(6));
286+
latch1.countDown();
287+
return "consumerTag";
288+
}).given(channel).basicConsume(anyString(), anyBoolean(), anyString(), anyBoolean(), anyBoolean(),
289+
anyMap(), any(Consumer.class));
290+
291+
willThrow(new RuntimeException("bad ack")).given(channel).basicAck(1L, false);
292+
willAnswer(inv -> {
293+
consumer.get().handleCancelOk("consumerTag");
294+
latch2.countDown();
295+
return null;
296+
}).given(channel).basicCancel("consumerTag");
297+
298+
DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
299+
container.setQueueNames("test");
300+
container.setPrefetchCount(2);
301+
container.setMonitorInterval(100);
302+
container.setMessageListener(mock(MessageListener.class));
303+
container.afterPropertiesSet();
304+
container.start();
305+
306+
assertThat(latch1.await(10, TimeUnit.SECONDS)).isTrue();
307+
consumer.get().handleDelivery("consumerTag", envelope(1L), new BasicProperties(), new byte[1]);
308+
assertThat(latch2.await(10, TimeUnit.SECONDS)).isTrue();
309+
container.stop();
310+
}
311+
266312
private Envelope envelope(long tag) {
267313
return new Envelope(tag, false, "", "");
268314
}

0 commit comments

Comments
 (0)