Skip to content

Commit 239161b

Browse files
committed
Deleting of expired messages
1 parent 50f3f07 commit 239161b

6 files changed

+69
-1
lines changed

Diff for: pkg/dbal/DbalConsumer.php

+4
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ public function receiveNoWait(): ?Message
7878
{
7979
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
8080

81+
$this->removeExpiredMessages();
8182
$this->redeliverMessages();
8283

8384
// get top message from the queue
@@ -108,6 +109,9 @@ public function reject(Message $message, bool $requeue = false): void
108109
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
109110

110111
if ($requeue) {
112+
$message = clone $message;
113+
$message->setRedelivered(false);
114+
111115
$this->getContext()->createProducer()->send($this->queue, $message);
112116

113117
return;

Diff for: pkg/dbal/DbalConsumerHelperTrait.php

+11
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,15 @@ protected function redeliverMessages(): void
9292
->execute()
9393
;
9494
}
95+
96+
protected function removeExpiredMessages(): void
97+
{
98+
$this->getConnection()->createQueryBuilder()
99+
->delete($this->getContext()->getTableName())
100+
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)')
101+
->setParameter(':now', (int) time(), Type::BIGINT)
102+
->setParameter('redelivered', false, Type::BOOLEAN)
103+
->execute()
104+
;
105+
}
95106
}

Diff for: pkg/dbal/DbalProducer.php

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public function send(Destination $destination, Message $message): void
8787
'properties' => JSON::encode($message->getProperties()),
8888
'priority' => $message->getPriority(),
8989
'queue' => $destination->getQueueName(),
90+
'redelivered' => false,
9091
'delivery_id' => null,
9192
'redeliver_after' => null,
9293
];
@@ -134,6 +135,7 @@ public function send(Destination $destination, Message $message): void
134135
'queue' => Type::STRING,
135136
'time_to_live' => Type::INTEGER,
136137
'delayed_until' => Type::INTEGER,
138+
'redelivered' => Type::BOOLEAN,
137139
'delivery_id' => Type::STRING,
138140
'redeliver_after' => Type::BIGINT,
139141
]);

Diff for: pkg/dbal/DbalSubscriptionConsumer.php

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public function consume(int $timeout = 0): void
8484
}
8585

8686
$now = time();
87+
$this->removeExpiredMessages();
8788
$this->redeliverMessages();
8889

8990
if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) {

Diff for: pkg/dbal/Tests/DbalConsumerTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue()
169169
$producerMock
170170
->expects($this->once())
171171
->method('send')
172-
->with($this->identicalTo($queue), $this->identicalTo($message))
172+
->with($this->identicalTo($queue), $this->isInstanceOf($message))
173173
;
174174

175175
$context = $this->createContextMock();

Diff for: pkg/dbal/Tests/Functional/DbalConsumerTest.php

+50
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Dbal\Tests\Functional;
46

57
use Enqueue\Dbal\DbalContext;
@@ -101,4 +103,52 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate()
101103
$consumer->acknowledge($message);
102104
$this->assertSame($expectedPriority5Body, $message->getBody());
103105
}
106+
107+
public function testShouldDeleteExpiredMessage()
108+
{
109+
$context = $this->context;
110+
$queue = $context->createQueue(__METHOD__);
111+
112+
$consumer = $context->createConsumer($queue);
113+
114+
// guard
115+
$this->assertNull($consumer->receiveNoWait());
116+
117+
$producer = $context->createProducer();
118+
119+
$this->context->getDbalConnection()->insert(
120+
$this->context->getTableName(), [
121+
'id' => 'id',
122+
'human_id' => 'id',
123+
'published_at' => '123',
124+
'body' => 'expiredMessage',
125+
'headers' => json_encode([]),
126+
'properties' => json_encode([]),
127+
'queue' => __METHOD__,
128+
'redelivered' => 0,
129+
'time_to_live' => time() - 10000,
130+
]);
131+
132+
$message = $context->createMessage('notExpiredMessage');
133+
$message->setRedelivered(false);
134+
$producer->send($queue, $message);
135+
136+
$this->assertSame('2', $this->getQuerySize());
137+
138+
$message = $consumer->receive(8000);
139+
140+
$this->assertSame('1', $this->getQuerySize());
141+
142+
$consumer->acknowledge($message);
143+
144+
$this->assertSame('0', $this->getQuerySize());
145+
}
146+
147+
private function getQuerySize(): string
148+
{
149+
return $this->context->getDbalConnection()
150+
->executeQuery('SELECT count(*) FROM '.$this->context->getTableName())
151+
->fetchColumn(0)
152+
;
153+
}
104154
}

0 commit comments

Comments
 (0)