@@ -160,7 +160,7 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
160
160
{
161
161
$ this ->subscribeToTopic ($ connection ->app ->id , $ channelName );
162
162
163
- $ this ->addConnectionToSet ($ connection );
163
+ $ this ->addConnectionToSet ($ connection, Carbon:: now () );
164
164
165
165
$ this ->addChannelToSet (
166
166
$ connection ->app ->id , $ channelName
@@ -416,7 +416,7 @@ public function getMemberSockets($userId, $appId, $channelName): PromiseInterfac
416
416
public function connectionPonged (ConnectionInterface $ connection ): bool
417
417
{
418
418
// This will update the score with the current timestamp.
419
- $ this ->addConnectionToSet ($ connection );
419
+ $ this ->addConnectionToSet ($ connection, Carbon:: now () );
420
420
421
421
return parent ::connectionPonged ($ connection );
422
422
}
@@ -431,9 +431,7 @@ public function removeObsoleteConnections(): bool
431
431
$ this ->lock ()->get (function () {
432
432
$ this ->getConnectionsFromSet (0 , now ()->subMinutes (2 )->format ('U ' ))
433
433
->then (function ($ connections ) {
434
- foreach ($ connections as $ connection => $ score ) {
435
- [$ appId , $ socketId ] = explode (': ' , $ connection );
436
-
434
+ foreach ($ connections as $ appId => $ socketId ) {
437
435
$ this ->unsubscribeFromAllChannels (
438
436
$ this ->fakeConnectionForApp ($ appId , $ socketId )
439
437
);
@@ -571,9 +569,11 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
571
569
*/
572
570
public function addConnectionToSet (ConnectionInterface $ connection , $ moment = null )
573
571
{
572
+ $ moment = $ moment ? Carbon::parse ($ moment ) : Carbon::now ();
573
+
574
574
$ this ->publishClient ->zadd (
575
575
$ this ->getRedisKey (null , null , ['sockets ' ]),
576
- Carbon:: parse ( $ moment) ->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
576
+ $ moment ->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
577
577
);
578
578
}
579
579
@@ -597,16 +597,26 @@ public function removeConnectionFromSet(ConnectionInterface $connection)
597
597
*
598
598
* @param int $start
599
599
* @param int $stop
600
+ * @param bool $strict
600
601
* @return PromiseInterface
601
602
*/
602
- public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 )
603
+ public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 , bool $ strict = true )
603
604
{
604
- return $ this ->publishClient ->zrange (
605
+ if ($ strict ) {
606
+ $ start = "( {$ start }" ;
607
+ $ stop = "( {$ stop }" ;
608
+ }
609
+
610
+ return $ this ->publishClient ->zrangebyscore (
605
611
$ this ->getRedisKey (null , null , ['sockets ' ]),
606
- $ start , $ stop, ' withscores '
612
+ $ start , $ stop
607
613
)
608
614
->then (function ($ list ) {
609
- return Helpers::redisListToArray ($ list );
615
+ return collect ($ list )->mapWithKeys (function ($ appWithSocket ) {
616
+ [$ appId , $ socketId ] = explode (': ' , $ appWithSocket );
617
+
618
+ return [$ appId => $ socketId ];
619
+ })->toArray ();
610
620
});
611
621
}
612
622
0 commit comments