Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

[feature] PCNTL signals into soft-close of connections for Redis horizontal replication #523

Merged
merged 8 commits into from
Sep 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
"orchestra/database": "^4.0|^5.0|^6.0",
"phpunit/phpunit": "^8.0|^9.0"
},
"suggest": {
"ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown."
},
"autoload": {
"psr-4": {
"BeyondCode\\LaravelWebSockets\\": "src/"
Expand Down
73 changes: 73 additions & 0 deletions src/ChannelManagers/LocalChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
*/
protected $users = [];

/**
* Wether the current instance accepts new connections.
*
* @var bool
*/
protected $acceptsNewConnections = true;

/**
* Create a new channel manager instance.
*
Expand Down Expand Up @@ -71,6 +78,28 @@ public function findOrCreate($appId, string $channel)
return $this->channels[$appId][$channel];
}

/**
* Get the local connections, regardless of the channel
* they are connected to.
*
* @return \React\Promise\PromiseInterface
*/
public function getLocalConnections(): PromiseInterface
{
$connections = collect($this->channels)
->map(function ($channelsWithConnections, $appId) {
return collect($channelsWithConnections)->values();
})
->values()->collapse()
->map(function ($channel) {
return collect($channel->getConnections());
})
->values()->collapse()
->toArray();

return new FulfilledPromise($connections);
}

/**
* Get all channels for a specific app
* for the current instance.
Expand Down Expand Up @@ -313,6 +342,50 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
return new FulfilledPromise($results);
}

/**
* Keep tracking the connections availability when they pong.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
*/
public function connectionPonged(ConnectionInterface $connection): bool
{
return true;
}

/**
* Remove the obsolete connections that didn't ponged in a while.
*
* @return bool
*/
public function removeObsoleteConnections(): bool
{
return true;
}

/**
* Mark the current instance as unable to accept new connections.
*
* @return $this
*/
public function declineNewConnections()
{
$this->acceptsNewConnections = false;

return $this;
}

/**
* Check if the current server instance
* accepts new connections.
*
* @return bool
*/
public function acceptsNewConnections(): bool
{
return $this->acceptsNewConnections;
}

/**
* Get the channel class by the channel name.
*
Expand Down
177 changes: 167 additions & 10 deletions src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
namespace BeyondCode\LaravelWebSockets\ChannelManagers;

use BeyondCode\LaravelWebSockets\Channels\Channel;
use BeyondCode\LaravelWebSockets\Helpers;
use BeyondCode\LaravelWebSockets\Server\MockableConnection;
use Carbon\Carbon;
use Clue\React\Redis\Client;
use Clue\React\Redis\Factory;
use Illuminate\Cache\RedisLock;
use Illuminate\Support\Facades\Redis;
use Illuminate\Support\Str;
use Ratchet\ConnectionInterface;
use React\EventLoop\LoopInterface;
Expand Down Expand Up @@ -41,6 +46,21 @@ class RedisChannelManager extends LocalChannelManager
*/
protected $subscribeClient;

/**
* The Redis manager instance.
*
* @var \Illuminate\Redis\RedisManager
*/
protected $redis;

/**
* The lock name to use on Redis to avoid multiple
* actions that might lead to multiple processings.
*
* @var string
*/
protected static $redisLockName = 'laravel-websockets:channel-manager:lock';

/**
* Create a new channel manager instance.
*
Expand All @@ -52,6 +72,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
{
$this->loop = $loop;

$this->redis = Redis::connection(
config('websockets.replication.modes.redis.connection', 'default')
);

$connectionUri = $this->getConnectionUri();

$factoryClass = $factoryClass ?: Factory::class;
Expand All @@ -67,6 +91,17 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
$this->serverId = Str::uuid()->toString();
}

/**
* Get the local connections, regardless of the channel
* they are connected to.
*
* @return \React\Promise\PromiseInterface
*/
public function getLocalConnections(): PromiseInterface
{
return parent::getLocalConnections();
}

/**
* Get all channels for a specific app
* for the current instance.
Expand Down Expand Up @@ -108,9 +143,9 @@ public function unsubscribeFromAllChannels(ConnectionInterface $connection)
$connection, $channel, new stdClass
);
}
})->then(function () use ($connection) {
parent::unsubscribeFromAllChannels($connection);
});

parent::unsubscribeFromAllChannels($connection);
}

/**
Expand All @@ -130,6 +165,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
}
});

$this->addConnectionToSet($connection);

$this->addChannelToSet(
$connection->app->id, $channelName
);
Expand All @@ -156,8 +193,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
if ($count === 0) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);

$this->removeUserData(
$connection->app->id, $channelName, $connection->socketId
);

$this->removeChannelFromSet($connection->app->id, $channelName);

$this->removeConnectionFromSet($connection);

return;
}

Expand All @@ -168,7 +211,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
if ($count < 1) {
$this->unsubscribeFromTopic($connection->app->id, $channelName);

$this->removeUserData(
$connection->app->id, $channelName, $connection->socketId
);

$this->removeChannelFromSet($connection->app->id, $channelName);

$this->removeConnectionFromSet($connection);
}
});
});
Expand Down Expand Up @@ -293,12 +342,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
{
return $this->publishClient
->hgetall($this->getRedisKey($appId, $channel, ['users']))
->then(function ($members) {
[$keys, $values] = collect($members)->partition(function ($value, $key) {
return $key % 2 === 0;
});

return collect(array_combine($keys->all(), $values->all()))
->then(function ($list) {
return collect(Helpers::redisListToArray($list))
->map(function ($user) {
return json_decode($user);
})
Expand Down Expand Up @@ -344,6 +389,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
});
}

/**
* Keep tracking the connections availability when they pong.
*
* @param \Ratchet\ConnectionInterface $connection
* @return bool
*/
public function connectionPonged(ConnectionInterface $connection): bool
{
// This will update the score with the current timestamp.
$this->addConnectionToSet($connection);

return parent::connectionPonged($connection);
}

/**
* Remove the obsolete connections that didn't ponged in a while.
*
* @return bool
*/
public function removeObsoleteConnections(): bool
{
$this->lock()->get(function () {
$this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U'))
->then(function ($connections) {
foreach ($connections as $connection => $score) {
[$appId, $socketId] = explode(':', $connection);

$this->unsubscribeFromAllChannels(
$this->fakeConnectionForApp($appId, $socketId)
);
}
});
});

return parent::removeObsoleteConnections();
}

/**
* Handle a message received from Redis on a specific channel.
*
Expand Down Expand Up @@ -462,6 +544,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1);
}

/**
* Add the connection to the sorted list.
*
* @param \Ratchet\ConnectionInterface $connection
* @param \DateTime|string|null $moment
* @return void
*/
public function addConnectionToSet(ConnectionInterface $connection, $moment = null)
{
$this->getPublishClient()
->zadd(
$this->getRedisKey(null, null, ['sockets']),
Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}"
);
}

/**
* Remove the connection from the sorted list.
*
* @param \Ratchet\ConnectionInterface $connection
* @return void
*/
public function removeConnectionFromSet(ConnectionInterface $connection)
{
$this->getPublishClient()
->zrem(
$this->getRedisKey(null, null, ['sockets']),
"{$connection->app->id}:{$connection->socketId}"
);
}

/**
* Get the connections from the sorted list, with last
* connection between certain timestamps.
*
* @param int $start
* @param int $stop
* @return PromiseInterface
*/
public function getConnectionsFromSet(int $start = 0, int $stop = 0)
{
return $this->getPublishClient()
->zrange(
$this->getRedisKey(null, null, ['sockets']),
$start, $stop, 'withscores'
)
->then(function ($list) {
return Helpers::redisListToArray($list);
});
}

/**
* Add a channel to the set list.
*
Expand Down Expand Up @@ -555,11 +688,11 @@ public function unsubscribeFromTopic($appId, string $channel = null)
* Get the Redis Keyspace name to handle subscriptions
* and other key-value sets.
*
* @param mixed $appId
* @param string|int|null $appId
* @param string|null $channel
* @return string
*/
public function getRedisKey($appId, string $channel = null, array $suffixes = []): string
public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string
{
$prefix = config('database.redis.options.prefix', null);

Expand All @@ -577,4 +710,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = []

return $hash;
}

/**
* Get a new RedisLock instance to avoid race conditions.
*
* @return \Illuminate\Cache\CacheLock
*/
protected function lock()
{
return new RedisLock($this->redis, static::$redisLockName, 0);
}

/**
* Create a fake connection for app that will mimick a connection
* by app ID and Socket ID to be able to be passed to the methods
* that accepts a connection class.
*
* @param string|int $appId
* @param string $socketId
* @return ConnectionInterface
*/
public function fakeConnectionForApp($appId, string $socketId)
{
return new MockableConnection($appId, $socketId);
}
}
Loading