Skip to content

Redis New Implementation #585

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 41 additions & 6 deletions docs/transport/redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ It creates a collection (a queue or topic) there. Pushes messages to the tail of
The transport works with [phpredis](https://github.com/phpredis/phpredis) php extension or [predis](https://github.com/nrk/predis) library.
Make sure you installed either of them

**Limitations** It works only in auto ack mode hence If consumer crashes the message is lost.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Send expiration message](#send-expiration-message)
* [Send delayed message](#send-delayed-message)
* [Consume message](#consume-message)
* [Delete queue (purge messages)](#delete-queue-purge-messages)
* [Delete topic (purge messages)](#delete-topic-purge-messages)
Expand Down Expand Up @@ -56,7 +56,7 @@ $factory = new RedisConnectionFactory([
]);

// same as above but given as DSN string
$factory = new RedisConnectionFactory('redis://example.com:1000?vendor=phpredis');
$factory = new RedisConnectionFactory('redis+phpredis://example.com:1000');

$psrContext = $factory->createContext();

Expand All @@ -68,7 +68,7 @@ $redis = new \Enqueue\Redis\PhpRedis([ /** redis connection options */ ]);
$redis->connect();

// Secure\TLS connection. Works only with predis library. Note second "S" in scheme.
$factory = new RedisConnectionFactory('rediss://user:pass@host/0?vendor=predis');
$factory = new RedisConnectionFactory('rediss+predis://user:pass@host/0');

$factory = new RedisConnectionFactory($redis);
```
Expand All @@ -82,7 +82,7 @@ use Enqueue\Redis\RedisConnectionFactory;
$connectionFactory = new RedisConnectionFactory([
'host' => 'localhost',
'port' => 6379,
'vendor' => 'predis',
'scheme_extensions' => 'predis',
]);

$psrContext = $connectionFactory->createContext();
Expand All @@ -102,7 +102,7 @@ $options = [];

$redis = new PRedis(new \PRedis\Client($config, $options));

$factory = new RedisConnectionFactory(['vendor' => 'custom', 'redis' => $redis]);
$factory = new RedisConnectionFactory($redis);
```

## Send message to topic
Expand All @@ -129,6 +129,38 @@ $message = $psrContext->createMessage('Hello world!');
$psrContext->createProducer()->send($fooQueue, $message);
```

## Send expiration message

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */
/** @var \Enqueue\Redis\RedisDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setTimeToLive(60000) // 60 sec
//
->send($fooQueue, $message)
;
```

## Send delayed message

```php
<?php
/** @var \Enqueue\Redis\RedisContext $psrContext */
/** @var \Enqueue\Redis\RedisDestination $fooQueue */

$message = $psrContext->createMessage('Hello world!');

$psrContext->createProducer()
->setDeliveryDelay(5000) // 5 sec

->send($fooQueue, $message)
;
````

## Consume message:

```php
Expand All @@ -141,6 +173,9 @@ $consumer = $psrContext->createConsumer($fooQueue);
$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
//$consumer->reject($message);
```

## Delete queue (purge messages):
Expand Down
41 changes: 41 additions & 0 deletions pkg/redis/JsonSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

class JsonSerializer implements Serializer
{
public function toString(RedisMessage $message): string
{
$json = json_encode([
'body' => $message->getBody(),
'properties' => $message->getProperties(),
'headers' => $message->getHeaders(),
]);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return $json;
}

public function toMessage(string $string): RedisMessage
{
$data = json_decode($string, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
}

return new RedisMessage($data['body'], $data['properties'], $data['headers']);
}
}
38 changes: 38 additions & 0 deletions pkg/redis/LuaScripts.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

class LuaScripts
{
/**
* Get the Lua script to migrate expired messages back onto the queue.
*
* KEYS[1] - The queue we are removing messages from, for example: queues:foo:reserved
* KEYS[2] - The queue we are moving messages to, for example: queues:foo
* ARGV[1] - The current UNIX timestamp
*
* @return string
*/
public static function migrateExpired()
{
return <<<'LUA'
-- Get all of the messages with an expired "score"...
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])

-- If we have values in the array, we will remove them from the first queue
-- and add them onto the destination queue in chunks of 100, which moves
-- all of the appropriate messages onto the destination queue very safely.
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

for i = 1, #val, 100 do
redis.call('lpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end

return val
LUA;
}
}
28 changes: 28 additions & 0 deletions pkg/redis/PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ public function __construct(array $config)
}
}

public function eval(string $script, array $keys = [], array $args = [])
{
try {
// mixed eval($script, $numkeys, $keyOrArg1 = null, $keyOrArgN = null)
return call_user_func_array([$this->redis, 'eval'], array_merge([$script, count($keys)], $keys, $args));
} catch (PRedisServerException $e) {
throw new ServerException('eval command has failed', null, $e);
}
}

public function zadd(string $key, string $value, float $score): int
{
try {
return $this->redis->zadd($key, [$value => $score]);
} catch (PRedisServerException $e) {
throw new ServerException('zadd command has failed', null, $e);
}
}

public function zrem(string $key, string $value): int
{
try {
return $this->redis->zrem($key, [$value]);
} catch (PRedisServerException $e) {
throw new ServerException('zrem command has failed', null, $e);
}
}

public function lpush(string $key, string $value): int
{
try {
Expand Down
27 changes: 27 additions & 0 deletions pkg/redis/PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,33 @@ public function __construct(array $config)
$this->config = $config;
}

public function eval(string $script, array $keys = [], array $args = [])
{
try {
return $this->redis->eval($script, array_merge($keys, $args), count($keys));
} catch (\RedisException $e) {
throw new ServerException('eval command has failed', null, $e);
}
}

public function zadd(string $key, string $value, float $score): int
{
try {
return $this->redis->zAdd($key, $score, $value);
} catch (\RedisException $e) {
throw new ServerException('zadd command has failed', null, $e);
}
}

public function zrem(string $key, string $value): int
{
try {
return $this->redis->zRem($key, $value);
} catch (\RedisException $e) {
throw new ServerException('zrem command has failed', null, $e);
}
}

public function lpush(string $key, string $value): int
{
try {
Expand Down
32 changes: 32 additions & 0 deletions pkg/redis/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,38 @@

interface Redis
{
/**
* @param string $script
* @param array $keys
* @param array $args
*
* @throws ServerException
*
* @return mixed
*/
public function eval(string $script, array $keys = [], array $args = []);

/**
* @param string $key
* @param string $value
* @param float $score
*
* @throws ServerException
*
* @return int
*/
public function zadd(string $key, string $value, float $score): int;

/**
* @param string $key
* @param string $value
*
* @throws ServerException
*
* @return int
*/
public function zrem(string $key, string $value): int;

/**
* @param string $key
* @param string $value
Expand Down
7 changes: 5 additions & 2 deletions pkg/redis/RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class RedisConnectionFactory implements ConnectionFactory
* 'read_write_timeout' => Timeout (expressed in seconds) used when performing read or write operations on the underlying network resource after which an exception is thrown.
* 'predis_options' => An array of predis specific options.
* 'ssl' => could be any of http://fi2.php.net/manual/en/context.ssl.php#refsect1-context.ssl-options
* 'redelivery_delay' => Default 300 sec. Returns back message into the queue if message was not acknowledged or rejected after this delay.
* It could happen if consumer has failed with fatal error or even if message processing is slow and takes more than this time.
* ].
*
* or
Expand Down Expand Up @@ -85,10 +87,10 @@ public function createContext(): Context
if ($this->config['lazy']) {
return new RedisContext(function () {
return $this->createRedis();
});
}, $this->config['redelivery_delay']);
}

return new RedisContext($this->createRedis());
return new RedisContext($this->createRedis(), $this->config['redelivery_delay']);
}

private function createRedis(): Redis
Expand Down Expand Up @@ -158,6 +160,7 @@ private function defaultConfig(): array
'read_write_timeout' => null,
'predis_options' => null,
'ssl' => null,
'redelivery_delay' => 300,
];
}
}
Loading