Skip to content
This repository was archived by the owner on Feb 7, 2024. It is now read-only.

Commit 9cb83dc

Browse files
authored
Merge pull request #552 from beyondcode/feature/async-queue
[2.x] Async Redis queue driver
2 parents 40ee5fb + 391c5f7 commit 9cb83dc

File tree

10 files changed

+385
-24
lines changed

10 files changed

+385
-24
lines changed

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
"illuminate/broadcasting": "^6.3|^7.0|^8.0",
4242
"illuminate/console": "^6.3|^7.0|^8.0",
4343
"illuminate/http": "^6.3|^7.0|^8.0",
44+
"illuminate/queue": "^6.3|^7.0|^8.0",
4445
"illuminate/routing": "^6.3|^7.0|^8.0",
4546
"illuminate/support": "^6.3|^7.0|^8.0",
4647
"pusher/pusher-php-server": "^3.0|^4.0",

config/websockets.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@
137137

138138
'redis' => [
139139

140-
'connection' => 'default',
140+
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
141141

142142
/*
143143
|--------------------------------------------------------------------------

docs/horizontal-scaling/redis.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,29 @@ You can set the connection name to the Redis database under `redis`:
4040
```
4141

4242
The connections can be found in your `config/database.php` file, under the `redis` key.
43+
44+
## Async Redis Queue
45+
46+
The default Redis connection also interacts with the queues. Since you might want to dispatch jobs on Redis from the server, you can encounter an anti-pattern of using a blocking I/O connection (like PhpRedis or PRedis) within the WebSockets server.
47+
48+
To solve this issue, you can configure the built-in queue driver that uses the Async Redis connection when it's possible, like within the WebSockets server. It's highly recommended to switch your queue to it if you are going to use the queues within the server controllers, for example.
49+
50+
Add the `async-redis` queue driver to your list of connections. The configuration parameters are compatible with the default `redis` driver:
51+
52+
```php
53+
'connections' => [
54+
'async-redis' => [
55+
'driver' => 'async-redis',
56+
'connection' => env('WEBSOCKETS_REDIS_REPLICATION_CONNECTION', 'default'),
57+
'queue' => env('REDIS_QUEUE', 'default'),
58+
'retry_after' => 90,
59+
'block_for' => null,
60+
],
61+
]
62+
```
63+
64+
Also, make sure that the default queue driver is set to `async-redis`:
65+
66+
```
67+
QUEUE_CONNECTION=async-redis
68+
```

src/ChannelManagers/RedisChannelManager.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,16 @@ public function getPublishClient()
519519
return $this->publishClient;
520520
}
521521

522+
/**
523+
* Get the Redis client used by other classes.
524+
*
525+
* @return Client
526+
*/
527+
public function getRedisClient()
528+
{
529+
return $this->getPublishClient();
530+
}
531+
522532
/**
523533
* Get the unique identifier for the server.
524534
*

src/Queue/AsyncRedisConnector.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\Queue;
4+
5+
use Illuminate\Queue\Connectors\RedisConnector;
6+
7+
class AsyncRedisConnector extends RedisConnector
8+
{
9+
/**
10+
* Establish a queue connection.
11+
*
12+
* @param array $config
13+
* @return \Illuminate\Contracts\Queue\Queue
14+
*/
15+
public function connect(array $config)
16+
{
17+
return new AsyncRedisQueue(
18+
$this->redis, $config['queue'],
19+
$config['connection'] ?? $this->connection,
20+
$config['retry_after'] ?? 60,
21+
$config['block_for'] ?? null
22+
);
23+
}
24+
}

src/Queue/AsyncRedisQueue.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\Queue;
4+
5+
use BeyondCode\LaravelWebSockets\Contracts\ChannelManager;
6+
use Illuminate\Queue\RedisQueue;
7+
8+
class AsyncRedisQueue extends RedisQueue
9+
{
10+
/**
11+
* Get the connection for the queue.
12+
*
13+
* @return \BeyondCode\LaravelWebSockets\Contracts\ChannelManager|\Illuminate\Redis\Connections\Connection
14+
*/
15+
public function getConnection()
16+
{
17+
$channelManager = $this->container->bound(ChannelManager::class)
18+
? $this->container->make(ChannelManager::class)
19+
: null;
20+
21+
return $channelManager && method_exists($channelManager, 'getRedisClient')
22+
? $channelManager->getRedisClient()
23+
: parent::getConnection();
24+
}
25+
}

src/WebSocketsServiceProvider.php

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use BeyondCode\LaravelWebSockets\Dashboard\Http\Middleware\Authorize as AuthorizeDashboard;
1212
use BeyondCode\LaravelWebSockets\Server\Router;
1313
use Illuminate\Support\Facades\Gate;
14+
use Illuminate\Support\Facades\Queue;
1415
use Illuminate\Support\Facades\Route;
1516
use Illuminate\Support\ServiceProvider;
1617

@@ -36,6 +37,12 @@ public function boot()
3637
__DIR__.'/../database/migrations/0000_00_00_000000_rename_statistics_counters.php' => database_path('migrations/0000_00_00_000000_rename_statistics_counters.php'),
3738
], 'migrations');
3839

40+
$this->registerAsyncRedisQueueDriver();
41+
42+
$this->registerRouter();
43+
44+
$this->registerManagers();
45+
3946
$this->registerStatistics();
4047

4148
$this->registerDashboard();
@@ -50,8 +57,19 @@ public function boot()
5057
*/
5158
public function register()
5259
{
53-
$this->registerRouter();
54-
$this->registerManagers();
60+
//
61+
}
62+
63+
/**
64+
* Register the async, non-blocking Redis queue driver.
65+
*
66+
* @return void
67+
*/
68+
protected function registerAsyncRedisQueueDriver()
69+
{
70+
Queue::extend('async-redis', function () {
71+
return new Queue\AsyncRedisConnector($this->app['redis']);
72+
});
5573
}
5674

5775
/**

tests/AsyncRedisQueueTest.php

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
<?php
2+
3+
namespace BeyondCode\LaravelWebSockets\Test;
4+
5+
use BeyondCode\LaravelWebSockets\Queue\AsyncRedisConnector;
6+
use Illuminate\Queue\Queue;
7+
use Illuminate\Support\InteractsWithTime;
8+
use Mockery as m;
9+
10+
class AsyncRedisQueueTest extends TestCase
11+
{
12+
use InteractsWithTime;
13+
14+
/**
15+
* The testing queue for Redis.
16+
*
17+
* @var \Illuminate\Queue\RedisQueue
18+
*/
19+
private $queue;
20+
21+
/**
22+
* {@inheritdoc}
23+
*/
24+
public function setUp(): void
25+
{
26+
parent::setUp();
27+
28+
$this->runOnlyOnRedisReplication();
29+
30+
$connector = new AsyncRedisConnector($this->app['redis'], 'default');
31+
32+
$this->queue = $connector->connect([
33+
'queue' => 'default',
34+
'retry_after' => 60,
35+
'block_for' => null,
36+
]);
37+
38+
$this->queue->setContainer($this->app);
39+
}
40+
41+
/**
42+
* {@inheritdoc}
43+
*/
44+
protected function tearDown(): void
45+
{
46+
parent::tearDown();
47+
48+
m::close();
49+
}
50+
51+
public function test_expired_jobs_are_pushed_with_async_and_popped_with_sync()
52+
{
53+
$jobs = [
54+
new RedisQueueIntegrationTestJob(0),
55+
new RedisQueueIntegrationTestJob(1),
56+
new RedisQueueIntegrationTestJob(2),
57+
new RedisQueueIntegrationTestJob(3),
58+
];
59+
60+
$this->queue->later(1000, $jobs[0]);
61+
$this->queue->later(-200, $jobs[1]);
62+
$this->queue->later(-300, $jobs[2]);
63+
$this->queue->later(-100, $jobs[3]);
64+
65+
$this->getPublishClient()
66+
->zcard('queues:default:delayed')
67+
->then(function ($count) {
68+
$this->assertEquals(4, $count);
69+
});
70+
71+
$this->unregisterManagers();
72+
73+
$this->assertEquals($jobs[2], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
74+
$this->assertEquals($jobs[1], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
75+
$this->assertEquals($jobs[3], unserialize(json_decode($this->queue->pop()->getRawBody())->data->command));
76+
$this->assertNull($this->queue->pop());
77+
78+
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
79+
$this->assertEquals(3, $this->app['redis']->connection()->zcard('queues:default:reserved'));
80+
}
81+
82+
public function test_jobs_are_pushed_with_async_and_released_with_sync()
83+
{
84+
$this->queue->push(
85+
$job = new RedisQueueIntegrationTestJob(30)
86+
);
87+
88+
$this->unregisterManagers();
89+
90+
$this->getPublishClient()
91+
->assertCalledCount(1, 'eval');
92+
93+
$redisJob = $this->queue->pop();
94+
95+
$before = $this->currentTime();
96+
97+
$redisJob->release(1000);
98+
99+
$after = $this->currentTime();
100+
101+
// check the content of delayed queue
102+
$this->assertEquals(1, $this->app['redis']->connection()->zcard('queues:default:delayed'));
103+
104+
$results = $this->app['redis']->connection()->zrangebyscore('queues:default:delayed', -INF, INF, ['withscores' => true]);
105+
106+
$payload = array_keys($results)[0];
107+
108+
$score = $results[$payload];
109+
110+
$this->assertGreaterThanOrEqual($before + 1000, $score);
111+
$this->assertLessThanOrEqual($after + 1000, $score);
112+
113+
$decoded = json_decode($payload);
114+
115+
$this->assertEquals(1, $decoded->attempts);
116+
$this->assertEquals($job, unserialize($decoded->data->command));
117+
118+
$this->assertNull($this->queue->pop());
119+
}
120+
121+
public function test_jobs_are_pushed_with_async_and_deleted_with_sync()
122+
{
123+
$this->queue->push(
124+
$job = new RedisQueueIntegrationTestJob(30)
125+
);
126+
127+
$this->unregisterManagers();
128+
129+
$this->getPublishClient()
130+
->assertCalledCount(1, 'eval');
131+
132+
$redisJob = $this->queue->pop();
133+
134+
$redisJob->delete();
135+
136+
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:delayed'));
137+
$this->assertEquals(0, $this->app['redis']->connection()->zcard('queues:default:reserved'));
138+
$this->assertEquals(0, $this->app['redis']->connection()->llen('queues:default'));
139+
140+
$this->assertNull($this->queue->pop());
141+
}
142+
143+
public function test_jobs_are_pushed_with_async_and_cleared_with_sync()
144+
{
145+
if (! method_exists($this->queue, 'clear')) {
146+
$this->markTestSkipped('The Queue has no clear() method to test.');
147+
}
148+
149+
$job1 = new RedisQueueIntegrationTestJob(30);
150+
$job2 = new RedisQueueIntegrationTestJob(40);
151+
152+
$this->queue->push($job1);
153+
$this->queue->push($job2);
154+
155+
$this->getPublishClient()
156+
->assertCalledCount(2, 'eval');
157+
158+
$this->unregisterManagers();
159+
160+
$this->assertEquals(2, $this->queue->clear(null));
161+
$this->assertEquals(0, $this->queue->size());
162+
}
163+
164+
public function test_jobs_are_pushed_with_async_and_size_reflects_in_async_size()
165+
{
166+
$this->queue->size()->then(function ($count) {
167+
$this->assertEquals(0, $count);
168+
});
169+
170+
$this->queue->push(new RedisQueueIntegrationTestJob(1));
171+
172+
$this->queue->size()->then(function ($count) {
173+
$this->assertEquals(1, $count);
174+
});
175+
176+
$this->queue->later(60, new RedisQueueIntegrationTestJob(2));
177+
178+
$this->queue->size()->then(function ($count) {
179+
$this->assertEquals(2, $count);
180+
});
181+
182+
$this->queue->push(new RedisQueueIntegrationTestJob(3));
183+
184+
$this->queue->size()->then(function ($count) {
185+
$this->assertEquals(3, $count);
186+
});
187+
188+
$this->unregisterManagers();
189+
190+
$job = $this->queue->pop();
191+
192+
$this->registerManagers();
193+
194+
$this->queue->size()->then(function ($count) {
195+
$this->assertEquals(3, $count);
196+
});
197+
}
198+
}
199+
200+
class RedisQueueIntegrationTestJob
201+
{
202+
public $i;
203+
204+
public function __construct($i)
205+
{
206+
$this->i = $i;
207+
}
208+
209+
public function handle()
210+
{
211+
//
212+
}
213+
}

0 commit comments

Comments
 (0)