Skip to content

[dbal] Introduce redelivery support based on visibility approach. #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions docs/transport/dbal.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

The transport uses [Doctrine DBAL](http://docs.doctrine-project.org/projects/doctrine-dbal/en/latest/) library and SQL like server as a broker.
It creates a table there. Pushes and pops messages to\from that table.

**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost.

* [Installation](#installation)
* [Init database](#init-database)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Send expiration message](#send-expiration-message)
* [Send delayed message](#send-delayed-message)
* [Consume message](#consume-message)
* [Subscription consumer](#subscription-consumer)

Expand Down Expand Up @@ -90,6 +90,38 @@ $message = $psrContext->createMessage('Hello world!');
$psrContext->createProducer()->send($fooQueue, $message);
```

## Send expiration message

```php
<?php
/** @var \Enqueue\Dbal\DbalContext $psrContext */
/** @var \Enqueue\Dbal\DbalDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setTimeToLive(60000) // 60 sec
//
->send($fooQueue, $message)
;
```

## Send delayed message

```php
<?php
/** @var \Enqueue\Dbal\DbalContext $psrContext */
/** @var \Enqueue\Dbal\DbalDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setDeliveryDelay(5000) // 5 sec
//
->send($fooQueue, $message)
;
````

## Consume message:

```php
Expand All @@ -102,6 +134,9 @@ $consumer = $psrContext->createConsumer($fooQueue);
$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
//$consumer->reject($message);
```

## Subscription consumer
Expand Down
178 changes: 59 additions & 119 deletions pkg/dbal/DbalConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@
use Doctrine\DBAL\Types\Type;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Impl\ConsumerPollingTrait;
use Interop\Queue\Message;
use Interop\Queue\Queue;
use Ramsey\Uuid\Uuid;

class DbalConsumer implements Consumer
{
use ConsumerPollingTrait,
DbalConsumerHelperTrait;

/**
* @var DbalContext
*/
Expand All @@ -29,33 +34,47 @@ class DbalConsumer implements Consumer
private $queue;

/**
* Default 20 minutes in milliseconds.
*
* @var int
*/
private $pollingInterval;
private $redeliveryDelay;

public function __construct(DbalContext $context, DbalDestination $queue)
{
$this->context = $context;
$this->queue = $queue;
$this->dbal = $this->context->getDbalConnection();

$this->pollingInterval = 1000;
$this->redeliveryDelay = 1200000;
}

public function getContext(): DbalContext
{
return $this->context;
}

public function getConnection(): Connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why public?

{
return $this->dbal;
}

/**
* Polling interval is in milliseconds.
* Get interval between retry failed messages in milliseconds.
*/
public function setPollingInterval(int $interval): void
public function getRedeliveryDelay(): int
{
$this->pollingInterval = $interval;
return $this->redeliveryDelay;
}

/**
* Get polling interval in milliseconds.
* Interval between retry failed messages in seconds.
*/
public function getPollingInterval(): int
public function setRedeliveryDelay(int $redeliveryDelay): self
{
return $this->pollingInterval;
$this->redeliveryDelay = $redeliveryDelay;

return $this;
}

/**
Expand All @@ -66,41 +85,29 @@ public function getQueue(): Queue
return $this->queue;
}

public function receive(int $timeout = 0): ?Message
public function receiveNoWait(): ?Message
{
$timeout /= 1000;
$startAt = microtime(true);
$deliveryId = (string) Uuid::uuid1();
$redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the line states that redeliveryDelay is in seconds.


while (true) {
$message = $this->receiveMessage();
$this->redeliverMessages();

if ($message) {
return $message;
}

if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return null;
}

usleep($this->pollingInterval * 1000);

if ($timeout && (microtime(true) - $startAt) >= $timeout) {
return null;
// get top message from the queue
if ($message = $this->fetchMessage([$this->queue->getQueueName()], $deliveryId, $redeliveryDelay)) {
if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the expired message has to be deleted from message, I think that expiration logic could be moved to fetchMessage method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave it another though, and I now I think we could cook a query that removes expired messages. just like redeliverMessage. the expiration in the code should still be preserved

return $this->getContext()->convertMessage($message);
}
}
}

public function receiveNoWait(): ?Message
{
return $this->receiveMessage();
return null;
}

/**
* @param DbalMessage $message
*/
public function acknowledge(Message $message): void
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add an exception

InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);

// does nothing
$this->deleteMessage($message->getDeliveryId());
}

/**
Expand All @@ -111,106 +118,39 @@ public function reject(Message $message, bool $requeue = false): void
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);

if ($requeue) {
$this->context->createProducer()->send($this->queue, $message);
$this->getContext()->createProducer()->send($this->queue, $message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The redelivered flag should be forced false.


return;
}
}

protected function receiveMessage(): ?DbalMessage
{
$this->dbal->beginTransaction();
try {
$now = time();

$dbalMessage = $this->fetchPrioritizedMessage($now) ?: $dbalMessage = $this->fetchMessage($now);
if (false == $dbalMessage) {
$this->dbal->commit();

return null;
}

// remove message
$affectedRows = $this->dbal->delete($this->context->getTableName(), ['id' => $dbalMessage['id']], [
'id' => Type::GUID,
]);

if (1 !== $affectedRows) {
throw new \LogicException(sprintf('Expected record was removed but it is not. id: "%s"', $dbalMessage['id']));
}

$this->dbal->commit();

if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) {
return $this->context->convertMessage($dbalMessage);
}

return null;
} catch (\Exception $e) {
$this->dbal->rollBack();

throw $e;
}
$this->deleteMessage($message->getDeliveryId());
}

private function fetchPrioritizedMessage(int $now): ?array
private function deleteMessage(string $deliveryId): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method should be moved to trait.

{
$query = $this->dbal->createQueryBuilder();
$query
->select('*')
->from($this->context->getTableName())
->andWhere('queue = :queue')
->andWhere('priority IS NOT NULL')
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
->addOrderBy('published_at', 'asc')
->addOrderBy('priority', 'desc')
->setMaxResults(1)
;
if (empty($deliveryId)) {
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
}

$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();

$result = $this->dbal->executeQuery(
$sql,
[
'queue' => $this->queue->getQueueName(),
'delayedUntil' => $now,
],
[
'queue' => Type::STRING,
'delayedUntil' => Type::INTEGER,
]
)->fetch();

return $result ?: null;
$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => $deliveryId],
['delivery_id' => Type::STRING]
);
}

private function fetchMessage(int $now): ?array
private function redeliverMessages(): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method should be in the consumer trait. It must be called in subscription consumer too.

{
$query = $this->dbal->createQueryBuilder();
$query
->select('*')
->from($this->context->getTableName())
->andWhere('queue = :queue')
->andWhere('priority IS NULL')
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
->addOrderBy('published_at', 'asc')
->setMaxResults(1)
$this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redelivered', ':redelivered')
->andWhere('delivery_id IS NOT NULL')
->andWhere('redeliver_after < :now')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::STRING)
->setParameter('redelivered', true, Type::BOOLEAN)
->execute()
;

$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a transaction in both consumer and subscription consumer
both should use getWriteLockSQL


$result = $this->dbal->executeQuery(
$sql,
[
'queue' => $this->queue->getQueueName(),
'delayedUntil' => $now,
],
[
'queue' => Type::STRING,
'delayedUntil' => Type::INTEGER,
]
)->fetch();

return $result ?: null;
}
}
78 changes: 78 additions & 0 deletions pkg/dbal/DbalConsumerHelperTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
<?php

declare(strict_types=1);

namespace Enqueue\Dbal;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Types\Type;

trait DbalConsumerHelperTrait
{
abstract public function getContext(): DbalContext;

abstract public function getConnection(): Connection;

protected function fetchMessage(array $queues, string $deliveryId, int $redeliveryDelay): ?array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this method returns an instance of DbalMessage. You have access to context so you can use a $context->convertMessage() method.

{
try {
$now = time();

$this->getConnection()->beginTransaction();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be out of try ... catch block


$query = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id IS NULL')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('queue IN (:queues)')
->addOrderBy('priority', 'desc')
->addOrderBy('published_at', 'asc')
->setMaxResults(1);

// select for update
$message = $this->getConnection()->executeQuery(
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
['delayedUntil' => $now, 'queues' => array_values($queues)],
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY]
)->fetch();

if (!$message) {
$this->getConnection()->commit();

return null;
}

// mark message as delivered to consumer
$this->getConnection()->createQueryBuilder()
->andWhere('id = :id')
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->setParameter('id', $message['id'], Type::GUID)
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
->execute()
;

$this->getConnection()->commit();

$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setMaxResults(1)
->execute()
->fetch()
;

return $deliveredMessage ?: null;
} catch (\Exception $e) {
$this->getConnection()->rollBack();

throw $e;
}
}
}
Loading