diff --git a/pkg/amqp-bunny/AmqpContext.php b/pkg/amqp-bunny/AmqpContext.php index d0730d6b5..629454516 100644 --- a/pkg/amqp-bunny/AmqpContext.php +++ b/pkg/amqp-bunny/AmqpContext.php @@ -3,13 +3,9 @@ namespace Enqueue\AmqpBunny; use Bunny\Channel; -use Bunny\Client; -use Bunny\Exception\ClientException; use Bunny\Message; use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; -use Enqueue\AmqpTools\SignalSocketHelper; -use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -51,11 +47,9 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscrip private $buffer; /** - * an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];. - * - * @var array + * @var AmqpSubscriptionConsumer */ - private $subscribers; + private $bcSubscriptionConsumer; /** * Callable must return instance of \Bunny\Channel once called. @@ -81,7 +75,7 @@ public function __construct($bunnyChannel, $config = []) } $this->buffer = new Buffer(); - $this->subscribers = []; + $this->bcSubscriptionConsumer = $this->createSubscriptionConsumer(); } /** @@ -140,10 +134,12 @@ public function createConsumer(PsrDestination $destination) /** * {@inheritdoc} + * + * @return AmqpSubscriptionConsumer */ public function createSubscriptionConsumer() { - return new SubscriptionConsumer($this); + return new AmqpSubscriptionConsumer($this); } /** @@ -339,42 +335,7 @@ public function setQos($prefetchSize, $prefetchCount, $global) */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) { - if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { - return; - } - - $bunnyCallback = function (Message $message, Channel $channel, Client $bunny) { - $receivedMessage = $this->convertMessage($message); - $receivedMessage->setConsumerTag($message->consumerTag); - - /** - * @var AmqpConsumer - * @var callable $callback - */ - list($consumer, $callback) = $this->subscribers[$message->consumerTag]; - - if (false === call_user_func($callback, $receivedMessage, $consumer)) { - $bunny->stop(); - } - }; - - $frame = $this->getBunnyChannel()->consume( - $bunnyCallback, - $consumer->getQueue()->getQueueName(), - $consumer->getConsumerTag(), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT) - ); - - if (empty($frame->consumerTag)) { - throw new Exception('Got empty consumer tag'); - } - - $consumer->setConsumerTag($frame->consumerTag); - - $this->subscribers[$frame->consumerTag] = [$consumer, $callback]; + $this->bcSubscriptionConsumer->subscribe($consumer, $callback); } /** @@ -384,15 +345,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) */ public function unsubscribe(InteropAmqpConsumer $consumer) { - if (false == $consumer->getConsumerTag()) { - return; - } - - $consumerTag = $consumer->getConsumerTag(); - - $this->getBunnyChannel()->cancel($consumerTag); - $consumer->setConsumerTag(null); - unset($this->subscribers[$consumerTag]); + $this->bcSubscriptionConsumer->unsubscribe($consumer); } /** @@ -402,24 +355,7 @@ public function unsubscribe(InteropAmqpConsumer $consumer) */ public function consume($timeout = 0) { - if (empty($this->subscribers)) { - throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); - } - - $signalHandler = new SignalSocketHelper(); - $signalHandler->beforeSocket(); - - try { - $this->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); - } catch (ClientException $e) { - if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) { - return; - } - - throw $e; - } finally { - $signalHandler->afterSocket(); - } + $this->bcSubscriptionConsumer->consume($timeout); } /** diff --git a/pkg/amqp-bunny/AmqpSubscriptionConsumer.php b/pkg/amqp-bunny/AmqpSubscriptionConsumer.php new file mode 100644 index 000000000..7a28b9ace --- /dev/null +++ b/pkg/amqp-bunny/AmqpSubscriptionConsumer.php @@ -0,0 +1,141 @@ +context = $context; + + $this->subscribers = []; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + if (empty($this->subscribers)) { + throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); + } + + $signalHandler = new SignalSocketHelper(); + $signalHandler->beforeSocket(); + + try { + $this->context->getBunnyChannel()->getClient()->run(0 !== $timeout ? $timeout / 1000 : null); + } catch (ClientException $e) { + if ('stream_select() failed.' == $e->getMessage() && $signalHandler->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $signalHandler->afterSocket(); + } + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { + return; + } + + $bunnyCallback = function (Message $message, Channel $channel, Client $bunny) { + $receivedMessage = $this->context->convertMessage($message); + $receivedMessage->setConsumerTag($message->consumerTag); + + /** + * @var AmqpConsumer + * @var callable $callback + */ + list($consumer, $callback) = $this->subscribers[$message->consumerTag]; + + if (false === call_user_func($callback, $receivedMessage, $consumer)) { + $bunny->stop(); + } + }; + + $frame = $this->context->getBunnyChannel()->consume( + $bunnyCallback, + $consumer->getQueue()->getQueueName(), + $consumer->getConsumerTag(), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT) + ); + + if (empty($frame->consumerTag)) { + throw new Exception('Got empty consumer tag'); + } + + $consumer->setConsumerTag($frame->consumerTag); + + $this->subscribers[$frame->consumerTag] = [$consumer, $callback]; + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function unsubscribe(PsrConsumer $consumer) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if (false == $consumer->getConsumerTag()) { + return; + } + + $consumerTag = $consumer->getConsumerTag(); + + $this->context->getBunnyChannel()->cancel($consumerTag); + $consumer->setConsumerTag(null); + unset($this->subscribers[$consumerTag]); + } + + /** + * {@inheritdoc} + */ + public function unsubscribeAll() + { + foreach ($this->subscribers as list($consumer)) { + $this->unsubscribe($consumer); + } + } +} diff --git a/pkg/amqp-bunny/Tests/AmqpContextTest.php b/pkg/amqp-bunny/Tests/AmqpContextTest.php index 74f435ea5..2bc593f1f 100644 --- a/pkg/amqp-bunny/Tests/AmqpContextTest.php +++ b/pkg/amqp-bunny/Tests/AmqpContextTest.php @@ -5,9 +5,11 @@ use Bunny\Channel; use Bunny\Protocol\MethodQueueDeclareOkFrame; use Enqueue\AmqpBunny\AmqpContext; +use Enqueue\AmqpBunny\AmqpSubscriptionConsumer; use Interop\Amqp\Impl\AmqpBind; use Interop\Amqp\Impl\AmqpQueue; use Interop\Amqp\Impl\AmqpTopic; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use PHPUnit\Framework\TestCase; class AmqpContextTest extends TestCase @@ -235,6 +237,20 @@ public function testShouldSetQos() $context->setQos(123, 456, true); } + public function testShouldImplementPsrSubscriptionConsumerAwareInterface() + { + $rc = new \ReflectionClass(AmqpContext::class); + + $this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class)); + } + + public function testShouldReturnExpectedSubscriptionConsumerInstance() + { + $context = new AmqpContext($this->createChannelMock()); + + $this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Channel */ diff --git a/pkg/amqp-bunny/Tests/AmqpSubscriptionConsumerTest.php b/pkg/amqp-bunny/Tests/AmqpSubscriptionConsumerTest.php new file mode 100644 index 000000000..4ee444175 --- /dev/null +++ b/pkg/amqp-bunny/Tests/AmqpSubscriptionConsumerTest.php @@ -0,0 +1,31 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new AmqpSubscriptionConsumer($this->createAmqpContextMock()); + } + + /** + * @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createAmqpContextMock() + { + return $this->createMock(AmqpContext::class); + } +} diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index 256f0b19a..358ed5313 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -4,7 +4,6 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; -use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -45,11 +44,9 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscrip private $receiveMethod; /** - * an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];. - * - * @var array + * @var AmqpSubscriptionConsumer */ - private $subscribers; + private $bcSubscriptionConsumer; /** * Callable must return instance of \AMQPChannel once called. @@ -70,7 +67,7 @@ public function __construct($extChannel, $receiveMethod) } $this->buffer = new Buffer(); - $this->subscribers = []; + $this->bcSubscriptionConsumer = $this->createSubscriptionConsumer(); } /** @@ -267,7 +264,7 @@ public function createConsumer(PsrDestination $destination) */ public function createSubscriptionConsumer() { - return new SubscriptionConsumer($this); + return new AmqpSubscriptionConsumer($this); } /** @@ -316,18 +313,7 @@ public function getExtChannel() */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) { - if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { - return; - } - - $extQueue = new \AMQPQueue($this->getExtChannel()); - $extQueue->setName($consumer->getQueue()->getQueueName()); - - $extQueue->consume(null, Flags::convertConsumerFlags($consumer->getFlags()), $consumer->getConsumerTag()); - - $consumerTag = $extQueue->getConsumerTag(); - $consumer->setConsumerTag($consumerTag); - $this->subscribers[$consumerTag] = [$consumer, $callback, $extQueue]; + $this->bcSubscriptionConsumer->subscribe($consumer, $callback); } /** @@ -337,17 +323,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) */ public function unsubscribe(InteropAmqpConsumer $consumer) { - if (false == $consumer->getConsumerTag()) { - return; - } - - $consumerTag = $consumer->getConsumerTag(); - $consumer->setConsumerTag(null); - - list($consumer, $callback, $extQueue) = $this->subscribers[$consumerTag]; - - $extQueue->cancel($consumerTag); - unset($this->subscribers[$consumerTag]); + $this->bcSubscriptionConsumer->unsubscribe($consumer); } /** @@ -357,51 +333,7 @@ public function unsubscribe(InteropAmqpConsumer $consumer) */ public function consume($timeout = 0) { - if (empty($this->subscribers)) { - throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); - } - - /** @var \AMQPQueue $extQueue */ - $extConnection = $this->getExtChannel()->getConnection(); - - $originalTimeout = $extConnection->getReadTimeout(); - try { - $extConnection->setReadTimeout($timeout / 1000); - - reset($this->subscribers); - /** @var $consumer AmqpConsumer */ - list($consumer) = current($this->subscribers); - - $extQueue = new \AMQPQueue($this->getExtChannel()); - $extQueue->setName($consumer->getQueue()->getQueueName()); - $extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use ($originalTimeout, $extConnection) { - $consumeTimeout = $extConnection->getReadTimeout(); - try { - $extConnection->setReadTimeout($originalTimeout); - - $message = $this->convertMessage($extEnvelope); - $message->setConsumerTag($q->getConsumerTag()); - - /** - * @var AmqpConsumer - * @var callable $callback - */ - list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()]; - - return call_user_func($callback, $message, $consumer); - } finally { - $extConnection->setReadTimeout($consumeTimeout); - } - }, AMQP_JUST_CONSUME); - } catch (\AMQPQueueException $e) { - if ('Consumer timeout exceed' == $e->getMessage()) { - return null; - } - - throw $e; - } finally { - $extConnection->setReadTimeout($originalTimeout); - } + $this->bcSubscriptionConsumer->consume($timeout); } /** diff --git a/pkg/amqp-ext/AmqpSubscriptionConsumer.php b/pkg/amqp-ext/AmqpSubscriptionConsumer.php new file mode 100644 index 000000000..a9f43391c --- /dev/null +++ b/pkg/amqp-ext/AmqpSubscriptionConsumer.php @@ -0,0 +1,139 @@ +context = $context; + + $this->subscribers = []; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + if (empty($this->subscribers)) { + throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); + } + + /** @var \AMQPQueue $extQueue */ + $extConnection = $this->context->getExtChannel()->getConnection(); + + $originalTimeout = $extConnection->getReadTimeout(); + try { + $extConnection->setReadTimeout($timeout / 1000); + + reset($this->subscribers); + /** @var $consumer AmqpConsumer */ + list($consumer) = current($this->subscribers); + + $extQueue = new \AMQPQueue($this->context->getExtChannel()); + $extQueue->setName($consumer->getQueue()->getQueueName()); + $extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use ($originalTimeout, $extConnection) { + $consumeTimeout = $extConnection->getReadTimeout(); + try { + $extConnection->setReadTimeout($originalTimeout); + + $message = $this->context->convertMessage($extEnvelope); + $message->setConsumerTag($q->getConsumerTag()); + + /** + * @var AmqpConsumer + * @var callable $callback + */ + list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()]; + + return call_user_func($callback, $message, $consumer); + } finally { + $extConnection->setReadTimeout($consumeTimeout); + } + }, AMQP_JUST_CONSUME); + } catch (\AMQPQueueException $e) { + if ('Consumer timeout exceed' == $e->getMessage()) { + return null; + } + + throw $e; + } finally { + $extConnection->setReadTimeout($originalTimeout); + } + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { + return; + } + + $extQueue = new \AMQPQueue($this->context->getExtChannel()); + $extQueue->setName($consumer->getQueue()->getQueueName()); + + $extQueue->consume(null, Flags::convertConsumerFlags($consumer->getFlags()), $consumer->getConsumerTag()); + + $consumerTag = $extQueue->getConsumerTag(); + $consumer->setConsumerTag($consumerTag); + $this->subscribers[$consumerTag] = [$consumer, $callback, $extQueue]; + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function unsubscribe(PsrConsumer $consumer) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if (false == $consumer->getConsumerTag()) { + return; + } + + $consumerTag = $consumer->getConsumerTag(); + $consumer->setConsumerTag(null); + + list($consumer, $callback, $extQueue) = $this->subscribers[$consumerTag]; + + $extQueue->cancel($consumerTag); + unset($this->subscribers[$consumerTag]); + } + + /** + * {@inheritdoc} + */ + public function unsubscribeAll() + { + foreach ($this->subscribers as list($consumer)) { + $this->unsubscribe($consumer); + } + } +} diff --git a/pkg/amqp-ext/Tests/AmqpContextTest.php b/pkg/amqp-ext/Tests/AmqpContextTest.php index 42f23562c..2518cd178 100644 --- a/pkg/amqp-ext/Tests/AmqpContextTest.php +++ b/pkg/amqp-ext/Tests/AmqpContextTest.php @@ -5,6 +5,7 @@ use Enqueue\AmqpExt\AmqpConsumer; use Enqueue\AmqpExt\AmqpContext; use Enqueue\AmqpExt\AmqpProducer; +use Enqueue\AmqpExt\AmqpSubscriptionConsumer; use Enqueue\AmqpExt\Buffer; use Enqueue\Null\NullQueue; use Enqueue\Null\NullTopic; @@ -14,6 +15,7 @@ use Interop\Amqp\Impl\AmqpTopic; use Interop\Queue\InvalidDestinationException; use Interop\Queue\PsrContext; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use PHPUnit\Framework\TestCase; class AmqpContextTest extends TestCase @@ -248,6 +250,20 @@ public function testShouldClosePersistedConnection() $context->close(); } + public function testShouldImplementPsrSubscriptionConsumerAwareInterface() + { + $rc = new \ReflectionClass(AmqpContext::class); + + $this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class)); + } + + public function testShouldReturnExpectedSubscriptionConsumerInstance() + { + $context = new AmqpContext($this->createExtChannelMock(), 'basic_get'); + + $this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|\AMQPChannel */ diff --git a/pkg/amqp-ext/Tests/AmqpSubscriptionConsumerTest.php b/pkg/amqp-ext/Tests/AmqpSubscriptionConsumerTest.php new file mode 100644 index 000000000..8ef4964b7 --- /dev/null +++ b/pkg/amqp-ext/Tests/AmqpSubscriptionConsumerTest.php @@ -0,0 +1,31 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new AmqpSubscriptionConsumer($this->createAmqpContextMock()); + } + + /** + * @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createAmqpContextMock() + { + return $this->createMock(AmqpContext::class); + } +} diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php index 02c9bcb85..6b7933f98 100644 --- a/pkg/amqp-lib/AmqpContext.php +++ b/pkg/amqp-lib/AmqpContext.php @@ -4,8 +4,6 @@ use Enqueue\AmqpTools\DelayStrategyAware; use Enqueue\AmqpTools\DelayStrategyAwareTrait; -use Enqueue\AmqpTools\SignalSocketHelper; -use Enqueue\AmqpTools\SubscriptionConsumer; use Interop\Amqp\AmqpBind as InteropAmqpBind; use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer; use Interop\Amqp\AmqpContext as InteropAmqpContext; @@ -23,8 +21,6 @@ use Interop\Queue\PsrTopic; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; -use PhpAmqpLib\Exception\AMQPIOWaitException; -use PhpAmqpLib\Exception\AMQPTimeoutException; use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -53,11 +49,9 @@ class AmqpContext implements InteropAmqpContext, DelayStrategyAware, PsrSubscrip private $buffer; /** - * an item contains an array: [AmqpConsumerInterop $consumer, callable $callback];. - * - * @var array + * @var AmqpSubscriptionConsumer */ - private $subscribers; + private $bcSubscriptionConsumer; /** * @param AbstractConnection $connection @@ -74,7 +68,7 @@ public function __construct(AbstractConnection $connection, $config = []) $this->connection = $connection; $this->buffer = new Buffer(); - $this->subscribers = []; + $this->bcSubscriptionConsumer = $this->createSubscriptionConsumer(); } /** @@ -136,7 +130,7 @@ public function createConsumer(PsrDestination $destination) */ public function createSubscriptionConsumer() { - return new SubscriptionConsumer($this); + return new AmqpSubscriptionConsumer($this); } /** @@ -332,42 +326,7 @@ public function setQos($prefetchSize, $prefetchCount, $global) */ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) { - if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { - return; - } - - $libCallback = function (LibAMQPMessage $message) { - $receivedMessage = $this->convertMessage($message); - $receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']); - - /** - * @var AmqpConsumer - * @var callable $callback - */ - list($consumer, $callback) = $this->subscribers[$message->delivery_info['consumer_tag']]; - - if (false === call_user_func($callback, $receivedMessage, $consumer)) { - throw new StopBasicConsumptionException(); - } - }; - - $consumerTag = $this->getLibChannel()->basic_consume( - $consumer->getQueue()->getQueueName(), - $consumer->getConsumerTag(), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), - (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT), - $libCallback - ); - - if (empty($consumerTag)) { - throw new Exception('Got empty consumer tag'); - } - - $consumer->setConsumerTag($consumerTag); - - $this->subscribers[$consumerTag] = [$consumer, $callback]; + $this->bcSubscriptionConsumer->subscribe($consumer, $callback); } /** @@ -377,16 +336,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback) */ public function unsubscribe(InteropAmqpConsumer $consumer) { - if (false == $consumer->getConsumerTag()) { - return; - } - - $consumerTag = $consumer->getConsumerTag(); - - $this->getLibChannel()->basic_cancel($consumerTag); - - $consumer->setConsumerTag(null); - unset($this->subscribers[$consumerTag], $this->getLibChannel()->callbacks[$consumerTag]); + $this->bcSubscriptionConsumer->unsubscribe($consumer); } /** @@ -396,42 +346,7 @@ public function unsubscribe(InteropAmqpConsumer $consumer) */ public function consume($timeout = 0) { - if (empty($this->subscribers)) { - throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); - } - - $signalHandler = new SignalSocketHelper(); - $signalHandler->beforeSocket(); - - try { - while (true) { - $start = microtime(true); - - $this->channel->wait(null, false, $timeout / 1000); - - if ($timeout <= 0) { - continue; - } - - // compute remaining timeout and continue until time is up - $stop = microtime(true); - $timeout -= ($stop - $start) * 1000; - - if ($timeout <= 0) { - break; - } - } - } catch (AMQPTimeoutException $e) { - } catch (StopBasicConsumptionException $e) { - } catch (AMQPIOWaitException $e) { - if ($signalHandler->wasThereSignal()) { - return; - } - - throw $e; - } finally { - $signalHandler->afterSocket(); - } + $this->bcSubscriptionConsumer->consume($timeout); } /** diff --git a/pkg/amqp-lib/AmqpSubscriptionConsumer.php b/pkg/amqp-lib/AmqpSubscriptionConsumer.php new file mode 100644 index 000000000..3f744350b --- /dev/null +++ b/pkg/amqp-lib/AmqpSubscriptionConsumer.php @@ -0,0 +1,157 @@ +context = $context; + } + + /** + * {@inheritdoc} + */ + public function consume($timeout = 0) + { + if (empty($this->subscribers)) { + throw new \LogicException('There is no subscribers. Consider calling basicConsumeSubscribe before consuming'); + } + + $signalHandler = new SignalSocketHelper(); + $signalHandler->beforeSocket(); + + try { + while (true) { + $start = microtime(true); + + $this->context->getLibChannel()->wait(null, false, $timeout / 1000); + + if ($timeout <= 0) { + continue; + } + + // compute remaining timeout and continue until time is up + $stop = microtime(true); + $timeout -= ($stop - $start) * 1000; + + if ($timeout <= 0) { + break; + } + } + } catch (AMQPTimeoutException $e) { + } catch (StopBasicConsumptionException $e) { + } catch (AMQPIOWaitException $e) { + if ($signalHandler->wasThereSignal()) { + return; + } + + throw $e; + } finally { + $signalHandler->afterSocket(); + } + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function subscribe(PsrConsumer $consumer, callable $callback) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if ($consumer->getConsumerTag() && array_key_exists($consumer->getConsumerTag(), $this->subscribers)) { + return; + } + + $libCallback = function (LibAMQPMessage $message) { + $receivedMessage = $this->context->convertMessage($message); + $receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']); + + /** + * @var AmqpConsumer + * @var callable $callback + */ + list($consumer, $callback) = $this->subscribers[$message->delivery_info['consumer_tag']]; + + if (false === call_user_func($callback, $receivedMessage, $consumer)) { + throw new StopBasicConsumptionException(); + } + }; + + $consumerTag = $this->context->getLibChannel()->basic_consume( + $consumer->getQueue()->getQueueName(), + $consumer->getConsumerTag(), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOACK), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE), + (bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT), + $libCallback + ); + + if (empty($consumerTag)) { + throw new Exception('Got empty consumer tag'); + } + + $consumer->setConsumerTag($consumerTag); + + $this->subscribers[$consumerTag] = [$consumer, $callback]; + } + + /** + * @param AmqpConsumer $consumer + * + * {@inheritdoc} + */ + public function unsubscribe(PsrConsumer $consumer) + { + if (false == $consumer instanceof AmqpConsumer) { + throw new \InvalidArgumentException(sprintf('The consumer must be instance of "%s" got "%s"', AmqpConsumer::class, get_class($consumer))); + } + + if (false == $consumer->getConsumerTag()) { + return; + } + + $consumerTag = $consumer->getConsumerTag(); + + $this->context->getLibChannel()->basic_cancel($consumerTag); + + $consumer->setConsumerTag(null); + unset($this->subscribers[$consumerTag], $this->context->getLibChannel()->callbacks[$consumerTag]); + } + + /** + * {@inheritdoc} + */ + public function unsubscribeAll() + { + foreach ($this->subscribers as list($consumer)) { + $this->unsubscribe($consumer); + } + } +} diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php index be52235e7..6923297c8 100644 --- a/pkg/amqp-lib/Tests/AmqpContextTest.php +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -3,9 +3,11 @@ namespace Enqueue\AmqpLib\Tests; use Enqueue\AmqpLib\AmqpContext; +use Enqueue\AmqpLib\AmqpSubscriptionConsumer; use Interop\Amqp\Impl\AmqpBind; use Interop\Amqp\Impl\AmqpQueue; use Interop\Amqp\Impl\AmqpTopic; +use Interop\Queue\PsrSubscriptionConsumerAwareContext; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AbstractConnection; use PhpAmqpLib\Wire\AMQPTable; @@ -314,6 +316,20 @@ public function testShouldSetQos() $context->setQos(123, 456, true); } + public function testShouldImplementPsrSubscriptionConsumerAwareInterface() + { + $rc = new \ReflectionClass(AmqpContext::class); + + $this->assertTrue($rc->implementsInterface(PsrSubscriptionConsumerAwareContext::class)); + } + + public function testShouldReturnExpectedSubscriptionConsumerInstance() + { + $context = new AmqpContext($this->createConnectionMock()); + + $this->assertInstanceOf(AmqpSubscriptionConsumer::class, $context->createSubscriptionConsumer()); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|AbstractConnection */ diff --git a/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php b/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php new file mode 100644 index 000000000..3fd352711 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpSubscriptionConsumerTest.php @@ -0,0 +1,31 @@ +assertTrue($rc->implementsInterface(PsrSubscriptionConsumer::class)); + } + + public function testCouldBeConstructedWithAmqpContextAsFirstArgument() + { + new AmqpSubscriptionConsumer($this->createAmqpContextMock()); + } + + /** + * @return AmqpContext|\PHPUnit_Framework_MockObject_MockObject + */ + private function createAmqpContextMock() + { + return $this->createMock(AmqpContext::class); + } +}