diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 6368e24a2..169837aa4 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -176,7 +176,7 @@ protected function receiveMessage() ->where('queue = :queue') ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') ->orderBy('priority', 'desc') - ->addOrderBy('id', 'asc') + ->addOrderBy('id', 'desc') ->setMaxResults(1) ; diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 4d125bb10..85231d621 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -13,6 +13,11 @@ class DbalProducer implements PsrProducer { + /** + * @var int|null + */ + private $priority; + /** * @var DbalContext */ @@ -29,14 +34,19 @@ public function __construct(DbalContext $context) /** * {@inheritdoc} * - * @param PsrDestination $destination - * @param PsrMessage $message + * @param DbalDestination $destination + * @param DbalMessage $message * * @throws Exception */ public function send(PsrDestination $destination, PsrMessage $message) { InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class); + InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); + + if (null !== $this->priority && null === $message->getPriority()) { + $message->setPriority($this->priority); + } $body = $message->getBody(); if (is_scalar($body) || null === $body) { @@ -111,11 +121,9 @@ public function getDeliveryDelay() */ public function setPriority($priority) { - if (null === $priority) { - return; - } + $this->priority = $priority; - throw new \LogicException('Not implemented'); + return $this; } /** @@ -123,7 +131,7 @@ public function setPriority($priority) */ public function getPriority() { - return null; + return $this->priority; } /** diff --git a/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php b/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php deleted file mode 100644 index e470565d8..000000000 --- a/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php +++ /dev/null @@ -1,91 +0,0 @@ -createContext(); - $queue = $this->createQueue($context, 'default'); - $consumer = $context->createConsumer($queue); - - // guard - $this->assertNull($consumer->receiveNoWait()); - - $messagePriorities = [1, 0, 3]; - $producer = $context->createProducer(); - foreach ($messagePriorities as $priority) { - $producer->send($queue, $this->createMessage($context, $priority)); - } - - sort($messagePriorities); - foreach (array_reverse($messagePriorities) as $priority) { - $message = $consumer->receive(8000); // 8 sec - - $this->assertInstanceOf(PsrMessage::class, $message); - $consumer->acknowledge($message); - $this->assertSame('priority'.$priority, $message->getBody()); - } - } - - /** - * @return PsrContext - */ - protected function createContext() - { - $factory = new DbalConnectionFactory( - [ - 'lazy' => true, - 'connection' => [ - 'dbname' => getenv('SYMFONY__DB__NAME'), - 'user' => getenv('SYMFONY__DB__USER'), - 'password' => getenv('SYMFONY__DB__PASSWORD'), - 'host' => getenv('SYMFONY__DB__HOST'), - 'port' => getenv('SYMFONY__DB__PORT'), - 'driver' => getenv('SYMFONY__DB__DRIVER'), - ], - ] - ); - - return $factory->createContext(); - } - - /** - * {@inheritdoc} - * - * @param DbalContext $context - */ - protected function createQueue(PsrContext $context, $queueName) - { - $queue = $context->createQueue($queueName); - $context->createDataBaseTable(); - - return $queue; - } - - /** - * @param PsrContext $context - * @param int $priority - * - * @return DbalMessage - */ - protected function createMessage(PsrContext $context, $priority) - { - /** @var DbalMessage $message */ - $message = $context->createMessage('priority'.$priority); - $message->setPriority($priority); - - return $message; - } -} diff --git a/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php new file mode 100644 index 000000000..455d4f1ce --- /dev/null +++ b/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php @@ -0,0 +1,62 @@ + true, + 'connection' => [ + 'dbname' => getenv('SYMFONY__DB__NAME'), + 'user' => getenv('SYMFONY__DB__USER'), + 'password' => getenv('SYMFONY__DB__PASSWORD'), + 'host' => getenv('SYMFONY__DB__HOST'), + 'port' => getenv('SYMFONY__DB__PORT'), + 'driver' => getenv('SYMFONY__DB__DRIVER'), + ], + ]); + + $context = $factory->createContext(); + $context->createDataBaseTable(); + + return $context; + } + + /** + * {@inheritdoc} + * + * @return DbalMessage + */ + protected function createMessage(PsrContext $context, $priority) + { + /** @var DbalMessage $message */ + $message = $context->createMessage('priority'.$priority); + $message->setPriority($priority); + + return $message; + } + + /** + * {@inheritdoc} + * + * @return DbalDestination + */ + protected function createQueue(PsrContext $context, $queueName) + { + return parent::createQueue($context, $queueName.time()); + } +}