diff --git a/pkg/amqp-ext/AmqpConsumer.php b/pkg/amqp-ext/AmqpConsumer.php index 83250173e..afc45eaae 100644 --- a/pkg/amqp-ext/AmqpConsumer.php +++ b/pkg/amqp-ext/AmqpConsumer.php @@ -19,14 +19,14 @@ class AmqpConsumer implements Consumer private $queue; /** - * @var \AMQPQueue + * @var Buffer */ - private $extQueue; + private $buffer; /** - * @var string + * @var \AMQPQueue */ - private $consumerId; + private $extQueue; /** * @var bool @@ -36,13 +36,14 @@ class AmqpConsumer implements Consumer /** * @param AmqpContext $context * @param AmqpQueue $queue + * @param Buffer $buffer */ - public function __construct(AmqpContext $context, AmqpQueue $queue) + public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buffer) { $this->queue = $queue; $this->context = $context; + $this->buffer = $buffer; - $this->consumerId = uniqid('', true); $this->isInit = false; } @@ -63,6 +64,10 @@ public function getQueue() */ public function receive($timeout = 0) { + if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) { + return $message; + } + /** @var \AMQPQueue $extQueue */ $extConnection = $this->getExtQueue()->getChannel()->getConnection(); @@ -71,17 +76,28 @@ public function receive($timeout = 0) $extConnection->setReadTimeout($timeout / 1000); if (false == $this->isInit) { - $this->getExtQueue()->consume(null, AMQP_NOPARAM, $this->consumerId); + $this->getExtQueue()->consume(null, AMQP_NOPARAM); $this->isInit = true; } + /** @var AmqpMessage|null $message */ $message = null; $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) { $message = $this->convertMessage($extEnvelope); + $message->setConsumerTag($q->getConsumerTag()); + + if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) { + return false; + } + + // not our message, put it to buffer and continue. + $this->buffer->push($q->getConsumerTag(), $message); + + $message = null; - return false; + return true; }, AMQP_JUST_CONSUME); return $message; diff --git a/pkg/amqp-ext/AmqpContext.php b/pkg/amqp-ext/AmqpContext.php index f7bd5acf8..68c8b2ca1 100644 --- a/pkg/amqp-ext/AmqpContext.php +++ b/pkg/amqp-ext/AmqpContext.php @@ -20,6 +20,11 @@ class AmqpContext implements Context */ private $extChannelFactory; + /** + * @var Buffer + */ + private $buffer; + /** * Callable must return instance of \AMQPChannel once called. * @@ -34,6 +39,8 @@ public function __construct($extChannel) } else { throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.'); } + + $this->buffer = new Buffer(); } /** @@ -170,10 +177,10 @@ public function createConsumer(Destination $destination) $queue = $this->createTemporaryQueue(); $this->bind($destination, $queue); - return new AmqpConsumer($this, $queue); + return new AmqpConsumer($this, $queue, $this->buffer); } - return new AmqpConsumer($this, $destination); + return new AmqpConsumer($this, $destination, $this->buffer); } public function close() diff --git a/pkg/amqp-ext/AmqpMessage.php b/pkg/amqp-ext/AmqpMessage.php index d5992237b..c92bd2dac 100644 --- a/pkg/amqp-ext/AmqpMessage.php +++ b/pkg/amqp-ext/AmqpMessage.php @@ -26,6 +26,11 @@ class AmqpMessage implements Message */ private $deliveryTag; + /** + * @var string|null + */ + private $consumerTag; + /** * @var bool */ @@ -227,6 +232,22 @@ public function setDeliveryTag($deliveryTag) $this->deliveryTag = $deliveryTag; } + /** + * @return string|null + */ + public function getConsumerTag() + { + return $this->consumerTag; + } + + /** + * @param string|null $consumerTag + */ + public function setConsumerTag($consumerTag) + { + $this->consumerTag = $consumerTag; + } + public function clearFlags() { $this->flags = AMQP_NOPARAM; diff --git a/pkg/amqp-ext/Buffer.php b/pkg/amqp-ext/Buffer.php new file mode 100644 index 000000000..e3f500e29 --- /dev/null +++ b/pkg/amqp-ext/Buffer.php @@ -0,0 +1,41 @@ + [AmqpMessage, AmqpMessage ...]] + */ + private $messages; + + public function __construct() + { + $this->messages = []; + } + + /** + * @param string $consumerTag + * @param AmqpMessage $message + */ + public function push($consumerTag, AmqpMessage $message) + { + if (false == array_key_exists($consumerTag, $this->messages)) { + $this->messages[$consumerTag] = []; + } + + $this->messages[$consumerTag][] = $message; + } + + /** + * @param string $consumerTag + * + * @return AmqpMessage|null + */ + public function pop($consumerTag) + { + if (false == empty($this->messages[$consumerTag])) { + return array_shift($this->messages[$consumerTag]); + } + } +} diff --git a/pkg/amqp-ext/Tests/AmqpConsumerTest.php b/pkg/amqp-ext/Tests/AmqpConsumerTest.php new file mode 100644 index 000000000..c5cad4f30 --- /dev/null +++ b/pkg/amqp-ext/Tests/AmqpConsumerTest.php @@ -0,0 +1,37 @@ +assertClassImplements(Consumer::class, AmqpConsumer::class); + } + + public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments() + { + new AmqpConsumer( + $this->createContext(), + new AmqpQueue('aName'), + new Buffer() + ); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext + */ + private function createContext() + { + return $this->createMock(AmqpContext::class); + } +} diff --git a/pkg/amqp-ext/Tests/AmqpContextTest.php b/pkg/amqp-ext/Tests/AmqpContextTest.php index a0e0bd32a..1b94405f3 100644 --- a/pkg/amqp-ext/Tests/AmqpContextTest.php +++ b/pkg/amqp-ext/Tests/AmqpContextTest.php @@ -8,6 +8,7 @@ use Enqueue\AmqpExt\AmqpProducer; use Enqueue\AmqpExt\AmqpQueue; use Enqueue\AmqpExt\AmqpTopic; +use Enqueue\AmqpExt\Buffer; use Enqueue\Psr\Context; use Enqueue\Psr\InvalidDestinationException; use Enqueue\Test\ClassExtensionTrait; @@ -35,6 +36,15 @@ public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgume }); } + public function testShouldCreateNewBufferOnConstruct() + { + $context = new AmqpContext(function () { + return $this->createExtChannelMock(); + }); + + $this->assertAttributeInstanceOf(Buffer::class, 'buffer', $context); + } + public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument() { $this->expectException(\InvalidArgumentException::class); @@ -143,6 +153,8 @@ public function testShouldReturnAmqpConsumerForGivenQueue() { $context = new AmqpContext($this->createExtChannelMock()); + $buffer = $this->readAttribute($context, 'buffer'); + $queue = new AmqpQueue('aName'); $consumer = $context->createConsumer($queue); @@ -150,6 +162,8 @@ public function testShouldReturnAmqpConsumerForGivenQueue() $this->assertInstanceOf(AmqpConsumer::class, $consumer); $this->assertAttributeSame($context, 'context', $consumer); $this->assertAttributeSame($queue, 'queue', $consumer); + $this->assertAttributeSame($queue, 'queue', $consumer); + $this->assertAttributeSame($buffer, 'buffer', $consumer); } public function testShouldThrowIfNotAmqpQueueGivenOnCreateConsumerCall() diff --git a/pkg/amqp-ext/Tests/AmqpMessageTest.php b/pkg/amqp-ext/Tests/AmqpMessageTest.php index 30fed0a4f..11d77f64f 100644 --- a/pkg/amqp-ext/Tests/AmqpMessageTest.php +++ b/pkg/amqp-ext/Tests/AmqpMessageTest.php @@ -170,6 +170,15 @@ public function testShouldReturnPreviouslySetDeliveryTag() $this->assertSame('theDeliveryTag', $message->getDeliveryTag()); } + public function testShouldReturnPreviouslySetConsumerTag() + { + $message = new AmqpMessage(); + + $message->setConsumerTag('theConsumerTag'); + + $this->assertSame('theConsumerTag', $message->getConsumerTag()); + } + public function testShouldAllowAddFlags() { $message = new AmqpMessage(); diff --git a/pkg/amqp-ext/Tests/AmqpProducerTest.php b/pkg/amqp-ext/Tests/AmqpProducerTest.php new file mode 100644 index 000000000..b467441c8 --- /dev/null +++ b/pkg/amqp-ext/Tests/AmqpProducerTest.php @@ -0,0 +1,17 @@ +assertClassImplements(Producer::class, AmqpProducer::class); + } +} diff --git a/pkg/amqp-ext/Tests/BufferTest.php b/pkg/amqp-ext/Tests/BufferTest.php new file mode 100644 index 000000000..a5be47829 --- /dev/null +++ b/pkg/amqp-ext/Tests/BufferTest.php @@ -0,0 +1,63 @@ +assertAttributeSame([], 'messages', $buffer); + } + + public function testShouldReturnNullIfNoMessagesInBuffer() + { + $buffer = new Buffer(); + + $this->assertNull($buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('anotherConsumerTag')); + } + + public function testShouldPushMessageToBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + $bazMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $buffer->push('anotherConsumerTag', $bazMessage); + + $this->assertAttributeSame([ + 'aConsumerTag' => [$fooMessage, $barMessage], + 'anotherConsumerTag' => [$bazMessage], + ], 'messages', $buffer); + } + + public function testShouldPopMessageFromBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $this->assertSame($fooMessage, $buffer->pop('aConsumerTag')); + $this->assertSame($barMessage, $buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('aConsumerTag')); + } +}