From c641c2a3d33803d3f22e489f9f54cd2c2e045d58 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 23 Oct 2018 20:16:28 +0300 Subject: [PATCH 1/9] Introduce redelivery support based on visibility approach --- pkg/dbal/DbalConsumer.php | 194 ++++++++---------- pkg/dbal/DbalContext.php | 16 ++ pkg/dbal/DbalMessage.php | 32 +++ pkg/dbal/DbalProducer.php | 4 + pkg/dbal/DbalSubscriptionConsumer.php | 107 +++++----- pkg/dbal/Tests/DbalConsumerTest.php | 12 ++ .../Tests/Functional/DbalConsumerTest.php | 4 +- 7 files changed, 209 insertions(+), 160 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index c8e1b02d4..d611ddf42 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -6,13 +6,18 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Types\Type; +use function GuzzleHttp\Psr7\str; use Interop\Queue\Consumer; use Interop\Queue\Exception\InvalidMessageException; +use Interop\Queue\Impl\ConsumerPollingTrait; use Interop\Queue\Message; use Interop\Queue\Queue; +use Ramsey\Uuid\Uuid; class DbalConsumer implements Consumer { + use ConsumerPollingTrait; + /** * @var DbalContext */ @@ -29,9 +34,11 @@ class DbalConsumer implements Consumer private $queue; /** + * Default 20 minutes in milliseconds. + * * @var int */ - private $pollingInterval; + private $redeliveryDelay; public function __construct(DbalContext $context, DbalDestination $queue) { @@ -39,23 +46,25 @@ public function __construct(DbalContext $context, DbalDestination $queue) $this->queue = $queue; $this->dbal = $this->context->getDbalConnection(); - $this->pollingInterval = 1000; + $this->redeliveryDelay = 1200000; } /** - * Polling interval is in milliseconds. + * Get interval between retry failed messages in milliseconds. */ - public function setPollingInterval(int $interval): void + public function getRedeliveryDelay(): int { - $this->pollingInterval = $interval; + return $this->redeliveryDelay; } /** - * Get polling interval in milliseconds. + * Interval between retry failed messages in seconds. */ - public function getPollingInterval(): int + public function setRedeliveryDelay(int $redeliveryDelay): self { - return $this->pollingInterval; + $this->redeliveryDelay = $redeliveryDelay; + + return $this; } /** @@ -66,33 +75,59 @@ public function getQueue(): Queue return $this->queue; } - public function receive(int $timeout = 0): ?Message + public function receiveNoWait(): ?Message { - $timeout /= 1000; - $startAt = microtime(true); + $this->redeliverMessages(); + + $this->dbal->beginTransaction(); + try { + $now = (int) time(); + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $deliveryId = (string) Uuid::uuid1(); - while (true) { - $message = $this->receiveMessage(); + // get top message from the queue + $message = $this->fetchMessage($now); - if ($message) { - return $message; - } + if (null == $message) { + $this->dbal->commit(); - if ($timeout && (microtime(true) - $startAt) >= $timeout) { return null; } - usleep($this->pollingInterval * 1000); + // mark message as delivered to consumer + $this->dbal->createQueryBuilder() + ->update($this->context->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->andWhere('id = :id') + ->setParameter('id', $message['id'], Type::GUID) + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ->execute() + ; + + $dbalMessage = $this->dbal->createQueryBuilder() + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch() + ; - if ($timeout && (microtime(true) - $startAt) >= $timeout) { - return null; + $this->dbal->commit(); + + if ($message->isRedelivered() || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) { + return $this->context->convertMessage($dbalMessage); } - } - } - public function receiveNoWait(): ?Message - { - return $this->receiveMessage(); + return null; + } catch (\Exception $e) { + $this->dbal->rollBack(); + + throw $e; + } } /** @@ -100,7 +135,7 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { - // does nothing + $this->deleteMessage($message->getDeliveryId()); } /** @@ -115,102 +150,51 @@ public function reject(Message $message, bool $requeue = false): void return; } + + $this->deleteMessage($message->getDeliveryId()); } - protected function receiveMessage(): ?DbalMessage + private function deleteMessage(?string $deliveryId): void { - $this->dbal->beginTransaction(); - try { - $now = time(); - - $dbalMessage = $this->fetchPrioritizedMessage($now) ?: $dbalMessage = $this->fetchMessage($now); - if (false == $dbalMessage) { - $this->dbal->commit(); - - return null; - } - - // remove message - $affectedRows = $this->dbal->delete($this->context->getTableName(), ['id' => $dbalMessage['id']], [ - 'id' => Type::GUID, - ]); - - if (1 !== $affectedRows) { - throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $dbalMessage['id'])); - } - - $this->dbal->commit(); - - if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) { - return $this->context->convertMessage($dbalMessage); - } - - return null; - } catch (\Exception $e) { - $this->dbal->rollBack(); - - throw $e; - } + $this->dbal->delete( + $this->context->getTableName(), + ['delivery_id' => $deliveryId], + ['delivery_id' => Type::STRING] + ); } - private function fetchPrioritizedMessage(int $now): ?array + private function fetchMessage(int $now): ?array { - $query = $this->dbal->createQueryBuilder(); - $query + $result = $this->dbal->createQueryBuilder() ->select('*') ->from($this->context->getTableName()) + ->andWhere('delivery_id IS NULL') ->andWhere('queue = :queue') - ->andWhere('priority IS NOT NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setParameter('queue', $this->queue->getQueueName(), Type::STRING) + ->setParameter('delayedUntil', $now, Type::BIGINT) ->setMaxResults(1) + ->execute() + ->fetch() ; - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queue' => $this->queue->getQueueName(), - 'delayedUntil' => $now, - ], - [ - 'queue' => Type::STRING, - 'delayedUntil' => Type::INTEGER, - ] - )->fetch(); - return $result ?: null; } - private function fetchMessage(int $now): ?array + private function redeliverMessages(): void { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue = :queue') - ->andWhere('priority IS NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1) + $this->dbal->createQueryBuilder() + ->update($this->context->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redelivered', ':redelivered') + ->andWhere('delivery_id IS NOT NULL') + ->andWhere('redeliver_after < :now') + ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter('deliveryId', null, Type::STRING) + ->setParameter('redelivered', true, Type::BOOLEAN) + ->execute() ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queue' => $this->queue->getQueueName(), - 'delayedUntil' => $now, - ], - [ - 'queue' => Type::STRING, - 'delayedUntil' => Type::INTEGER, - ] - )->fetch(); - - return $result ?: null; } } diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 5a715b288..bb2dfa9ed 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -116,6 +116,10 @@ public function createConsumer(Destination $destination): Consumer $consumer->setPollingInterval($this->config['polling_interval']); } + if (isset($this->config['redelivery_delay'])) { + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); + } + return $consumer; } @@ -133,6 +137,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function convertMessage(array $dbalMessage): DbalMessage { + /** @var DbalMessage $dbalMessageObj */ $dbalMessageObj = $this->createMessage( $dbalMessage['body'], $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], @@ -148,6 +153,12 @@ public function convertMessage(array $dbalMessage): DbalMessage if (isset($dbalMessage['published_at'])) { $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); } + if (isset($dbalMessage['delivery_id'])) { + $dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']); + } + if (isset($dbalMessage['redeliver_after'])) { + $dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']); + } return $dbalMessageObj; } @@ -212,6 +223,8 @@ public function createDataBaseTable(): void $table->addColumn('priority', Type::SMALLINT, ['notnull' => false]); $table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]); $table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]); + $table->addColumn('delivery_id', Type::STRING, ['notnull' => false]); + $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); $table->addIndex(['published_at']); @@ -219,6 +232,9 @@ public function createDataBaseTable(): void $table->addIndex(['priority']); $table->addIndex(['delayed_until']); $table->addIndex(['priority', 'published_at']); + $table->addIndex(['redeliver_after']); + $table->addUniqueIndex(['delivery_id']); + $table->addIndex(['delivery_id']); $sm->createTable($table); } diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 88b49c588..8464d56e3 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -38,11 +38,21 @@ class DbalMessage implements Message */ private $deliveryDelay; + /** + * @var int seconds + */ + private $redeliverAfter; + /** * @var int milliseconds */ private $timeToLive; + /** + * @var null|string + */ + private $deliveryId; + /** * Milliseconds, for example 15186054527288. * @@ -65,6 +75,8 @@ public function __construct(string $body = '', array $properties = [], array $he $this->redelivered = false; $this->priority = null; $this->deliveryDelay = null; + $this->deliveryId = null; + $this->redeliverAfter = null; } public function setBody(string $body): void @@ -208,6 +220,26 @@ public function setTimestamp(int $timestamp = null): void $this->setHeader('timestamp', $timestamp); } + public function getDeliveryId(): ?string + { + return $this->deliveryId; + } + + public function setDeliveryId(?string $deliveryId = null): void + { + $this->deliveryId = $deliveryId; + } + + public function getRedeliverAfter(): int + { + return $this->redeliverAfter; + } + + public function setRedeliverAfter(int $redeliverAfter = null): void + { + $this->redeliverAfter = $redeliverAfter; + } + public function getPublishedAt(): ?int { return $this->publishedAt; diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index ddf567a8a..b34719c83 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -87,6 +87,8 @@ public function send(Destination $destination, Message $message): void 'properties' => JSON::encode($message->getProperties()), 'priority' => $message->getPriority(), 'queue' => $destination->getQueueName(), + 'delivery_id' => null, + 'redeliver_after' => null, ]; $delay = $message->getDeliveryDelay(); @@ -132,6 +134,8 @@ public function send(Destination $destination, Message $message): void 'queue' => Type::STRING, 'time_to_live' => Type::INTEGER, 'delayed_until' => Type::INTEGER, + 'delivery_id' => Type::STRING, + 'redeliver_after' => Type::BIGINT, ]); } catch (\Exception $e) { throw new Exception('The transport fails to send the message due to some internal error.', null, $e); diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index a822cee04..8ca063fd7 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -7,6 +7,7 @@ use Doctrine\DBAL\Types\Type; use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; +use Ramsey\Uuid\Uuid; class DbalSubscriptionConsumer implements SubscriptionConsumer { @@ -27,6 +28,13 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer */ private $dbal; + /** + * Default 20 minutes in milliseconds. + * + * @var int + */ + private $redeliveryDelay; + /** * @param DbalContext $context */ @@ -35,6 +43,16 @@ public function __construct(DbalContext $context) $this->context = $context; $this->dbal = $this->context->getDbalConnection(); $this->subscribers = []; + + $this->redeliveryDelay = 1200000; + } + + /** + * Get interval between retry failed messages in milliseconds. + */ + public function getRedeliveryDelay(): int + { + return $this->redeliveryDelay; } public function consume(int $timeout = 0): void @@ -43,8 +61,10 @@ public function consume(int $timeout = 0): void throw new \LogicException('No subscribers'); } - $timeout = (int) ceil($timeout / 1000); - $endAt = time() + $timeout; + $now = time(); + $timeout /= 1000; + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $deliveryId = (string) Uuid::uuid1(); $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { @@ -57,10 +77,30 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - $message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames); + $message = $this->fetchMessage($currentQueueNames); if ($message) { - $this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]); + // mark message as delivered to consumer + $this->dbal->createQueryBuilder() + ->update($this->context->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->andWhere('id = :id') + ->setParameter('id', $message['id'], Type::GUID) + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ->execute() + ; + + $message = $this->dbal->createQueryBuilder() + ->select('*') + ->from($this->context->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch() + ; $dbalMessage = $this->context->convertMessage($message); @@ -81,7 +121,7 @@ public function consume(int $timeout = 0): void usleep(200000); // 200ms } - if ($timeout && microtime(true) >= $endAt) { + if ($timeout && microtime(true) >= $now + $timeout) { return; } } @@ -137,62 +177,21 @@ public function unsubscribeAll(): void private function fetchMessage(array $queues): ?array { - $query = $this->dbal->createQueryBuilder(); - $query - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('queue IN (:queues)') - ->andWhere('priority IS NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1) - ; - - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queues' => array_keys($queues), - 'delayedUntil' => time(), - ], - [ - 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, - 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, - ] - )->fetch(); - - return $result ?: null; - } - - private function fetchPrioritizedMessage(array $queues): ?array - { - $query = $this->dbal->createQueryBuilder(); - $query + $result = $this->dbal->createQueryBuilder() ->select('*') ->from($this->context->getTableName()) + ->andWhere('delivery_id IS NULL') ->andWhere('queue IN (:queues)') - ->andWhere('priority IS NOT NULL') - ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') - ->addOrderBy('published_at', 'asc') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setParameter('queues', array_keys($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) + ->setParameter('delayedUntil', time(), \Doctrine\DBAL\ParameterType::INTEGER) ->setMaxResults(1) + ->execute() + ->fetch() ; - $sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL(); - - $result = $this->dbal->executeQuery( - $sql, - [ - 'queues' => array_keys($queues), - 'delayedUntil' => time(), - ], - [ - 'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY, - 'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER, - ] - )->fetch(); - return $result ?: null; } } diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index a7000d9f6..77a88472f 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -1,5 +1,7 @@ assertEquals(123456, $consumer->getPollingInterval()); } + public function testCouldSetAndGetRedeliveryDelay() + { + $destination = new DbalDestination('queue'); + + $consumer = new DbalConsumer($this->createContextMock(), $destination); + $consumer->setRedeliveryDelay(123456); + + $this->assertEquals(123456, $consumer->getRedeliveryDelay()); + } + public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() { $this->expectException(InvalidMessageException::class); diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 92060500f..61c1c3e51 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -49,6 +49,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $producer = $context->createProducer(); + /** @var DbalMessage $message */ $message = $context->createMessage($expectedBody); $message->setPublishedAt($time); $producer->send($queue, $message); @@ -61,7 +62,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $this->assertSame($time, $message->getPublishedAt()); } - public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() + public function testShouldOrderMessagesWithSamePriorityByPublishedAtDateee() { $context = $this->context; $queue = $context->createQueue(__METHOD__); @@ -79,6 +80,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() $producer = $context->createProducer(); + /** @var DbalMessage $message */ $message = $context->createMessage($expectedPriority5Body); $message->setPriority(5); $message->setPublishedAt($time); From 5f4ef90b380e4a108542fb6fa25cc46c07e24409 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Mon, 29 Oct 2018 17:52:47 +0200 Subject: [PATCH 2/9] Consumer minor fixes --- pkg/dbal/DbalConsumer.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index d611ddf42..460b5b2b3 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -6,7 +6,6 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\Types\Type; -use function GuzzleHttp\Psr7\str; use Interop\Queue\Consumer; use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Impl\ConsumerPollingTrait; @@ -118,7 +117,7 @@ public function receiveNoWait(): ?Message $this->dbal->commit(); - if ($message->isRedelivered() || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) { + if ($dbalMessage['redelivered'] || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) { return $this->context->convertMessage($dbalMessage); } From 6532d1ea1fe8c5c6205ad36e47436089c078a735 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 14:25:31 +0200 Subject: [PATCH 3/9] Updated docs --- docs/transport/dbal.md | 39 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/docs/transport/dbal.md b/docs/transport/dbal.md index 6e72d7953..d14cad922 100644 --- a/docs/transport/dbal.md +++ b/docs/transport/dbal.md @@ -2,14 +2,14 @@ The transport uses [Doctrine DBAL](http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/) library and SQL like server as a broker. It creates a table there. Pushes and pops messages to\from that table. - -**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost. * [Installation](#installation) * [Init database](#init-database) * [Create context](#create-context) * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) +* [Send expiration message](#send-expiration-message) +* [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) * [Subscription consumer](#subscription-consumer) @@ -90,6 +90,38 @@ $message = $psrContext->createMessage('Hello world!'); $psrContext->createProducer()->send($fooQueue, $message); ``` +## Send expiration message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setTimeToLive(60000) // 60 sec + // + ->send($fooQueue, $message) +; +``` + +## Send delayed message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setDeliveryDelay(5000) // 5 sec + // + ->send($fooQueue, $message) +; +```` + ## Consume message: ```php @@ -102,6 +134,9 @@ $consumer = $psrContext->createConsumer($fooQueue); $message = $consumer->receive(); // process a message + +$consumer->acknowledge($message); +//$consumer->reject($message); ``` ## Subscription consumer From 10f507d0e26c063a148c677bb7b731592bb2e1ed Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 16:26:01 +0200 Subject: [PATCH 4/9] Moved common logic to trait --- pkg/dbal/DbalConsumer.php | 81 +++++++++------------------ pkg/dbal/DbalConsumerHelperTrait.php | 67 ++++++++++++++++++++++ pkg/dbal/DbalContext.php | 8 ++- pkg/dbal/DbalSubscriptionConsumer.php | 68 ++++++++-------------- 4 files changed, 123 insertions(+), 101 deletions(-) create mode 100644 pkg/dbal/DbalConsumerHelperTrait.php diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 460b5b2b3..66dc4566e 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -15,7 +15,8 @@ class DbalConsumer implements Consumer { - use ConsumerPollingTrait; + use ConsumerPollingTrait, + DbalConsumerHelperTrait; /** * @var DbalContext @@ -48,6 +49,16 @@ public function __construct(DbalContext $context, DbalDestination $queue) $this->redeliveryDelay = 1200000; } + public function getContext(): DbalContext + { + return $this->context; + } + + public function getConnection(): Connection + { + return $this->dbal; + } + /** * Get interval between retry failed messages in milliseconds. */ @@ -78,47 +89,27 @@ public function receiveNoWait(): ?Message { $this->redeliverMessages(); - $this->dbal->beginTransaction(); + $this->getConnection()->beginTransaction(); try { - $now = (int) time(); - $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $deliveryId = (string) Uuid::uuid1(); // get top message from the queue - $message = $this->fetchMessage($now); + $message = $this->fetchMessage([$this->queue->getQueueName()]); if (null == $message) { - $this->dbal->commit(); + $this->getConnection()->commit(); return null; } - // mark message as delivered to consumer - $this->dbal->createQueryBuilder() - ->update($this->context->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redeliver_after', ':redeliverAfter') - ->andWhere('id = :id') - ->setParameter('id', $message['id'], Type::GUID) - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute() - ; - - $dbalMessage = $this->dbal->createQueryBuilder() - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - $this->dbal->commit(); + $this->markMessageAsDeliveredToConsumer($message, $deliveryId); + + $dbalMessage = $this->getMessageByDeliveryId($deliveryId); + + $this->getConnection()->commit(); if ($dbalMessage['redelivered'] || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) { - return $this->context->convertMessage($dbalMessage); + return $this->getContext()->convertMessage($dbalMessage); } return null; @@ -145,7 +136,7 @@ public function reject(Message $message, bool $requeue = false): void InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $this->getContext()->createProducer()->send($this->queue, $message); return; } @@ -155,37 +146,17 @@ public function reject(Message $message, bool $requeue = false): void private function deleteMessage(?string $deliveryId): void { - $this->dbal->delete( - $this->context->getTableName(), + $this->getConnection()->delete( + $this->getContext()->getTableName(), ['delivery_id' => $deliveryId], ['delivery_id' => Type::STRING] ); } - private function fetchMessage(int $now): ?array - { - $result = $this->dbal->createQueryBuilder() - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('queue = :queue') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->addOrderBy('priority', 'desc') - ->addOrderBy('published_at', 'asc') - ->setParameter('queue', $this->queue->getQueueName(), Type::STRING) - ->setParameter('delayedUntil', $now, Type::BIGINT) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - return $result ?: null; - } - private function redeliverMessages(): void { - $this->dbal->createQueryBuilder() - ->update($this->context->getTableName()) + $this->getConnection()->createQueryBuilder() + ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') ->set('redelivered', ':redelivered') ->andWhere('delivery_id IS NOT NULL') diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php new file mode 100644 index 000000000..97e710f70 --- /dev/null +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -0,0 +1,67 @@ +getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id IS NULL') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('queue IN (:queues)') + ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER) + ->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) + ->setMaxResults(1) + ->execute() + ->fetch() + ; + + return $result ?: null; + } + + protected function markMessageAsDeliveredToConsumer(array $message, string $deliveryId): void + { + $now = time(); + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + + $this->getConnection()->createQueryBuilder() + ->andWhere('id = :id') + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->setParameter('id', $message['id'], Type::GUID) + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ->execute() + ; + } + + protected function getMessageByDeliveryId(string $deliveryId): array + { + return $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch() + ; + } +} diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index bb2dfa9ed..30f4e3e91 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -129,7 +129,13 @@ public function close(): void public function createSubscriptionConsumer(): SubscriptionConsumer { - return new DbalSubscriptionConsumer($this); + $consumer = new DbalSubscriptionConsumer($this); + + if (isset($this->config['redelivery_delay'])) { + $consumer->setRedeliveryDelay($this->config['redelivery_delay']); + } + + return $consumer; } /** diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 8ca063fd7..afcf58c7a 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -4,13 +4,15 @@ namespace Enqueue\Dbal; -use Doctrine\DBAL\Types\Type; +use Doctrine\DBAL\Connection; use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; use Ramsey\Uuid\Uuid; class DbalSubscriptionConsumer implements SubscriptionConsumer { + use DbalConsumerHelperTrait; + /** * @var DbalContext */ @@ -47,6 +49,16 @@ public function __construct(DbalContext $context) $this->redeliveryDelay = 1200000; } + public function getContext(): DbalContext + { + return $this->context; + } + + public function getConnection(): Connection + { + return $this->dbal; + } + /** * Get interval between retry failed messages in milliseconds. */ @@ -55,6 +67,11 @@ public function getRedeliveryDelay(): int return $this->redeliveryDelay; } + public function setRedeliveryDelay(int $redeliveryDelay): void + { + $this->redeliveryDelay = $redeliveryDelay; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -63,7 +80,6 @@ public function consume(int $timeout = 0): void $now = time(); $timeout /= 1000; - $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $deliveryId = (string) Uuid::uuid1(); $queueNames = []; @@ -80,29 +96,11 @@ public function consume(int $timeout = 0): void $message = $this->fetchMessage($currentQueueNames); if ($message) { - // mark message as delivered to consumer - $this->dbal->createQueryBuilder() - ->update($this->context->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redeliver_after', ':redeliverAfter') - ->andWhere('id = :id') - ->setParameter('id', $message['id'], Type::GUID) - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute() - ; - - $message = $this->dbal->createQueryBuilder() - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - $dbalMessage = $this->context->convertMessage($message); + $this->markMessageAsDeliveredToConsumer($message, $deliveryId); + + $message = $this->getMessageByDeliveryId($deliveryId); + + $dbalMessage = $this->getContext()->convertMessage($message); /** * @var DbalConsumer @@ -174,24 +172,4 @@ public function unsubscribeAll(): void { $this->subscribers = []; } - - private function fetchMessage(array $queues): ?array - { - $result = $this->dbal->createQueryBuilder() - ->select('*') - ->from($this->context->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('queue IN (:queues)') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->addOrderBy('priority', 'desc') - ->addOrderBy('published_at', 'asc') - ->setParameter('queues', array_keys($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) - ->setParameter('delayedUntil', time(), \Doctrine\DBAL\ParameterType::INTEGER) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - return $result ?: null; - } } From 697d9513b6e21285a603fac02510ca3a8861a3a7 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 16:34:01 +0200 Subject: [PATCH 5/9] Fixed message deleting in consumer --- pkg/dbal/DbalConsumer.php | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 66dc4566e..53e063a64 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -144,8 +144,12 @@ public function reject(Message $message, bool $requeue = false): void $this->deleteMessage($message->getDeliveryId()); } - private function deleteMessage(?string $deliveryId): void + private function deleteMessage(string $deliveryId): void { + if (empty($deliveryId)) { + throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); + } + $this->getConnection()->delete( $this->getContext()->getTableName(), ['delivery_id' => $deliveryId], From 41823bf13ea93ef99b1a739ad9daa6074066718b Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 18:04:45 +0200 Subject: [PATCH 6/9] United methods in trait. Covered by transaction --- pkg/dbal/DbalConsumer.php | 36 +++------- pkg/dbal/DbalConsumerHelperTrait.php | 94 ++++++++++++++------------- pkg/dbal/DbalContext.php | 1 - pkg/dbal/DbalSubscriptionConsumer.php | 9 +-- pkg/dbal/Tests/DbalConsumerTest.php | 70 ++++++++++++++++++-- 5 files changed, 125 insertions(+), 85 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 53e063a64..37e41c0fe 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -87,37 +87,19 @@ public function getQueue(): Queue public function receiveNoWait(): ?Message { - $this->redeliverMessages(); - - $this->getConnection()->beginTransaction(); - try { - $deliveryId = (string) Uuid::uuid1(); - - // get top message from the queue - $message = $this->fetchMessage([$this->queue->getQueueName()]); - - if (null == $message) { - $this->getConnection()->commit(); - - return null; - } + $deliveryId = (string) Uuid::uuid1(); + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds - $this->markMessageAsDeliveredToConsumer($message, $deliveryId); - - $dbalMessage = $this->getMessageByDeliveryId($deliveryId); - - $this->getConnection()->commit(); + $this->redeliverMessages(); - if ($dbalMessage['redelivered'] || empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) { - return $this->getContext()->convertMessage($dbalMessage); + // get top message from the queue + if ($message = $this->fetchMessage([$this->queue->getQueueName()], $deliveryId, $redeliveryDelay)) { + if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) { + return $this->getContext()->convertMessage($message); } - - return null; - } catch (\Exception $e) { - $this->dbal->rollBack(); - - throw $e; } + + return null; } /** diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 97e710f70..7e3b86a24 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -13,55 +13,61 @@ abstract public function getContext(): DbalContext; abstract public function getConnection(): Connection; - protected function fetchMessage(array $queues): ?array + protected function fetchMessage(array $queues, string $deliveryId, int $redeliveryDelay): ?array { - $now = time(); + try { + $now = time(); - $result = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->andWhere('queue IN (:queues)') - ->addOrderBy('priority', 'desc') - ->addOrderBy('published_at', 'asc') - ->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER) - ->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) - ->setMaxResults(1) - ->execute() - ->fetch() - ; + $this->getConnection()->beginTransaction(); - return $result ?: null; - } + $message = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id IS NULL') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('queue IN (:queues)') + ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') + ->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER) + ->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) + ->setMaxResults(1) + ->execute() + ->fetch() + ; - protected function markMessageAsDeliveredToConsumer(array $message, string $deliveryId): void - { - $now = time(); - $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + if (!$message) { + $this->getConnection()->commit(); - $this->getConnection()->createQueryBuilder() - ->andWhere('id = :id') - ->update($this->getContext()->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redeliver_after', ':redeliverAfter') - ->setParameter('id', $message['id'], Type::GUID) - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute() - ; - } + return null; + } - protected function getMessageByDeliveryId(string $deliveryId): array - { - return $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setMaxResults(1) - ->execute() - ->fetch() - ; + // mark message as delivered to consumer + $this->getConnection()->createQueryBuilder() + ->andWhere('id = :id') + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->setParameter('id', $message['id'], Type::GUID) + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ->execute(); + + $this->getConnection()->commit(); + + $deliveredMessage = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId, Type::STRING) + ->setMaxResults(1) + ->execute() + ->fetch(); + + return $deliveredMessage ?: null; + } catch (\Exception $e) { + $this->getConnection()->rollBack(); + + throw $e; + } } } diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 30f4e3e91..0cac7ac45 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -240,7 +240,6 @@ public function createDataBaseTable(): void $table->addIndex(['priority', 'published_at']); $table->addIndex(['redeliver_after']); $table->addUniqueIndex(['delivery_id']); - $table->addIndex(['delivery_id']); $sm->createTable($table); } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index afcf58c7a..efe80e8af 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -81,6 +81,7 @@ public function consume(int $timeout = 0): void $now = time(); $timeout /= 1000; $deliveryId = (string) Uuid::uuid1(); + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { @@ -93,13 +94,7 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - $message = $this->fetchMessage($currentQueueNames); - - if ($message) { - $this->markMessageAsDeliveredToConsumer($message, $deliveryId); - - $message = $this->getMessageByDeliveryId($deliveryId); - + if ($message = $this->fetchMessage($currentQueueNames, $deliveryId, $redeliveryDelay)) { $dbalMessage = $this->getContext()->convertMessage($message); /** diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 77a88472f..d40e15beb 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -4,6 +4,8 @@ namespace Enqueue\Dbal\Tests; +use Doctrine\DBAL\Connection; +use Doctrine\DBAL\Types\Type; use Enqueue\Dbal\DbalConsumer; use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\DbalDestination; @@ -38,10 +40,40 @@ public function testShouldReturnInstanceOfDestination() $this->assertSame($destination, $consumer->getQueue()); } - public function testCouldCallAcknowledgedMethod() + public function testShouldDeleteMessageOnAcknowledge() { - $consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue')); - $consumer->acknowledge(new DbalMessage()); + $queue = new DbalDestination('queue'); + + $message = new DbalMessage(); + $message->setBody('theBody'); + $message->setDeliveryId('foo-delivery-id'); + + $dbal = $this->createConectionMock(); + $dbal + ->expects($this->once()) + ->method('delete') + ->with( + 'some-table-name', + ['delivery_id' => $message->getDeliveryId()], + ['delivery_id' => Type::STRING] + ) + ; + + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getDbalConnection') + ->will($this->returnValue($dbal)) + ; + $context + ->expects($this->once()) + ->method('getTableName') + ->will($this->returnValue('some-table-name')) + ; + + $consumer = new DbalConsumer($context, $queue); + + $consumer->acknowledge($message); } public function testCouldSetAndGetPollingInterval() @@ -77,17 +109,35 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() $consumer->reject(new InvalidMessage()); } - public function testShouldDoNothingOnReject() + public function testShouldDeleteMessageFromQueueOnReject() { $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); + $message->setDeliveryId('foo-delivery-id'); + + $dbal = $this->createConectionMock(); + $dbal + ->expects($this->once()) + ->method('delete') + ->with( + 'some-table-name', + ['delivery_id' => $message->getDeliveryId()], + ['delivery_id' => Type::STRING] + ) + ; $context = $this->createContextMock(); $context - ->expects($this->never()) - ->method('createProducer') + ->expects($this->once()) + ->method('getDbalConnection') + ->will($this->returnValue($dbal)) + ; + $context + ->expects($this->once()) + ->method('getTableName') + ->will($this->returnValue('some-table-name')) ; $consumer = new DbalConsumer($context, $queue); @@ -136,6 +186,14 @@ private function createContextMock() { return $this->createMock(DbalContext::class); } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext + */ + private function createConectionMock() + { + return $this->createMock(Connection::class); + } } class InvalidMessage implements Message From 03c40aba5514b6c93e0af276ef952b6424673b09 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 18:16:03 +0200 Subject: [PATCH 7/9] Added write lock --- pkg/dbal/DbalConsumerHelperTrait.php | 23 +++++++++++-------- .../Tests/Functional/DbalConsumerTest.php | 4 +--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 7e3b86a24..e0f4fe3c5 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -5,6 +5,7 @@ namespace Enqueue\Dbal; use Doctrine\DBAL\Connection; +use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\Types\Type; trait DbalConsumerHelperTrait @@ -20,7 +21,7 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive $this->getConnection()->beginTransaction(); - $message = $this->getConnection()->createQueryBuilder() + $query = $this->getConnection()->createQueryBuilder() ->select('*') ->from($this->getContext()->getTableName()) ->andWhere('delivery_id IS NULL') @@ -28,12 +29,14 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive ->andWhere('queue IN (:queues)') ->addOrderBy('priority', 'desc') ->addOrderBy('published_at', 'asc') - ->setParameter('delayedUntil', $now, \Doctrine\DBAL\ParameterType::INTEGER) - ->setParameter('queues', array_values($queues), \Doctrine\DBAL\Connection::PARAM_STR_ARRAY) - ->setMaxResults(1) - ->execute() - ->fetch() - ; + ->setMaxResults(1); + + // select for update + $message = $this->getConnection()->executeQuery( + $query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(), + ['delayedUntil' => $now, 'queues' => array_values($queues)], + ['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY] + )->fetch(); if (!$message) { $this->getConnection()->commit(); @@ -50,7 +53,8 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive ->setParameter('id', $message['id'], Type::GUID) ->setParameter('deliveryId', $deliveryId, Type::STRING) ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute(); + ->execute() + ; $this->getConnection()->commit(); @@ -61,7 +65,8 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive ->setParameter('deliveryId', $deliveryId, Type::STRING) ->setMaxResults(1) ->execute() - ->fetch(); + ->fetch() + ; return $deliveredMessage ?: null; } catch (\Exception $e) { diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 61c1c3e51..92060500f 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -49,7 +49,6 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $producer = $context->createProducer(); - /** @var DbalMessage $message */ $message = $context->createMessage($expectedBody); $message->setPublishedAt($time); $producer->send($queue, $message); @@ -62,7 +61,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $this->assertSame($time, $message->getPublishedAt()); } - public function testShouldOrderMessagesWithSamePriorityByPublishedAtDateee() + public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() { $context = $this->context; $queue = $context->createQueue(__METHOD__); @@ -80,7 +79,6 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDateee() $producer = $context->createProducer(); - /** @var DbalMessage $message */ $message = $context->createMessage($expectedPriority5Body); $message->setPriority(5); $message->setPublishedAt($time); From 50f3f072f5046fe7d893db87ec16bf7fbee4a6e2 Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Tue, 30 Oct 2018 19:24:45 +0200 Subject: [PATCH 8/9] Fixes. --- pkg/dbal/DbalConsumer.php | 43 +++++++++------------------ pkg/dbal/DbalConsumerHelperTrait.php | 29 ++++++++++++++---- pkg/dbal/DbalSubscriptionConsumer.php | 40 +++++++++++++------------ pkg/dbal/Tests/DbalConsumerTest.php | 13 ++++++++ 4 files changed, 71 insertions(+), 54 deletions(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 37e41c0fe..08175af7e 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -11,7 +11,6 @@ use Interop\Queue\Impl\ConsumerPollingTrait; use Interop\Queue\Message; use Interop\Queue\Queue; -use Ramsey\Uuid\Uuid; class DbalConsumer implements Consumer { @@ -49,16 +48,6 @@ public function __construct(DbalContext $context, DbalDestination $queue) $this->redeliveryDelay = 1200000; } - public function getContext(): DbalContext - { - return $this->context; - } - - public function getConnection(): Connection - { - return $this->dbal; - } - /** * Get interval between retry failed messages in milliseconds. */ @@ -68,7 +57,7 @@ public function getRedeliveryDelay(): int } /** - * Interval between retry failed messages in seconds. + * Get interval between retrying failed messages in milliseconds. */ public function setRedeliveryDelay(int $redeliveryDelay): self { @@ -87,13 +76,12 @@ public function getQueue(): Queue public function receiveNoWait(): ?Message { - $deliveryId = (string) Uuid::uuid1(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $this->redeliverMessages(); // get top message from the queue - if ($message = $this->fetchMessage([$this->queue->getQueueName()], $deliveryId, $redeliveryDelay)) { + if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) { if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) { return $this->getContext()->convertMessage($message); } @@ -107,6 +95,8 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { + InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); + $this->deleteMessage($message->getDeliveryId()); } @@ -126,6 +116,16 @@ public function reject(Message $message, bool $requeue = false): void $this->deleteMessage($message->getDeliveryId()); } + protected function getContext(): DbalContext + { + return $this->context; + } + + protected function getConnection(): Connection + { + return $this->dbal; + } + private function deleteMessage(string $deliveryId): void { if (empty($deliveryId)) { @@ -138,19 +138,4 @@ private function deleteMessage(string $deliveryId): void ['delivery_id' => Type::STRING] ); } - - private function redeliverMessages(): void - { - $this->getConnection()->createQueryBuilder() - ->update($this->getContext()->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redelivered', ':redelivered') - ->andWhere('delivery_id IS NOT NULL') - ->andWhere('redeliver_after < :now') - ->setParameter(':now', (int) time(), Type::BIGINT) - ->setParameter('deliveryId', null, Type::STRING) - ->setParameter('redelivered', true, Type::BOOLEAN) - ->execute() - ; - } } diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index e0f4fe3c5..533dc97e8 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -7,20 +7,22 @@ use Doctrine\DBAL\Connection; use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\Types\Type; +use Ramsey\Uuid\Uuid; trait DbalConsumerHelperTrait { - abstract public function getContext(): DbalContext; + abstract protected function getContext(): DbalContext; - abstract public function getConnection(): Connection; + abstract protected function getConnection(): Connection; - protected function fetchMessage(array $queues, string $deliveryId, int $redeliveryDelay): ?array + protected function fetchMessage(array $queues, int $redeliveryDelay): ?array { - try { - $now = time(); + $now = time(); + $deliveryId = (string) Uuid::uuid1(); - $this->getConnection()->beginTransaction(); + $this->getConnection()->beginTransaction(); + try { $query = $this->getConnection()->createQueryBuilder() ->select('*') ->from($this->getContext()->getTableName()) @@ -75,4 +77,19 @@ protected function fetchMessage(array $queues, string $deliveryId, int $redelive throw $e; } } + + protected function redeliverMessages(): void + { + $this->getConnection()->createQueryBuilder() + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redelivered', ':redelivered') + ->andWhere('delivery_id IS NOT NULL') + ->andWhere('redeliver_after < :now') + ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter('deliveryId', null, Type::STRING) + ->setParameter('redelivered', true, Type::BOOLEAN) + ->execute() + ; + } } diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index efe80e8af..6ac5b7a4a 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -7,7 +7,6 @@ use Doctrine\DBAL\Connection; use Interop\Queue\Consumer; use Interop\Queue\SubscriptionConsumer; -use Ramsey\Uuid\Uuid; class DbalSubscriptionConsumer implements SubscriptionConsumer { @@ -49,27 +48,19 @@ public function __construct(DbalContext $context) $this->redeliveryDelay = 1200000; } - public function getContext(): DbalContext - { - return $this->context; - } - - public function getConnection(): Connection - { - return $this->dbal; - } - /** - * Get interval between retry failed messages in milliseconds. + * Get interval between retrying failed messages in milliseconds. */ public function getRedeliveryDelay(): int { return $this->redeliveryDelay; } - public function setRedeliveryDelay(int $redeliveryDelay): void + public function setRedeliveryDelay(int $redeliveryDelay): self { $this->redeliveryDelay = $redeliveryDelay; + + return $this; } public function consume(int $timeout = 0): void @@ -78,23 +69,24 @@ public function consume(int $timeout = 0): void throw new \LogicException('No subscribers'); } - $now = time(); - $timeout /= 1000; - $deliveryId = (string) Uuid::uuid1(); - $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds - $queueNames = []; foreach (array_keys($this->subscribers) as $queueName) { $queueNames[$queueName] = $queueName; } + $timeout /= 1000; + $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $currentQueueNames = []; while (true) { if (empty($currentQueueNames)) { $currentQueueNames = $queueNames; } - if ($message = $this->fetchMessage($currentQueueNames, $deliveryId, $redeliveryDelay)) { + $now = time(); + $this->redeliverMessages(); + + if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { $dbalMessage = $this->getContext()->convertMessage($message); /** @@ -167,4 +159,14 @@ public function unsubscribeAll(): void { $this->subscribers = []; } + + protected function getContext(): DbalContext + { + return $this->context; + } + + protected function getConnection(): Connection + { + return $this->dbal; + } } diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index d40e15beb..3e9147a82 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -40,6 +40,19 @@ public function testShouldReturnInstanceOfDestination() $this->assertSame($destination, $consumer->getQueue()); } + public function testAcknowledgeShouldThrowIfInstanceOfMessageIsInvalid() + { + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage( + 'The message must be an instance of '. + 'Enqueue\Dbal\DbalMessage '. + 'but it is Enqueue\Dbal\Tests\InvalidMessage.' + ); + + $consumer = new DbalConsumer($this->createContextMock(), new DbalDestination('queue')); + $consumer->acknowledge(new InvalidMessage()); + } + public function testShouldDeleteMessageOnAcknowledge() { $queue = new DbalDestination('queue'); From 239161bf3843f1b0eebbd0730f6e09bd9640051b Mon Sep 17 00:00:00 2001 From: Roman Samarsky Date: Wed, 31 Oct 2018 17:36:36 +0200 Subject: [PATCH 9/9] Deleting of expired messages --- pkg/dbal/DbalConsumer.php | 4 ++ pkg/dbal/DbalConsumerHelperTrait.php | 11 ++++ pkg/dbal/DbalProducer.php | 2 + pkg/dbal/DbalSubscriptionConsumer.php | 1 + pkg/dbal/Tests/DbalConsumerTest.php | 2 +- .../Tests/Functional/DbalConsumerTest.php | 50 +++++++++++++++++++ 6 files changed, 69 insertions(+), 1 deletion(-) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 08175af7e..ede0cfa40 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -78,6 +78,7 @@ public function receiveNoWait(): ?Message { $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds + $this->removeExpiredMessages(); $this->redeliverMessages(); // get top message from the queue @@ -108,6 +109,9 @@ public function reject(Message $message, bool $requeue = false): void InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); if ($requeue) { + $message = clone $message; + $message->setRedelivered(false); + $this->getContext()->createProducer()->send($this->queue, $message); return; diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 533dc97e8..2dc29db26 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -92,4 +92,15 @@ protected function redeliverMessages(): void ->execute() ; } + + protected function removeExpiredMessages(): void + { + $this->getConnection()->createQueryBuilder() + ->delete($this->getContext()->getTableName()) + ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') + ->setParameter(':now', (int) time(), Type::BIGINT) + ->setParameter('redelivered', false, Type::BOOLEAN) + ->execute() + ; + } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index b34719c83..38ad33414 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -87,6 +87,7 @@ public function send(Destination $destination, Message $message): void 'properties' => JSON::encode($message->getProperties()), 'priority' => $message->getPriority(), 'queue' => $destination->getQueueName(), + 'redelivered' => false, 'delivery_id' => null, 'redeliver_after' => null, ]; @@ -134,6 +135,7 @@ public function send(Destination $destination, Message $message): void 'queue' => Type::STRING, 'time_to_live' => Type::INTEGER, 'delayed_until' => Type::INTEGER, + 'redelivered' => Type::BOOLEAN, 'delivery_id' => Type::STRING, 'redeliver_after' => Type::BIGINT, ]); diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 6ac5b7a4a..12c1cef37 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -84,6 +84,7 @@ public function consume(int $timeout = 0): void } $now = time(); + $this->removeExpiredMessages(); $this->redeliverMessages(); if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 3e9147a82..4eeb70a75 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -169,7 +169,7 @@ public function testRejectShouldReSendMessageToSameQueueOnRequeue() $producerMock ->expects($this->once()) ->method('send') - ->with($this->identicalTo($queue), $this->identicalTo($message)) + ->with($this->identicalTo($queue), $this->isInstanceOf($message)) ; $context = $this->createContextMock(); diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 92060500f..ecee1bc71 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -1,5 +1,7 @@ acknowledge($message); $this->assertSame($expectedPriority5Body, $message->getBody()); } + + public function testShouldDeleteExpiredMessage() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $producer = $context->createProducer(); + + $this->context->getDbalConnection()->insert( + $this->context->getTableName(), [ + 'id' => 'id', + 'human_id' => 'id', + 'published_at' => '123', + 'body' => 'expiredMessage', + 'headers' => json_encode([]), + 'properties' => json_encode([]), + 'queue' => __METHOD__, + 'redelivered' => 0, + 'time_to_live' => time() - 10000, + ]); + + $message = $context->createMessage('notExpiredMessage'); + $message->setRedelivered(false); + $producer->send($queue, $message); + + $this->assertSame('2', $this->getQuerySize()); + + $message = $consumer->receive(8000); + + $this->assertSame('1', $this->getQuerySize()); + + $consumer->acknowledge($message); + + $this->assertSame('0', $this->getQuerySize()); + } + + private function getQuerySize(): string + { + return $this->context->getDbalConnection() + ->executeQuery('SELECT count(*) FROM '.$this->context->getTableName()) + ->fetchColumn(0) + ; + } }