diff --git a/pkg/pheanstalk/PheanstalkProducer.php b/pkg/pheanstalk/PheanstalkProducer.php index 05722dad9..030beedf8 100644 --- a/pkg/pheanstalk/PheanstalkProducer.php +++ b/pkg/pheanstalk/PheanstalkProducer.php @@ -7,7 +7,6 @@ use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\InvalidMessageException; -use Interop\Queue\Exception\PriorityNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; use Pheanstalk\Pheanstalk; @@ -19,6 +18,21 @@ class PheanstalkProducer implements Producer */ private $pheanstalk; + /** + * @var int + */ + private $deliveryDelay; + + /** + * @var int + */ + private $priority; + + /** + * @var int + */ + private $timeToLive; + public function __construct(Pheanstalk $pheanstalk) { $this->pheanstalk = $pheanstalk; @@ -35,11 +49,17 @@ public function send(Destination $destination, Message $message): void $rawMessage = json_encode($message); if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'Could not encode value into json. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg())); + } + + if (null !== $this->priority && null === $message->getHeader('priority')) { + $message->setPriority($this->priority); + } + if (null !== $this->deliveryDelay && null === $message->getHeader('delay')) { + $message->setDelay($this->deliveryDelay / 1000); + } + if (null !== $this->timeToLive && null === $message->getHeader('ttr')) { + $message->setTimeToRun($this->timeToLive / 1000); } $this->pheanstalk->useTube($destination->getName())->put( @@ -55,16 +75,14 @@ public function send(Destination $destination, Message $message): void */ public function setDeliveryDelay(int $deliveryDelay = null): Producer { - if (null === $deliveryDelay) { - return $this; - } + $this->deliveryDelay = $deliveryDelay; - throw new \LogicException('Not implemented'); + return $this; } public function getDeliveryDelay(): ?int { - return null; + return $this->deliveryDelay; } /** @@ -72,16 +90,14 @@ public function getDeliveryDelay(): ?int */ public function setPriority(int $priority = null): Producer { - if (null === $priority) { - return $this; - } + $this->priority = $priority; - throw PriorityNotSupportedException::providerDoestNotSupportIt(); + return $this; } public function getPriority(): ?int { - return null; + return $this->priority; } /** @@ -89,15 +105,13 @@ public function getPriority(): ?int */ public function setTimeToLive(int $timeToLive = null): Producer { - if (null === $timeToLive) { - return $this; - } + $this->timeToLive = $timeToLive; - throw new \LogicException('Not implemented'); + return $this; } public function getTimeToLive(): ?int { - return null; + return $this->timeToLive; } } diff --git a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php index 60a19f627..7a736c09e 100644 --- a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php +++ b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php @@ -66,6 +66,155 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ); } + public function testMessagePriorityPrecedesPriority() + { + $message = new PheanstalkMessage('theBody'); + $message->setPriority(100); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"priority":100}}', 100, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setPriority(50); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testAccessDeliveryDelayAsMilliseconds() + { + $producer = new PheanstalkProducer($this->createPheanstalkMock()); + $producer->setDeliveryDelay(5000); + + $this->assertEquals(5000, $producer->getDeliveryDelay()); + } + + public function testDeliveryDelayResolvesToSeconds() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, 5, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(5000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testMessageDelayPrecedesDeliveryDelay() + { + $message = new PheanstalkMessage('theBody'); + $message->setDelay(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 25, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(1000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testAccessTimeToLiveAsMilliseconds() + { + $producer = new PheanstalkProducer($this->createPheanstalkMock()); + $producer->setTimeToLive(5000); + + $this->assertEquals(5000, $producer->getTimeToLive()); + } + + public function testTimeToLiveResolvesToSeconds() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 5) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(5000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testMessageTimeToRunPrecedesTimeToLive() + { + $message = new PheanstalkMessage('theBody'); + $message->setTimeToRun(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 25) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(1000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + /** * @return MockObject|Pheanstalk */