Skip to content

Commit 6233667

Browse files
authored
Merge pull request #565 from rosamarsky/mongodb-subscription-consumer
MongoDB Subscription Consumer feature
2 parents 0fd466c + 4a62957 commit 6233667

13 files changed

+531
-25
lines changed

docs/transport/mongodb.md

+34
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.
1010
* [Send expiration message](#send-expiration-message)
1111
* [Send delayed message](#send-delayed-message)
1212
* [Consume message](#consume-message)
13+
* [Subscription consumer](#subscription-consumer)
1314

1415
## Installation
1516

@@ -139,4 +140,37 @@ $consumer->acknowledge($message);
139140
// $consumer->reject($message);
140141
```
141142

143+
## Subscription consumer
144+
145+
```php
146+
<?php
147+
use Interop\Queue\PsrMessage;
148+
use Interop\Queue\PsrConsumer;
149+
150+
/** @var \Enqueue\Mongodb\MongodbContext $psrContext */
151+
/** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
152+
/** @var \Enqueue\Mongodb\MongodbDestination $barQueue */
153+
154+
$fooConsumer = $psrContext->createConsumer($fooQueue);
155+
$barConsumer = $psrContext->createConsumer($barQueue);
156+
157+
$subscriptionConsumer = $psrContext->createSubscriptionConsumer();
158+
$subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
159+
// process message
160+
161+
$consumer->acknowledge($message);
162+
163+
return true;
164+
});
165+
$subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
166+
// process message
167+
168+
$consumer->acknowledge($message);
169+
170+
return true;
171+
});
172+
173+
$subscriptionConsumer->consume(2000); // 2 sec
174+
```
175+
142176
[back to index](../index.md)

pkg/mongodb/MongodbConsumer.php

+2-16
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public function reject(Message $message, bool $requeue = false): void
115115
}
116116
}
117117

118-
protected function receiveMessage(): ?MongodbMessage
118+
private function receiveMessage(): ?MongodbMessage
119119
{
120120
$now = time();
121121
$collection = $this->context->getCollection();
@@ -137,23 +137,9 @@ protected function receiveMessage(): ?MongodbMessage
137137
return null;
138138
}
139139
if (empty($message['time_to_live']) || $message['time_to_live'] > time()) {
140-
return $this->convertMessage($message);
140+
return $this->context->convertMessage($message);
141141
}
142142

143143
return null;
144144
}
145-
146-
protected function convertMessage(array $mongodbMessage): MongodbMessage
147-
{
148-
$properties = JSON::decode($mongodbMessage['properties']);
149-
$headers = JSON::decode($mongodbMessage['headers']);
150-
151-
$message = $this->context->createMessage($mongodbMessage['body'], $properties, $headers);
152-
$message->setId((string) $mongodbMessage['_id']);
153-
$message->setPriority((int) $mongodbMessage['priority']);
154-
$message->setRedelivered((bool) $mongodbMessage['redelivered']);
155-
$message->setPublishedAt((int) $mongodbMessage['published_at']);
156-
157-
return $message;
158-
}
159145
}

pkg/mongodb/MongodbContext.php

+20-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
use Interop\Queue\Context;
99
use Interop\Queue\Destination;
1010
use Interop\Queue\Exception\InvalidDestinationException;
11-
use Interop\Queue\Exception\SubscriptionConsumerNotSupportedException;
1211
use Interop\Queue\Exception\TemporaryQueueNotSupportedException;
1312
use Interop\Queue\Message;
1413
use Interop\Queue\Producer;
@@ -107,7 +106,26 @@ public function close(): void
107106

108107
public function createSubscriptionConsumer(): SubscriptionConsumer
109108
{
110-
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
109+
return new MongodbSubscriptionConsumer($this);
110+
}
111+
112+
/**
113+
* @internal It must be used here and in the consumer only
114+
*/
115+
public function convertMessage(array $mongodbMessage): MongodbMessage
116+
{
117+
$mongodbMessageObj = $this->createMessage(
118+
$mongodbMessage['body'],
119+
JSON::decode($mongodbMessage['properties']),
120+
JSON::decode($mongodbMessage['headers'])
121+
);
122+
123+
$mongodbMessageObj->setId((string) $mongodbMessage['_id']);
124+
$mongodbMessageObj->setPriority((int) $mongodbMessage['priority']);
125+
$mongodbMessageObj->setRedelivered((bool) $mongodbMessage['redelivered']);
126+
$mongodbMessageObj->setPublishedAt((int) $mongodbMessage['published_at']);
127+
128+
return $mongodbMessageObj;
111129
}
112130

113131
/**
+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Mongodb;
6+
7+
use Interop\Queue\Consumer;
8+
use Interop\Queue\SubscriptionConsumer;
9+
10+
class MongodbSubscriptionConsumer implements SubscriptionConsumer
11+
{
12+
/**
13+
* @var MongodbContext
14+
*/
15+
private $context;
16+
17+
/**
18+
* an item contains an array: [MongodbConsumer $consumer, callable $callback];.
19+
*
20+
* @var array
21+
*/
22+
private $subscribers;
23+
24+
/**
25+
* @param MongodbContext $context
26+
*/
27+
public function __construct(MongodbContext $context)
28+
{
29+
$this->context = $context;
30+
$this->subscribers = [];
31+
}
32+
33+
public function consume(int $timeout = 0): void
34+
{
35+
if (empty($this->subscribers)) {
36+
throw new \LogicException('No subscribers');
37+
}
38+
39+
$timeout = (int) ceil($timeout / 1000);
40+
$endAt = time() + $timeout;
41+
42+
$queueNames = [];
43+
foreach (array_keys($this->subscribers) as $queueName) {
44+
$queueNames[$queueName] = $queueName;
45+
}
46+
47+
$currentQueueNames = [];
48+
while (true) {
49+
if (empty($currentQueueNames)) {
50+
$currentQueueNames = $queueNames;
51+
}
52+
53+
$result = $this->context->getCollection()->findOneAndDelete(
54+
[
55+
'queue' => ['$in' => array_keys($currentQueueNames)],
56+
'$or' => [
57+
['delayed_until' => ['$exists' => false]],
58+
['delayed_until' => ['$lte' => time()]],
59+
],
60+
],
61+
[
62+
'sort' => ['priority' => -1, 'published_at' => 1],
63+
'typeMap' => ['root' => 'array', 'document' => 'array'],
64+
]
65+
);
66+
67+
if ($result) {
68+
list($consumer, $callback) = $this->subscribers[$result['queue']];
69+
70+
$message = $this->context->convertMessage($result);
71+
72+
if (false === call_user_func($callback, $message, $consumer)) {
73+
return;
74+
}
75+
76+
unset($currentQueueNames[$result['queue']]);
77+
} else {
78+
$currentQueueNames = [];
79+
80+
usleep(200000); // 200ms
81+
}
82+
83+
if ($timeout && microtime(true) >= $endAt) {
84+
return;
85+
}
86+
}
87+
}
88+
89+
/**
90+
* @param MongodbConsumer $consumer
91+
*/
92+
public function subscribe(Consumer $consumer, callable $callback): void
93+
{
94+
if (false == $consumer instanceof MongodbConsumer) {
95+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
96+
}
97+
98+
$queueName = $consumer->getQueue()->getQueueName();
99+
if (array_key_exists($queueName, $this->subscribers)) {
100+
if ($this->subscribers[$queueName][0] === $consumer && $this->subscribers[$queueName][1] === $callback) {
101+
return;
102+
}
103+
104+
throw new \InvalidArgumentException(sprintf('There is a consumer subscribed to queue: "%s"', $queueName));
105+
}
106+
107+
$this->subscribers[$queueName] = [$consumer, $callback];
108+
}
109+
110+
/**
111+
* @param MongodbConsumer $consumer
112+
*/
113+
public function unsubscribe(Consumer $consumer): void
114+
{
115+
if (false == $consumer instanceof MongodbConsumer) {
116+
throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', MongodbConsumer::class, get_class($consumer)));
117+
}
118+
119+
$queueName = $consumer->getQueue()->getQueueName();
120+
121+
if (false == array_key_exists($queueName, $this->subscribers)) {
122+
return;
123+
}
124+
125+
if ($this->subscribers[$queueName][0] !== $consumer) {
126+
return;
127+
}
128+
129+
unset($this->subscribers[$queueName]);
130+
}
131+
132+
public function unsubscribeAll(): void
133+
{
134+
$this->subscribers = [];
135+
}
136+
}

pkg/mongodb/Tests/MongodbContextTest.php

+26
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,32 @@ public function testShouldCreateMessage()
7171
$this->assertFalse($message->isRedelivered());
7272
}
7373

74+
public function testShouldConvertFromArrayToMongodbMessage()
75+
{
76+
$arrayData = [
77+
'_id' => 'stringId',
78+
'body' => 'theBody',
79+
'properties' => json_encode(['barProp' => 'barPropVal']),
80+
'headers' => json_encode(['fooHeader' => 'fooHeaderVal']),
81+
'priority' => '12',
82+
'published_at' => 1525935820,
83+
'redelivered' => false,
84+
];
85+
86+
$context = new MongodbContext($this->createClientMock());
87+
$message = $context->convertMessage($arrayData);
88+
89+
$this->assertInstanceOf(MongodbMessage::class, $message);
90+
91+
$this->assertEquals('stringId', $message->getId());
92+
$this->assertEquals('theBody', $message->getBody());
93+
$this->assertEquals(['barProp' => 'barPropVal'], $message->getProperties());
94+
$this->assertEquals(['fooHeader' => 'fooHeaderVal'], $message->getHeaders());
95+
$this->assertEquals(12, $message->getPriority());
96+
$this->assertEquals(1525935820, $message->getPublishedAt());
97+
$this->assertFalse($message->isRedelivered());
98+
}
99+
74100
public function testShouldCreateTopic()
75101
{
76102
$context = new MongodbContext($this->createClientMock());

0 commit comments

Comments
 (0)