Skip to content

Commit adb4c9f

Browse files
committed
[12.x] RedisBroadcaster for RedisCuster
1 parent 6ab00c9 commit adb4c9f

File tree

3 files changed

+93
-4
lines changed

3 files changed

+93
-4
lines changed

.github/workflows/queues.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ jobs:
127127
REDIS_CLIENT: ${{ matrix.client }}
128128
QUEUE_CONNECTION: redis
129129

130+
- name: Execute broadcasting tests
131+
run: vendor/bin/phpunit tests/Integration/Broadcasting
132+
env:
133+
REDIS_CLIENT: ${{ matrix.client }}
134+
QUEUE_CONNECTION: redis
135+
130136
redis-cluster:
131137
runs-on: ubuntu-24.04
132138

@@ -175,6 +181,12 @@ jobs:
175181
REDIS_CLUSTER_HOSTS_AND_PORTS: 127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002
176182
REDIS_QUEUE: '{default}'
177183

184+
- name: Execute broadcasting tests
185+
run: vendor/bin/phpunit tests/Integration/Broadcasting
186+
env:
187+
REDIS_CLIENT: ${{ matrix.client }}
188+
REDIS_CLUSTER_HOSTS_AND_PORTS: 127.0.0.1:7000,127.0.0.1:7001,127.0.0.1:7002
189+
178190
beanstalkd:
179191
runs-on: ubuntu-24.04
180192

src/Illuminate/Broadcasting/Broadcasters/RedisBroadcaster.php

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
use Illuminate\Broadcasting\BroadcastException;
66
use Illuminate\Contracts\Redis\Factory as Redis;
7+
use Illuminate\Redis\Connections\PhpRedisClusterConnection;
8+
use Illuminate\Redis\Connections\PredisClusterConnection;
9+
use Illuminate\Redis\Connections\PredisConnection;
710
use Illuminate\Support\Arr;
11+
use Predis\Connection\Cluster\RedisCluster;
812
use Predis\Connection\ConnectionException;
913
use RedisException;
1014
use Symfony\Component\HttpKernel\Exception\AccessDeniedHttpException;
@@ -125,10 +129,29 @@ public function broadcast(array $channels, $event, array $payload = [])
125129
]);
126130

127131
try {
128-
$connection->eval(
129-
$this->broadcastMultipleChannelsScript(),
130-
0, $payload, ...$this->formatChannels($channels)
131-
);
132+
if ($connection instanceof PhpRedisClusterConnection) {
133+
foreach ($channels as $channel) {
134+
$connection->publish($channel, $payload);
135+
}
136+
} elseif ($connection instanceof PredisClusterConnection
137+
&& $connection->client()->getConnection() instanceof RedisCluster
138+
) {
139+
$randomClusterNodeConnection = new PredisConnection(
140+
$connection->client()->getClientBy('slot', mt_rand(0, 16383))
141+
);
142+
if ($events = $connection->getEventDispatcher()) {
143+
$randomClusterNodeConnection->setEventDispatcher($events);
144+
}
145+
$randomClusterNodeConnection->eval(
146+
$this->broadcastMultipleChannelsScript(),
147+
0, $payload, ...$this->formatChannels($channels)
148+
);
149+
} else {
150+
$connection->eval(
151+
$this->broadcastMultipleChannelsScript(),
152+
0, $payload, ...$this->formatChannels($channels)
153+
);
154+
}
132155
} catch (ConnectionException|RedisException $e) {
133156
throw new BroadcastException(
134157
sprintf('Redis error: %s.', $e->getMessage())
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<?php
2+
3+
namespace Illuminate\Tests\Integration\Broadcasting;
4+
5+
use Illuminate\Broadcasting\Broadcasters\RedisBroadcaster;
6+
use Illuminate\Foundation\Testing\Concerns\InteractsWithRedis;
7+
use Orchestra\Testbench\TestCase;
8+
use PHPUnit\Framework\Attributes\DataProvider;
9+
use PHPUnit\Framework\Attributes\RequiresPhpExtension;
10+
11+
#[RequiresPhpExtension('redis')]
12+
class RedisBroadcasterTest extends TestCase
13+
{
14+
use InteractsWithRedis;
15+
16+
/**
17+
* @param string $driver
18+
*/
19+
#[DataProvider('redisDriverProvider')]
20+
#[RequiresPhpExtension('pcntl')]
21+
public function testBroadcast($driver)
22+
{
23+
$this->beforeApplicationDestroyed(function () {
24+
$this->tearDownRedis();
25+
});
26+
27+
if ($pid = pcntl_fork() > 0) {
28+
$this->setUpRedis();
29+
/** @var \Redis|\RedisCluster $redisClient */
30+
$redisClient = $this->redis['phpredis']->client();
31+
$redisClient->subscribe(['channel-1'], function ($redis, $channel, $message) {
32+
$redis->unsubscribe(['channel-1']);
33+
$redis->close();
34+
$receivedPayload = json_decode($message, true);
35+
$this->assertEquals('test_channel-1', $channel);
36+
$this->assertEquals([
37+
'event' => 'test.event',
38+
'data' => ['foo' => 'bar'],
39+
'socket' => null,
40+
], $receivedPayload);
41+
});
42+
} elseif ($pid == 0) {
43+
$this->setUpRedis();
44+
$redis = $this->redis[$driver];
45+
$broadcaster = new RedisBroadcaster($redis, null, 'test_');
46+
$channels = ['channel-1', 'channel-2'];
47+
usleep(1000);
48+
$broadcaster->broadcast($channels, 'test.event', ['foo' => 'bar']);
49+
exit;
50+
} else {
51+
$this->fail('Cannot fork');
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)