diff --git a/docs/transport/redis.md b/docs/transport/redis.md index 8a51c61cc..ff5a45990 100644 --- a/docs/transport/redis.md +++ b/docs/transport/redis.md @@ -5,12 +5,12 @@ It creates a collection (a queue or topic) there. Pushes messages to the tail of The transport works with [phpredis](https://github.com/phpredis/phpredis) php extension or [predis](https://github.com/nrk/predis) library. Make sure you installed either of them -**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost. - * [Installation](#installation) * [Create context](#create-context) * [Send message to topic](#send-message-to-topic) * [Send message to queue](#send-message-to-queue) +* [Send expiration message](#send-expiration-message) +* [Send delayed message](#send-delayed-message) * [Consume message](#consume-message) * [Delete queue (purge messages)](#delete-queue-purge-messages) * [Delete topic (purge messages)](#delete-topic-purge-messages) @@ -56,7 +56,7 @@ $factory = new RedisConnectionFactory([ ]); // same as above but given as DSN string -$factory = new RedisConnectionFactory('redis://example.com:1000?vendor=phpredis'); +$factory = new RedisConnectionFactory('redis+phpredis://example.com:1000'); $psrContext = $factory->createContext(); @@ -68,7 +68,7 @@ $redis = new \Enqueue\Redis\PhpRedis([ /** redis connection options */ ]); $redis->connect(); // Secure\TLS connection. Works only with predis library. Note second "S" in scheme. -$factory = new RedisConnectionFactory('rediss://user:pass@host/0?vendor=predis'); +$factory = new RedisConnectionFactory('rediss+predis://user:pass@host/0'); $factory = new RedisConnectionFactory($redis); ``` @@ -82,7 +82,7 @@ use Enqueue\Redis\RedisConnectionFactory; $connectionFactory = new RedisConnectionFactory([ 'host' => 'localhost', 'port' => 6379, - 'vendor' => 'predis', + 'scheme_extensions' => 'predis', ]); $psrContext = $connectionFactory->createContext(); @@ -102,7 +102,7 @@ $options = []; $redis = new PRedis(new \PRedis\Client($config, $options)); -$factory = new RedisConnectionFactory(['vendor' => 'custom', 'redis' => $redis]); +$factory = new RedisConnectionFactory($redis); ``` ## Send message to topic @@ -129,6 +129,38 @@ $message = $psrContext->createMessage('Hello world!'); $psrContext->createProducer()->send($fooQueue, $message); ``` +## Send expiration message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setTimeToLive(60000) // 60 sec + // + ->send($fooQueue, $message) +; +``` + +## Send delayed message + +```php +createMessage('Hello world!'); + +$psrContext->createProducer() + ->setDeliveryDelay(5000) // 5 sec + + ->send($fooQueue, $message) +; +```` + ## Consume message: ```php @@ -141,6 +173,9 @@ $consumer = $psrContext->createConsumer($fooQueue); $message = $consumer->receive(); // process a message + +$consumer->acknowledge($message); +//$consumer->reject($message); ``` ## Delete queue (purge messages): diff --git a/pkg/redis/JsonSerializer.php b/pkg/redis/JsonSerializer.php new file mode 100644 index 000000000..7e064a221 --- /dev/null +++ b/pkg/redis/JsonSerializer.php @@ -0,0 +1,41 @@ + $message->getBody(), + 'properties' => $message->getProperties(), + 'headers' => $message->getHeaders(), + ]); + + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return $json; + } + + public function toMessage(string $string): RedisMessage + { + $data = json_decode($string, true); + if (JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf( + 'The malformed json given. Error %s and message %s', + json_last_error(), + json_last_error_msg() + )); + } + + return new RedisMessage($data['body'], $data['properties'], $data['headers']); + } +} diff --git a/pkg/redis/LuaScripts.php b/pkg/redis/LuaScripts.php new file mode 100644 index 000000000..4a40d2447 --- /dev/null +++ b/pkg/redis/LuaScripts.php @@ -0,0 +1,38 @@ +redis, 'eval'], array_merge([$script, count($keys)], $keys, $args)); + } catch (PRedisServerException $e) { + throw new ServerException('eval command has failed', null, $e); + } + } + + public function zadd(string $key, string $value, float $score): int + { + try { + return $this->redis->zadd($key, [$value => $score]); + } catch (PRedisServerException $e) { + throw new ServerException('zadd command has failed', null, $e); + } + } + + public function zrem(string $key, string $value): int + { + try { + return $this->redis->zrem($key, [$value]); + } catch (PRedisServerException $e) { + throw new ServerException('zrem command has failed', null, $e); + } + } + public function lpush(string $key, string $value): int { try { diff --git a/pkg/redis/PhpRedis.php b/pkg/redis/PhpRedis.php index bf6da281a..bd8dd9498 100644 --- a/pkg/redis/PhpRedis.php +++ b/pkg/redis/PhpRedis.php @@ -26,6 +26,33 @@ public function __construct(array $config) $this->config = $config; } + public function eval(string $script, array $keys = [], array $args = []) + { + try { + return $this->redis->eval($script, array_merge($keys, $args), count($keys)); + } catch (\RedisException $e) { + throw new ServerException('eval command has failed', null, $e); + } + } + + public function zadd(string $key, string $value, float $score): int + { + try { + return $this->redis->zAdd($key, $score, $value); + } catch (\RedisException $e) { + throw new ServerException('zadd command has failed', null, $e); + } + } + + public function zrem(string $key, string $value): int + { + try { + return $this->redis->zRem($key, $value); + } catch (\RedisException $e) { + throw new ServerException('zrem command has failed', null, $e); + } + } + public function lpush(string $key, string $value): int { try { diff --git a/pkg/redis/Redis.php b/pkg/redis/Redis.php index 2ea2e054e..362f5e8d0 100644 --- a/pkg/redis/Redis.php +++ b/pkg/redis/Redis.php @@ -6,6 +6,38 @@ interface Redis { + /** + * @param string $script + * @param array $keys + * @param array $args + * + * @throws ServerException + * + * @return mixed + */ + public function eval(string $script, array $keys = [], array $args = []); + + /** + * @param string $key + * @param string $value + * @param float $score + * + * @throws ServerException + * + * @return int + */ + public function zadd(string $key, string $value, float $score): int; + + /** + * @param string $key + * @param string $value + * + * @throws ServerException + * + * @return int + */ + public function zrem(string $key, string $value): int; + /** * @param string $key * @param string $value diff --git a/pkg/redis/RedisConnectionFactory.php b/pkg/redis/RedisConnectionFactory.php index 5d5ec3b40..eee9d4851 100644 --- a/pkg/redis/RedisConnectionFactory.php +++ b/pkg/redis/RedisConnectionFactory.php @@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory * 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown. * 'predis_options' => An array of predis specific options. * 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options + * 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay. + * It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time. * ]. * * or @@ -85,10 +87,10 @@ public function createContext(): Context if ($this->config['lazy']) { return new RedisContext(function () { return $this->createRedis(); - }); + }, $this->config['redelivery_delay']); } - return new RedisContext($this->createRedis()); + return new RedisContext($this->createRedis(), $this->config['redelivery_delay']); } private function createRedis(): Redis @@ -158,6 +160,7 @@ private function defaultConfig(): array 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ]; } } diff --git a/pkg/redis/RedisConsumer.php b/pkg/redis/RedisConsumer.php index 0fb9db787..6ff23f41e 100644 --- a/pkg/redis/RedisConsumer.php +++ b/pkg/redis/RedisConsumer.php @@ -11,6 +11,8 @@ class RedisConsumer implements Consumer { + use RedisConsumerHelperTrait; + /** * @var RedisDestination */ @@ -21,12 +23,33 @@ class RedisConsumer implements Consumer */ private $context; + /** + * @var int + */ + private $redeliveryDelay = 300; + public function __construct(RedisContext $context, RedisDestination $queue) { $this->context = $context; $this->queue = $queue; } + /** + * @return int + */ + public function getRedeliveryDelay(): ?int + { + return $this->redeliveryDelay; + } + + /** + * @param int $delay + */ + public function setRedeliveryDelay(int $delay): void + { + $this->redeliveryDelay = $delay; + } + /** * @return RedisDestination */ @@ -40,8 +63,9 @@ public function getQueue(): Queue */ public function receive(int $timeout = 0): ?Message { - $timeout = (int) ($timeout / 1000); - if (empty($timeout)) { + $timeout = (int) ceil($timeout / 1000); + + if ($timeout <= 0) { while (true) { if ($message = $this->receive(5000)) { return $message; @@ -49,11 +73,7 @@ public function receive(int $timeout = 0): ?Message } } - if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) { - return RedisMessage::jsonUnserialize($result->getMessage()); - } - - return null; + return $this->receiveMessage([$this->queue], $timeout, $this->redeliveryDelay); } /** @@ -61,11 +81,7 @@ public function receive(int $timeout = 0): ?Message */ public function receiveNoWait(): ?Message { - if ($result = $this->getRedis()->rpop($this->queue->getName())) { - return RedisMessage::jsonUnserialize($result->getMessage()); - } - - return null; + return $this->receiveMessageNoWait($this->queue, $this->redeliveryDelay); } /** @@ -73,7 +89,7 @@ public function receiveNoWait(): ?Message */ public function acknowledge(Message $message): void { - // do nothing. redis transport always works in auto ack mode + $this->getRedis()->zrem($this->queue->getName().':reserved', $message->getReservedKey()); } /** @@ -83,13 +99,27 @@ public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - // do nothing on reject. redis transport always works in auto ack mode + $this->acknowledge($message); if ($requeue) { - $this->context->createProducer()->send($this->queue, $message); + $message = $this->getContext()->getSerializer()->toMessage($message->getReservedKey()); + $message->setHeader('attempts', 0); + + if ($message->getTimeToLive()) { + $message->setHeader('expires_at', time() + $message->getTimeToLive()); + } + + $payload = $this->getContext()->getSerializer()->toString($message); + + $this->getRedis()->lpush($this->queue->getName(), $payload); } } + private function getContext(): RedisContext + { + return $this->context; + } + private function getRedis(): Redis { return $this->context->getRedis(); diff --git a/pkg/redis/RedisConsumerHelperTrait.php b/pkg/redis/RedisConsumerHelperTrait.php new file mode 100644 index 000000000..9939986ed --- /dev/null +++ b/pkg/redis/RedisConsumerHelperTrait.php @@ -0,0 +1,118 @@ +queueNames) { + $this->queueNames = []; + foreach ($queues as $queue) { + $this->queueNames[] = $queue->getName(); + } + } + + while ($thisTimeout > 0) { + $this->migrateExpiredMessages($this->queueNames); + + if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) { + return null; + } + + $this->pushQueueNameBack($result->getKey()); + + if ($message = $this->processResult($result, $redeliveryDelay)) { + return $message; + } + + $thisTimeout -= time() - $startAt; + } + + return null; + } + + protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage + { + $this->migrateExpiredMessages([$destination->getName()]); + + if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) { + return $this->processResult($result, $redeliveryDelay); + } + + return null; + } + + protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage + { + $message = $this->getContext()->getSerializer()->toMessage($result->getMessage()); + + $now = time(); + + if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) { + if ($now > $expiresAt) { + return null; + } + } + + $message->setHeader('attempts', $message->getAttempts() + 1); + $message->setRedelivered($message->getAttempts() > 1); + $message->setKey($result->getKey()); + $message->setReservedKey($this->getContext()->getSerializer()->toString($message)); + + $reservedQueue = $result->getKey().':reserved'; + $redeliveryAt = $now + $redeliveryDelay; + + $this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt); + + return $message; + } + + protected function pushQueueNameBack(string $queueName): void + { + if (count($this->queueNames) <= 1) { + return; + } + + if (false === $from = array_search($queueName, $this->queueNames, true)) { + throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName)); + } + + $to = count($this->queueNames) - 1; + + $out = array_splice($this->queueNames, $from, 1); + array_splice($this->queueNames, $to, 0, $out); + } + + protected function migrateExpiredMessages(array $queueNames): void + { + $now = time(); + + foreach ($queueNames as $queueName) { + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]); + + $this->getContext()->getRedis() + ->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]); + } + } +} diff --git a/pkg/redis/RedisContext.php b/pkg/redis/RedisContext.php index e69385884..344bb20c5 100644 --- a/pkg/redis/RedisContext.php +++ b/pkg/redis/RedisContext.php @@ -17,6 +17,8 @@ class RedisContext implements Context { + use SerializerAwareTrait; + /** * @var Redis */ @@ -27,12 +29,18 @@ class RedisContext implements Context */ private $redisFactory; + /** + * @var int + */ + private $redeliveryDelay = 300; + /** * Callable must return instance of Redis once called. * * @param Redis|callable $redis + * @param int $redeliveryDelay */ - public function __construct($redis) + public function __construct($redis, int $redeliveryDelay) { if ($redis instanceof Redis) { $this->redis = $redis; @@ -45,6 +53,9 @@ public function __construct($redis) Redis::class )); } + + $this->redeliveryDelay = $redeliveryDelay; + $this->setSerializer(new JsonSerializer()); } /** @@ -78,7 +89,7 @@ public function deleteQueue(Queue $queue): void { InvalidDestinationException::assertDestinationInstanceOf($queue, RedisDestination::class); - $this->getRedis()->del($queue->getName()); + $this->deleteDestination($queue); } /** @@ -88,7 +99,7 @@ public function deleteTopic(Topic $topic): void { InvalidDestinationException::assertDestinationInstanceOf($topic, RedisDestination::class); - $this->getRedis()->del($topic->getName()); + $this->deleteDestination($topic); } public function createTemporaryQueue(): Queue @@ -101,7 +112,7 @@ public function createTemporaryQueue(): Queue */ public function createProducer(): Producer { - return new RedisProducer($this->getRedis()); + return new RedisProducer($this); } /** @@ -113,7 +124,10 @@ public function createConsumer(Destination $destination): Consumer { InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); - return new RedisConsumer($this, $destination); + $consumer = new RedisConsumer($this, $destination); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** @@ -121,7 +135,10 @@ public function createConsumer(Destination $destination): Consumer */ public function createSubscriptionConsumer(): SubscriptionConsumer { - return new RedisSubscriptionConsumer($this); + $consumer = new RedisSubscriptionConsumer($this); + $consumer->setRedeliveryDelay($this->redeliveryDelay); + + return $consumer; } /** @@ -129,7 +146,7 @@ public function createSubscriptionConsumer(): SubscriptionConsumer */ public function purgeQueue(Queue $queue): void { - $this->getRedis()->del($queue->getName()); + $this->deleteDestination($queue); } public function close(): void @@ -154,4 +171,11 @@ public function getRedis(): Redis return $this->redis; } + + private function deleteDestination(RedisDestination $destination): void + { + $this->getRedis()->del($destination->getName()); + $this->getRedis()->del($destination->getName().':delayed'); + $this->getRedis()->del($destination->getName().':reserved'); + } } diff --git a/pkg/redis/RedisMessage.php b/pkg/redis/RedisMessage.php index ab614f773..74c65475a 100644 --- a/pkg/redis/RedisMessage.php +++ b/pkg/redis/RedisMessage.php @@ -6,7 +6,7 @@ use Interop\Queue\Message; -class RedisMessage implements Message, \JsonSerializable +class RedisMessage implements Message { /** * @var string @@ -28,6 +28,16 @@ class RedisMessage implements Message, \JsonSerializable */ private $redelivered; + /** + * @var string + */ + private $reservedKey; + + /** + * @var string + */ + private $key; + public function __construct(string $body = '', array $properties = [], array $headers = []) { $this->body = $body; @@ -139,26 +149,72 @@ public function getReplyTo(): ?string return $this->getHeader('reply_to'); } - public function jsonSerialize(): array + /** + * @return int + */ + public function getAttempts(): int + { + return (int) $this->getHeader('attempts', 0); + } + + /** + * @return int + */ + public function getTimeToLive(): ?int + { + return $this->getHeader('time_to_live'); + } + + /** + * Set time to live in milliseconds. + */ + public function setTimeToLive(int $timeToLive = null): void + { + $this->setHeader('time_to_live', $timeToLive); + } + + public function getDeliveryDelay(): ?int + { + return $this->getHeader('delivery_delay'); + } + + /** + * Set delay in milliseconds. + */ + public function setDeliveryDelay(int $deliveryDelay = null): void + { + $this->setHeader('delivery_delay', $deliveryDelay); + } + + /** + * @return string + */ + public function getReservedKey(): ?string + { + return $this->reservedKey; + } + + /** + * @param string $reservedKey + */ + public function setReservedKey(string $reservedKey) { - return [ - 'body' => $this->getBody(), - 'properties' => $this->getProperties(), - 'headers' => $this->getHeaders(), - ]; + $this->reservedKey = $reservedKey; } - public static function jsonUnserialize(string $json): self + /** + * @return string + */ + public function getKey(): ?string { - $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); - } + return $this->key; + } - return new self($data['body'], $data['properties'], $data['headers']); + /** + * @param string $key + */ + public function setKey(string $key): void + { + $this->key = $key; } } diff --git a/pkg/redis/RedisProducer.php b/pkg/redis/RedisProducer.php index 81ca13b7e..69b028cb1 100644 --- a/pkg/redis/RedisProducer.php +++ b/pkg/redis/RedisProducer.php @@ -9,20 +9,31 @@ use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Message; use Interop\Queue\Producer; +use Ramsey\Uuid\Uuid; class RedisProducer implements Producer { /** - * @var Redis + * @var RedisContext */ - private $redis; + private $context; /** - * @param Redis $redis + * @var int|null */ - public function __construct(Redis $redis) + private $timeToLive; + + /** + * @var int + */ + private $deliveryDelay; + + /** + * @param RedisContext $context + */ + public function __construct(RedisContext $context) { - $this->redis = $redis; + $this->context = $context; } /** @@ -34,24 +45,44 @@ public function send(Destination $destination, Message $message): void InvalidDestinationException::assertDestinationInstanceOf($destination, RedisDestination::class); InvalidMessageException::assertMessageInstanceOf($message, RedisMessage::class); - $this->redis->lpush($destination->getName(), json_encode($message)); + $message->setMessageId(Uuid::uuid4()->toString()); + $message->setHeader('attempts', 0); + + if (null !== $this->timeToLive && null === $message->getTimeToLive()) { + $message->setTimeToLive($this->timeToLive); + } + + if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) { + $message->setDeliveryDelay($this->deliveryDelay); + } + + if ($message->getTimeToLive()) { + $message->setHeader('expires_at', time() + $message->getTimeToLive()); + } + + $payload = $this->context->getSerializer()->toString($message); + + if ($message->getDeliveryDelay()) { + $deliveryAt = time() + $message->getDeliveryDelay(); + $this->context->getRedis()->zadd($destination->getName().':delayed', $payload, $deliveryAt); + } else { + $this->context->getRedis()->lpush($destination->getName(), $payload); + } } /** - * @return RedisProducer + * @return self */ public function setDeliveryDelay(int $deliveryDelay = null): Producer { - if (null === $deliveryDelay) { - return $this; - } + $this->deliveryDelay = $deliveryDelay; - throw new \LogicException('Not implemented'); + return $this; } public function getDeliveryDelay(): ?int { - return null; + return $this->deliveryDelay; } /** @@ -72,19 +103,17 @@ public function getPriority(): ?int } /** - * @return RedisProducer + * @return self */ public function setTimeToLive(int $timeToLive = null): Producer { - if (null === $timeToLive) { - return $this; - } + $this->timeToLive = $timeToLive; - throw new \LogicException('Not implemented'); + return $this; } public function getTimeToLive(): ?int { - return null; + return $this->timeToLive; } } diff --git a/pkg/redis/RedisSubscriptionConsumer.php b/pkg/redis/RedisSubscriptionConsumer.php index 1b6cd1149..c59cab4da 100644 --- a/pkg/redis/RedisSubscriptionConsumer.php +++ b/pkg/redis/RedisSubscriptionConsumer.php @@ -9,6 +9,8 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer { + use RedisConsumerHelperTrait; + /** * @var RedisContext */ @@ -21,6 +23,11 @@ class RedisSubscriptionConsumer implements SubscriptionConsumer */ private $subscribers; + /** + * @var int + */ + private $redeliveryDelay = 300; + /** * @param RedisContext $context */ @@ -30,6 +37,22 @@ public function __construct(RedisContext $context) $this->subscribers = []; } + /** + * @return int + */ + public function getRedeliveryDelay(): ?int + { + return $this->redeliveryDelay; + } + + /** + * @param int $delay + */ + public function setRedeliveryDelay(int $delay): void + { + $this->redeliveryDelay = $delay; + } + public function consume(int $timeout = 0): void { if (empty($this->subscribers)) { @@ -39,32 +62,19 @@ public function consume(int $timeout = 0): void $timeout = (int) ceil($timeout / 1000); $endAt = time() + $timeout; - $queueNames = []; - foreach (array_keys($this->subscribers) as $queueName) { - $queueNames[$queueName] = $queueName; + $queues = []; + /** @var Consumer $consumer */ + foreach ($this->subscribers as list($consumer)) { + $queues[] = $consumer->getQueue(); } - $currentQueueNames = []; while (true) { - if (empty($currentQueueNames)) { - $currentQueueNames = $queueNames; - } + if ($message = $this->receiveMessage($queues, $timeout ?: 5, $this->redeliveryDelay)) { + list($consumer, $callback) = $this->subscribers[$message->getKey()]; - $result = $this->context->getRedis()->brpop($currentQueueNames, $timeout ?: 5); - if ($result) { - $message = RedisMessage::jsonUnserialize($result->getMessage()); - list($consumer, $callback) = $this->subscribers[$result->getKey()]; if (false === call_user_func($callback, $message, $consumer)) { return; } - - unset($currentQueueNames[$result->getKey()]); - } else { - $currentQueueNames = []; - - if ($timeout && microtime(true) >= $endAt) { - return; - } } if ($timeout && microtime(true) >= $endAt) { @@ -92,6 +102,7 @@ public function subscribe(Consumer $consumer, callable $callback): void } $this->subscribers[$queueName] = [$consumer, $callback]; + $this->queueNames = null; } /** @@ -114,10 +125,17 @@ public function unsubscribe(Consumer $consumer): void } unset($this->subscribers[$queueName]); + $this->queueNames = null; } public function unsubscribeAll(): void { $this->subscribers = []; + $this->queueNames = null; + } + + private function getContext(): RedisContext + { + return $this->context; } } diff --git a/pkg/redis/Serializer.php b/pkg/redis/Serializer.php new file mode 100644 index 000000000..a936a9328 --- /dev/null +++ b/pkg/redis/Serializer.php @@ -0,0 +1,12 @@ +serializer = $serializer; + } + + /** + * @return Serializer + */ + public function getSerializer() + { + return $this->serializer; + } +} diff --git a/pkg/redis/Tests/Functional/CommonUseCasesTrait.php b/pkg/redis/Tests/Functional/CommonUseCasesTrait.php index 02f63d6f4..b80ea9763 100644 --- a/pkg/redis/Tests/Functional/CommonUseCasesTrait.php +++ b/pkg/redis/Tests/Functional/CommonUseCasesTrait.php @@ -61,7 +61,10 @@ public function testProduceAndReceiveOneMessageSentDirectlyToQueue() $this->assertEquals(__METHOD__, $message->getBody()); $this->assertEquals(['FooProperty' => 'FooVal'], $message->getProperties()); - $this->assertEquals(['BarHeader' => 'BarVal'], $message->getHeaders()); + $this->assertCount(3, $message->getHeaders()); + $this->assertSame(1, $message->getHeader('attempts')); + $this->assertSame('BarVal', $message->getHeader('BarHeader')); + $this->assertNotEmpty('BarVal', $message->getHeader('message_id')); } public function testProduceAndReceiveOneMessageSentDirectlyToTopic() @@ -99,7 +102,7 @@ public function testConsumerReceiveMessageWithZeroTimeout() $actualMessage = $consumer->receive(0); $this->assertInstanceOf(RedisMessage::class, $actualMessage); - $consumer->acknowledge($message); + $consumer->acknowledge($actualMessage); $this->assertEquals(__METHOD__, $message->getBody()); } diff --git a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php index 953006af9..897d5ee18 100644 --- a/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php +++ b/pkg/redis/Tests/RedisConnectionFactoryConfigTest.php @@ -90,6 +90,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -110,6 +111,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -130,6 +132,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -151,6 +154,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -172,6 +176,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -193,6 +198,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -215,6 +221,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -237,6 +244,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -259,6 +267,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -280,6 +289,7 @@ public static function provideConfigs() 'predis_options' => null, 'ssl' => null, 'foo' => 'bar', + 'redelivery_delay' => 300, ], ]; @@ -301,6 +311,7 @@ public static function provideConfigs() 'read_write_timeout' => null, 'predis_options' => null, 'ssl' => null, + 'redelivery_delay' => 300, ], ]; @@ -326,6 +337,7 @@ public static function provideConfigs() 'cafile' => 'private.pem', 'verify_peer' => '1', ], + 'redelivery_delay' => 300, ], ]; } diff --git a/pkg/redis/Tests/RedisConsumerTest.php b/pkg/redis/Tests/RedisConsumerTest.php index d8b0c4e7b..8c2b1afe0 100644 --- a/pkg/redis/Tests/RedisConsumerTest.php +++ b/pkg/redis/Tests/RedisConsumerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Redis\Tests; +use Enqueue\Redis\JsonSerializer; use Enqueue\Redis\Redis; use Enqueue\Redis\RedisConsumer; use Enqueue\Redis\RedisContext; @@ -35,41 +36,85 @@ public function testShouldReturnDestinationSetInConstructorOnGetQueue() $this->assertSame($destination, $consumer->getQueue()); } - public function testShouldDoNothingOnAcknowledge() + public function testShouldAcknowledgeMessage() { - $consumer = new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue')); + $redisMock = $this->createRedisMock(); + $redisMock + ->expects($this->once()) + ->method('zrem') + ->with('aQueue:reserved', 'reserved-key') + ->willReturn(1) + ; - $consumer->acknowledge(new RedisMessage()); - } + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; - public function testShouldDoNothingOnReject() - { - $consumer = new RedisConsumer($this->createContextMock(), new RedisDestination('aQueue')); + $message = new RedisMessage(); + $message->setReservedKey('reserved-key'); + + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); - $consumer->reject(new RedisMessage()); + $consumer->acknowledge($message); } - public function testShouldSendSameMessageToDestinationOnReQueue() + public function testShouldRejectMessage() { + $redisMock = $this->createRedisMock(); + $redisMock + ->expects($this->once()) + ->method('zrem') + ->with('aQueue:reserved', 'reserved-key') + ->willReturn(1) + ; + + $contextMock = $this->createContextMock(); + $contextMock + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $message = new RedisMessage(); + $message->setReservedKey('reserved-key'); - $destination = new RedisDestination('aQueue'); + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); - $producerMock = $this->createProducerMock(); - $producerMock + $consumer->reject($message); + } + + public function testShouldSendSameMessageToDestinationOnReQueue() + { + $redisMock = $this->createRedisMock(); + $redisMock ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($destination), $this->identicalTo($message)) + ->method('lpush') + ->with('aQueue', '{"body":"text","properties":[],"headers":{"attempts":0}}') + ->willReturn(1) ; + $serializer = new JsonSerializer(); + $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) - ->method('createProducer') - ->willReturn($producerMock) + ->expects($this->any()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) ; - $consumer = new RedisConsumer($contextMock, $destination); + $message = new RedisMessage(); + $message->setBody('text'); + $message->setReservedKey($serializer->toString($message)); + + $consumer = new RedisConsumer($contextMock, new RedisDestination('aQueue')); $consumer->reject($message, true); } @@ -88,7 +133,7 @@ public function testShouldCallRedisBRPopAndReturnNullIfNothingInQueueOnReceive() $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; @@ -102,20 +147,27 @@ public function testShouldCallRedisBRPopAndReturnMessageIfOneInQueueOnReceive() { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) ->method('brpop') ->with(['aQueue'], 2) - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -131,24 +183,26 @@ public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroT $expectedTimeout = 5; + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->at(0)) + ->expects($this->at(2)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) ->willReturn(null) ; $redisMock - ->expects($this->at(1)) + ->expects($this->at(5)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) ->willReturn(null) ; $redisMock - ->expects($this->at(2)) + ->expects($this->at(8)) ->method('brpop') ->with(['aQueue'], $expectedTimeout) - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); @@ -157,6 +211,11 @@ public function testShouldCallRedisBRPopSeveralTimesWithFiveSecondTimeoutIfZeroT ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->atLeastOnce()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -170,6 +229,8 @@ public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoW { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) @@ -180,10 +241,15 @@ public function testShouldCallRedisRPopAndReturnNullIfNothingInQueueOnReceiveNoW $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->any()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); @@ -194,20 +260,27 @@ public function testShouldCallRedisRPopAndReturnMessageIfOneInQueueOnReceiveNoWa { $destination = new RedisDestination('aQueue'); + $serializer = new JsonSerializer(); + $redisMock = $this->createRedisMock(); $redisMock ->expects($this->once()) ->method('rpop') ->with('aQueue') - ->willReturn(new RedisResult('aQueue', json_encode(new RedisMessage('aBody')))) + ->willReturn(new RedisResult('aQueue', $serializer->toString(new RedisMessage('aBody')))) ; $contextMock = $this->createContextMock(); $contextMock - ->expects($this->once()) + ->expects($this->atLeastOnce()) ->method('getRedis') ->willReturn($redisMock) ; + $contextMock + ->expects($this->any()) + ->method('getSerializer') + ->willReturn($serializer) + ; $consumer = new RedisConsumer($contextMock, $destination); diff --git a/pkg/redis/Tests/RedisContextTest.php b/pkg/redis/Tests/RedisContextTest.php index 94fb626f1..01ea67df5 100644 --- a/pkg/redis/Tests/RedisContextTest.php +++ b/pkg/redis/Tests/RedisContextTest.php @@ -27,26 +27,26 @@ public function testShouldImplementContextInterface() public function testCouldBeConstructedWithRedisAsFirstArgument() { - new RedisContext($this->createRedisMock()); + new RedisContext($this->createRedisMock(), 300); } public function testCouldBeConstructedWithRedisFactoryAsFirstArgument() { new RedisContext(function () { return $this->createRedisMock(); - }); + }, 300); } public function testThrowIfNeitherRedisNorFactoryGiven() { $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The $redis argument must be either Enqueue\Redis\Redis or callable that returns Enqueue\Redis\Redis once called.'); - new RedisContext(new \stdClass()); + new RedisContext(new \stdClass(), 300); } public function testShouldAllowCreateEmptyMessage() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $message = $context->createMessage(); @@ -59,7 +59,7 @@ public function testShouldAllowCreateEmptyMessage() public function testShouldAllowCreateCustomMessage() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); @@ -72,7 +72,7 @@ public function testShouldAllowCreateCustomMessage() public function testShouldCreateQueue() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $queue = $context->createQueue('aQueue'); @@ -82,7 +82,7 @@ public function testShouldCreateQueue() public function testShouldAllowCreateTopic() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $topic = $context->createTopic('aTopic'); @@ -92,7 +92,7 @@ public function testShouldAllowCreateTopic() public function testThrowNotImplementedOnCreateTmpQueueCall() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->expectException(TemporaryQueueNotSupportedException::class); @@ -101,7 +101,7 @@ public function testThrowNotImplementedOnCreateTmpQueueCall() public function testShouldCreateProducer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $producer = $context->createProducer(); @@ -110,7 +110,7 @@ public function testShouldCreateProducer() public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Redis\RedisDestination but got Enqueue\Null\NullQueue.'); @@ -121,7 +121,7 @@ public function testShouldThrowIfNotRedisDestinationGivenOnCreateConsumer() public function testShouldCreateConsumer() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $queue = $context->createQueue('aQueue'); @@ -138,7 +138,7 @@ public function testShouldCallRedisDisconnectOnClose() ->method('disconnect') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $context->close(); } @@ -151,7 +151,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteQueue() ->method('del') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $this->expectException(InvalidDestinationException::class); $context->deleteQueue(new NullQueue('aQueue')); @@ -161,12 +161,22 @@ public function testShouldAllowDeleteQueue() { $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->once()) + ->expects($this->at(0)) ->method('del') ->with('aQueueName') ; + $redisMock + ->expects($this->at(1)) + ->method('del') + ->with('aQueueName:delayed') + ; + $redisMock + ->expects($this->at(2)) + ->method('del') + ->with('aQueueName:reserved') + ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $queue = $context->createQueue('aQueueName'); @@ -181,7 +191,7 @@ public function testThrowIfNotRedisDestinationGivenOnDeleteTopic() ->method('del') ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $this->expectException(InvalidDestinationException::class); $context->deleteTopic(new NullTopic('aTopic')); @@ -191,12 +201,22 @@ public function testShouldAllowDeleteTopic() { $redisMock = $this->createRedisMock(); $redisMock - ->expects($this->once()) + ->expects($this->at(0)) ->method('del') ->with('aTopicName') ; + $redisMock + ->expects($this->at(1)) + ->method('del') + ->with('aTopicName:delayed') + ; + $redisMock + ->expects($this->at(2)) + ->method('del') + ->with('aTopicName:reserved') + ; - $context = new RedisContext($redisMock); + $context = new RedisContext($redisMock, 300); $topic = $context->createTopic('aTopicName'); @@ -205,7 +225,7 @@ public function testShouldAllowDeleteTopic() public function testShouldReturnExpectedSubscriptionConsumerInstance() { - $context = new RedisContext($this->createRedisMock()); + $context = new RedisContext($this->createRedisMock(), 300); $this->assertInstanceOf(RedisSubscriptionConsumer::class, $context->createSubscriptionConsumer()); } diff --git a/pkg/redis/Tests/RedisMessageTest.php b/pkg/redis/Tests/RedisMessageTest.php index 2a81efa5a..5b1e42fe2 100644 --- a/pkg/redis/Tests/RedisMessageTest.php +++ b/pkg/redis/Tests/RedisMessageTest.php @@ -9,11 +9,6 @@ class RedisMessageTest extends \PHPUnit\Framework\TestCase { use ClassExtensionTrait; - public function testShouldImplementJsonSerializableInterface() - { - $this->assertClassImplements(\JsonSerializable::class, RedisMessage::class); - } - public function testCouldConstructMessageWithoutArguments() { $message = new RedisMessage(); @@ -63,34 +58,4 @@ public function testShouldSetReplyToAsHeader() $this->assertSame(['reply_to' => 'theQueueName'], $message->getHeaders()); } - - public function testColdBeSerializedToJson() - { - $message = new RedisMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); - - $this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message)); - } - - public function testCouldBeUnserializedFromJson() - { - $message = new RedisMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']); - - $json = json_encode($message); - - //guard - $this->assertNotEmpty($json); - - $unserializedMessage = RedisMessage::jsonUnserialize($json); - - $this->assertInstanceOf(RedisMessage::class, $unserializedMessage); - $this->assertEquals($message, $unserializedMessage); - } - - public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson() - { - $this->expectException(\InvalidArgumentException::class); - $this->expectExceptionMessage('The malformed json given.'); - - RedisMessage::jsonUnserialize('{]'); - } } diff --git a/pkg/redis/Tests/RedisProducerTest.php b/pkg/redis/Tests/RedisProducerTest.php index 98934826c..12082734c 100644 --- a/pkg/redis/Tests/RedisProducerTest.php +++ b/pkg/redis/Tests/RedisProducerTest.php @@ -4,7 +4,9 @@ use Enqueue\Null\NullMessage; use Enqueue\Null\NullQueue; +use Enqueue\Redis\JsonSerializer; use Enqueue\Redis\Redis; +use Enqueue\Redis\RedisContext; use Enqueue\Redis\RedisDestination; use Enqueue\Redis\RedisMessage; use Enqueue\Redis\RedisProducer; @@ -25,12 +27,12 @@ public function testShouldImplementProducerInterface() public function testCouldBeConstructedWithRedisAsFirstArgument() { - new RedisProducer($this->createRedisMock()); + new RedisProducer($this->createContextMock()); } public function testThrowIfDestinationNotRedisDestinationOnSend() { - $producer = new RedisProducer($this->createRedisMock()); + $producer = new RedisProducer($this->createContextMock()); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Redis\RedisDestination but got Enqueue\Null\NullQueue.'); @@ -39,7 +41,7 @@ public function testThrowIfDestinationNotRedisDestinationOnSend() public function testThrowIfMessageNotRedisMessageOnSend() { - $producer = new RedisProducer($this->createRedisMock()); + $producer = new RedisProducer($this->createContextMock()); $this->expectException(InvalidMessageException::class); $this->expectExceptionMessage('The message must be an instance of Enqueue\Redis\RedisMessage but it is Enqueue\Null\NullMessage.'); @@ -54,14 +56,46 @@ public function testShouldCallLPushOnSend() $redisMock ->expects($this->once()) ->method('lpush') - ->with('aDestination', '{"body":"","properties":[],"headers":[]}') + ->willReturnCallback(function (string $key, string $value) { + $this->assertSame('aDestination', $key); + + $message = json_decode($value, true); + + $this->assertArrayHasKey('body', $message); + $this->assertArrayHasKey('properties', $message); + $this->assertArrayHasKey('headers', $message); + $this->assertNotEmpty($message['headers']['message_id']); + $this->assertSame(0, $message['headers']['attempts']); + + return true; + }) ; - $producer = new RedisProducer($redisMock); + $context = $this->createContextMock(); + $context + ->expects($this->once()) + ->method('getRedis') + ->willReturn($redisMock) + ; + $context + ->expects($this->once()) + ->method('getSerializer') + ->willReturn(new JsonSerializer()) + ; + + $producer = new RedisProducer($context); $producer->send($destination, new RedisMessage()); } + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RedisContext + */ + private function createContextMock() + { + return $this->createMock(RedisContext::class); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|Redis */ diff --git a/pkg/redis/Tests/Spec/JsonSerializerTest.php b/pkg/redis/Tests/Spec/JsonSerializerTest.php new file mode 100644 index 000000000..fd244b17c --- /dev/null +++ b/pkg/redis/Tests/Spec/JsonSerializerTest.php @@ -0,0 +1,76 @@ +assertClassImplements(Serializer::class, JsonSerializer::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new JsonSerializer(); + } + + public function testShouldConvertMessageToJsonString() + { + $serializer = new JsonSerializer(); + + $message = new RedisMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); + + $json = $serializer->toString($message); + + $this->assertSame('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}', $json); + } + + public function testThrowIfFailedToEncodeMessageToJson() + { + $serializer = new JsonSerializer(); + + $resource = fopen(__FILE__, 'rb'); + + //guard + $this->assertInternalType('resource', $resource); + + $message = new RedisMessage('theBody', ['aProp' => $resource]); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toString($message); + } + + public function testShouldConvertJsonStringToMessage() + { + $serializer = new JsonSerializer(); + + $message = $serializer->toMessage('{"body":"theBody","properties":{"aProp":"aPropVal"},"headers":{"aHeader":"aHeaderVal"}}'); + + $this->assertInstanceOf(RedisMessage::class, $message); + + $this->assertSame('theBody', $message->getBody()); + $this->assertSame(['aProp' => 'aPropVal'], $message->getProperties()); + $this->assertSame(['aHeader' => 'aHeaderVal'], $message->getHeaders()); + } + + public function testThrowIfFailedToDecodeJsonToMessage() + { + $serializer = new JsonSerializer(); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The malformed json given.'); + $serializer->toMessage('{]'); + } +} diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php index 21d514fa5..16dbd8f98 100644 --- a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeFromAllSubscribedQueuesTest.php @@ -35,7 +35,7 @@ protected function createQueue(Context $context, $queueName) { /** @var RedisDestination $queue */ $queue = parent::createQueue($context, $queueName); - $context->getRedis()->del($queueName); + $context->deleteQueue($queue); return $queue; } diff --git a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php index 9524e5b6f..b227e3405 100644 --- a/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php +++ b/pkg/redis/Tests/Spec/RedisSubscriptionConsumerConsumeUntilUnsubscribedTest.php @@ -35,7 +35,7 @@ protected function createQueue(Context $context, $queueName) { /** @var RedisDestination $queue */ $queue = parent::createQueue($context, $queueName); - $context->getRedis()->del($queueName); + $context->purgeQueue($queue); return $queue; } diff --git a/pkg/redis/composer.json b/pkg/redis/composer.json index dfbccfd04..73b2236a3 100644 --- a/pkg/redis/composer.json +++ b/pkg/redis/composer.json @@ -8,7 +8,8 @@ "require": { "php": "^7.1.3", "queue-interop/queue-interop": "0.7.x-dev", - "enqueue/dsn": "0.9.x-dev" + "enqueue/dsn": "0.9.x-dev", + "ramsey/uuid": "^3" }, "require-dev": { "phpunit/phpunit": "~5.4.0",