From 16797d6b6edb5c510d67d59631a067bc5c464884 Mon Sep 17 00:00:00 2001 From: Jonathan Cox Date: Thu, 19 Mar 2020 17:06:38 -0400 Subject: [PATCH 1/2] Implement DeliveryDelay, Priority and TimeToLive in PheanstalkProducer --- pkg/pheanstalk/PheanstalkProducer.php | 88 +++-- .../Tests/PheanstalkProducerTest.php | 320 ++++++++++++++++++ 2 files changed, 384 insertions(+), 24 deletions(-) diff --git a/pkg/pheanstalk/PheanstalkProducer.php b/pkg/pheanstalk/PheanstalkProducer.php index 05722dad9..28120f0fb 100644 --- a/pkg/pheanstalk/PheanstalkProducer.php +++ b/pkg/pheanstalk/PheanstalkProducer.php @@ -7,7 +7,6 @@ use Interop\Queue\Destination; use Interop\Queue\Exception\InvalidDestinationException; use Interop\Queue\Exception\InvalidMessageException; -use Interop\Queue\Exception\PriorityNotSupportedException; use Interop\Queue\Message; use Interop\Queue\Producer; use Pheanstalk\Pheanstalk; @@ -19,6 +18,21 @@ class PheanstalkProducer implements Producer */ private $pheanstalk; + /** + * @var int + */ + private $deliveryDelay; + + /** + * @var int + */ + private $priority; + + /** + * @var int + */ + private $timeToLive; + public function __construct(Pheanstalk $pheanstalk) { $this->pheanstalk = $pheanstalk; @@ -35,18 +49,14 @@ public function send(Destination $destination, Message $message): void $rawMessage = json_encode($message); if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'Could not encode value into json. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg())); } $this->pheanstalk->useTube($destination->getName())->put( $rawMessage, - $message->getPriority(), - $message->getDelay(), - $message->getTimeToRun() + $this->resolvePriority($message), + $this->resolveDelay($message), + $this->resolveTimeToLive($message) ); } @@ -55,16 +65,14 @@ public function send(Destination $destination, Message $message): void */ public function setDeliveryDelay(int $deliveryDelay = null): Producer { - if (null === $deliveryDelay) { - return $this; - } + $this->deliveryDelay = $deliveryDelay; - throw new \LogicException('Not implemented'); + return $this; } public function getDeliveryDelay(): ?int { - return null; + return $this->deliveryDelay; } /** @@ -72,16 +80,14 @@ public function getDeliveryDelay(): ?int */ public function setPriority(int $priority = null): Producer { - if (null === $priority) { - return $this; - } + $this->priority = $priority; - throw PriorityNotSupportedException::providerDoestNotSupportIt(); + return $this; } public function getPriority(): ?int { - return null; + return $this->priority; } /** @@ -89,15 +95,49 @@ public function getPriority(): ?int */ public function setTimeToLive(int $timeToLive = null): Producer { - if (null === $timeToLive) { - return $this; - } + $this->timeToLive = $timeToLive; - throw new \LogicException('Not implemented'); + return $this; } public function getTimeToLive(): ?int { - return null; + return $this->timeToLive; + } + + private function resolvePriority(PheanstalkMessage $message): ?int + { + if (null === $this->priority) { + return $message->getPriority(); + } + + $priority = $this->priority; + $this->priority = null; + + return $priority; + } + + private function resolveDelay(PheanstalkMessage $message): ?int + { + if (null === $this->deliveryDelay) { + return $message->getDelay(); + } + + $delay = $this->deliveryDelay; + $this->deliveryDelay = null; + + return $delay / 1000; + } + + private function resolveTimeToLive(PheanstalkMessage $message): ?int + { + if (null === $this->timeToLive) { + return $message->getTimeToRun(); + } + + $ttl = $this->timeToLive; + $this->timeToLive = null; + + return $ttl / 1000; } } diff --git a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php index 60a19f627..1b9522ac4 100644 --- a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php +++ b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php @@ -66,6 +66,326 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ); } + public function testPriorityPrecedesMessagePriority() + { + $message = new PheanstalkMessage('theBody'); + $message->setPriority(100); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"priority":100}}', 50, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setPriority(50); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testNullPriorityFallsBackToMessagePriority() + { + $message = new PheanstalkMessage('theBody'); + $message->setPriority(100); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"priority":100}}', 100, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setPriority(null); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testPriorityDoesNotPersist() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', 100, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setPriority(100); + + $this->assertEquals(100, $producer->getPriority()); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + + $this->assertNull($producer->getPriority()); + } + + public function testAccessDeliveryDelayAsMilliseconds() + { + $producer = new PheanstalkProducer($this->createPheanstalkMock()); + $producer->setDeliveryDelay(5000); + + $this->assertEquals(5000, $producer->getDeliveryDelay()); + } + + public function testDeliveryDelayResolvesToSeconds() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, 5, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(5000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testDeliveryDelayPrecedesMessageDelay() + { + $message = new PheanstalkMessage('theBody'); + $message->setDelay(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 1, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(1000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testNullDeliveryDelayFallsBackToMessageDelay() + { + $message = new PheanstalkMessage('theBody'); + $message->setDelay(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 25, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(null); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testDeliveryDelayDoesNotPersist() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, 1, Pheanstalk::DEFAULT_TTR) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setDeliveryDelay(1000); + + $this->assertEquals(1000, $producer->getDeliveryDelay()); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + + $this->assertNull($producer->getDeliveryDelay()); + } + + public function testAccessTimeToLiveAsMilliseconds() + { + $producer = new PheanstalkProducer($this->createPheanstalkMock()); + $producer->setTimeToLive(5000); + + $this->assertEquals(5000, $producer->getTimeToLive()); + } + + public function testTimeToLiveResolvesToSeconds() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 5) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(5000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testTimeToLivePrecedesMessageTimeToRun() + { + $message = new PheanstalkMessage('theBody'); + $message->setTimeToRun(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 1) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(1000); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testNullTimeToLiveFallsBackToMessageTimeToRun() + { + $message = new PheanstalkMessage('theBody'); + $message->setTimeToRun(25); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 25) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(null); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + } + + public function testTimeToLiveDoesNotPersist() + { + $message = new PheanstalkMessage('theBody'); + + $pheanstalk = $this->createPheanstalkMock(); + $pheanstalk + ->expects($this->once()) + ->method('useTube') + ->with('theQueueName') + ->willReturnSelf() + ; + $pheanstalk + ->expects($this->once()) + ->method('put') + ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 1) + ; + + $producer = new PheanstalkProducer($pheanstalk); + $producer->setTimeToLive(1000); + + $this->assertEquals(1000, $producer->getTimeToLive()); + + $producer->send( + new PheanstalkDestination('theQueueName'), + $message + ); + + $this->assertNull($producer->getTimeToLive()); + } + /** * @return MockObject|Pheanstalk */ From 3b205e0667ce675cc2c791616fae2075678069f9 Mon Sep 17 00:00:00 2001 From: Jonathan Cox Date: Fri, 20 Mar 2020 05:11:45 -0400 Subject: [PATCH 2/2] =?UTF-8?q?Swap=20priority=20of=20producer=20and=20mes?= =?UTF-8?q?sage=20values=20Don=E2=80=99t=20reset=20producer=20values=20aft?= =?UTF-8?q?er=20send?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/pheanstalk/PheanstalkProducer.php | 52 ++--- .../Tests/PheanstalkProducerTest.php | 179 +----------------- 2 files changed, 17 insertions(+), 214 deletions(-) diff --git a/pkg/pheanstalk/PheanstalkProducer.php b/pkg/pheanstalk/PheanstalkProducer.php index 28120f0fb..030beedf8 100644 --- a/pkg/pheanstalk/PheanstalkProducer.php +++ b/pkg/pheanstalk/PheanstalkProducer.php @@ -52,11 +52,21 @@ public function send(Destination $destination, Message $message): void throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg())); } + if (null !== $this->priority && null === $message->getHeader('priority')) { + $message->setPriority($this->priority); + } + if (null !== $this->deliveryDelay && null === $message->getHeader('delay')) { + $message->setDelay($this->deliveryDelay / 1000); + } + if (null !== $this->timeToLive && null === $message->getHeader('ttr')) { + $message->setTimeToRun($this->timeToLive / 1000); + } + $this->pheanstalk->useTube($destination->getName())->put( $rawMessage, - $this->resolvePriority($message), - $this->resolveDelay($message), - $this->resolveTimeToLive($message) + $message->getPriority(), + $message->getDelay(), + $message->getTimeToRun() ); } @@ -104,40 +114,4 @@ public function getTimeToLive(): ?int { return $this->timeToLive; } - - private function resolvePriority(PheanstalkMessage $message): ?int - { - if (null === $this->priority) { - return $message->getPriority(); - } - - $priority = $this->priority; - $this->priority = null; - - return $priority; - } - - private function resolveDelay(PheanstalkMessage $message): ?int - { - if (null === $this->deliveryDelay) { - return $message->getDelay(); - } - - $delay = $this->deliveryDelay; - $this->deliveryDelay = null; - - return $delay / 1000; - } - - private function resolveTimeToLive(PheanstalkMessage $message): ?int - { - if (null === $this->timeToLive) { - return $message->getTimeToRun(); - } - - $ttl = $this->timeToLive; - $this->timeToLive = null; - - return $ttl / 1000; - } } diff --git a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php index 1b9522ac4..7a736c09e 100644 --- a/pkg/pheanstalk/Tests/PheanstalkProducerTest.php +++ b/pkg/pheanstalk/Tests/PheanstalkProducerTest.php @@ -66,34 +66,7 @@ public function testShouldJsonEncodeMessageAndPutToExpectedTube() ); } - public function testPriorityPrecedesMessagePriority() - { - $message = new PheanstalkMessage('theBody'); - $message->setPriority(100); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":{"priority":100}}', 50, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) - ; - - $producer = new PheanstalkProducer($pheanstalk); - $producer->setPriority(50); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testNullPriorityFallsBackToMessagePriority() + public function testMessagePriorityPrecedesPriority() { $message = new PheanstalkMessage('theBody'); $message->setPriority(100); @@ -112,42 +85,12 @@ public function testNullPriorityFallsBackToMessagePriority() ; $producer = new PheanstalkProducer($pheanstalk); - $producer->setPriority(null); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testPriorityDoesNotPersist() - { - $message = new PheanstalkMessage('theBody'); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":[]}', 100, Pheanstalk::DEFAULT_DELAY, Pheanstalk::DEFAULT_TTR) - ; - - $producer = new PheanstalkProducer($pheanstalk); - $producer->setPriority(100); - - $this->assertEquals(100, $producer->getPriority()); + $producer->setPriority(50); $producer->send( new PheanstalkDestination('theQueueName'), $message ); - - $this->assertNull($producer->getPriority()); } public function testAccessDeliveryDelayAsMilliseconds() @@ -184,34 +127,7 @@ public function testDeliveryDelayResolvesToSeconds() ); } - public function testDeliveryDelayPrecedesMessageDelay() - { - $message = new PheanstalkMessage('theBody'); - $message->setDelay(25); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 1, Pheanstalk::DEFAULT_TTR) - ; - - $producer = new PheanstalkProducer($pheanstalk); - $producer->setDeliveryDelay(1000); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testNullDeliveryDelayFallsBackToMessageDelay() + public function testMessageDelayPrecedesDeliveryDelay() { $message = new PheanstalkMessage('theBody'); $message->setDelay(25); @@ -229,43 +145,13 @@ public function testNullDeliveryDelayFallsBackToMessageDelay() ->with('{"body":"theBody","properties":[],"headers":{"delay":25}}', Pheanstalk::DEFAULT_PRIORITY, 25, Pheanstalk::DEFAULT_TTR) ; - $producer = new PheanstalkProducer($pheanstalk); - $producer->setDeliveryDelay(null); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testDeliveryDelayDoesNotPersist() - { - $message = new PheanstalkMessage('theBody'); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, 1, Pheanstalk::DEFAULT_TTR) - ; - $producer = new PheanstalkProducer($pheanstalk); $producer->setDeliveryDelay(1000); - $this->assertEquals(1000, $producer->getDeliveryDelay()); - $producer->send( new PheanstalkDestination('theQueueName'), $message ); - - $this->assertNull($producer->getDeliveryDelay()); } public function testAccessTimeToLiveAsMilliseconds() @@ -302,34 +188,7 @@ public function testTimeToLiveResolvesToSeconds() ); } - public function testTimeToLivePrecedesMessageTimeToRun() - { - $message = new PheanstalkMessage('theBody'); - $message->setTimeToRun(25); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 1) - ; - - $producer = new PheanstalkProducer($pheanstalk); - $producer->setTimeToLive(1000); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testNullTimeToLiveFallsBackToMessageTimeToRun() + public function testMessageTimeToRunPrecedesTimeToLive() { $message = new PheanstalkMessage('theBody'); $message->setTimeToRun(25); @@ -347,43 +206,13 @@ public function testNullTimeToLiveFallsBackToMessageTimeToRun() ->with('{"body":"theBody","properties":[],"headers":{"ttr":25}}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 25) ; - $producer = new PheanstalkProducer($pheanstalk); - $producer->setTimeToLive(null); - - $producer->send( - new PheanstalkDestination('theQueueName'), - $message - ); - } - - public function testTimeToLiveDoesNotPersist() - { - $message = new PheanstalkMessage('theBody'); - - $pheanstalk = $this->createPheanstalkMock(); - $pheanstalk - ->expects($this->once()) - ->method('useTube') - ->with('theQueueName') - ->willReturnSelf() - ; - $pheanstalk - ->expects($this->once()) - ->method('put') - ->with('{"body":"theBody","properties":[],"headers":[]}', Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, 1) - ; - $producer = new PheanstalkProducer($pheanstalk); $producer->setTimeToLive(1000); - $this->assertEquals(1000, $producer->getTimeToLive()); - $producer->send( new PheanstalkDestination('theQueueName'), $message ); - - $this->assertNull($producer->getTimeToLive()); } /**