diff --git a/composer.json b/composer.json index dc8968a652..9df2218f37 100644 --- a/composer.json +++ b/composer.json @@ -56,6 +56,9 @@ "orchestra/database": "^4.0|^5.0|^6.0", "phpunit/phpunit": "^8.0|^9.0" }, + "suggest": { + "ext-pcntl": "Running the server needs pcntl to listen to command signals and soft-shutdown." + }, "autoload": { "psr-4": { "BeyondCode\\LaravelWebSockets\\": "src/" diff --git a/src/ChannelManagers/LocalChannelManager.php b/src/ChannelManagers/LocalChannelManager.php index 2b8150cf1f..7ff689bf95 100644 --- a/src/ChannelManagers/LocalChannelManager.php +++ b/src/ChannelManagers/LocalChannelManager.php @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager */ protected $users = []; + /** + * Wether the current instance accepts new connections. + * + * @var bool + */ + protected $acceptsNewConnections = true; + /** * Create a new channel manager instance. * @@ -71,6 +78,28 @@ public function findOrCreate($appId, string $channel) return $this->channels[$appId][$channel]; } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + $connections = collect($this->channels) + ->map(function ($channelsWithConnections, $appId) { + return collect($channelsWithConnections)->values(); + }) + ->values()->collapse() + ->map(function ($channel) { + return collect($channel->getConnections()); + }) + ->values()->collapse() + ->toArray(); + + return new FulfilledPromise($connections); + } + /** * Get all channels for a specific app * for the current instance. @@ -313,6 +342,50 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt return new FulfilledPromise($results); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + return true; + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + return true; + } + + /** + * Mark the current instance as unable to accept new connections. + * + * @return $this + */ + public function declineNewConnections() + { + $this->acceptsNewConnections = false; + + return $this; + } + + /** + * Check if the current server instance + * accepts new connections. + * + * @return bool + */ + public function acceptsNewConnections(): bool + { + return $this->acceptsNewConnections; + } + /** * Get the channel class by the channel name. * diff --git a/src/ChannelManagers/RedisChannelManager.php b/src/ChannelManagers/RedisChannelManager.php index 0d884b3acd..ee8ce768ee 100644 --- a/src/ChannelManagers/RedisChannelManager.php +++ b/src/ChannelManagers/RedisChannelManager.php @@ -3,8 +3,13 @@ namespace BeyondCode\LaravelWebSockets\ChannelManagers; use BeyondCode\LaravelWebSockets\Channels\Channel; +use BeyondCode\LaravelWebSockets\Helpers; +use BeyondCode\LaravelWebSockets\Server\MockableConnection; +use Carbon\Carbon; use Clue\React\Redis\Client; use Clue\React\Redis\Factory; +use Illuminate\Cache\RedisLock; +use Illuminate\Support\Facades\Redis; use Illuminate\Support\Str; use Ratchet\ConnectionInterface; use React\EventLoop\LoopInterface; @@ -41,6 +46,21 @@ class RedisChannelManager extends LocalChannelManager */ protected $subscribeClient; + /** + * The Redis manager instance. + * + * @var \Illuminate\Redis\RedisManager + */ + protected $redis; + + /** + * The lock name to use on Redis to avoid multiple + * actions that might lead to multiple processings. + * + * @var string + */ + protected static $redisLockName = 'laravel-websockets:channel-manager:lock'; + /** * Create a new channel manager instance. * @@ -52,6 +72,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null) { $this->loop = $loop; + $this->redis = Redis::connection( + config('websockets.replication.modes.redis.connection', 'default') + ); + $connectionUri = $this->getConnectionUri(); $factoryClass = $factoryClass ?: Factory::class; @@ -67,6 +91,17 @@ public function __construct(LoopInterface $loop, $factoryClass = null) $this->serverId = Str::uuid()->toString(); } + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface + { + return parent::getLocalConnections(); + } + /** * Get all channels for a specific app * for the current instance. @@ -108,9 +143,9 @@ public function unsubscribeFromAllChannels(ConnectionInterface $connection) $connection, $channel, new stdClass ); } + })->then(function () use ($connection) { + parent::unsubscribeFromAllChannels($connection); }); - - parent::unsubscribeFromAllChannels($connection); } /** @@ -130,6 +165,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan } }); + $this->addConnectionToSet($connection); + $this->addChannelToSet( $connection->app->id, $channelName ); @@ -156,8 +193,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $ if ($count === 0) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + $this->removeConnectionFromSet($connection); + return; } @@ -168,7 +211,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $ if ($count < 1) { $this->unsubscribeFromTopic($connection->app->id, $channelName); + $this->removeUserData( + $connection->app->id, $channelName, $connection->socketId + ); + $this->removeChannelFromSet($connection->app->id, $channelName); + + $this->removeConnectionFromSet($connection); } }); }); @@ -293,12 +342,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface { return $this->publishClient ->hgetall($this->getRedisKey($appId, $channel, ['users'])) - ->then(function ($members) { - [$keys, $values] = collect($members)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return collect(array_combine($keys->all(), $values->all())) + ->then(function ($list) { + return collect(Helpers::redisListToArray($list)) ->map(function ($user) { return json_decode($user); }) @@ -344,6 +389,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt }); } + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool + { + // This will update the score with the current timestamp. + $this->addConnectionToSet($connection); + + return parent::connectionPonged($connection); + } + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool + { + $this->lock()->get(function () { + $this->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($connections) { + foreach ($connections as $connection => $score) { + [$appId, $socketId] = explode(':', $connection); + + $this->unsubscribeFromAllChannels( + $this->fakeConnectionForApp($appId, $socketId) + ); + } + }); + }); + + return parent::removeObsoleteConnections(); + } + /** * Handle a message received from Redis on a specific channel. * @@ -462,6 +544,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int return $this->incrementSubscriptionsCount($appId, $channel, $increment * -1); } + /** + * Add the connection to the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @param \DateTime|string|null $moment + * @return void + */ + public function addConnectionToSet(ConnectionInterface $connection, $moment = null) + { + $this->getPublishClient() + ->zadd( + $this->getRedisKey(null, null, ['sockets']), + Carbon::parse($moment)->format('U'), "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Remove the connection from the sorted list. + * + * @param \Ratchet\ConnectionInterface $connection + * @return void + */ + public function removeConnectionFromSet(ConnectionInterface $connection) + { + $this->getPublishClient() + ->zrem( + $this->getRedisKey(null, null, ['sockets']), + "{$connection->app->id}:{$connection->socketId}" + ); + } + + /** + * Get the connections from the sorted list, with last + * connection between certain timestamps. + * + * @param int $start + * @param int $stop + * @return PromiseInterface + */ + public function getConnectionsFromSet(int $start = 0, int $stop = 0) + { + return $this->getPublishClient() + ->zrange( + $this->getRedisKey(null, null, ['sockets']), + $start, $stop, 'withscores' + ) + ->then(function ($list) { + return Helpers::redisListToArray($list); + }); + } + /** * Add a channel to the set list. * @@ -555,11 +688,11 @@ public function unsubscribeFromTopic($appId, string $channel = null) * Get the Redis Keyspace name to handle subscriptions * and other key-value sets. * - * @param mixed $appId + * @param string|int|null $appId * @param string|null $channel * @return string */ - public function getRedisKey($appId, string $channel = null, array $suffixes = []): string + public function getRedisKey($appId = null, string $channel = null, array $suffixes = []): string { $prefix = config('database.redis.options.prefix', null); @@ -577,4 +710,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = [] return $hash; } + + /** + * Get a new RedisLock instance to avoid race conditions. + * + * @return \Illuminate\Cache\CacheLock + */ + protected function lock() + { + return new RedisLock($this->redis, static::$redisLockName, 0); + } + + /** + * Create a fake connection for app that will mimick a connection + * by app ID and Socket ID to be able to be passed to the methods + * that accepts a connection class. + * + * @param string|int $appId + * @param string $socketId + * @return ConnectionInterface + */ + public function fakeConnectionForApp($appId, string $socketId) + { + return new MockableConnection($appId, $socketId); + } } diff --git a/src/Console/Commands/StartServer.php b/src/Console/Commands/StartServer.php index 664d6a92b0..890a4f1ff5 100644 --- a/src/Console/Commands/StartServer.php +++ b/src/Console/Commands/StartServer.php @@ -26,7 +26,7 @@ class StartServer extends Command {--disable-statistics : Disable the statistics tracking.} {--statistics-interval= : The amount of seconds to tick between statistics saving.} {--debug : Forces the loggers to be enabled and thereby overriding the APP_DEBUG setting.} - {--test : Prepare the server, but do not start it.} + {--loop : Programatically inject the loop.} '; /** @@ -79,6 +79,10 @@ public function handle() $this->configureRoutes(); + $this->configurePcntlSignal(); + + $this->configurePongTracker(); + $this->startServer(); } @@ -141,7 +145,7 @@ public function configureRestartTimer() $this->loop->addPeriodicTimer(10, function () { if ($this->getLastRestart() !== $this->lastRestart) { - $this->loop->stop(); + $this->triggerSoftShutdown(); } }); } @@ -156,6 +160,46 @@ protected function configureRoutes() WebSocketRouter::routes(); } + /** + * Configure the PCNTL signals for soft shutdown. + * + * @return void + */ + protected function configurePcntlSignal() + { + // When the process receives a SIGTERM or a SIGINT + // signal, it should mark the server as unavailable + // to receive new connections, close the current connections, + // then stopping the loop. + + $this->loop->addSignal(SIGTERM, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + + $this->loop->addSignal(SIGINT, function () { + $this->line('Closing existing connections...'); + + $this->triggerSoftShutdown(); + }); + } + + /** + * Configure the tracker that will delete + * from the store the connections that. + * + * @return void + */ + protected function configurePongTracker() + { + $this->loop->addPeriodicTimer(10, function () { + $this->laravel + ->make(ChannelManager::class) + ->removeObsoleteConnections(); + }); + } + /** * Configure the HTTP logger class. * @@ -209,14 +253,6 @@ protected function startServer() $this->buildServer(); - // For testing, just boot up the server, run it - // but exit after the next tick. - if ($this->option('test')) { - $this->loop->futureTick(function () { - $this->loop->stop(); - }); - } - $this->server->run(); } @@ -231,6 +267,10 @@ protected function buildServer() $this->option('host'), $this->option('port') ); + if ($loop = $this->option('loop')) { + $this->loop = $loop; + } + $this->server = $this->server ->setLoop($this->loop) ->withRoutes(WebSocketRouter::getRoutes()) @@ -249,4 +289,29 @@ protected function getLastRestart() 'beyondcode:websockets:restart', 0 ); } + + /** + * Trigger a soft shutdown for the process. + * + * @return void + */ + protected function triggerSoftShutdown() + { + $channelManager = $this->laravel->make(ChannelManager::class); + + // Close the new connections allowance on this server. + $channelManager->declineNewConnections(); + + // Get all local connections and close them. They will + // be automatically be unsubscribed from all channels. + $channelManager->getLocalConnections() + ->then(function ($connections) { + foreach ($connections as $connection) { + $connection->close(); + } + }) + ->then(function () { + $this->loop->stop(); + }); + } } diff --git a/src/Contracts/ChannelManager.php b/src/Contracts/ChannelManager.php index e056e11474..35d5baf6c6 100644 --- a/src/Contracts/ChannelManager.php +++ b/src/Contracts/ChannelManager.php @@ -36,6 +36,14 @@ public function find($appId, string $channel); */ public function findOrCreate($appId, string $channel); + /** + * Get the local connections, regardless of the channel + * they are connected to. + * + * @return \React\Promise\PromiseInterface + */ + public function getLocalConnections(): PromiseInterface; + /** * Get all channels for a specific app * for the current instance. @@ -177,4 +185,19 @@ public function getChannelMember(ConnectionInterface $connection, string $channe * @return \React\Promise\PromiseInterface */ public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface; + + /** + * Keep tracking the connections availability when they pong. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + public function connectionPonged(ConnectionInterface $connection): bool; + + /** + * Remove the obsolete connections that didn't ponged in a while. + * + * @return bool + */ + public function removeObsoleteConnections(): bool; } diff --git a/src/Helpers.php b/src/Helpers.php new file mode 100644 index 0000000000..73545458ff --- /dev/null +++ b/src/Helpers.php @@ -0,0 +1,26 @@ + value array. + [$keys, $values] = collect($list)->partition(function ($value, $key) { + return $key % 2 === 0; + }); + + return array_combine($keys->all(), $values->all()); + } +} diff --git a/src/Server/Messages/PusherChannelProtocolMessage.php b/src/Server/Messages/PusherChannelProtocolMessage.php index 14dea23010..d70934b6dd 100644 --- a/src/Server/Messages/PusherChannelProtocolMessage.php +++ b/src/Server/Messages/PusherChannelProtocolMessage.php @@ -34,6 +34,8 @@ protected function ping(ConnectionInterface $connection) $connection->send(json_encode([ 'event' => 'pusher:pong', ])); + + $this->channelManager->connectionPonged($connection); } /** diff --git a/src/Server/MockableConnection.php b/src/Server/MockableConnection.php new file mode 100644 index 0000000000..4d6d8f7e40 --- /dev/null +++ b/src/Server/MockableConnection.php @@ -0,0 +1,45 @@ +app = new stdClass; + + $this->app->id = $appId; + $this->socketId = $socketId; + } + + /** + * Send data to the connection. + * + * @param string $data + * @return \Ratchet\ConnectionInterface + */ + public function send($data) + { + // + } + + /** + * Close the connection. + * + * @return void + */ + public function close() + { + // + } +} diff --git a/src/Server/WebSocketHandler.php b/src/Server/WebSocketHandler.php index 1016a1aa0e..0dbe8bea58 100644 --- a/src/Server/WebSocketHandler.php +++ b/src/Server/WebSocketHandler.php @@ -39,6 +39,10 @@ public function __construct(ChannelManager $channelManager) */ public function onOpen(ConnectionInterface $connection) { + if (! $this->connectionCanBeMade($connection)) { + return $connection->close(); + } + $this->verifyAppKey($connection) ->verifyOrigin($connection) ->limitConcurrentConnections($connection) @@ -69,6 +73,10 @@ public function onOpen(ConnectionInterface $connection) */ public function onMessage(ConnectionInterface $connection, MessageInterface $message) { + if (! isset($connection->app)) { + return; + } + Messages\PusherMessageFactory::createForMessage( $message, $connection, $this->channelManager )->respond(); @@ -113,6 +121,18 @@ public function onError(ConnectionInterface $connection, Exception $exception) } } + /** + * Check if the connection can be made for the + * current server instance. + * + * @param \Ratchet\ConnectionInterface $connection + * @return bool + */ + protected function connectionCanBeMade(ConnectionInterface $connection): bool + { + return $this->channelManager->acceptsNewConnections(); + } + /** * Verify the app key validity. * diff --git a/src/Statistics/Collectors/RedisCollector.php b/src/Statistics/Collectors/RedisCollector.php index 7b845b59e9..f7b5074c34 100644 --- a/src/Statistics/Collectors/RedisCollector.php +++ b/src/Statistics/Collectors/RedisCollector.php @@ -2,6 +2,7 @@ namespace BeyondCode\LaravelWebSockets\Statistics\Collectors; +use BeyondCode\LaravelWebSockets\Helpers; use BeyondCode\LaravelWebSockets\Statistics\Statistic; use Illuminate\Cache\RedisLock; use Illuminate\Support\Facades\Redis; @@ -30,7 +31,7 @@ class RedisCollector extends MemoryCollector * * @var string */ - protected static $redisLockName = 'laravel-websockets:lock'; + protected static $redisLockName = 'laravel-websockets:collector:lock'; /** * Initialize the logger. @@ -178,7 +179,7 @@ public function save() } $statistic = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); $this->createRecord($statistic, $appId); @@ -229,7 +230,7 @@ public function getStatistics(): PromiseInterface ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId, &$appsWithStatistics) { $appsWithStatistics[$appId] = $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -251,7 +252,7 @@ public function getAppStatistics($appId): PromiseInterface ->hgetall($this->channelManager->getRedisKey($appId, null, ['stats'])) ->then(function ($list) use ($appId) { return $this->arrayToStatisticInstance( - $appId, $this->redisListToArray($list) + $appId, Helpers::redisListToArray($list) ); }); } @@ -361,26 +362,6 @@ protected function lock() return new RedisLock($this->redis, static::$redisLockName, 0); } - /** - * Transform the Redis' list of key after value - * to key-value pairs. - * - * @param array $list - * @return array - */ - protected function redisListToArray(array $list) - { - // Redis lists come into a format where the keys are on even indexes - // and the values are on odd indexes. This way, we know which - // ones are keys and which ones are values and their get combined - // later to form the key => value array. - [$keys, $values] = collect($list)->partition(function ($value, $key) { - return $key % 2 === 0; - }); - - return array_combine($keys->all(), $values->all()); - } - /** * Transform a key-value pair to a Statistic instance. * diff --git a/tests/Commands/StartServerTest.php b/tests/Commands/StartServerTest.php index 223331c22d..08f71a3083 100644 --- a/tests/Commands/StartServerTest.php +++ b/tests/Commands/StartServerTest.php @@ -8,7 +8,43 @@ class StartServerTest extends TestCase { public function test_does_not_fail_if_building_up() { - $this->artisan('websockets:serve', ['--test' => true, '--debug' => true]); + $this->loop->futureTick(function () { + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6001]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigint_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGINT); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6002]); + + $this->assertTrue(true); + } + + public function test_pcntl_sigterm_signal() + { + $this->loop->futureTick(function () { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel']); + + posix_kill(posix_getpid(), SIGTERM); + + $this->loop->stop(); + }); + + $this->artisan('websockets:serve', ['--loop' => $this->loop, '--debug' => true, '--port' => 6003]); $this->assertTrue(true); } diff --git a/tests/ConnectionTest.php b/tests/ConnectionTest.php index e4e37016d5..2e4f2ed0d2 100644 --- a/tests/ConnectionTest.php +++ b/tests/ConnectionTest.php @@ -108,4 +108,21 @@ public function test_capacity_limit() ->assertSentEvent('pusher:error', ['data' => ['message' => 'Over capacity', 'code' => 4100]]) ->assertClosed(); } + + public function test_close_all_new_connections_after_stating_the_server_does_not_accept_new_connections() + { + $this->newActiveConnection(['test-channel']) + ->assertSentEvent('pusher:connection_established') + ->assertSentEvent('pusher_internal:subscription_succeeded'); + + $this->channelManager->declineNewConnections(); + + $this->assertFalse( + $this->channelManager->acceptsNewConnections() + ); + + $this->newActiveConnection(['test-channel']) + ->assertNothingSent() + ->assertClosed(); + } } diff --git a/tests/Mocks/Connection.php b/tests/Mocks/Connection.php index 8de4a7b981..42d02c0732 100644 --- a/tests/Mocks/Connection.php +++ b/tests/Mocks/Connection.php @@ -97,6 +97,18 @@ public function assertNotSentEvent(string $name) return $this; } + /** + * Assert that no events occured within the connection. + * + * @return $this + */ + public function assertNothingSent() + { + PHPUnit::assertEquals([], $this->sentData); + + return $this; + } + /** * Assert the connection is closed. * diff --git a/tests/PresenceChannelTest.php b/tests/PresenceChannelTest.php index b7d0b8aa86..9d4bbcbc90 100644 --- a/tests/PresenceChannelTest.php +++ b/tests/PresenceChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PresenceChannelTest extends TestCase { @@ -185,4 +186,22 @@ public function test_statistics_get_collected_for_presenece_channels() ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPresenceConnection('presence-channel', ['user_id' => 1]); + $this->newPresenceConnection('presence-channel-2', ['user_id' => 2]); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PrivateChannelTest.php b/tests/PrivateChannelTest.php index bfc4807f61..f28ce6d8d5 100644 --- a/tests/PrivateChannelTest.php +++ b/tests/PrivateChannelTest.php @@ -3,6 +3,7 @@ namespace BeyondCode\LaravelWebSockets\Test; use BeyondCode\LaravelWebSockets\Server\Exceptions\InvalidSignature; +use Ratchet\ConnectionInterface; class PrivateChannelTest extends TestCase { @@ -138,4 +139,22 @@ public function test_statistics_get_collected_for_private_channels() ], $statistic->toArray()); }); } + + public function test_local_connections_for_private_channels() + { + $this->newPrivateConnection('private-channel'); + $this->newPrivateConnection('private-channel-2'); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/PublicChannelTest.php b/tests/PublicChannelTest.php index 373f2f31a3..95d2f5030b 100644 --- a/tests/PublicChannelTest.php +++ b/tests/PublicChannelTest.php @@ -2,6 +2,8 @@ namespace BeyondCode\LaravelWebSockets\Test; +use Ratchet\ConnectionInterface; + class PublicChannelTest extends TestCase { public function test_connect_to_public_channel() @@ -114,4 +116,22 @@ public function test_statistics_get_collected_for_public_channels() ], $statistic->toArray()); }); } + + public function test_local_connections_for_public_channels() + { + $this->newActiveConnection(['public-channel']); + $this->newActiveConnection(['public-channel-2']); + + $this->channelManager + ->getLocalConnections() + ->then(function ($connections) { + $this->assertCount(2, $connections); + + foreach ($connections as $connection) { + $this->assertInstanceOf( + ConnectionInterface::class, $connection + ); + } + }); + } } diff --git a/tests/ReplicationTest.php b/tests/ReplicationTest.php index 00ee615489..f08c6b0ab8 100644 --- a/tests/ReplicationTest.php +++ b/tests/ReplicationTest.php @@ -32,4 +32,106 @@ public function test_events_get_replicated_across_connections() 'data' => ['channel' => 'public-channel', 'test' => 'yes'], ]); } + + public function test_not_ponged_connections_do_get_removed_for_public_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newActiveConnection(['public-channel']); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'public-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_private_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPrivateConnection('private-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + } + + public function test_not_ponged_connections_do_get_removed_for_presence_channels() + { + $this->runOnlyOnRedisReplication(); + + $connection = $this->newPresenceConnection('presence-channel'); + + // Make the connection look like it was lost 1 day ago. + $this->channelManager->addConnectionToSet($connection, now()->subDays(1)); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(1, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(1, $members); + }); + + $this->channelManager->removeObsoleteConnections(); + + $this->channelManager + ->getGlobalConnectionsCount('1234', 'private-channel') + ->then(function ($count) { + $this->assertEquals(0, $count); + }); + + $this->channelManager + ->getConnectionsFromSet(0, now()->subMinutes(2)->format('U')) + ->then(function ($expiredConnections) { + $this->assertCount(0, $expiredConnections); + }); + + $this->channelManager + ->getChannelMembers('1234', 'presence-channel') + ->then(function ($members) { + $this->assertCount(0, $members); + }); + } }