Skip to content

Commit fdb6392

Browse files
authored
Merge pull request #195 from vtsykun/patch-1
Fixed losing message priority for dbal driver
2 parents 02b254f + 3ad3741 commit fdb6392

File tree

5 files changed

+139
-5
lines changed

5 files changed

+139
-5
lines changed

Diff for: pkg/dbal/Client/DbalDriver.php

+28-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ class DbalDriver implements DriverInterface
3030
*/
3131
private $queueMetaRegistry;
3232

33+
/**
34+
* @var array
35+
*/
36+
private static $priorityMap = [
37+
MessagePriority::VERY_LOW => 0,
38+
MessagePriority::LOW => 1,
39+
MessagePriority::NORMAL => 2,
40+
MessagePriority::HIGH => 3,
41+
MessagePriority::VERY_HIGH => 4,
42+
];
43+
3344
/**
3445
* @param DbalContext $context
3546
* @param Config $config
@@ -63,6 +74,9 @@ public function createTransportMessage(Message $message)
6374
$transportMessage->setDelay($message->getDelay());
6475
$transportMessage->setReplyTo($message->getReplyTo());
6576
$transportMessage->setCorrelationId($message->getCorrelationId());
77+
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
78+
$transportMessage->setPriority(self::$priorityMap[$message->getPriority()]);
79+
}
6680

6781
return $transportMessage;
6882
}
@@ -83,11 +97,16 @@ public function createClientMessage(PsrMessage $message)
8397
$clientMessage->setContentType($message->getHeader('content_type'));
8498
$clientMessage->setMessageId($message->getMessageId());
8599
$clientMessage->setTimestamp($message->getTimestamp());
86-
$clientMessage->setPriority(MessagePriority::NORMAL);
87100
$clientMessage->setDelay($message->getDelay());
88101
$clientMessage->setReplyTo($message->getReplyTo());
89102
$clientMessage->setCorrelationId($message->getCorrelationId());
90103

104+
$priorityMap = array_flip(self::$priorityMap);
105+
$priority = array_key_exists($message->getPriority(), $priorityMap) ?
106+
$priorityMap[$message->getPriority()] :
107+
MessagePriority::NORMAL;
108+
$clientMessage->setPriority($priority);
109+
91110
return $clientMessage;
92111
}
93112

@@ -156,4 +175,12 @@ public function getConfig()
156175
{
157176
return $this->config;
158177
}
178+
179+
/**
180+
* @return array
181+
*/
182+
public static function getPriorityMap()
183+
{
184+
return self::$priorityMap;
185+
}
159186
}

Diff for: pkg/dbal/DbalConsumer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ protected function receiveMessage()
176176
->where('queue = :queue')
177177
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
178178
->orderBy('priority', 'desc')
179-
->orderBy('id', 'asc')
179+
->addOrderBy('id', 'asc')
180180
->setMaxResults(1)
181181
;
182182

Diff for: pkg/dbal/Tests/Client/DbalDriverTest.php

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public function testShouldConvertTransportMessageToClientMessage()
9292
$transportMessage->setHeader('content_type', 'ContentType');
9393
$transportMessage->setMessageId('MessageId');
9494
$transportMessage->setTimestamp(1000);
95+
$transportMessage->setPriority(2);
9596
$transportMessage->setDelay(12345);
9697

9798
$driver = new DbalDriver(

Diff for: pkg/dbal/Tests/DbalConsumerTest.php

+18-3
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,15 @@ public function testShouldReceiveMessage()
172172
->will($this->returnSelf())
173173
;
174174
$queryBuilder
175-
->expects($this->exactly(2))
175+
->expects($this->exactly(1))
176176
->method('orderBy')
177177
->will($this->returnSelf())
178178
;
179+
$queryBuilder
180+
->expects($this->exactly(1))
181+
->method('addOrderBy')
182+
->will($this->returnSelf())
183+
;
179184

180185
$platform = $this->createPlatformMock();
181186

@@ -264,10 +269,15 @@ public function testShouldReturnNullIfThereIsNoNewMessage()
264269
->will($this->returnSelf())
265270
;
266271
$queryBuilder
267-
->expects($this->exactly(2))
272+
->expects($this->exactly(1))
268273
->method('orderBy')
269274
->will($this->returnSelf())
270275
;
276+
$queryBuilder
277+
->expects($this->exactly(1))
278+
->method('addOrderBy')
279+
->will($this->returnSelf())
280+
;
271281

272282
$platform = $this->createPlatformMock();
273283

@@ -352,10 +362,15 @@ public function testShouldThrowIfMessageWasNotRemoved()
352362
->will($this->returnSelf())
353363
;
354364
$queryBuilder
355-
->expects($this->exactly(2))
365+
->expects($this->exactly(1))
356366
->method('orderBy')
357367
->will($this->returnSelf())
358368
;
369+
$queryBuilder
370+
->expects($this->exactly(1))
371+
->method('addOrderBy')
372+
->will($this->returnSelf())
373+
;
359374

360375
$platform = $this->createPlatformMock();
361376

Diff for: pkg/dbal/Tests/DbalSendPriorityMessagesTest.php

+91
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?php
2+
3+
namespace Enqueue\Dbal\Tests;
4+
5+
use Enqueue\Dbal\DbalConnectionFactory;
6+
use Enqueue\Dbal\DbalContext;
7+
use Enqueue\Dbal\DbalMessage;
8+
use Interop\Queue\PsrContext;
9+
use Interop\Queue\PsrMessage;
10+
use PHPUnit\Framework\TestCase;
11+
12+
/**
13+
* @group functional
14+
*/
15+
class DbalSendPriorityMessagesTest extends TestCase
16+
{
17+
public function test()
18+
{
19+
$context = $this->createContext();
20+
$queue = $this->createQueue($context, 'default');
21+
$consumer = $context->createConsumer($queue);
22+
23+
// guard
24+
$this->assertNull($consumer->receiveNoWait());
25+
26+
$messagePriorities = [1, 0, 3];
27+
$producer = $context->createProducer();
28+
foreach ($messagePriorities as $priority) {
29+
$producer->send($queue, $this->createMessage($context, $priority));
30+
}
31+
32+
sort($messagePriorities);
33+
foreach (array_reverse($messagePriorities) as $priority) {
34+
$message = $consumer->receive(8000); // 8 sec
35+
36+
$this->assertInstanceOf(PsrMessage::class, $message);
37+
$consumer->acknowledge($message);
38+
$this->assertSame('priority'.$priority, $message->getBody());
39+
}
40+
}
41+
42+
/**
43+
* @return PsrContext
44+
*/
45+
protected function createContext()
46+
{
47+
$factory = new DbalConnectionFactory(
48+
[
49+
'lazy' => true,
50+
'connection' => [
51+
'dbname' => getenv('SYMFONY__DB__NAME'),
52+
'user' => getenv('SYMFONY__DB__USER'),
53+
'password' => getenv('SYMFONY__DB__PASSWORD'),
54+
'host' => getenv('SYMFONY__DB__HOST'),
55+
'port' => getenv('SYMFONY__DB__PORT'),
56+
'driver' => getenv('SYMFONY__DB__DRIVER'),
57+
],
58+
]
59+
);
60+
61+
return $factory->createContext();
62+
}
63+
64+
/**
65+
* {@inheritdoc}
66+
*
67+
* @param DbalContext $context
68+
*/
69+
protected function createQueue(PsrContext $context, $queueName)
70+
{
71+
$queue = $context->createQueue($queueName);
72+
$context->createDataBaseTable();
73+
74+
return $queue;
75+
}
76+
77+
/**
78+
* @param PsrContext $context
79+
* @param int $priority
80+
*
81+
* @return DbalMessage
82+
*/
83+
protected function createMessage(PsrContext $context, $priority)
84+
{
85+
/** @var DbalMessage $message */
86+
$message = $context->createMessage('priority'.$priority);
87+
$message->setPriority($priority);
88+
89+
return $message;
90+
}
91+
}

0 commit comments

Comments
 (0)