diff --git a/pkg/dbal/Client/DbalDriver.php b/pkg/dbal/Client/DbalDriver.php index 86bc0e176..8196a9b84 100644 --- a/pkg/dbal/Client/DbalDriver.php +++ b/pkg/dbal/Client/DbalDriver.php @@ -30,6 +30,17 @@ class DbalDriver implements DriverInterface */ private $queueMetaRegistry; + /** + * @var array + */ + private static $priorityMap = [ + MessagePriority::VERY_LOW => 0, + MessagePriority::LOW => 1, + MessagePriority::NORMAL => 2, + MessagePriority::HIGH => 3, + MessagePriority::VERY_HIGH => 4, + ]; + /** * @param DbalContext $context * @param Config $config @@ -63,6 +74,9 @@ public function createTransportMessage(Message $message) $transportMessage->setDelay($message->getDelay()); $transportMessage->setReplyTo($message->getReplyTo()); $transportMessage->setCorrelationId($message->getCorrelationId()); + if (array_key_exists($message->getPriority(), self::$priorityMap)) { + $transportMessage->setPriority(self::$priorityMap[$message->getPriority()]); + } return $transportMessage; } @@ -83,11 +97,16 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setContentType($message->getHeader('content_type')); $clientMessage->setMessageId($message->getMessageId()); $clientMessage->setTimestamp($message->getTimestamp()); - $clientMessage->setPriority(MessagePriority::NORMAL); $clientMessage->setDelay($message->getDelay()); $clientMessage->setReplyTo($message->getReplyTo()); $clientMessage->setCorrelationId($message->getCorrelationId()); + $priorityMap = array_flip(self::$priorityMap); + $priority = array_key_exists($message->getPriority(), $priorityMap) ? + $priorityMap[$message->getPriority()] : + MessagePriority::NORMAL; + $clientMessage->setPriority($priority); + return $clientMessage; } @@ -156,4 +175,12 @@ public function getConfig() { return $this->config; } + + /** + * @return array + */ + public static function getPriorityMap() + { + return self::$priorityMap; + } } diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 5546d9100..6368e24a2 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -176,7 +176,7 @@ protected function receiveMessage() ->where('queue = :queue') ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') ->orderBy('priority', 'desc') - ->orderBy('id', 'asc') + ->addOrderBy('id', 'asc') ->setMaxResults(1) ; diff --git a/pkg/dbal/Tests/Client/DbalDriverTest.php b/pkg/dbal/Tests/Client/DbalDriverTest.php index 4eb95ffb9..232e3ceaf 100644 --- a/pkg/dbal/Tests/Client/DbalDriverTest.php +++ b/pkg/dbal/Tests/Client/DbalDriverTest.php @@ -92,6 +92,7 @@ public function testShouldConvertTransportMessageToClientMessage() $transportMessage->setHeader('content_type', 'ContentType'); $transportMessage->setMessageId('MessageId'); $transportMessage->setTimestamp(1000); + $transportMessage->setPriority(2); $transportMessage->setDelay(12345); $driver = new DbalDriver( diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 3e7695315..6fcf7fa9e 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -172,10 +172,15 @@ public function testShouldReceiveMessage() ->will($this->returnSelf()) ; $queryBuilder - ->expects($this->exactly(2)) + ->expects($this->exactly(1)) ->method('orderBy') ->will($this->returnSelf()) ; + $queryBuilder + ->expects($this->exactly(1)) + ->method('addOrderBy') + ->will($this->returnSelf()) + ; $platform = $this->createPlatformMock(); @@ -264,10 +269,15 @@ public function testShouldReturnNullIfThereIsNoNewMessage() ->will($this->returnSelf()) ; $queryBuilder - ->expects($this->exactly(2)) + ->expects($this->exactly(1)) ->method('orderBy') ->will($this->returnSelf()) ; + $queryBuilder + ->expects($this->exactly(1)) + ->method('addOrderBy') + ->will($this->returnSelf()) + ; $platform = $this->createPlatformMock(); @@ -352,10 +362,15 @@ public function testShouldThrowIfMessageWasNotRemoved() ->will($this->returnSelf()) ; $queryBuilder - ->expects($this->exactly(2)) + ->expects($this->exactly(1)) ->method('orderBy') ->will($this->returnSelf()) ; + $queryBuilder + ->expects($this->exactly(1)) + ->method('addOrderBy') + ->will($this->returnSelf()) + ; $platform = $this->createPlatformMock(); diff --git a/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php b/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php new file mode 100644 index 000000000..e470565d8 --- /dev/null +++ b/pkg/dbal/Tests/DbalSendPriorityMessagesTest.php @@ -0,0 +1,91 @@ +createContext(); + $queue = $this->createQueue($context, 'default'); + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $messagePriorities = [1, 0, 3]; + $producer = $context->createProducer(); + foreach ($messagePriorities as $priority) { + $producer->send($queue, $this->createMessage($context, $priority)); + } + + sort($messagePriorities); + foreach (array_reverse($messagePriorities) as $priority) { + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(PsrMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame('priority'.$priority, $message->getBody()); + } + } + + /** + * @return PsrContext + */ + protected function createContext() + { + $factory = new DbalConnectionFactory( + [ + 'lazy' => true, + 'connection' => [ + 'dbname' => getenv('SYMFONY__DB__NAME'), + 'user' => getenv('SYMFONY__DB__USER'), + 'password' => getenv('SYMFONY__DB__PASSWORD'), + 'host' => getenv('SYMFONY__DB__HOST'), + 'port' => getenv('SYMFONY__DB__PORT'), + 'driver' => getenv('SYMFONY__DB__DRIVER'), + ], + ] + ); + + return $factory->createContext(); + } + + /** + * {@inheritdoc} + * + * @param DbalContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->createDataBaseTable(); + + return $queue; + } + + /** + * @param PsrContext $context + * @param int $priority + * + * @return DbalMessage + */ + protected function createMessage(PsrContext $context, $priority) + { + /** @var DbalMessage $message */ + $message = $context->createMessage('priority'.$priority); + $message->setPriority($priority); + + return $message; + } +}