Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Presence Channels emit added or removed events more than once for same users #536

Merged
merged 4 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions src/ChannelManagers/LocalChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class LocalChannelManager implements ChannelManager
*/
protected $users = [];

/**
* The list of users by socket and their attached id.
*
* @var array
*/
protected $userSockets = [];

/**
* Wether the current instance accepts new connections.
*
Expand Down Expand Up @@ -273,6 +280,7 @@ public function broadcastAcrossServers($appId, string $channel, stdClass $payloa
public function userJoinedPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel, stdClass $payload)
{
$this->users["{$connection->app->id}:{$channel}"][$connection->socketId] = json_encode($user);
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][] = $connection->socketId;
}

/**
Expand All @@ -287,6 +295,19 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl
public function userLeftPresenceChannel(ConnectionInterface $connection, stdClass $user, string $channel)
{
unset($this->users["{$connection->app->id}:{$channel}"][$connection->socketId]);

$deletableSocketKey = array_search(
$connection->socketId,
$this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]
);

if ($deletableSocketKey !== false) {
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"][$deletableSocketKey]);

if (count($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]) === 0) {
unset($this->userSockets["{$connection->app->id}:{$channel}:{$user->user_id}"]);
}
}
}

/**
Expand Down Expand Up @@ -342,6 +363,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
return new FulfilledPromise($results);
}

/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
return new FulfilledPromise(
$this->userSockets["{$appId}:{$channelName}:{$userId}"] ?? []
);
}

/**
* Keep tracking the connections availability when they pong.
*
Expand Down
59 changes: 58 additions & 1 deletion src/ChannelManagers/RedisChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,10 @@ public function userJoinedPresenceChannel(ConnectionInterface $connection, stdCl
$this->storeUserData(
$connection->app->id, $channel, $connection->socketId, json_encode($user)
);

$this->addUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
}

/**
Expand All @@ -329,6 +333,10 @@ public function userLeftPresenceChannel(ConnectionInterface $connection, stdClas
$this->removeUserData(
$connection->app->id, $channel, $connection->socketId
);

$this->removeUserSocket(
$connection->app->id, $channel, $user, $connection->socketId
);
}

/**
Expand Down Expand Up @@ -389,6 +397,21 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
});
}

/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface
{
return $this->publishClient->smembers(
$this->getRedisKey($appId, $channelName, [$userId, 'userSockets'])
);
}

/**
* Keep tracking the connections availability when they pong.
*
Expand Down Expand Up @@ -628,7 +651,7 @@ public function removeChannelFromSet($appId, string $channel)
* @param string|int $appId
* @param string|null $channel
* @param string $key
* @param mixed $data
* @param string $data
* @return PromiseInterface
*/
public function storeUserData($appId, string $channel = null, string $key, $data)
Expand Down Expand Up @@ -681,6 +704,40 @@ public function unsubscribeFromTopic($appId, string $channel = null)
);
}

/**
* Add the Presence Channel's User's Socket ID to a list.
*
* @param string|int $appId
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
*/
protected function addUserSocket($appId, string $channel, stdClass $user, string $socketId)
{
$this->publishClient->sadd(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
$socketId
);
}

/**
* Remove the Presence Channel's User's Socket ID from the list.
*
* @param string|int $appId
* @param string $channel
* @param stdClass $user
* @param string $socketId
* @return void
*/
protected function removeUserSocket($appId, string $channel, stdClass $user, string $socketId)
{
$this->publishClient->srem(
$this->getRedisKey($appId, $channel, [$user->user_id, 'userSockets']),
$socketId
);
}

/**
* Get the Redis Keyspace name to handle subscriptions
* and other key-value sets.
Expand Down
73 changes: 47 additions & 26 deletions src/Channels/PresenceChannel.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,32 @@ public function subscribe(ConnectionInterface $connection, stdClass $payload)
]));
});

$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
'channel' => $this->getName(),
'data' => $payload->channel_data,
];

$this->broadcastToEveryoneExcept(
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
// The `pusher_internal:member_added` event is triggered when a user joins a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the first tab is opened.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($payload, $connection) {
if (count($sockets) === 1) {
$memberAddedPayload = [
'event' => 'pusher_internal:member_added',
'channel' => $this->getName(),
'data' => $payload->channel_data,
];

$this->broadcastToEveryoneExcept(
(object) $memberAddedPayload, $connection->socketId,
$connection->app->id
);
}

DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
'socketId' => $connection->socketId,
'channel' => $this->getName(),
]);
DashboardLogger::log($connection->app->id, DashboardLogger::TYPE_SUBSCRIBED, [
'socketId' => $connection->socketId,
'channel' => $this->getName(),
'duplicate-connection' => count($sockets) > 1,
]);
});
}

/**
Expand All @@ -95,18 +106,28 @@ public function unsubscribe(ConnectionInterface $connection)
$connection, $user, $this->getName()
);

$memberRemovedPayload = [
'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(),
'data' => json_encode([
'user_id' => $user->user_id,
]),
];

$this->broadcastToEveryoneExcept(
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
// The `pusher_internal:member_removed` is triggered when a user leaves a channel.
// It's quite possible that a user can have multiple connections to the same channel
// (for example by having multiple browser tabs open)
// and in this case the events will only be triggered when the last one is closed.
$this->channelManager
->getMemberSockets($user->user_id, $connection->app->id, $this->getName())
->then(function ($sockets) use ($connection, $user) {
if (count($sockets) === 0) {
$memberRemovedPayload = [
'event' => 'pusher_internal:member_removed',
'channel' => $this->getName(),
'data' => json_encode([
'user_id' => $user->user_id,
]),
];

$this->broadcastToEveryoneExcept(
(object) $memberRemovedPayload, $connection->socketId,
$connection->app->id
);
}
});
});
}
}
10 changes: 10 additions & 0 deletions src/Contracts/ChannelManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,16 @@ public function getChannelMember(ConnectionInterface $connection, string $channe
*/
public function getChannelsMembersCount($appId, array $channelNames): PromiseInterface;

/**
* Get the socket IDs for a presence channel member.
*
* @param string|int $userId
* @param string|int $appId
* @param string $channelName
* @return \React\Promise\PromiseInterface
*/
public function getMemberSockets($userId, $appId, $channelName): PromiseInterface;

/**
* Keep tracking the connections availability when they pong.
*
Expand Down
26 changes: 26 additions & 0 deletions tests/Mocks/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,32 @@ public function close()
$this->closed = true;
}

/**
* Reset the events for assertions.
*
* @return $this
*/
public function resetEvents()
{
$this->sentData = [];
$this->sentRawData = [];

return $this;
}

/**
* Dump & stop execution.
*
* @return void
*/
public function dd()
{
dd([
'sentData' => $this->sentData,
'sentRawData' => $this->sentRawData,
]);
}

/**
* Assert that an event got sent.
*
Expand Down
77 changes: 77 additions & 0 deletions tests/PresenceChannelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,31 @@ public function test_connect_to_presence_channel_with_valid_signature()
});
}

public function test_connect_to_presence_channel_when_user_with_same_ids_is_already_joined()
{
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
$morty = $this->newPresenceConnection('presence-channel', ['user_id' => 2]);
$pickleRick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);

foreach ([$rick, $morty, $pickleRick] as $connection) {
$connection->assertSentEvent('pusher_internal:subscription_succeeded', [
'channel' => 'presence-channel',
]);
}

$this->channelManager
->getGlobalConnectionsCount('1234', 'presence-channel')
->then(function ($total) {
$this->assertEquals(3, $total);
});

$this->channelManager
->getChannelMembers('1234', 'presence-channel')
->then(function ($members) {
$this->assertCount(2, $members);
});
}

public function test_presence_channel_broadcast_member_events()
{
$rick = $this->newPresenceConnection('presence-channel', ['user_id' => 1]);
Expand Down Expand Up @@ -200,4 +225,56 @@ public function test_local_connections_for_presence_channels()
}
});
}

public function test_multiple_clients_with_same_user_id_trigger_member_added_and_removed_event_only_on_first_and_last_socket_connection()
{
// Connect the `observer` user to the server
$observerConnection = $this->newPresenceConnection('presence-channel', ['user_id' => 'observer']);

// Connect the first socket for user `1` to the server
$firstConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);

// Make sure the observer sees a `member_added` event for `user:1`
$observerConnection->assertSentEvent('pusher_internal:member_added', [
'event' => 'pusher_internal:member_added',
'channel' => 'presence-channel',
'data' => json_encode(['user_id' => '1']),
])->resetEvents();

// Connect the second socket for user `1` to the server
$secondConnection = $this->newPresenceConnection('presence-channel', ['user_id' => '1']);

// Make sure the observer was not notified of a `member_added` event (user was already connected)
$observerConnection->assertNotSentEvent('pusher_internal:member_added');

// Disconnect the first socket for user `1` on the server
$this->pusherServer->onClose($firstConnection);

// Make sure the observer was not notified of a `member_removed` event (user still connected on another socket)
$observerConnection->assertNotSentEvent('pusher_internal:member_removed');

// Disconnect the second (and last) socket for user `1` on the server
$this->pusherServer->onClose($secondConnection);

// Make sure the observer was notified of a `member_removed` event (last socket for user was disconnected)
$observerConnection->assertSentEvent('pusher_internal:member_removed');

$this->channelManager
->getMemberSockets('1', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(0, $sockets);
});

$this->channelManager
->getMemberSockets('2', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(0, $sockets);
});

$this->channelManager
->getMemberSockets('observer', '1234', 'presence-channel')
->then(function ($sockets) {
$this->assertCount(1, $sockets);
});
}
}