Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 805fd5e

Browse files
authored
Merge pull request #530 from stayallive/feature/fix-presence-channels
Fix presence channels emitting too much events
2 parents 331a127 + 3ccf931 commit 805fd5e

File tree

7 files changed

+165
-69
lines changed

7 files changed

+165
-69
lines changed

src/HttpApi/Controllers/FetchUsersController.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ public function __invoke(Request $request)
2222
}
2323

2424
return [
25-
'users' => Collection::make($channel->getUsers())->map(function ($user) {
26-
return ['id' => $user->user_id];
25+
'users' => Collection::make($channel->getUsers())->keys()->map(function ($userId) {
26+
return ['id' => $userId];
2727
})->values(),
2828
];
2929
}

src/WebSockets/Channels/PresenceChannel.php

Lines changed: 73 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,44 @@
55
use Ratchet\ConnectionInterface;
66
use stdClass;
77

8+
/**
9+
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
10+
*/
811
class PresenceChannel extends Channel
912
{
13+
/**
14+
* List of users in the channel keyed by their user ID with their info as value.
15+
*
16+
* @var array<string, array>
17+
*/
1018
protected $users = [];
1119

12-
public function getUsers(): array
13-
{
14-
return $this->users;
15-
}
16-
17-
/*
18-
* @link https://pusher.com/docs/pusher_protocol#presence-channel-events
20+
/**
21+
* List of sockets keyed by their ID with the value pointing to a user ID.
22+
*
23+
* @var array<string, string>
1924
*/
25+
protected $sockets = [];
26+
2027
public function subscribe(ConnectionInterface $connection, stdClass $payload)
2128
{
2229
$this->verifySignature($connection, $payload);
2330

2431
$this->saveConnection($connection);
2532

26-
$channelData = json_decode($payload->channel_data);
27-
$this->users[$connection->socketId] = $channelData;
33+
$channelData = json_decode($payload->channel_data, true);
34+
35+
// The ID of the user connecting
36+
$userId = (string) $channelData['user_id'];
37+
38+
// Check if the user was already connected to the channel before storing the connection in the state
39+
$userFirstConnection = ! isset($this->users[$userId]);
40+
41+
// Add or replace the user info in the state
42+
$this->users[$userId] = $channelData['user_info'] ?? [];
43+
44+
// Add the socket ID to user ID map in the state
45+
$this->sockets[$connection->socketId] = $userId;
2846

2947
// Send the success event
3048
$connection->send(json_encode([
@@ -33,72 +51,74 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
3351
'data' => json_encode($this->getChannelData()),
3452
]));
3553

36-
$this->broadcastToOthers($connection, [
37-
'event' => 'pusher_internal:member_added',
38-
'channel' => $this->channelName,
39-
'data' => json_encode($channelData),
40-
]);
54+
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
55+
// It's quite possible that a user can have multiple connections to the same channel
56+
// (for example by having multiple browser tabs open)
57+
// and in this case the events will only be triggered when the first tab is opened.
58+
if ($userFirstConnection) {
59+
$this->broadcastToOthers($connection, [
60+
'event' => 'pusher_internal:member_added',
61+
'channel' => $this->channelName,
62+
'data' => json_encode($channelData),
63+
]);
64+
}
4165
}
4266

4367
public function unsubscribe(ConnectionInterface $connection)
4468
{
4569
parent::unsubscribe($connection);
4670

47-
if (! isset($this->users[$connection->socketId])) {
71+
if (! isset($this->sockets[$connection->socketId])) {
4872
return;
4973
}
5074

51-
$this->broadcastToOthers($connection, [
52-
'event' => 'pusher_internal:member_removed',
53-
'channel' => $this->channelName,
54-
'data' => json_encode([
55-
'user_id' => $this->users[$connection->socketId]->user_id,
56-
]),
57-
]);
58-
59-
unset($this->users[$connection->socketId]);
75+
// Find the user ID belonging to this socket
76+
$userId = $this->sockets[$connection->socketId];
77+
78+
// Remove the socket from the state
79+
unset($this->sockets[$connection->socketId]);
80+
81+
// Test if the user still has open sockets to this channel
82+
$userHasOpenConnections = (array_flip($this->sockets)[$userId] ?? null) !== null;
83+
84+
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
85+
// It's quite possible that a user can have multiple connections to the same channel
86+
// (for example by having multiple browser tabs open)
87+
// and in this case the events will only be triggered when the last one is closed.
88+
if (! $userHasOpenConnections) {
89+
$this->broadcastToOthers($connection, [
90+
'event' => 'pusher_internal:member_removed',
91+
'channel' => $this->channelName,
92+
'data' => json_encode([
93+
'user_id' => $userId,
94+
]),
95+
]);
96+
97+
// Remove the user info from the state
98+
unset($this->users[$userId]);
99+
}
60100
}
61101

62102
protected function getChannelData(): array
63103
{
64104
return [
65105
'presence' => [
66-
'ids' => $userIds = $this->getUserIds(),
67-
'hash' => $this->getHash(),
68-
'count' => count($userIds),
106+
'ids' => array_keys($this->users),
107+
'hash' => $this->users,
108+
'count' => count($this->users),
69109
],
70110
];
71111
}
72112

73-
public function toArray(): array
74-
{
75-
return array_merge(parent::toArray(), [
76-
'user_count' => count($this->getUserIds()),
77-
]);
78-
}
79-
80-
protected function getUserIds(): array
113+
public function getUsers(): array
81114
{
82-
$userIds = array_map(function ($channelData) {
83-
return (string) $channelData->user_id;
84-
}, $this->users);
85-
86-
return array_values(array_unique($userIds));
115+
return $this->users;
87116
}
88117

89-
/**
90-
* Compute the hash for the presence channel integrity.
91-
*
92-
* @return array
93-
*/
94-
protected function getHash(): array
118+
public function toArray(): array
95119
{
96-
$hash = [];
97-
98-
foreach ($this->users as $socketId => $channelData) {
99-
$hash[$channelData->user_id] = $channelData->user_info ?? [];
100-
}
101-
102-
return $hash;
120+
return array_merge(parent::toArray(), [
121+
'user_count' => count($this->users),
122+
]);
103123
}
104124
}

tests/Channels/PresenceChannelTest.php

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public function clients_with_valid_auth_signatures_can_join_presence_channels()
4343
],
4444
];
4545

46-
$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
46+
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);
4747

4848
$this->pusherServer->onMessage($connection, $message);
4949

@@ -63,7 +63,7 @@ public function clients_with_no_user_info_can_join_presence_channels()
6363
'user_id' => 1,
6464
];
6565

66-
$message = $this->getSignedMessage($connection, 'presence-channel', $channelData);
66+
$message = $this->getSignedSubscribeMessage($connection, 'presence-channel', $channelData);
6767

6868
$this->pusherServer->onMessage($connection, $message);
6969

@@ -80,27 +80,64 @@ public function multiple_clients_with_same_user_id_are_counted_once()
8080

8181
$channelName = 'presence-channel';
8282
$channelData = [
83-
'user_id' => $userId = 1,
83+
'user_id' => $userId = 'user:1',
8484
];
8585

86-
$this->pusherServer->onMessage($connection, $this->getSignedMessage($connection, $channelName, $channelData));
87-
$this->pusherServer->onMessage($connection2, $this->getSignedMessage($connection2, $channelName, $channelData));
86+
$this->pusherServer->onMessage($connection, $this->getSignedSubscribeMessage($connection, $channelName, $channelData));
87+
$this->pusherServer->onMessage($connection2, $this->getSignedSubscribeMessage($connection2, $channelName, $channelData));
8888

8989
$connection2->assertSentEvent('pusher_internal:subscription_succeeded', [
9090
'channel' => $channelName,
9191
'data' => json_encode([
9292
'presence' => [
93-
'ids' => [(string) $userId],
93+
'ids' => [$userId],
9494
'hash' => [
95-
(string) $userId => [],
95+
$userId => [],
9696
],
9797
'count' => 1,
9898
],
9999
]),
100100
]);
101101
}
102102

103-
private function getSignedMessage(Connection $connection, string $channelName, array $channelData): Message
103+
/** @test */
104+
public function multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
105+
{
106+
$channelName = 'presence-channel';
107+
108+
// Connect the `observer` user to the server
109+
$this->pusherServer->onOpen($observerConnection = $this->getWebSocketConnection());
110+
$this->pusherServer->onMessage($observerConnection, $this->getSignedSubscribeMessage($observerConnection, $channelName, ['user_id' => 'observer']));
111+
112+
// Connect the first socket for user `user:1` to the server
113+
$this->pusherServer->onOpen($firstConnection = $this->getWebSocketConnection());
114+
$this->pusherServer->onMessage($firstConnection, $this->getSignedSubscribeMessage($firstConnection, $channelName, ['user_id' => 'user:1']));
115+
116+
// Make sure the observer sees a `member_added` event for `user:1`
117+
$observerConnection->assertSentEvent('pusher_internal:member_added');
118+
$observerConnection->resetEvents();
119+
120+
// Connect the second socket for user `user:1` to the server
121+
$this->pusherServer->onOpen($secondConnection = $this->getWebSocketConnection());
122+
$this->pusherServer->onMessage($secondConnection, $this->getSignedSubscribeMessage($secondConnection, $channelName, ['user_id' => 'user:1']));
123+
124+
// Make sure the observer was not notified of a `member_added` event (user was already connected)
125+
$observerConnection->assertNotSentEvent('pusher_internal:member_added');
126+
127+
// Disconnect the first socket for user `user:1` on the server
128+
$this->pusherServer->onClose($firstConnection);
129+
130+
// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
131+
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');
132+
133+
// Disconnect the second (and last) socket for user `user:1` on the server
134+
$this->pusherServer->onClose($secondConnection);
135+
136+
// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
137+
$observerConnection->assertSentEvent('pusher_internal:member_removed');
138+
}
139+
140+
private function getSignedSubscribeMessage(Connection $connection, string $channelName, array $channelData): Message
104141
{
105142
$signature = "{$connection->socketId}:{$channelName}:".json_encode($channelData);
106143

tests/HttpApi/FetchChannelTest.php

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,39 @@ public function it_returns_the_channel_information()
6666
], json_decode($response->getContent(), true));
6767
}
6868

69+
/** @test */
70+
public function it_returns_the_channel_information_for_presence_channel()
71+
{
72+
$this->joinPresenceChannel('presence-global', 'user:1');
73+
$this->joinPresenceChannel('presence-global', 'user:2');
74+
$this->joinPresenceChannel('presence-global', 'user:2');
75+
76+
$connection = new Connection();
77+
78+
$requestPath = '/apps/1234/channel/presence-global';
79+
$routeParams = [
80+
'appId' => '1234',
81+
'channelName' => 'presence-global',
82+
];
83+
84+
$queryString = Pusher::build_auth_query_string('TestKey', 'TestSecret', 'GET', $requestPath);
85+
86+
$request = new Request('GET', "{$requestPath}?{$queryString}&".http_build_query($routeParams));
87+
88+
$controller = app(FetchChannelController::class);
89+
90+
$controller->onOpen($connection, $request);
91+
92+
/** @var JsonResponse $response */
93+
$response = array_pop($connection->sentRawData);
94+
95+
$this->assertSame([
96+
'occupied' => true,
97+
'subscription_count' => 3,
98+
'user_count' => 2,
99+
], json_decode($response->getContent(), true));
100+
}
101+
69102
/** @test */
70103
public function it_returns_404_for_invalid_channels()
71104
{

tests/HttpApi/FetchChannelsTest.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,10 +103,10 @@ public function it_returns_the_channel_information_for_prefix()
103103
/** @test */
104104
public function it_returns_the_channel_information_for_prefix_with_user_count()
105105
{
106-
$this->joinPresenceChannel('presence-global.1');
107-
$this->joinPresenceChannel('presence-global.1');
108-
$this->joinPresenceChannel('presence-global.2');
109-
$this->joinPresenceChannel('presence-notglobal.2');
106+
$this->joinPresenceChannel('presence-global.1', 'user:1');
107+
$this->joinPresenceChannel('presence-global.1', 'user:2');
108+
$this->joinPresenceChannel('presence-global.2', 'user:3');
109+
$this->joinPresenceChannel('presence-notglobal.2', 'user:4');
110110

111111
$connection = new Connection();
112112

tests/Mocks/Connection.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ public function close()
2828
$this->closed = true;
2929
}
3030

31+
public function resetEvents()
32+
{
33+
$this->sentData = [];
34+
$this->sentRawData = [];
35+
}
36+
3137
public function assertSentEvent(string $name, array $additionalParameters = [])
3238
{
3339
$event = collect($this->sentData)->firstWhere('event', '=', $name);

tests/TestCase.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,14 @@ protected function getConnectedWebSocketConnection(array $channelsToJoin = [], s
8989
return $connection;
9090
}
9191

92-
protected function joinPresenceChannel($channel): Connection
92+
protected function joinPresenceChannel($channel, $userId = null): Connection
9393
{
9494
$connection = $this->getWebSocketConnection();
9595

9696
$this->pusherServer->onOpen($connection);
9797

9898
$channelData = [
99-
'user_id' => 1,
99+
'user_id' => $userId ?? 1,
100100
'user_info' => [
101101
'name' => 'Marcel',
102102
],

0 commit comments

Comments
 (0)