Skip to content

Commit a4392ca

Browse files
authored
Merge pull request #563 from rosamarsky/dbal-subscription-consumer
Dbal Subscription Consumer feature
2 parents 6233667 + 1c9f591 commit a4392ca

13 files changed

+577
-27
lines changed

Diff for: docs/transport/amqp.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer;
235235
$fooConsumer = $psrContext->createConsumer($fooQueue);
236236
$barConsumer = $psrContext->createConsumer($barQueue);
237237

238-
$subscriptionConsumer =$psrContext->createSubscriptionConsumer();
238+
$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
239239
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
240240
// process message
241241

Diff for: docs/transport/amqp_bunny.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ use Interop\Queue\PsrConsumer;
227227
$fooConsumer = $psrContext->createConsumer($fooQueue);
228228
$barConsumer = $psrContext->createConsumer($barQueue);
229229

230-
$subscriptionConsumer =$psrContext->createSubscriptionConsumer();
230+
$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
231231
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
232232
// process message
233233

Diff for: docs/transport/amqp_lib.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ use Interop\Queue\PsrConsumer;
235235
$fooConsumer = $psrContext->createConsumer($fooQueue);
236236
$barConsumer = $psrContext->createConsumer($barQueue);
237237

238-
$subscriptionConsumer =$psrContext->createSubscriptionConsumer();
238+
$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
239239
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
240240
// process message
241241

Diff for: docs/transport/dbal.md

+34
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ It creates a table there. Pushes and pops messages to\from that table.
1111
* [Send message to topic](#send-message-to-topic)
1212
* [Send message to queue](#send-message-to-queue)
1313
* [Consume message](#consume-message)
14+
* [Subscription consumer](#subscription-consumer)
1415

1516
## Installation
1617

@@ -103,4 +104,37 @@ $message = $consumer->receive();
103104
// process a message
104105
```
105106

107+
## Subscription consumer
108+
109+
```php
110+
<?php
111+
use Interop\Queue\PsrMessage;
112+
use Interop\Queue\PsrConsumer;
113+
114+
/** @var \Enqueue\Dbal\DbalContext $psrContext */
115+
/** @var \Enqueue\Dbal\DbalDestination $fooQueue */
116+
/** @var \Enqueue\Dbal\DbalDestination $barQueue */
117+
118+
$fooConsumer = $psrContext->createConsumer($fooQueue);
119+
$barConsumer = $psrContext->createConsumer($barQueue);
120+
121+
$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
122+
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
123+
// process message
124+
125+
$consumer->acknowledge($message);
126+
127+
return true;
128+
});
129+
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
130+
// process message
131+
132+
$consumer->acknowledge($message);
133+
134+
return true;
135+
});
136+
137+
$subscriptionConsumer->consume(2000); // 2 sec
138+
```
139+
106140
[back to index](../index.md)

Diff for: pkg/dbal/DbalConsumer.php

+1-22
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ protected function receiveMessage(): ?DbalMessage
142142
$this->dbal->commit();
143143

144144
if (empty($dbalMessage['time_to_live']) || ($dbalMessage['time_to_live'] / 1000) > microtime(true)) {
145-
return $this->convertMessage($dbalMessage);
145+
return $this->context->convertMessage($dbalMessage);
146146
}
147147

148148
return null;
@@ -153,27 +153,6 @@ protected function receiveMessage(): ?DbalMessage
153153
}
154154
}
155155

156-
protected function convertMessage(array $dbalMessage): DbalMessage
157-
{
158-
/** @var DbalMessage $message */
159-
$message = $this->context->createMessage();
160-
161-
$message->setBody($dbalMessage['body']);
162-
$message->setPriority((int) $dbalMessage['priority']);
163-
$message->setRedelivered((bool) $dbalMessage['redelivered']);
164-
$message->setPublishedAt((int) $dbalMessage['published_at']);
165-
166-
if ($dbalMessage['headers']) {
167-
$message->setHeaders(JSON::decode($dbalMessage['headers']));
168-
}
169-
170-
if ($dbalMessage['properties']) {
171-
$message->setProperties(JSON::decode($dbalMessage['properties']));
172-
}
173-
174-
return $message;
175-
}
176-
177156
private function fetchPrioritizedMessage(int $now): ?array
178157
{
179158
$query = $this->dbal->createQueryBuilder();

Diff for: pkg/dbal/DbalContext.php

+25-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
use Interop\Queue\Context;
1212
use Interop\Queue\Destination;
1313
use Interop\Queue\Exception\InvalidDestinationException;
14-
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
1514
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
1615
use Interop\Queue\Message;
1716
use Interop\Queue\Producer;
@@ -126,7 +125,31 @@ public function close(): void
126125

127126
public function createSubscriptionConsumer(): SubscriptionConsumer
128127
{
129-
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
128+
return new DbalSubscriptionConsumer($this);
129+
}
130+
131+
/**
132+
* @internal It must be used here and in the consumer only
133+
*/
134+
public function convertMessage(array $dbalMessage): DbalMessage
135+
{
136+
$dbalMessageObj = $this->createMessage(
137+
$dbalMessage['body'],
138+
$dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [],
139+
$dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : []
140+
);
141+
142+
if (isset($dbalMessage['redelivered'])) {
143+
$dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']);
144+
}
145+
if (isset($dbalMessage['priority'])) {
146+
$dbalMessageObj->setPriority((int) $dbalMessage['priority']);
147+
}
148+
if (isset($dbalMessage['published_at'])) {
149+
$dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']);
150+
}
151+
152+
return $dbalMessageObj;
130153
}
131154

132155
/**

Diff for: pkg/dbal/DbalSubscriptionConsumer.php

+198
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Dbal;
6+
7+
use Doctrine\DBAL\Types\Type;
8+
use Interop\Queue\Consumer;
9+
use Interop\Queue\SubscriptionConsumer;
10+
11+
class DbalSubscriptionConsumer implements SubscriptionConsumer
12+
{
13+
/**
14+
* @var DbalContext
15+
*/
16+
private $context;
17+
18+
/**
19+
* an item contains an array: [DbalConsumer $consumer, callable $callback];.
20+
*
21+
* @var array
22+
*/
23+
private $subscribers;
24+
25+
/**
26+
* @var \Doctrine\DBAL\Connection
27+
*/
28+
private $dbal;
29+
30+
/**
31+
* @param DbalContext $context
32+
*/
33+
public function __construct(DbalContext $context)
34+
{
35+
$this->context = $context;
36+
$this->dbal = $this->context->getDbalConnection();
37+
$this->subscribers = [];
38+
}
39+
40+
public function consume(int $timeout = 0): void
41+
{
42+
if (empty($this->subscribers)) {
43+
throw new \LogicException('No subscribers');
44+
}
45+
46+
$timeout = (int) ceil($timeout / 1000);
47+
$endAt = time() + $timeout;
48+
49+
$queueNames = [];
50+
foreach (array_keys($this->subscribers) as $queueName) {
51+
$queueNames[$queueName] = $queueName;
52+
}
53+
54+
$currentQueueNames = [];
55+
while (true) {
56+
if (empty($currentQueueNames)) {
57+
$currentQueueNames = $queueNames;
58+
}
59+
60+
$message = $this->fetchPrioritizedMessage($currentQueueNames) ?: $this->fetchMessage($currentQueueNames);
61+
62+
if ($message) {
63+
$this->dbal->delete($this->context->getTableName(), ['id' => $message['id']], ['id' => Type::GUID]);
64+
65+
$dbalMessage = $this->context->convertMessage($message);
66+
67+
/**
68+
* @var DbalConsumer
69+
* @var callable $callback
70+
*/
71+
list($consumer, $callback) = $this->subscribers[$message['queue']];
72+
73+
if (false === call_user_func($callback, $dbalMessage, $consumer)) {
74+
return;
75+
}
76+
77+
unset($currentQueueNames[$message['queue']]);
78+
} else {
79+
$currentQueueNames = [];
80+
81+
usleep(200000); // 200ms
82+
}
83+
84+
if ($timeout && microtime(true) >= $endAt) {
85+
return;
86+
}
87+
}
88+
}
89+
90+
/**
91+
* @param DbalConsumer $consumer
92+
*/
93+
public function subscribe(Consumer $consumer, callable $callback): void
94+
{
95+
if (false == $consumer instanceof DbalConsumer) {
96+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer)));
97+
}
98+
99+
$queueName = $consumer->getQueue()->getQueueName();
100+
if (array_key_exists($queueName, $this->subscribers)) {
101+
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
102+
return;
103+
}
104+
105+
throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
106+
}
107+
108+
$this->subscribers[$queueName] = [$consumer, $callback];
109+
}
110+
111+
/**
112+
* @param DbalConsumer $consumer
113+
*/
114+
public function unsubscribe(Consumer $consumer): void
115+
{
116+
if (false == $consumer instanceof DbalConsumer) {
117+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', DbalConsumer::class, get_class($consumer)));
118+
}
119+
120+
$queueName = $consumer->getQueue()->getQueueName();
121+
122+
if (false == array_key_exists($queueName, $this->subscribers)) {
123+
return;
124+
}
125+
126+
if ($this->subscribers[$queueName][0] !== $consumer) {
127+
return;
128+
}
129+
130+
unset($this->subscribers[$queueName]);
131+
}
132+
133+
public function unsubscribeAll(): void
134+
{
135+
$this->subscribers = [];
136+
}
137+
138+
private function fetchMessage(array $queues): ?array
139+
{
140+
$query = $this->dbal->createQueryBuilder();
141+
$query
142+
->select('*')
143+
->from($this->context->getTableName())
144+
->andWhere('queue IN (:queues)')
145+
->andWhere('priority IS NULL')
146+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
147+
->addOrderBy('published_at', 'asc')
148+
->setMaxResults(1)
149+
;
150+
151+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
152+
153+
$result = $this->dbal->executeQuery(
154+
$sql,
155+
[
156+
'queues' => array_keys($queues),
157+
'delayedUntil' => time(),
158+
],
159+
[
160+
'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY,
161+
'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER,
162+
]
163+
)->fetch();
164+
165+
return $result ?: null;
166+
}
167+
168+
private function fetchPrioritizedMessage(array $queues): ?array
169+
{
170+
$query = $this->dbal->createQueryBuilder();
171+
$query
172+
->select('*')
173+
->from($this->context->getTableName())
174+
->andWhere('queue IN (:queues)')
175+
->andWhere('priority IS NOT NULL')
176+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
177+
->addOrderBy('published_at', 'asc')
178+
->addOrderBy('priority', 'desc')
179+
->setMaxResults(1)
180+
;
181+
182+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
183+
184+
$result = $this->dbal->executeQuery(
185+
$sql,
186+
[
187+
'queues' => array_keys($queues),
188+
'delayedUntil' => time(),
189+
],
190+
[
191+
'queues' => \Doctrine\DBAL\Connection::PARAM_STR_ARRAY,
192+
'delayedUntil' => \Doctrine\DBAL\ParameterType::INTEGER,
193+
]
194+
)->fetch();
195+
196+
return $result ?: null;
197+
}
198+
}

Diff for: pkg/dbal/Tests/DbalContextTest.php

+15
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,21 @@ public function testShouldCreateMessage()
6565
$this->assertFalse($message->isRedelivered());
6666
}
6767

68+
public function testShouldConvertArrayToDbalMessage()
69+
{
70+
$arrayData = [
71+
'body' => 'theBody',
72+
'properties' => json_encode(['barProp' => 'barPropVal']),
73+
'headers' => json_encode(['fooHeader' => 'fooHeaderVal']),
74+
];
75+
$context = new DbalContext($this->createConnectionMock());
76+
$message = $context->convertMessage($arrayData);
77+
78+
$this->assertSame('theBody', $message->getBody());
79+
$this->assertSame(['barProp' => 'barPropVal'], $message->getProperties());
80+
$this->assertSame(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
81+
}
82+
6883
public function testShouldCreateTopic()
6984
{
7085
$context = new DbalContext($this->createConnectionMock());

Diff for: pkg/dbal/Tests/DbalMessageTest.php

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
<?php
22

3+
declare(strict_types=1);
4+
35
namespace Enqueue\Dbal\Tests;
46

57
use Enqueue\Dbal\DbalMessage;

0 commit comments

Comments
 (0)