Skip to content

Commit 1f96246

Browse files
authored
Merge pull request #613 from php-enqueue/dbal-concurent-fetch
[dbal] Use concurrent fetch message approach (no transaction, no pessimistic lock)
2 parents 2fd6cc2 + 2445ef2 commit 1f96246

File tree

8 files changed

+179
-139
lines changed

8 files changed

+179
-139
lines changed

pkg/dbal/DbalConsumer.php

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
namespace Enqueue\Dbal;
66

77
use Doctrine\DBAL\Connection;
8-
use Doctrine\DBAL\Types\Type;
98
use Interop\Queue\Consumer;
109
use Interop\Queue\Exception\InvalidMessageException;
1110
use Interop\Queue\Impl\ConsumerPollingTrait;
@@ -81,14 +80,7 @@ public function receiveNoWait(): ?Message
8180
$this->removeExpiredMessages();
8281
$this->redeliverMessages();
8382

84-
// get top message from the queue
85-
if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) {
86-
if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) {
87-
return $this->getContext()->convertMessage($message);
88-
}
89-
}
90-
91-
return null;
83+
return $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay);
9284
}
9385

9486
/**
@@ -129,17 +121,4 @@ protected function getConnection(): Connection
129121
{
130122
return $this->dbal;
131123
}
132-
133-
private function deleteMessage(string $deliveryId): void
134-
{
135-
if (empty($deliveryId)) {
136-
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
137-
}
138-
139-
$this->getConnection()->delete(
140-
$this->getContext()->getTableName(),
141-
['delivery_id' => $deliveryId],
142-
['delivery_id' => Type::STRING]
143-
);
144-
}
145124
}

pkg/dbal/DbalConsumerHelperTrait.php

Lines changed: 102 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -11,96 +11,137 @@
1111

1212
trait DbalConsumerHelperTrait
1313
{
14+
private $redeliverMessagesLastExecutedAt;
15+
16+
private $removeExpiredMessagesLastExecutedAt;
17+
1418
abstract protected function getContext(): DbalContext;
1519

1620
abstract protected function getConnection(): Connection;
1721

18-
protected function fetchMessage(array $queues, int $redeliveryDelay): ?array
22+
protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessage
1923
{
24+
if (empty($queues)) {
25+
throw new \LogicException('Queues must not be empty.');
26+
}
27+
2028
$now = time();
21-
$deliveryId = (string) Uuid::uuid1();
22-
23-
$this->getConnection()->beginTransaction();
24-
25-
try {
26-
$query = $this->getConnection()->createQueryBuilder()
27-
->select('*')
28-
->from($this->getContext()->getTableName())
29-
->andWhere('delivery_id IS NULL')
30-
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
31-
->andWhere('queue IN (:queues)')
32-
->addOrderBy('priority', 'desc')
33-
->addOrderBy('published_at', 'asc')
34-
->setMaxResults(1);
35-
36-
// select for update
37-
$message = $this->getConnection()->executeQuery(
38-
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
39-
['delayedUntil' => $now, 'queues' => array_values($queues)],
40-
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY]
41-
)->fetch();
42-
43-
if (!$message) {
44-
$this->getConnection()->commit();
29+
$deliveryId = Uuid::uuid4();
30+
31+
$endAt = microtime(true) + 0.2; // add 200ms
32+
33+
$select = $this->getConnection()->createQueryBuilder()
34+
->select('id')
35+
->from($this->getContext()->getTableName())
36+
->andWhere('queue IN (:queues)')
37+
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
38+
->andWhere('delivery_id IS NULL')
39+
->addOrderBy('priority', 'asc')
40+
->addOrderBy('published_at', 'asc')
41+
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
42+
->setParameter('delayedUntil', $now, ParameterType::INTEGER)
43+
->setMaxResults(1);
44+
45+
$update = $this->getConnection()->createQueryBuilder()
46+
->update($this->getContext()->getTableName())
47+
->set('delivery_id', ':deliveryId')
48+
->set('redeliver_after', ':redeliverAfter')
49+
->andWhere('id = :messageId')
50+
->andWhere('delivery_id IS NULL')
51+
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
52+
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
53+
;
4554

55+
while (microtime(true) < $endAt) {
56+
$result = $select->execute()->fetch();
57+
if (empty($result)) {
4658
return null;
4759
}
4860

49-
// mark message as delivered to consumer
50-
$this->getConnection()->createQueryBuilder()
51-
->andWhere('id = :id')
52-
->update($this->getContext()->getTableName())
53-
->set('delivery_id', ':deliveryId')
54-
->set('redeliver_after', ':redeliverAfter')
55-
->setParameter('id', $message['id'], Type::GUID)
56-
->setParameter('deliveryId', $deliveryId, Type::STRING)
57-
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
58-
->execute()
61+
$update
62+
->setParameter('messageId', $result['id'], Type::GUID)
5963
;
6064

61-
$this->getConnection()->commit();
62-
63-
$deliveredMessage = $this->getConnection()->createQueryBuilder()
64-
->select('*')
65-
->from($this->getContext()->getTableName())
66-
->andWhere('delivery_id = :deliveryId')
67-
->setParameter('deliveryId', $deliveryId, Type::STRING)
68-
->setMaxResults(1)
69-
->execute()
70-
->fetch()
71-
;
72-
73-
return $deliveredMessage ?: null;
74-
} catch (\Exception $e) {
75-
$this->getConnection()->rollBack();
76-
77-
throw $e;
65+
if ($update->execute()) {
66+
$deliveredMessage = $this->getConnection()->createQueryBuilder()
67+
->select('*')
68+
->from($this->getContext()->getTableName())
69+
->andWhere('delivery_id = :deliveryId')
70+
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
71+
->setMaxResults(1)
72+
->execute()
73+
->fetch()
74+
;
75+
76+
if (false == $deliveredMessage) {
77+
throw new \LogicException('There must be a message at all times at this stage but there is no a message.');
78+
}
79+
80+
if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) {
81+
return $this->getContext()->convertMessage($deliveredMessage);
82+
}
83+
}
7884
}
85+
86+
return null;
7987
}
8088

8189
protected function redeliverMessages(): void
8290
{
83-
$this->getConnection()->createQueryBuilder()
91+
if (null === $this->redeliverMessagesLastExecutedAt) {
92+
$this->redeliverMessagesLastExecutedAt = microtime(true);
93+
} elseif ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) {
94+
return;
95+
}
96+
97+
$update = $this->getConnection()->createQueryBuilder()
8498
->update($this->getContext()->getTableName())
8599
->set('delivery_id', ':deliveryId')
86100
->set('redelivered', ':redelivered')
87-
->andWhere('delivery_id IS NOT NULL')
88101
->andWhere('redeliver_after < :now')
89-
->setParameter(':now', (int) time(), Type::BIGINT)
90-
->setParameter('deliveryId', null, Type::STRING)
102+
->andWhere('delivery_id IS NOT NULL')
103+
->setParameter(':now', time(), Type::BIGINT)
104+
->setParameter('deliveryId', null, Type::GUID)
91105
->setParameter('redelivered', true, Type::BOOLEAN)
92-
->execute()
93106
;
107+
108+
$update->execute();
109+
110+
$this->redeliverMessagesLastExecutedAt = microtime(true);
94111
}
95112

96113
protected function removeExpiredMessages(): void
97114
{
98-
$this->getConnection()->createQueryBuilder()
115+
if (null === $this->removeExpiredMessagesLastExecutedAt) {
116+
$this->removeExpiredMessagesLastExecutedAt = microtime(true);
117+
} elseif ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) {
118+
return;
119+
}
120+
121+
$delete = $this->getConnection()->createQueryBuilder()
99122
->delete($this->getContext()->getTableName())
100123
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)')
101-
->setParameter(':now', (int) time(), Type::BIGINT)
102-
->setParameter('redelivered', false, Type::BOOLEAN)
103-
->execute()
124+
->andWhere('delivery_id IS NULL')
125+
->andWhere('redelivered = false')
126+
127+
->setParameter(':now', time(), Type::BIGINT)
104128
;
129+
130+
$delete->execute();
131+
132+
$this->removeExpiredMessagesLastExecutedAt = microtime(true);
133+
}
134+
135+
private function deleteMessage(string $deliveryId): void
136+
{
137+
if (empty($deliveryId)) {
138+
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
139+
}
140+
141+
$this->getConnection()->delete(
142+
$this->getContext()->getTableName(),
143+
['delivery_id' => Uuid::fromString($deliveryId)->getBytes()],
144+
['delivery_id' => Type::GUID]
145+
);
105146
}
106147
}

pkg/dbal/DbalContext.php

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Interop\Queue\Queue;
1818
use Interop\Queue\SubscriptionConsumer;
1919
use Interop\Queue\Topic;
20+
use Ramsey\Uuid\Uuid;
2021

2122
class DbalContext implements Context
2223
{
@@ -141,32 +142,38 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
141142
/**
142143
* @internal It must be used here and in the consumer only
143144
*/
144-
public function convertMessage(array $dbalMessage): DbalMessage
145+
public function convertMessage(array $arrayMessage): DbalMessage
145146
{
146-
/** @var DbalMessage $dbalMessageObj */
147-
$dbalMessageObj = $this->createMessage(
148-
$dbalMessage['body'],
149-
$dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [],
150-
$dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : []
147+
/** @var DbalMessage $message */
148+
$message = $this->createMessage(
149+
$arrayMessage['body'],
150+
$arrayMessage['properties'] ? JSON::decode($arrayMessage['properties']) : [],
151+
$arrayMessage['headers'] ? JSON::decode($arrayMessage['headers']) : []
151152
);
152153

153-
if (isset($dbalMessage['redelivered'])) {
154-
$dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']);
154+
if (isset($arrayMessage['id'])) {
155+
$message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString());
155156
}
156-
if (isset($dbalMessage['priority'])) {
157-
$dbalMessageObj->setPriority((int) $dbalMessage['priority']);
157+
if (isset($arrayMessage['queue'])) {
158+
$message->setQueue($arrayMessage['queue']);
158159
}
159-
if (isset($dbalMessage['published_at'])) {
160-
$dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']);
160+
if (isset($arrayMessage['redelivered'])) {
161+
$message->setRedelivered((bool) $arrayMessage['redelivered']);
161162
}
162-
if (isset($dbalMessage['delivery_id'])) {
163-
$dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']);
163+
if (isset($arrayMessage['priority'])) {
164+
$message->setPriority((int) (-1 * $arrayMessage['priority']));
164165
}
165-
if (isset($dbalMessage['redeliver_after'])) {
166-
$dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']);
166+
if (isset($arrayMessage['published_at'])) {
167+
$message->setPublishedAt((int) $arrayMessage['published_at']);
168+
}
169+
if (isset($arrayMessage['delivery_id'])) {
170+
$message->setDeliveryId(Uuid::fromBytes($arrayMessage['delivery_id'])->toString());
171+
}
172+
if (isset($arrayMessage['redeliver_after'])) {
173+
$message->setRedeliverAfter((int) $arrayMessage['redeliver_after']);
167174
}
168175

169-
return $dbalMessageObj;
176+
return $message;
170177
}
171178

172179
/**
@@ -218,8 +225,7 @@ public function createDataBaseTable(): void
218225

219226
$table = new Table($this->getTableName());
220227

221-
$table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]);
222-
$table->addColumn('human_id', Type::STRING, ['length' => 36]);
228+
$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
223229
$table->addColumn('published_at', Type::BIGINT);
224230
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
225231
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
@@ -229,17 +235,14 @@ public function createDataBaseTable(): void
229235
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
230236
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
231237
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
232-
$table->addColumn('delivery_id', Type::STRING, ['notnull' => false]);
238+
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
233239
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);
234240

235241
$table->setPrimaryKey(['id']);
236-
$table->addIndex(['published_at']);
237-
$table->addIndex(['queue']);
238-
$table->addIndex(['priority']);
239-
$table->addIndex(['delayed_until']);
240-
$table->addIndex(['priority', 'published_at']);
241-
$table->addIndex(['redeliver_after']);
242-
$table->addUniqueIndex(['delivery_id']);
242+
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);
243+
244+
$table->addIndex(['redeliver_after', 'delivery_id']);
245+
$table->addIndex(['time_to_live', 'delivery_id']);
243246

244247
$sm->createTable($table);
245248
}

pkg/dbal/DbalMessage.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ class DbalMessage implements Message
5353
*/
5454
private $deliveryId;
5555

56+
/**
57+
* @var string|null
58+
*/
59+
private $queue;
60+
5661
/**
5762
* Milliseconds, for example 15186054527288.
5863
*
@@ -249,4 +254,14 @@ public function setPublishedAt(int $publishedAt = null): void
249254
{
250255
$this->publishedAt = $publishedAt;
251256
}
257+
258+
public function getQueue(): ?string
259+
{
260+
return $this->queue;
261+
}
262+
263+
public function setQueue(?string $queue): void
264+
{
265+
$this->queue = $queue;
266+
}
252267
}

pkg/dbal/DbalProducer.php

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public function send(Destination $destination, Message $message): void
7171
}
7272

7373
$body = $message->getBody();
74-
$uuid = Uuid::uuid1();
74+
$uuid = Uuid::uuid4();
7575

7676
$publishedAt = null !== $message->getPublishedAt() ?
7777
$message->getPublishedAt() :
@@ -80,12 +80,11 @@ public function send(Destination $destination, Message $message): void
8080

8181
$dbalMessage = [
8282
'id' => $this->uuidCodec->encodeBinary($uuid),
83-
'human_id' => $uuid->toString(),
8483
'published_at' => $publishedAt,
8584
'body' => $body,
8685
'headers' => JSON::encode($message->getHeaders()),
8786
'properties' => JSON::encode($message->getProperties()),
88-
'priority' => $message->getPriority(),
87+
'priority' => -1 * $message->getPriority(),
8988
'queue' => $destination->getQueueName(),
9089
'redelivered' => false,
9190
'delivery_id' => null,

0 commit comments

Comments
 (0)