diff --git a/composer.json b/composer.json index 6489f3737..ff9db427d 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "queue-interop/queue-interop": "^0.7", "bunny/bunny": "^0.2.4|^0.3|^0.4", "php-amqplib/php-amqplib": "^2.7", - "doctrine/dbal": "~2.5", + "doctrine/dbal": "^2.6", "ramsey/uuid": "^2|^3.5", "psr/log": "^1", "psr/container": "^1", diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 810eeb66f..1e6e9c579 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -5,7 +5,7 @@ namespace Enqueue\Dbal; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Exception\DeadlockException; +use Doctrine\DBAL\Exception\RetryableException; use Doctrine\DBAL\ParameterType; use Doctrine\DBAL\Types\Type; use Ramsey\Uuid\Uuid; @@ -54,34 +54,36 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa ; while (microtime(true) < $endAt) { - $result = $select->execute()->fetch(); - if (empty($result)) { - return null; - } - - $update - ->setParameter('messageId', $result['id'], Type::GUID) - ; - - if ($update->execute()) { - $deliveredMessage = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - // the message has been removed by a 3rd party, such as truncate operation. - if (false == $deliveredMessage) { - continue; + try { + $result = $select->execute()->fetch(); + if (empty($result)) { + return null; } - if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) { - return $this->getContext()->convertMessage($deliveredMessage); + $update + ->setParameter('messageId', $result['id'], Type::GUID); + + if ($update->execute()) { + $deliveredMessage = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) + ->setMaxResults(1) + ->execute() + ->fetch(); + + // the message has been removed by a 3rd party, such as truncate operation. + if (false === $deliveredMessage) { + continue; + } + + if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) { + return $this->getContext()->convertMessage($deliveredMessage); + } } + } catch (RetryableException $e) { + // maybe next time we'll get more luck } } @@ -111,7 +113,7 @@ protected function redeliverMessages(): void $update->execute(); $this->redeliverMessagesLastExecutedAt = microtime(true); - } catch (DeadlockException $e) { + } catch (RetryableException $e) { // maybe next time we'll get more luck } } @@ -135,7 +137,7 @@ protected function removeExpiredMessages(): void try { $delete->execute(); - } catch (DeadlockException $e) { + } catch (RetryableException $e) { // maybe next time we'll get more luck } diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index a39b5c622..b48985714 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.1.3", "queue-interop/queue-interop": "^0.7", - "doctrine/dbal": "~2.5", + "doctrine/dbal": "^2.6", "ramsey/uuid": "^3" }, "require-dev": {