Skip to content

[amqp] Put in buffer not our message. Continue consumption. #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions pkg/amqp-ext/AmqpConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ class AmqpConsumer implements Consumer
private $queue;

/**
* @var \AMQPQueue
* @var Buffer
*/
private $extQueue;
private $buffer;

/**
* @var string
* @var \AMQPQueue
*/
private $consumerId;
private $extQueue;

/**
* @var bool
Expand All @@ -36,13 +36,14 @@ class AmqpConsumer implements Consumer
/**
* @param AmqpContext $context
* @param AmqpQueue $queue
* @param Buffer $buffer
*/
public function __construct(AmqpContext $context, AmqpQueue $queue)
public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buffer)
{
$this->queue = $queue;
$this->context = $context;
$this->buffer = $buffer;

$this->consumerId = uniqid('', true);
$this->isInit = false;
}

Expand All @@ -63,6 +64,10 @@ public function getQueue()
*/
public function receive($timeout = 0)
{
if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
return $message;
}

/** @var \AMQPQueue $extQueue */
$extConnection = $this->getExtQueue()->getChannel()->getConnection();

Expand All @@ -71,17 +76,28 @@ public function receive($timeout = 0)
$extConnection->setReadTimeout($timeout / 1000);

if (false == $this->isInit) {
$this->getExtQueue()->consume(null, AMQP_NOPARAM, $this->consumerId);
$this->getExtQueue()->consume(null, AMQP_NOPARAM);

$this->isInit = true;
}

/** @var AmqpMessage|null $message */
$message = null;

$this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
$message = $this->convertMessage($extEnvelope);
$message->setConsumerTag($q->getConsumerTag());

if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
return false;
}

// not our message, put it to buffer and continue.
$this->buffer->push($q->getConsumerTag(), $message);

$message = null;

return false;
return true;
}, AMQP_JUST_CONSUME);

return $message;
Expand Down
11 changes: 9 additions & 2 deletions pkg/amqp-ext/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ class AmqpContext implements Context
*/
private $extChannelFactory;

/**
* @var Buffer
*/
private $buffer;

/**
* Callable must return instance of \AMQPChannel once called.
*
Expand All @@ -34,6 +39,8 @@ public function __construct($extChannel)
} else {
throw new \InvalidArgumentException('The extChannel argument must be either AMQPChannel or callable that return AMQPChannel.');
}

$this->buffer = new Buffer();
}

/**
Expand Down Expand Up @@ -170,10 +177,10 @@ public function createConsumer(Destination $destination)
$queue = $this->createTemporaryQueue();
$this->bind($destination, $queue);

return new AmqpConsumer($this, $queue);
return new AmqpConsumer($this, $queue, $this->buffer);
}

return new AmqpConsumer($this, $destination);
return new AmqpConsumer($this, $destination, $this->buffer);
}

public function close()
Expand Down
21 changes: 21 additions & 0 deletions pkg/amqp-ext/AmqpMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class AmqpMessage implements Message
*/
private $deliveryTag;

/**
* @var string|null
*/
private $consumerTag;

/**
* @var bool
*/
Expand Down Expand Up @@ -227,6 +232,22 @@ public function setDeliveryTag($deliveryTag)
$this->deliveryTag = $deliveryTag;
}

/**
* @return string|null
*/
public function getConsumerTag()
{
return $this->consumerTag;
}

/**
* @param string|null $consumerTag
*/
public function setConsumerTag($consumerTag)
{
$this->consumerTag = $consumerTag;
}

public function clearFlags()
{
$this->flags = AMQP_NOPARAM;
Expand Down
41 changes: 41 additions & 0 deletions pkg/amqp-ext/Buffer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Enqueue\AmqpExt;

class Buffer
{
/**
* @var array ['aTag' => [AmqpMessage, AmqpMessage ...]]
*/
private $messages;

public function __construct()
{
$this->messages = [];
}

/**
* @param string $consumerTag
* @param AmqpMessage $message
*/
public function push($consumerTag, AmqpMessage $message)
{
if (false == array_key_exists($consumerTag, $this->messages)) {
$this->messages[$consumerTag] = [];
}

$this->messages[$consumerTag][] = $message;
}

/**
* @param string $consumerTag
*
* @return AmqpMessage|null
*/
public function pop($consumerTag)
{
if (false == empty($this->messages[$consumerTag])) {
return array_shift($this->messages[$consumerTag]);
}
}
}
37 changes: 37 additions & 0 deletions pkg/amqp-ext/Tests/AmqpConsumerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

namespace Enqueue\AmqpExt\Tests;

use Enqueue\AmqpExt\AmqpConsumer;
use Enqueue\AmqpExt\AmqpContext;
use Enqueue\AmqpExt\AmqpQueue;
use Enqueue\AmqpExt\Buffer;
use Enqueue\Psr\Consumer;
use Enqueue\Test\ClassExtensionTrait;

class AmqpConsumerTest extends \PHPUnit_Framework_TestCase
{
use ClassExtensionTrait;

public function testShouldImplementConsumerInterface()
{
$this->assertClassImplements(Consumer::class, AmqpConsumer::class);
}

public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments()
{
new AmqpConsumer(
$this->createContext(),
new AmqpQueue('aName'),
new Buffer()
);
}

/**
* @return \PHPUnit_Framework_MockObject_MockObject|AmqpContext
*/
private function createContext()
{
return $this->createMock(AmqpContext::class);
}
}
14 changes: 14 additions & 0 deletions pkg/amqp-ext/Tests/AmqpContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Enqueue\AmqpExt\AmqpProducer;
use Enqueue\AmqpExt\AmqpQueue;
use Enqueue\AmqpExt\AmqpTopic;
use Enqueue\AmqpExt\Buffer;
use Enqueue\Psr\Context;
use Enqueue\Psr\InvalidDestinationException;
use Enqueue\Test\ClassExtensionTrait;
Expand Down Expand Up @@ -35,6 +36,15 @@ public function testCouldBeConstructedWithExtChannelCallbackFactoryAsFirstArgume
});
}

public function testShouldCreateNewBufferOnConstruct()
{
$context = new AmqpContext(function () {
return $this->createExtChannelMock();
});

$this->assertAttributeInstanceOf(Buffer::class, 'buffer', $context);
}

public function testThrowIfNeitherCallbackNorExtChannelAsFirstArgument()
{
$this->expectException(\InvalidArgumentException::class);
Expand Down Expand Up @@ -143,13 +153,17 @@ public function testShouldReturnAmqpConsumerForGivenQueue()
{
$context = new AmqpContext($this->createExtChannelMock());

$buffer = $this->readAttribute($context, 'buffer');

$queue = new AmqpQueue('aName');

$consumer = $context->createConsumer($queue);

$this->assertInstanceOf(AmqpConsumer::class, $consumer);
$this->assertAttributeSame($context, 'context', $consumer);
$this->assertAttributeSame($queue, 'queue', $consumer);
$this->assertAttributeSame($queue, 'queue', $consumer);
$this->assertAttributeSame($buffer, 'buffer', $consumer);
}

public function testShouldThrowIfNotAmqpQueueGivenOnCreateConsumerCall()
Expand Down
9 changes: 9 additions & 0 deletions pkg/amqp-ext/Tests/AmqpMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,15 @@ public function testShouldReturnPreviouslySetDeliveryTag()
$this->assertSame('theDeliveryTag', $message->getDeliveryTag());
}

public function testShouldReturnPreviouslySetConsumerTag()
{
$message = new AmqpMessage();

$message->setConsumerTag('theConsumerTag');

$this->assertSame('theConsumerTag', $message->getConsumerTag());
}

public function testShouldAllowAddFlags()
{
$message = new AmqpMessage();
Expand Down
17 changes: 17 additions & 0 deletions pkg/amqp-ext/Tests/AmqpProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Enqueue\AmqpExt\Tests;

use Enqueue\AmqpExt\AmqpProducer;
use Enqueue\Psr\Producer;
use Enqueue\Test\ClassExtensionTrait;

class AmqpProducerTest extends \PHPUnit_Framework_TestCase
{
use ClassExtensionTrait;

public function testShouldImplementProducerInterface()
{
$this->assertClassImplements(Producer::class, AmqpProducer::class);
}
}
63 changes: 63 additions & 0 deletions pkg/amqp-ext/Tests/BufferTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace Enqueue\AmqpExt\Tests;

use Enqueue\AmqpExt\AmqpMessage;
use Enqueue\AmqpExt\Buffer;

class BufferTest extends \PHPUnit_Framework_TestCase
{
public function testCouldBeConstructedWithoutAnyArguments()
{
new Buffer();
}

public function testShouldSetEmptyArrayToMessagesPropertyOnConstruct()
{
$buffer = new Buffer();

$this->assertAttributeSame([], 'messages', $buffer);
}

public function testShouldReturnNullIfNoMessagesInBuffer()
{
$buffer = new Buffer();

$this->assertNull($buffer->pop('aConsumerTag'));
$this->assertNull($buffer->pop('anotherConsumerTag'));
}

public function testShouldPushMessageToBuffer()
{
$fooMessage = new AmqpMessage();
$barMessage = new AmqpMessage();
$bazMessage = new AmqpMessage();

$buffer = new Buffer();

$buffer->push('aConsumerTag', $fooMessage);
$buffer->push('aConsumerTag', $barMessage);

$buffer->push('anotherConsumerTag', $bazMessage);

$this->assertAttributeSame([
'aConsumerTag' => [$fooMessage, $barMessage],
'anotherConsumerTag' => [$bazMessage],
], 'messages', $buffer);
}

public function testShouldPopMessageFromBuffer()
{
$fooMessage = new AmqpMessage();
$barMessage = new AmqpMessage();

$buffer = new Buffer();

$buffer->push('aConsumerTag', $fooMessage);
$buffer->push('aConsumerTag', $barMessage);

$this->assertSame($fooMessage, $buffer->pop('aConsumerTag'));
$this->assertSame($barMessage, $buffer->pop('aConsumerTag'));
$this->assertNull($buffer->pop('aConsumerTag'));
}
}