Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit c078e5a

Browse files
authored
Merge pull request #497 from beyondcode/fix/app-connections-count
[fix] Redis connection counter didn't work properly
2 parents 06c898a + 5ba24cb commit c078e5a

25 files changed

+879
-157
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
"illuminate/routing": "^6.0|^7.0",
3737
"illuminate/support": "^6.0|^7.0",
3838
"pusher/pusher-php-server": "^3.0|^4.0",
39-
"react/dns": "^1.1",
39+
"react/promise": "^2.0",
4040
"symfony/http-kernel": "^4.0|^5.0",
4141
"symfony/psr-http-message-bridge": "^1.1|^2.0"
4242
},
4343
"require-dev": {
44+
"clue/block-react": "^1.4",
4445
"mockery/mockery": "^1.3",
4546
"orchestra/testbench-browser-kit": "^4.0|^5.0",
4647
"phpunit/phpunit": "^8.0|^9.0"

config/websockets.php

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,6 @@
4242

4343
'app' => \BeyondCode\LaravelWebSockets\Apps\ConfigAppManager::class,
4444

45-
/*
46-
|--------------------------------------------------------------------------
47-
| Channel Manager
48-
|--------------------------------------------------------------------------
49-
|
50-
| When users subscribe or unsubscribe from specific channels,
51-
| the connections are stored to keep track of any interaction with the
52-
| WebSocket server.
53-
| You can however add your own implementation that will help the store
54-
| of the channels alongside their connections.
55-
|
56-
*/
57-
58-
'channel' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
59-
6045
],
6146

6247
/*
@@ -191,6 +176,8 @@
191176

192177
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\MemoryStatisticsLogger::class,
193178

179+
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\ArrayChannelManager::class,
180+
194181
],
195182

196183
/*
@@ -214,6 +201,8 @@
214201

215202
'statistics_logger' => \BeyondCode\LaravelWebSockets\Statistics\Logger\RedisStatisticsLogger::class,
216203

204+
'channel_manager' => \BeyondCode\LaravelWebSockets\WebSockets\Channels\ChannelManagers\RedisChannelManager::class,
205+
217206
],
218207

219208
],

src/PubSub/Drivers/LocalClient.php

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,28 @@ public function unsubscribe($appId, string $channel): bool
6666
return true;
6767
}
6868

69+
/**
70+
* Subscribe to the app's pubsub keyspace.
71+
*
72+
* @param mixed $appId
73+
* @return bool
74+
*/
75+
public function subscribeToApp($appId): bool
76+
{
77+
return true;
78+
}
79+
80+
/**
81+
* Unsubscribe from the app's pubsub keyspace.
82+
*
83+
* @param mixed $appId
84+
* @return bool
85+
*/
86+
public function unsubscribeFromApp($appId): bool
87+
{
88+
return true;
89+
}
90+
6991
/**
7092
* Add a member to a channel. To be called when they have
7193
* subscribed to the channel.
@@ -137,4 +159,26 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
137159

138160
return new FulfilledPromise($results);
139161
}
162+
163+
/**
164+
* Get the amount of unique connections.
165+
*
166+
* @param mixed $appId
167+
* @return null|int
168+
*/
169+
public function getLocalConnectionsCount($appId)
170+
{
171+
return null;
172+
}
173+
174+
/**
175+
* Get the amount of connections aggregated on multiple instances.
176+
*
177+
* @param mixed $appId
178+
* @return null|int|\React\Promise\PromiseInterface
179+
*/
180+
public function getGlobalConnectionsCount($appId)
181+
{
182+
return null;
183+
}
140184
}

src/PubSub/Drivers/RedisClient.php

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,36 @@ public function unsubscribe($appId, string $channel): bool
175175
return true;
176176
}
177177

178+
/**
179+
* Subscribe to the app's pubsub keyspace.
180+
*
181+
* @param mixed $appId
182+
* @return bool
183+
*/
184+
public function subscribeToApp($appId): bool
185+
{
186+
$this->subscribeClient->__call('subscribe', [$this->getTopicName($appId)]);
187+
188+
$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', 1]);
189+
190+
return true;
191+
}
192+
193+
/**
194+
* Unsubscribe from the app's pubsub keyspace.
195+
*
196+
* @param mixed $appId
197+
* @return bool
198+
*/
199+
public function unsubscribeFromApp($appId): bool
200+
{
201+
$this->subscribeClient->__call('unsubscribe', [$this->getTopicName($appId)]);
202+
203+
$this->publishClient->__call('hincrby', [$this->getTopicName($appId), 'connections', -1]);
204+
205+
return true;
206+
}
207+
178208
/**
179209
* Add a member to a channel. To be called when they have
180210
* subscribed to the channel.
@@ -258,6 +288,17 @@ public function channelMemberCounts($appId, array $channelNames): PromiseInterfa
258288
});
259289
}
260290

291+
/**
292+
* Get the amount of connections aggregated on multiple instances.
293+
*
294+
* @param mixed $appId
295+
* @return null|int|\React\Promise\PromiseInterface
296+
*/
297+
public function getGlobalConnectionsCount($appId)
298+
{
299+
return $this->publishClient->hget($this->getTopicName($appId), 'connections');
300+
}
301+
261302
/**
262303
* Handle a message received from Redis on a specific channel.
263304
*
@@ -321,8 +362,8 @@ public function onMessage(string $redisChannel, string $payload)
321362
*/
322363
protected function getConnectionUri()
323364
{
324-
$name = config('websockets.replication.redis.connection') ?: 'default';
325-
$config = config('database.redis')[$name];
365+
$name = config('websockets.replication.redis.connection', 'default');
366+
$config = config("database.redis.{$name}");
326367

327368
$host = $config['host'];
328369
$port = $config['port'] ?: 6379;
@@ -377,13 +418,19 @@ public function getServerId()
377418
* app ID and channel name.
378419
*
379420
* @param mixed $appId
380-
* @param string $channel
421+
* @param string|null $channel
381422
* @return string
382423
*/
383-
protected function getTopicName($appId, string $channel): string
424+
protected function getTopicName($appId, string $channel = null): string
384425
{
385426
$prefix = config('database.redis.options.prefix', null);
386427

387-
return "{$prefix}{$appId}:{$channel}";
428+
$hash = "{$prefix}{$appId}";
429+
430+
if ($channel) {
431+
$hash .= ":{$channel}";
432+
}
433+
434+
return $hash;
388435
}
389436
}

src/PubSub/ReplicationInterface.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,22 @@ public function subscribe($appId, string $channel): bool;
4545
*/
4646
public function unsubscribe($appId, string $channel): bool;
4747

48+
/**
49+
* Subscribe to the app's pubsub keyspace.
50+
*
51+
* @param mixed $appId
52+
* @return bool
53+
*/
54+
public function subscribeToApp($appId): bool;
55+
56+
/**
57+
* Unsubscribe from the app's pubsub keyspace.
58+
*
59+
* @param mixed $appId
60+
* @return bool
61+
*/
62+
public function unsubscribeFromApp($appId): bool;
63+
4864
/**
4965
* Add a member to a channel. To be called when they have
5066
* subscribed to the channel.
@@ -85,4 +101,20 @@ public function channelMembers($appId, string $channel): PromiseInterface;
85101
* @return PromiseInterface
86102
*/
87103
public function channelMemberCounts($appId, array $channelNames): PromiseInterface;
104+
105+
/**
106+
* Get the amount of unique connections.
107+
*
108+
* @param mixed $appId
109+
* @return null|int
110+
*/
111+
public function getLocalConnectionsCount($appId);
112+
113+
/**
114+
* Get the amount of connections aggregated on multiple instances.
115+
*
116+
* @param mixed $appId
117+
* @return null|int|\React\Promise\PromiseInterface
118+
*/
119+
public function getGlobalConnectionsCount($appId);
88120
}

src/Statistics/Logger/MemoryStatisticsLogger.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public function save()
105105

106106
$this->createRecord($statistic, $appId);
107107

108-
$currentConnectionCount = $this->channelManager->getConnectionCount($appId);
108+
$currentConnectionCount = $this->channelManager->getGlobalConnectionsCount($appId);
109109

110110
$statistic->reset($currentConnectionCount);
111111
}

0 commit comments

Comments
 (0)