@@ -10,6 +10,7 @@ Allows to use [MongoDB](https://www.mongodb.com/) as a message queue broker.
10
10
* [ Send expiration message] ( #send-expiration-message )
11
11
* [ Send delayed message] ( #send-delayed-message )
12
12
* [ Consume message] ( #consume-message )
13
+ * [ Subscription consumer] ( #subscription-consumer )
13
14
14
15
## Installation
15
16
@@ -139,4 +140,37 @@ $consumer->acknowledge($message);
139
140
// $consumer->reject($message);
140
141
```
141
142
143
+ ## Subscription consumer
144
+
145
+ ``` php
146
+ <?php
147
+ use Interop\Queue\PsrMessage;
148
+ use Interop\Queue\PsrConsumer;
149
+
150
+ /** @var \Enqueue\Mongodb\MongodbContext $psrContext */
151
+ /** @var \Enqueue\Mongodb\MongodbDestination $fooQueue */
152
+ /** @var \Enqueue\Mongodb\MongodbDestination $barQueue */
153
+
154
+ $fooConsumer = $psrContext->createConsumer($fooQueue);
155
+ $barConsumer = $psrContext->createConsumer($barQueue);
156
+
157
+ $subscriptionConsumer = $psrContext->createSubscriptionConsumer();
158
+ $subscriptionConsumer->subscribe($fooConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
159
+ // process message
160
+
161
+ $consumer->acknowledge($message);
162
+
163
+ return true;
164
+ });
165
+ $subscriptionConsumer->subscribe($barConsumer, function(PsrMessage $message, PsrConsumer $consumer) {
166
+ // process message
167
+
168
+ $consumer->acknowledge($message);
169
+
170
+ return true;
171
+ });
172
+
173
+ $subscriptionConsumer->consume(2000); // 2 sec
174
+ ```
175
+
142
176
[ back to index] ( ../index.md )
0 commit comments