Skip to content

Commit d8be092

Browse files
authored
Merge pull request #371 from php-enqueue/pr-362
[dbal] Sort priority messages by published at date too.
2 parents 1562313 + 8914399 commit d8be092

9 files changed

+173
-52
lines changed

Diff for: pkg/dbal/DbalConsumer.php

+2
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ protected function convertMessage(array $dbalMessage)
186186
$message->setBody($dbalMessage['body']);
187187
$message->setPriority((int) $dbalMessage['priority']);
188188
$message->setRedelivered((bool) $dbalMessage['redelivered']);
189+
$message->setPublishedAt((int) $dbalMessage['published_at']);
189190

190191
if ($dbalMessage['headers']) {
191192
$message->setHeaders(JSON::decode($dbalMessage['headers']));
@@ -213,6 +214,7 @@ private function fetchPrioritizedMessage($now)
213214
->andWhere('priority IS NOT NULL')
214215
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
215216
->addOrderBy('priority', 'desc')
217+
->addOrderBy('published_at', 'asc')
216218
->setMaxResults(1)
217219
;
218220

Diff for: pkg/dbal/DbalMessage.php

+26-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ class DbalMessage implements PsrMessage
4141
*/
4242
private $timeToLive;
4343

44+
/**
45+
* Milliseconds, for example 15186054527288.
46+
*
47+
* Could be generated by the code: (int) (microtime(true) * 10000)
48+
*
49+
* @var int
50+
*/
51+
private $publishedAt;
52+
4453
/**
4554
* @param string $body
4655
* @param array $properties
@@ -259,7 +268,7 @@ public function getTimestamp()
259268
{
260269
$value = $this->getHeader('timestamp');
261270

262-
return $value === null ? null : (int) $value;
271+
return null === $value ? null : (int) $value;
263272
}
264273

265274
/**
@@ -269,4 +278,20 @@ public function setTimestamp($timestamp)
269278
{
270279
$this->setHeader('timestamp', $timestamp);
271280
}
281+
282+
/**
283+
* @return int
284+
*/
285+
public function getPublishedAt()
286+
{
287+
return $this->publishedAt;
288+
}
289+
290+
/**
291+
* @param int $publishedAt
292+
*/
293+
public function setPublishedAt($publishedAt)
294+
{
295+
$this->publishedAt = $publishedAt;
296+
}
272297
}

Diff for: pkg/dbal/DbalProducer.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
5454
InvalidDestinationException::assertDestinationInstanceOf($destination, DbalDestination::class);
5555
InvalidMessageException::assertMessageInstanceOf($message, DbalMessage::class);
5656

57-
if (null !== $this->priority && null === $message->getPriority()) {
57+
if (null !== $this->priority && 0 === $message->getPriority()) {
5858
$message->setPriority($this->priority);
5959
}
6060
if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) {
@@ -81,9 +81,14 @@ public function send(PsrDestination $destination, PsrMessage $message)
8181
throw new \LogicException('The generated uuid is empty');
8282
}
8383

84+
$publishedAt = null !== $message->getPublishedAt() ?
85+
$message->getPublishedAt() :
86+
(int) (microtime(true) * 10000)
87+
;
88+
8489
$dbalMessage = [
8590
'id' => $uuid,
86-
'published_at' => (int) (microtime(true) * 10000),
91+
'published_at' => $publishedAt,
8792
'body' => $body,
8893
'headers' => JSON::encode($message->getHeaders()),
8994
'properties' => JSON::encode($message->getProperties()),

Diff for: pkg/dbal/Tests/DbalConsumerTest.php

-36
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@
22

33
namespace Enqueue\Dbal\Tests;
44

5-
use Doctrine\DBAL\Connection;
6-
use Doctrine\DBAL\Platforms\AbstractPlatform;
7-
use Doctrine\DBAL\Query\QueryBuilder;
8-
use Doctrine\DBAL\Statement;
95
use Enqueue\Dbal\DbalConsumer;
106
use Enqueue\Dbal\DbalContext;
117
use Enqueue\Dbal\DbalDestination;
@@ -120,45 +116,13 @@ private function createProducerMock()
120116
return $this->createMock(DbalProducer::class);
121117
}
122118

123-
/**
124-
* @return \PHPUnit_Framework_MockObject_MockObject|Connection
125-
*/
126-
private function createConnectionMock()
127-
{
128-
return $this->createMock(Connection::class);
129-
}
130-
131-
/**
132-
* @return \PHPUnit_Framework_MockObject_MockObject|Statement
133-
*/
134-
private function createStatementMock()
135-
{
136-
return $this->createMock(Statement::class);
137-
}
138-
139119
/**
140120
* @return \PHPUnit_Framework_MockObject_MockObject|DbalContext
141121
*/
142122
private function createContextMock()
143123
{
144124
return $this->createMock(DbalContext::class);
145125
}
146-
147-
/**
148-
* @return \PHPUnit_Framework_MockObject_MockObject|QueryBuilder
149-
*/
150-
private function createQueryBuilderMock()
151-
{
152-
return $this->createMock(QueryBuilder::class);
153-
}
154-
155-
/**
156-
* @return \PHPUnit_Framework_MockObject_MockObject|AbstractPlatform
157-
*/
158-
private function createPlatformMock()
159-
{
160-
return $this->createMock(AbstractPlatform::class);
161-
}
162126
}
163127

164128
class InvalidMessage implements PsrMessage

Diff for: pkg/dbal/Tests/DbalMessageTest.php

+16
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ public function testShouldSetCorrelationIdAsHeader()
4949
$this->assertSame(['correlation_id' => 'theCorrelationId'], $message->getHeaders());
5050
}
5151

52+
public function testShouldSetPublishedAtToNullInConstructor()
53+
{
54+
$message = new DbalMessage();
55+
56+
$this->assertNull($message->getPublishedAt());
57+
}
58+
5259
public function testShouldSetMessageIdAsHeader()
5360
{
5461
$message = new DbalMessage();
@@ -72,4 +79,13 @@ public function testShouldSetReplyToAsHeader()
7279

7380
$this->assertSame(['reply_to' => 'theReply'], $message->getHeaders());
7481
}
82+
83+
public function testShouldAllowGetPreviouslySetPublishedAtTime()
84+
{
85+
$message = new DbalMessage();
86+
87+
$message->setPublishedAt(123);
88+
89+
$this->assertSame(123, $message->getPublishedAt());
90+
}
7591
}

Diff for: pkg/dbal/Tests/DbalProducerTest.php

-9
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
namespace Enqueue\Dbal\Tests;
44

5-
use Doctrine\DBAL\Connection;
65
use Enqueue\Dbal\DbalContext;
76
use Enqueue\Dbal\DbalDestination;
87
use Enqueue\Dbal\DbalMessage;
@@ -60,14 +59,6 @@ private function createContextMock()
6059
{
6160
return $this->createMock(DbalContext::class);
6261
}
63-
64-
/**
65-
* @return \PHPUnit_Framework_MockObject_MockObject|Connection
66-
*/
67-
private function createConnectionMock()
68-
{
69-
return $this->createMock(Connection::class);
70-
}
7162
}
7263

7364
class NotSupportedDestination1 implements PsrDestination

Diff for: pkg/dbal/Tests/Functional/DbalConsumerTest.php

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
<?php
2+
3+
namespace Enqueue\Dbal\Tests\Functional;
4+
5+
use Enqueue\Dbal\DbalContext;
6+
use Enqueue\Dbal\DbalMessage;
7+
use Enqueue\Dbal\Tests\Spec\CreateDbalContextTrait;
8+
use PHPUnit\Framework\TestCase;
9+
10+
/**
11+
* @group functional
12+
*/
13+
class DbalConsumerTest extends TestCase
14+
{
15+
use CreateDbalContextTrait;
16+
17+
/**
18+
* @var DbalContext
19+
*/
20+
private $context;
21+
22+
public function setUp()
23+
{
24+
$this->context = $this->createDbalContext();
25+
}
26+
27+
protected function tearDown()
28+
{
29+
if ($this->context) {
30+
$this->context->close();
31+
}
32+
33+
parent::tearDown();
34+
}
35+
36+
public function testShouldSetPublishedAtDateToReceivedMessage()
37+
{
38+
$context = $this->context;
39+
$queue = $context->createQueue(__METHOD__);
40+
41+
$consumer = $context->createConsumer($queue);
42+
43+
// guard
44+
$this->assertNull($consumer->receiveNoWait());
45+
46+
$time = (int) (microtime(true) * 10000);
47+
48+
$expectedBody = __CLASS__.$time;
49+
50+
$producer = $context->createProducer();
51+
52+
$message = $context->createMessage($expectedBody);
53+
$message->setPublishedAt($time);
54+
$producer->send($queue, $message);
55+
56+
$message = $consumer->receive(8000); // 8 sec
57+
58+
$this->assertInstanceOf(DbalMessage::class, $message);
59+
$consumer->acknowledge($message);
60+
$this->assertSame($expectedBody, $message->getBody());
61+
$this->assertSame($time, $message->getPublishedAt());
62+
}
63+
64+
public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate()
65+
{
66+
$context = $this->context;
67+
$queue = $context->createQueue(__METHOD__);
68+
69+
$consumer = $context->createConsumer($queue);
70+
71+
// guard
72+
$this->assertNull($consumer->receiveNoWait());
73+
74+
$time = (int) (microtime(true) * 10000);
75+
$olderTime = $time - 10000;
76+
77+
$expectedPriority5Body = __CLASS__.'_priority5_'.$time;
78+
$expectedPriority5BodyOlderTime = __CLASS__.'_priority5_'.$olderTime;
79+
80+
$producer = $context->createProducer();
81+
82+
$message = $context->createMessage($expectedPriority5Body);
83+
$message->setPriority(5);
84+
$message->setPublishedAt($time);
85+
$producer->send($queue, $message);
86+
87+
$message = $context->createMessage($expectedPriority5BodyOlderTime);
88+
$message->setPriority(5);
89+
$message->setPublishedAt($olderTime);
90+
$producer->send($queue, $message);
91+
92+
$message = $consumer->receive(8000); // 8 sec
93+
94+
$this->assertInstanceOf(DbalMessage::class, $message);
95+
$consumer->acknowledge($message);
96+
$this->assertSame($expectedPriority5BodyOlderTime, $message->getBody());
97+
98+
$message = $consumer->receive(8000); // 8 sec
99+
100+
$this->assertInstanceOf(DbalMessage::class, $message);
101+
$consumer->acknowledge($message);
102+
$this->assertSame($expectedPriority5Body, $message->getBody());
103+
}
104+
}

Diff for: pkg/dbal/Tests/Spec/DbalSendAndReceivePriorityMessagesFromQueueTest.php

+17-3
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Dbal\Tests\Spec;
44

5+
use Enqueue\Dbal\DbalContext;
56
use Enqueue\Dbal\DbalMessage;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;
@@ -13,6 +14,15 @@ class DbalSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePrio
1314
{
1415
use CreateDbalContextTrait;
1516

17+
private $publishedAt;
18+
19+
public function setUp()
20+
{
21+
parent::setUp();
22+
23+
$this->publishedAt = (int) (microtime(true) * 10000);
24+
}
25+
1626
/**
1727
* @return PsrContext
1828
*/
@@ -24,13 +34,17 @@ protected function createContext()
2434
/**
2535
* {@inheritdoc}
2636
*
37+
* @param DbalContext $context
38+
*
2739
* @return DbalMessage
2840
*/
29-
protected function createMessage(PsrContext $context, $priority)
41+
protected function createMessage(PsrContext $context, $body)
3042
{
3143
/** @var DbalMessage $message */
32-
$message = $context->createMessage('priority'.$priority);
33-
$message->setPriority($priority);
44+
$message = parent::createMessage($context, $body);
45+
46+
// in order to test priorities correctly we have to make sure the messages were sent in the same time.
47+
$message->setPublishedAt($this->publishedAt);
3448

3549
return $message;
3650
}

Diff for: pkg/dbal/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"enqueue/test": "^0.8@dev",
1616
"enqueue/enqueue": "^0.8@dev",
1717
"enqueue/null": "^0.8@dev",
18-
"queue-interop/queue-spec": "^0.5.4@dev",
18+
"queue-interop/queue-spec": "^0.5.5@dev",
1919
"symfony/dependency-injection": "^2.8|^3|^4",
2020
"symfony/config": "^2.8|^3|^4"
2121
},

0 commit comments

Comments
 (0)