Skip to content

Delay, ttl, priority, in producer #149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 3, 2017
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
"enqueue/simple-client": "*@dev",
"enqueue/test": "*@dev",
"enqueue/async-event-dispatcher": "*@dev",
"queue-interop/queue-interop": "^0.5@dev",
"queue-interop/queue-interop": "^0.6@dev",
"queue-interop/amqp-interop": "^0.6@dev",
"queue-interop/queue-spec": "^0.5@dev",
"queue-interop/amqp-interop": "^0.5@dev",

"phpunit/phpunit": "^5",
"doctrine/doctrine-bundle": "~1.2",
Expand Down
67 changes: 67 additions & 0 deletions pkg/amqp-ext/AmqpProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
use Interop\Amqp\AmqpQueue;
use Interop\Amqp\AmqpTopic;
use Interop\Queue\DeliveryDelayNotSupportedException;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrDestination;
Expand All @@ -14,6 +15,16 @@

class AmqpProducer implements InteropAmqpProducer
{
/**
* @var int|null
*/
private $priority;

/**
* @var int|float|null
*/
private $timeToLive;

/**
* @var \AMQPChannel
*/
Expand Down Expand Up @@ -42,6 +53,14 @@ public function send(PsrDestination $destination, PsrMessage $message)

InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class);

if (null !== $this->priority && null === $message->getPriority()) {
$message->setPriority($this->priority);
}

if (null !== $this->timeToLive && null === $message->getExpiration()) {
$message->setExpiration($this->timeToLive);
}

$amqpAttributes = $message->getHeaders();

if ($message->getProperties()) {
Expand Down Expand Up @@ -74,4 +93,52 @@ public function send(PsrDestination $destination, PsrMessage $message)
);
}
}

/**
* {@inheritdoc}
*/
public function setDeliveryDelay($deliveryDelay)
{
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
}

/**
* {@inheritdoc}
*/
public function getDeliveryDelay()
{
return null;
}

/**
* {@inheritdoc}
*/
public function setPriority($priority)
{
$this->priority = $priority;
}

/**
* {@inheritdoc}
*/
public function getPriority()
{
return $this->priority;
}

/**
* {@inheritdoc}
*/
public function setTimeToLive($timeToLive)
{
$this->timeToLive = $timeToLive;
}

/**
* {@inheritdoc}
*/
public function getTimeToLive()
{
return $this->timeToLive;
}
}
22 changes: 22 additions & 0 deletions pkg/amqp-ext/Tests/Spec/AmqpProducerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Interop\Queue\Spec\PsrProducerSpec;

/**
* @group functional
*/
class AmqpProducerTest extends PsrProducerSpec
{
/**
* {@inheritdoc}
*/
protected function createProducer()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext()->createProducer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;

/**
* @group functional
*/
class AmqpSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$queue->setArguments(['x-max-priority' => 10]);

$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

namespace Enqueue\AmqpExt\Tests\Spec;

use Enqueue\AmqpExt\AmqpConnectionFactory;
use Enqueue\AmqpExt\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceiveTimeToLiveMessagesFromQueueSpec;

/**
* @group functional
*/
class AmqpSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiveTimeToLiveMessagesFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
}
4 changes: 2 additions & 2 deletions pkg/amqp-ext/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
"require": {
"php": ">=5.6",
"ext-amqp": "^1.6",
"queue-interop/queue-interop": "^0.5@dev",
"queue-interop/amqp-interop": "^0.5@dev",

"queue-interop/amqp-interop": "^0.6@dev",
"psr/log": "^1"
},
"require-dev": {
Expand Down
3 changes: 2 additions & 1 deletion pkg/amqp-lib/AmqpContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Interop\Queue\PsrTopic;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Wire\AMQPTable;

class AmqpContext implements InteropAmqpContext
{
Expand Down Expand Up @@ -173,7 +174,7 @@ public function declareQueue(InteropAmqpQueue $queue)
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_EXCLUSIVE),
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_AUTODELETE),
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT),
$queue->getArguments()
$queue->getArguments() ? new AMQPTable($queue->getArguments()) : null
);
}

Expand Down
67 changes: 67 additions & 0 deletions pkg/amqp-lib/AmqpProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
use Interop\Amqp\AmqpTopic as InteropAmqpTopic;
use Interop\Queue\DeliveryDelayNotSupportedException;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrDestination;
Expand All @@ -17,6 +18,16 @@

class AmqpProducer implements InteropAmqpProducer
{
/**
* @var int|null
*/
private $priority;

/**
* @var int|float|null
*/
private $timeToLive;

/**
* @var AMQPChannel
*/
Expand All @@ -43,6 +54,14 @@ public function send(PsrDestination $destination, PsrMessage $message)

InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);

if (null !== $this->priority && null === $message->getPriority()) {
$message->setPriority($this->priority);
}

if (null !== $this->timeToLive && null === $message->getExpiration()) {
$message->setExpiration($this->timeToLive);
}

$amqpProperties = $message->getHeaders();

if ($appProperties = $message->getProperties()) {
Expand All @@ -69,4 +88,52 @@ public function send(PsrDestination $destination, PsrMessage $message)
);
}
}

/**
* {@inheritdoc}
*/
public function setDeliveryDelay($deliveryDelay)
{
throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt();
}

/**
* {@inheritdoc}
*/
public function getDeliveryDelay()
{
return null;
}

/**
* {@inheritdoc}
*/
public function setPriority($priority)
{
$this->priority = $priority;
}

/**
* {@inheritdoc}
*/
public function getPriority()
{
return $this->priority;
}

/**
* {@inheritdoc}
*/
public function setTimeToLive($timeToLive)
{
$this->timeToLive = $timeToLive;
}

/**
* {@inheritdoc}
*/
public function getTimeToLive()
{
return $this->timeToLive;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace Enqueue\AmqpLib\Tests\Spec;

use Enqueue\AmqpLib\AmqpConnectionFactory;
use Enqueue\AmqpLib\AmqpContext;
use Interop\Queue\PsrContext;
use Interop\Queue\Spec\SendAndReceivePriorityMessagesFromQueueSpec;

/**
* @group functional
*/
class AmqpSendAndReceivePriorityMessagesFromQueueTest extends SendAndReceivePriorityMessagesFromQueueSpec
{
/**
* {@inheritdoc}
*/
protected function createContext()
{
$factory = new AmqpConnectionFactory(getenv('AMQP_DSN'));

return $factory->createContext();
}

/**
* {@inheritdoc}
*
* @param AmqpContext $context
*/
protected function createQueue(PsrContext $context, $queueName)
{
$queue = $context->createQueue($queueName);
$queue->setArguments(['x-max-priority' => 10]);

$context->declareQueue($queue);
$context->purgeQueue($queue);

return $queue;
}
}
Loading