From d21c8ac4f30ede2b189cf6aa27859df1841b7e18 Mon Sep 17 00:00:00 2001 From: Gugiman Date: Thu, 8 Feb 2018 10:32:47 +0100 Subject: [PATCH 1/3] Order by publish_date within priorities --- pkg/dbal/DbalConsumer.php | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 69b11dbb0..796b27a55 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -213,6 +213,7 @@ private function fetchPrioritizedMessage($now) ->andWhere('priority IS NOT NULL') ->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)') ->addOrderBy('priority', 'desc') + ->addOrderBy('published_at', 'asc') ->setMaxResults(1) ; From 60574c8e70d417fa0816085df7e4e63fb91af45b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 14 Feb 2018 13:52:22 +0200 Subject: [PATCH 2/3] [dbal] Order prioritized messages by publised at date. --- pkg/dbal/DbalConsumer.php | 1 + pkg/dbal/DbalMessage.php | 27 ++++- pkg/dbal/DbalProducer.php | 9 +- pkg/dbal/Tests/DbalConsumerTest.php | 36 ------ pkg/dbal/Tests/DbalMessageTest.php | 16 +++ pkg/dbal/Tests/DbalProducerTest.php | 9 -- .../Tests/Functional/DbalConsumerTest.php | 103 ++++++++++++++++++ ...ndReceivePriorityMessagesFromQueueTest.php | 20 +++- pkg/dbal/composer.json | 2 +- 9 files changed, 171 insertions(+), 52 deletions(-) create mode 100644 pkg/dbal/Tests/Functional/DbalConsumerTest.php diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index 1260275fb..202897d63 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -186,6 +186,7 @@ protected function convertMessage(array $dbalMessage) $message->setBody($dbalMessage['body']); $message->setPriority((int) $dbalMessage['priority']); $message->setRedelivered((bool) $dbalMessage['redelivered']); + $message->setPublishedAt((int) $dbalMessage['published_at']); if ($dbalMessage['headers']) { $message->setHeaders(JSON::decode($dbalMessage['headers'])); diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 979a53657..28be69033 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -41,6 +41,15 @@ class DbalMessage implements PsrMessage */ private $timeToLive; + /** + * Milliseconds, for example 15186054527288. + * + * Could be generated by the code: (int) (microtime(true) * 10000) + * + * @var int + */ + private $publishedAt; + /** * @param string $body * @param array $properties @@ -259,7 +268,7 @@ public function getTimestamp() { $value = $this->getHeader('timestamp'); - return $value === null ? null : (int) $value; + return null === $value ? null : (int) $value; } /** @@ -269,4 +278,20 @@ public function setTimestamp($timestamp) { $this->setHeader('timestamp', $timestamp); } + + /** + * @return int + */ + public function getPublishedAt() + { + return $this->publishedAt; + } + + /** + * @param int $publishedAt + */ + public function setPublishedAt($publishedAt) + { + $this->publishedAt = $publishedAt; + } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index b06c5f1a8..971dad852 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -54,7 +54,7 @@ public function send(PsrDestination $destination, PsrMessage $message) InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class); InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class); - if (null !== $this->priority && null === $message->getPriority()) { + if (null !== $this->priority && 0 === $message->getPriority()) { $message->setPriority($this->priority); } if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { @@ -81,9 +81,14 @@ public function send(PsrDestination $destination, PsrMessage $message) throw new \LogicException('The generated uuid is empty'); } + $publishedAt = null !== $message->getPublishedAt() ? + $message->getPublishedAt() : + (int) (microtime(true) * 10000) + ; + $dbalMessage = [ 'id' => $uuid, - 'published_at' => (int) (microtime(true) * 10000), + 'published_at' => $publishedAt, 'body' => $body, 'headers' => JSON::encode($message->getHeaders()), 'properties' => JSON::encode($message->getProperties()), diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index f6e4e3358..410014cca 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -2,10 +2,6 @@ namespace Enqueue\Dbal\Tests; -use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Platforms\AbstractPlatform; -use Doctrine\DBAL\Query\QueryBuilder; -use Doctrine\DBAL\Statement; use Enqueue\Dbal\DbalConsumer; use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\DbalDestination; @@ -120,22 +116,6 @@ private function createProducerMock() return $this->createMock(DbalProducer::class); } - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Connection - */ - private function createConnectionMock() - { - return $this->createMock(Connection::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Statement - */ - private function createStatementMock() - { - return $this->createMock(Statement::class); - } - /** * @return \PHPUnit_Framework_MockObject_MockObject|DbalContext */ @@ -143,22 +123,6 @@ private function createContextMock() { return $this->createMock(DbalContext::class); } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueryBuilder - */ - private function createQueryBuilderMock() - { - return $this->createMock(QueryBuilder::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|AbstractPlatform - */ - private function createPlatformMock() - { - return $this->createMock(AbstractPlatform::class); - } } class InvalidMessage implements PsrMessage diff --git a/pkg/dbal/Tests/DbalMessageTest.php b/pkg/dbal/Tests/DbalMessageTest.php index c0af060d5..c1c2115e9 100644 --- a/pkg/dbal/Tests/DbalMessageTest.php +++ b/pkg/dbal/Tests/DbalMessageTest.php @@ -49,6 +49,13 @@ public function testShouldSetCorrelationIdAsHeader() $this->assertSame(['correlation_id' => 'theCorrelationId'], $message->getHeaders()); } + public function testShouldSetPublishedAtToNullInConstructor() + { + $message = new DbalMessage(); + + $this->assertNull($message->getPublishedAt()); + } + public function testShouldSetMessageIdAsHeader() { $message = new DbalMessage(); @@ -72,4 +79,13 @@ public function testShouldSetReplyToAsHeader() $this->assertSame(['reply_to' => 'theReply'], $message->getHeaders()); } + + public function testShouldAllowGetPreviouslySetPublishedAtTime() + { + $message = new DbalMessage(); + + $message->setPublishedAt(123); + + $this->assertSame(123, $message->getPublishedAt()); + } } diff --git a/pkg/dbal/Tests/DbalProducerTest.php b/pkg/dbal/Tests/DbalProducerTest.php index 370c94a22..547873f05 100644 --- a/pkg/dbal/Tests/DbalProducerTest.php +++ b/pkg/dbal/Tests/DbalProducerTest.php @@ -2,7 +2,6 @@ namespace Enqueue\Dbal\Tests; -use Doctrine\DBAL\Connection; use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\DbalDestination; use Enqueue\Dbal\DbalMessage; @@ -60,14 +59,6 @@ private function createContextMock() { return $this->createMock(DbalContext::class); } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Connection - */ - private function createConnectionMock() - { - return $this->createMock(Connection::class); - } } class NotSupportedDestination1 implements PsrDestination diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php new file mode 100644 index 000000000..1f27993bd --- /dev/null +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -0,0 +1,103 @@ +context = $this->createDbalContext(); + } + + protected function tearDown() + { + if ($this->context) { + $this->context->close(); + } + + parent::tearDown(); + } + + public function testShouldSetPublishedAtDateToReceivedMessage() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $time = (int) (microtime(true) * 10000); + + $expectedBody = __CLASS__.$time; + + $producer = $context->createProducer(); + + $message = $context->createMessage($expectedBody); + $message->setPublishedAt($time); + $producer->send($queue, $message); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(DbalMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedBody, $message->getBody()); + $this->assertSame($time, $message->getPublishedAt()); + } + + public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() + { + $context = $this->context; + $queue = $context->createQueue(__METHOD__); + + $consumer = $context->createConsumer($queue); + + // guard + $this->assertNull($consumer->receiveNoWait()); + + $time = (int) (microtime(true) * 10000); + $olderTime = $time - 10000; + + $expectedPriority5Body = __CLASS__.'_priority5_'.$time; + $expectedPriority5BodyOlderTime = __CLASS__.'_priority5_'.$olderTime; + + $producer = $context->createProducer(); + + $message = $context->createMessage($expectedPriority5Body); + $message->setPriority(5); + $message->setPublishedAt($time); + $producer->send($queue, $message); + + $message = $context->createMessage($expectedPriority5BodyOlderTime); + $message->setPriority(5); + $message->setPublishedAt($olderTime); + $producer->send($queue, $message); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(DbalMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedPriority5BodyOlderTime, $message->getBody()); + + $message = $consumer->receive(8000); // 8 sec + + $this->assertInstanceOf(DbalMessage::class, $message); + $consumer->acknowledge($message); + $this->assertSame($expectedPriority5Body, $message->getBody()); + } +} diff --git a/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php b/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php index 5ddc64509..7f9441784 100644 --- a/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php +++ b/pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Dbal\Tests\Spec; +use Enqueue\Dbal\DbalContext; use Enqueue\Dbal\DbalMessage; use Interop\Queue\PsrContext; use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec; @@ -13,6 +14,15 @@ class DbalSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePrio { use CreateDbalContextTrait; + private $publishedAt; + + public function setUp() + { + parent::setUp(); + + $this->publishedAt = (int) (microtime(true) * 10000); + } + /** * @return PsrContext */ @@ -24,13 +34,17 @@ protected function createContext() /** * {@inheritdoc} * + * @param DbalContext $context + * * @return DbalMessage */ - protected function createMessage(PsrContext $context, $priority) + protected function createMessage(PsrContext $context, $body) { /** @var DbalMessage $message */ - $message = $context->createMessage('priority'.$priority); - $message->setPriority($priority); + $message = parent::createMessage($context, $body); + + // in order to test priorities correctly we have to make sure the messages were sent in the same time. + $message->setPublishedAt($this->publishedAt); return $message; } diff --git a/pkg/dbal/composer.json b/pkg/dbal/composer.json index acffe3e1b..c30d0910e 100644 --- a/pkg/dbal/composer.json +++ b/pkg/dbal/composer.json @@ -15,7 +15,7 @@ "enqueue/test": "^0.8@dev", "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", - "queue-interop/queue-spec": "^0.5.4@dev", + "queue-interop/queue-spec": "^0.5.5@dev", "symfony/dependency-injection": "^2.8|^3|^4", "symfony/config": "^2.8|^3|^4" }, From ab5ee9972e77066840df29da19276ff87baab4ca Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 14 Feb 2018 14:14:51 +0200 Subject: [PATCH 3/3] fix phpstan --- pkg/dbal/Tests/Functional/DbalConsumerTest.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index 1f27993bd..92060500f 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -1,9 +1,10 @@