Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 40f024d

Browse files
authored
Merge pull request #536 from beyondcode/fix/multiple-channel-members
Presence Channels emit added or removed events more than once for same users
2 parents 88cf779 + af3c8e0 commit 40f024d

File tree

6 files changed

+254
-27
lines changed

6 files changed

+254
-27
lines changed

src/ChannelManagers/LocalChannelManager.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
2929
*/
3030
protected $users = [];
3131

32+
/**
33+
* The list of users by socket and their attached id.
34+
*
35+
* @var array
36+
*/
37+
protected $userSockets = [];
38+
3239
/**
3340
* Wether the current instance accepts new connections.
3441
*
@@ -273,6 +280,7 @@ public function broadcastAcrossServers($appId, string $channel, stdClass $payloa
273280
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
274281
{
275282
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
283+
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
276284
}
277285

278286
/**
@@ -287,6 +295,19 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl
287295
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
288296
{
289297
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);
298+
299+
$deletableSocketKey = array_search(
300+
$connection->socketId,
301+
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
302+
);
303+
304+
if ($deletableSocketKey !== false) {
305+
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);
306+
307+
if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
308+
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
309+
}
310+
}
290311
}
291312

292313
/**
@@ -342,6 +363,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
342363
return new FulfilledPromise($results);
343364
}
344365

366+
/**
367+
* Get the socket IDs for a presence channel member.
368+
*
369+
* @param string|int $userId
370+
* @param string|int $appId
371+
* @param string $channelName
372+
* @return \React\Promise\PromiseInterface
373+
*/
374+
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
375+
{
376+
return new FulfilledPromise(
377+
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
378+
);
379+
}
380+
345381
/**
346382
* Keep tracking the connections availability when they pong.
347383
*

src/ChannelManagers/RedisChannelManager.php

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,10 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl
313313
$this->storeUserData(
314314
$connection->app->id, $channel, $connection->socketId, json_encode($user)
315315
);
316+
317+
$this->addUserSocket(
318+
$connection->app->id, $channel, $user, $connection->socketId
319+
);
316320
}
317321

318322
/**
@@ -329,6 +333,10 @@ public function userLeftPresenceChannel(ConnectionInterface $connection, stdClas
329333
$this->removeUserData(
330334
$connection->app->id, $channel, $connection->socketId
331335
);
336+
337+
$this->removeUserSocket(
338+
$connection->app->id, $channel, $user, $connection->socketId
339+
);
332340
}
333341

334342
/**
@@ -389,6 +397,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
389397
});
390398
}
391399

400+
/**
401+
* Get the socket IDs for a presence channel member.
402+
*
403+
* @param string|int $userId
404+
* @param string|int $appId
405+
* @param string $channelName
406+
* @return \React\Promise\PromiseInterface
407+
*/
408+
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
409+
{
410+
return $this->publishClient->smembers(
411+
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
412+
);
413+
}
414+
392415
/**
393416
* Keep tracking the connections availability when they pong.
394417
*
@@ -628,7 +651,7 @@ public function removeChannelFromSet($appId, string $channel)
628651
* @param string|int $appId
629652
* @param string|null $channel
630653
* @param string $key
631-
* @param mixed $data
654+
* @param string $data
632655
* @return PromiseInterface
633656
*/
634657
public function storeUserData($appId, string $channel = null, string $key, $data)
@@ -681,6 +704,40 @@ public function unsubscribeFromTopic($appId, string $channel = null)
681704
);
682705
}
683706

707+
/**
708+
* Add the Presence Channel's User's Socket ID to a list.
709+
*
710+
* @param string|int $appId
711+
* @param string $channel
712+
* @param stdClass $user
713+
* @param string $socketId
714+
* @return void
715+
*/
716+
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId)
717+
{
718+
$this->publishClient->sadd(
719+
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
720+
$socketId
721+
);
722+
}
723+
724+
/**
725+
* Remove the Presence Channel's User's Socket ID from the list.
726+
*
727+
* @param string|int $appId
728+
* @param string $channel
729+
* @param stdClass $user
730+
* @param string $socketId
731+
* @return void
732+
*/
733+
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId)
734+
{
735+
$this->publishClient->srem(
736+
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
737+
$socketId
738+
);
739+
}
740+
684741
/**
685742
* Get the Redis Keyspace name to handle subscriptions
686743
* and other key-value sets.

src/Channels/PresenceChannel.php

Lines changed: 47 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,32 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
5555
]));
5656
});
5757

58-
$memberAddedPayload = [
59-
'event' => 'pusher_internal:member_added',
60-
'channel' => $this->getName(),
61-
'data' => $payload->channel_data,
62-
];
63-
64-
$this->broadcastToEveryoneExcept(
65-
(object) $memberAddedPayload, $connection->socketId,
66-
$connection->app->id
67-
);
58+
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
59+
// It's quite possible that a user can have multiple connections to the same channel
60+
// (for example by having multiple browser tabs open)
61+
// and in this case the events will only be triggered when the first tab is opened.
62+
$this->channelManager
63+
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
64+
->then(function ($sockets) use ($payload, $connection) {
65+
if (count($sockets) === 1) {
66+
$memberAddedPayload = [
67+
'event' => 'pusher_internal:member_added',
68+
'channel' => $this->getName(),
69+
'data' => $payload->channel_data,
70+
];
71+
72+
$this->broadcastToEveryoneExcept(
73+
(object) $memberAddedPayload, $connection->socketId,
74+
$connection->app->id
75+
);
76+
}
6877

69-
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
70-
'socketId' => $connection->socketId,
71-
'channel' => $this->getName(),
72-
]);
78+
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
79+
'socketId' => $connection->socketId,
80+
'channel' => $this->getName(),
81+
'duplicate-connection' => count($sockets) > 1,
82+
]);
83+
});
7384
}
7485

7586
/**
@@ -95,18 +106,28 @@ public function unsubscribe(ConnectionInterface $connection)
95106
$connection, $user, $this->getName()
96107
);
97108

98-
$memberRemovedPayload = [
99-
'event' => 'pusher_internal:member_removed',
100-
'channel' => $this->getName(),
101-
'data' => json_encode([
102-
'user_id' => $user->user_id,
103-
]),
104-
];
105-
106-
$this->broadcastToEveryoneExcept(
107-
(object) $memberRemovedPayload, $connection->socketId,
108-
$connection->app->id
109-
);
109+
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
110+
// It's quite possible that a user can have multiple connections to the same channel
111+
// (for example by having multiple browser tabs open)
112+
// and in this case the events will only be triggered when the last one is closed.
113+
$this->channelManager
114+
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
115+
->then(function ($sockets) use ($connection, $user) {
116+
if (count($sockets) === 0) {
117+
$memberRemovedPayload = [
118+
'event' => 'pusher_internal:member_removed',
119+
'channel' => $this->getName(),
120+
'data' => json_encode([
121+
'user_id' => $user->user_id,
122+
]),
123+
];
124+
125+
$this->broadcastToEveryoneExcept(
126+
(object) $memberRemovedPayload, $connection->socketId,
127+
$connection->app->id
128+
);
129+
}
130+
});
110131
});
111132
}
112133
}

src/Contracts/ChannelManager.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,16 @@ public function getChannelMember(ConnectionInterface $connection, string $channe
186186
*/
187187
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface;
188188

189+
/**
190+
* Get the socket IDs for a presence channel member.
191+
*
192+
* @param string|int $userId
193+
* @param string|int $appId
194+
* @param string $channelName
195+
* @return \React\Promise\PromiseInterface
196+
*/
197+
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface;
198+
189199
/**
190200
* Keep tracking the connections availability when they pong.
191201
*

tests/Mocks/Connection.php

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,32 @@ public function close()
5858
$this->closed = true;
5959
}
6060

61+
/**
62+
* Reset the events for assertions.
63+
*
64+
* @return $this
65+
*/
66+
public function resetEvents()
67+
{
68+
$this->sentData = [];
69+
$this->sentRawData = [];
70+
71+
return $this;
72+
}
73+
74+
/**
75+
* Dump & stop execution.
76+
*
77+
* @return void
78+
*/
79+
public function dd()
80+
{
81+
dd([
82+
'sentData' => $this->sentData,
83+
'sentRawData' => $this->sentRawData,
84+
]);
85+
}
86+
6187
/**
6288
* Assert that an event got sent.
6389
*

tests/PresenceChannelTest.php

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,31 @@ public function test_connect_to_presence_channel_with_valid_signature()
6161
});
6262
}
6363

64+
public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined()
65+
{
66+
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
67+
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
68+
$pickleRick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
69+
70+
foreach ([$rick, $morty, $pickleRick] as $connection) {
71+
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
72+
'channel' => 'presence-channel',
73+
]);
74+
}
75+
76+
$this->channelManager
77+
->getGlobalConnectionsCount('1234', 'presence-channel')
78+
->then(function ($total) {
79+
$this->assertEquals(3, $total);
80+
});
81+
82+
$this->channelManager
83+
->getChannelMembers('1234', 'presence-channel')
84+
->then(function ($members) {
85+
$this->assertCount(2, $members);
86+
});
87+
}
88+
6489
public function test_presence_channel_broadcast_member_events()
6590
{
6691
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
@@ -200,4 +225,56 @@ public function test_local_connections_for_presence_channels()
200225
}
201226
});
202227
}
228+
229+
public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
230+
{
231+
// Connect the `observer` user to the server
232+
$observerConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 'observer']);
233+
234+
// Connect the first socket for user `1` to the server
235+
$firstConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
236+
237+
// Make sure the observer sees a `member_added` event for `user:1`
238+
$observerConnection->assertSentEvent('pusher_internal:member_added', [
239+
'event' => 'pusher_internal:member_added',
240+
'channel' => 'presence-channel',
241+
'data' => json_encode(['user_id' => '1']),
242+
])->resetEvents();
243+
244+
// Connect the second socket for user `1` to the server
245+
$secondConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);
246+
247+
// Make sure the observer was not notified of a `member_added` event (user was already connected)
248+
$observerConnection->assertNotSentEvent('pusher_internal:member_added');
249+
250+
// Disconnect the first socket for user `1` on the server
251+
$this->pusherServer->onClose($firstConnection);
252+
253+
// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
254+
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');
255+
256+
// Disconnect the second (and last) socket for user `1` on the server
257+
$this->pusherServer->onClose($secondConnection);
258+
259+
// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
260+
$observerConnection->assertSentEvent('pusher_internal:member_removed');
261+
262+
$this->channelManager
263+
->getMemberSockets('1', '1234', 'presence-channel')
264+
->then(function ($sockets) {
265+
$this->assertCount(0, $sockets);
266+
});
267+
268+
$this->channelManager
269+
->getMemberSockets('2', '1234', 'presence-channel')
270+
->then(function ($sockets) {
271+
$this->assertCount(0, $sockets);
272+
});
273+
274+
$this->channelManager
275+
->getMemberSockets('observer', '1234', 'presence-channel')
276+
->then(function ($sockets) {
277+
$this->assertCount(1, $sockets);
278+
});
279+
}
203280
}

0 commit comments

Comments
 (0)