Skip to content

Commit 61dd291

Browse files
authored
Merge pull request #488 from php-enqueue/subscription-consumer
Improve multi queue consumption.
2 parents 2ff4f18 + cb865a9 commit 61dd291

File tree

13 files changed

+684
-13
lines changed

13 files changed

+684
-13
lines changed

Diff for: pkg/amqp-bunny/AmqpContext.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Enqueue\AmqpTools\DelayStrategyAware;
1010
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
1111
use Enqueue\AmqpTools\SignalSocketHelper;
12+
use Enqueue\AmqpTools\SubscriptionConsumer;
1213
use Interop\Amqp\AmqpBind as InteropAmqpBind;
1314
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
1415
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -22,9 +23,10 @@
2223
use Interop\Queue\Exception;
2324
use Interop\Queue\InvalidDestinationException;
2425
use Interop\Queue\PsrDestination;
26+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
2527
use Interop\Queue\PsrTopic;
2628

27-
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
29+
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
2830
{
2931
use DelayStrategyAwareTrait;
3032

@@ -136,6 +138,14 @@ public function createConsumer(PsrDestination $destination)
136138
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
137139
}
138140

141+
/**
142+
* {@inheritdoc}
143+
*/
144+
public function createSubscriptionConsumer()
145+
{
146+
return new SubscriptionConsumer($this);
147+
}
148+
139149
/**
140150
* @return AmqpProducer
141151
*/
@@ -323,6 +333,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
323333
}
324334

325335
/**
336+
* @deprecated since 0.8.34 will be removed in 0.9
337+
*
326338
* {@inheritdoc}
327339
*/
328340
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
@@ -366,6 +378,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
366378
}
367379

368380
/**
381+
* @deprecated since 0.8.34 will be removed in 0.9
382+
*
369383
* {@inheritdoc}
370384
*/
371385
public function unsubscribe(InteropAmqpConsumer $consumer)
@@ -382,6 +396,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
382396
}
383397

384398
/**
399+
* @deprecated since 0.8.34 will be removed in 0.9
400+
*
385401
* {@inheritdoc}
386402
*/
387403
public function consume($timeout = 0)

Diff for: pkg/amqp-bunny/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
"queue-interop/amqp-interop": "^0.7@dev",
1212
"bunny/bunny": "^0.2.4|^0.3|^0.4",
13-
"enqueue/amqp-tools": "^0.8.4@dev"
13+
"enqueue/amqp-tools": "^0.8@dev"
1414
},
1515
"require-dev": {
1616
"phpunit/phpunit": "~5.4.0",

Diff for: pkg/amqp-ext/AmqpContext.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
7+
use Enqueue\AmqpTools\SubscriptionConsumer;
78
use Interop\Amqp\AmqpBind as InteropAmqpBind;
89
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
910
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -16,9 +17,10 @@
1617
use Interop\Queue\Exception;
1718
use Interop\Queue\InvalidDestinationException;
1819
use Interop\Queue\PsrDestination;
20+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
1921
use Interop\Queue\PsrTopic;
2022

21-
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
23+
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
2224
{
2325
use DelayStrategyAwareTrait;
2426

@@ -260,6 +262,14 @@ public function createConsumer(PsrDestination $destination)
260262
return new AmqpConsumer($this, $destination, $this->buffer, $this->receiveMethod);
261263
}
262264

265+
/**
266+
* {@inheritdoc}
267+
*/
268+
public function createSubscriptionConsumer()
269+
{
270+
return new SubscriptionConsumer($this);
271+
}
272+
263273
/**
264274
* {@inheritdoc}
265275
*/
@@ -300,6 +310,8 @@ public function getExtChannel()
300310
}
301311

302312
/**
313+
* @deprecated since 0.8.34 will be removed in 0.9
314+
*
303315
* {@inheritdoc}
304316
*/
305317
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
@@ -319,6 +331,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
319331
}
320332

321333
/**
334+
* @deprecated since 0.8.34 will be removed in 0.9
335+
*
322336
* {@inheritdoc}
323337
*/
324338
public function unsubscribe(InteropAmqpConsumer $consumer)
@@ -337,6 +351,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
337351
}
338352

339353
/**
354+
* @deprecated since 0.8.34 will be removed in 0.9
355+
*
340356
* {@inheritdoc}
341357
*/
342358
public function consume($timeout = 0)

Diff for: pkg/amqp-ext/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"ext-amqp": "^1.9.3",
1111

1212
"queue-interop/amqp-interop": "^0.7@dev",
13-
"enqueue/amqp-tools": "^0.8.4@dev"
13+
"enqueue/amqp-tools": "^0.8@dev"
1414
},
1515
"require-dev": {
1616
"phpunit/phpunit": "~5.4.0",

Diff for: pkg/amqp-lib/AmqpContext.php

+17-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
77
use Enqueue\AmqpTools\SignalSocketHelper;
8+
use Enqueue\AmqpTools\SubscriptionConsumer;
89
use Interop\Amqp\AmqpBind as InteropAmqpBind;
910
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
1011
use Interop\Amqp\AmqpContext as InteropAmqpContext;
@@ -18,6 +19,7 @@
1819
use Interop\Queue\Exception;
1920
use Interop\Queue\InvalidDestinationException;
2021
use Interop\Queue\PsrDestination;
22+
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
2123
use Interop\Queue\PsrTopic;
2224
use PhpAmqpLib\Channel\AMQPChannel;
2325
use PhpAmqpLib\Connection\AbstractConnection;
@@ -26,7 +28,7 @@
2628
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
2729
use PhpAmqpLib\Wire\AMQPTable;
2830

29-
class AmqpContext implements InteropAmqpContext, DelayStrategyAware
31+
class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscriptionConsumerAwareContext
3032
{
3133
use DelayStrategyAwareTrait;
3234

@@ -129,6 +131,14 @@ public function createConsumer(PsrDestination $destination)
129131
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
130132
}
131133

134+
/**
135+
* {@inheritdoc}
136+
*/
137+
public function createSubscriptionConsumer()
138+
{
139+
return new SubscriptionConsumer($this);
140+
}
141+
132142
/**
133143
* @return AmqpProducer
134144
*/
@@ -316,6 +326,8 @@ public function setQos($prefetchSize, $prefetchCount, $global)
316326
}
317327

318328
/**
329+
* @deprecated since 0.8.34 will be removed in 0.9
330+
*
319331
* {@inheritdoc}
320332
*/
321333
public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
@@ -359,6 +371,8 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
359371
}
360372

361373
/**
374+
* @deprecated since 0.8.34 will be removed in 0.9
375+
*
362376
* {@inheritdoc}
363377
*/
364378
public function unsubscribe(InteropAmqpConsumer $consumer)
@@ -376,6 +390,8 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
376390
}
377391

378392
/**
393+
* @deprecated since 0.8.34 will be removed in 0.9
394+
*
379395
* {@inheritdoc}
380396
*/
381397
public function consume($timeout = 0)

Diff for: pkg/amqp-lib/composer.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@
88
"require": {
99
"php": ">=5.6",
1010
"php-amqplib/php-amqplib": "^2.7@dev",
11-
"queue-interop/queue-interop": "^0.6@dev|^1.0.0-alpha1",
1211
"queue-interop/amqp-interop": "^0.7@dev",
13-
"enqueue/amqp-tools": "^0.8.5@dev"
12+
"enqueue/amqp-tools": "^0.8@dev"
1413
},
1514
"require-dev": {
1615
"phpunit/phpunit": "~5.4.0",

Diff for: pkg/amqp-tools/SubscriptionConsumer.php

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpTools;
4+
5+
use Interop\Amqp\AmqpContext;
6+
use Interop\Queue\PsrConsumer;
7+
use Interop\Queue\PsrSubscriptionConsumer;
8+
9+
/**
10+
* @deprecated this is BC layer, will be removed in 0.9
11+
*/
12+
final class SubscriptionConsumer implements PsrSubscriptionConsumer
13+
{
14+
/**
15+
* @var AmqpContext
16+
*/
17+
private $context;
18+
19+
public function __construct(AmqpContext $context)
20+
{
21+
$this->context = $context;
22+
}
23+
24+
/**
25+
* {@inheritdoc}
26+
*/
27+
public function consume($timeout = 0)
28+
{
29+
$this->context->consume($timeout);
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function subscribe(PsrConsumer $consumer, callable $callback)
36+
{
37+
$this->context->subscribe($consumer, $callback);
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function unsubscribe(PsrConsumer $consumer)
44+
{
45+
$this->context->unsubscribe($consumer);
46+
}
47+
48+
/**
49+
* TODO.
50+
*
51+
* {@inheritdoc}
52+
*/
53+
public function unsubscribeAll()
54+
{
55+
throw new \LogicException('Not implemented');
56+
}
57+
}

Diff for: pkg/amqp-tools/Tests/SubscriptionConsumerTest.php

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpTools\Tests;
4+
5+
use Enqueue\AmqpTools\SubscriptionConsumer;
6+
use Interop\Amqp\AmqpConsumer;
7+
use Interop\Amqp\AmqpContext;
8+
use Interop\Queue\PsrSubscriptionConsumer;
9+
10+
class SubscriptionConsumerTest extends \PHPUnit_Framework_TestCase
11+
{
12+
public function testShouldImplementPsrSubscriptionConsumerInterface()
13+
{
14+
$rc = new \ReflectionClass(SubscriptionConsumer::class);
15+
16+
$this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class));
17+
}
18+
19+
public function testCouldBeConstructedWithAmqpContextAsFirstArgument()
20+
{
21+
new SubscriptionConsumer($this->createContext());
22+
}
23+
24+
public function testShouldProxySubscribeCallToContextMethod()
25+
{
26+
$consumer = $this->createConsumer();
27+
$callback = function () {};
28+
29+
$context = $this->createContext();
30+
$context
31+
->expects($this->once())
32+
->method('subscribe')
33+
->with($this->identicalTo($consumer), $this->identicalTo($callback))
34+
;
35+
36+
$subscriptionConsumer = new SubscriptionConsumer($context);
37+
$subscriptionConsumer->subscribe($consumer, $callback);
38+
}
39+
40+
public function testShouldProxyUnsubscribeCallToContextMethod()
41+
{
42+
$consumer = $this->createConsumer();
43+
44+
$context = $this->createContext();
45+
$context
46+
->expects($this->once())
47+
->method('unsubscribe')
48+
->with($this->identicalTo($consumer))
49+
;
50+
51+
$subscriptionConsumer = new SubscriptionConsumer($context);
52+
$subscriptionConsumer->unsubscribe($consumer);
53+
}
54+
55+
public function testShouldProxyConsumeCallToContextMethod()
56+
{
57+
$timeout = 123.456;
58+
59+
$context = $this->createContext();
60+
$context
61+
->expects($this->once())
62+
->method('consume')
63+
->with($this->identicalTo($timeout))
64+
;
65+
66+
$subscriptionConsumer = new SubscriptionConsumer($context);
67+
$subscriptionConsumer->consume($timeout);
68+
}
69+
70+
public function testThrowsNotImplementedOnUnsubscribeAllCall()
71+
{
72+
$context = $this->createContext();
73+
74+
$subscriptionConsumer = new SubscriptionConsumer($context);
75+
76+
$this->expectException(\LogicException::class);
77+
$this->expectExceptionMessage('Not implemented');
78+
$subscriptionConsumer->unsubscribeAll();
79+
}
80+
81+
/**
82+
* @return AmqpConsumer|\PHPUnit_Framework_MockObject_MockObject
83+
*/
84+
private function createConsumer()
85+
{
86+
return $this->createMock(AmqpConsumer::class);
87+
}
88+
89+
/**
90+
* @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject
91+
*/
92+
private function createContext()
93+
{
94+
return $this->createMock(AmqpContext::class);
95+
}
96+
}

Diff for: pkg/enqueue-bundle/Resources/config/services.yml

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
parameters:
2+
enqueue.queue_consumer.enable_subscription_consumer: false
3+
enqueue.queue_consumer.default_idle_time: 0
4+
enqueue.queue_consumer.default_receive_timeout: 10
5+
16
services:
27
enqueue.consumption.extensions:
38
class: 'Enqueue\Consumption\ChainExtension'
@@ -11,8 +16,10 @@ services:
1116
arguments:
1217
- '@enqueue.transport.context'
1318
- '@enqueue.consumption.extensions'
14-
- ~
15-
- ~
19+
- '%enqueue.queue_consumer.default_idle_time%'
20+
- '%enqueue.queue_consumer.default_receive_timeout%'
21+
calls:
22+
- ['enableSubscriptionConsumer', ['%enqueue.queue_consumer.enable_subscription_consumer%']]
1623

1724
# Deprecated. To be removed in 0.10.
1825
enqueue.consumption.queue_consumer:

0 commit comments

Comments
 (0)