File tree 1 file changed +5
-5
lines changed
1 file changed +5
-5
lines changed Original file line number Diff line number Diff line change @@ -25,13 +25,13 @@ public function __construct()
25
25
$ this ->subscribers = [];
26
26
}
27
27
28
- public function consume (int $ timeout = 0 ): void
28
+ public function consume (int $ timeoutMs = 0 ): void
29
29
{
30
- if (empty ($ this ->subscribers )) {
30
+ if (! $ subscriberCount = \count ($ this ->subscribers )) {
31
31
throw new \LogicException ('No subscribers ' );
32
32
}
33
33
34
- $ timeout /= 1000 ;
34
+ $ timeout = $ timeoutMs / 1000 ;
35
35
$ endAt = microtime (true ) + $ timeout ;
36
36
37
37
while (true ) {
@@ -41,13 +41,13 @@ public function consume(int $timeout = 0): void
41
41
* @var callable $processor
42
42
*/
43
43
foreach ($ this ->subscribers as $ queueName => list ($ consumer , $ callback )) {
44
- $ message = $ consumer ->receiveNoWait ();
44
+ $ message = 1 === $ subscriberCount ? $ consumer -> receive ( $ timeoutMs ) : $ consumer ->receiveNoWait ();
45
45
46
46
if ($ message ) {
47
47
if (false === call_user_func ($ callback , $ message , $ consumer )) {
48
48
return ;
49
49
}
50
- } else {
50
+ } elseif ( 1 !== $ subscriberCount ) {
51
51
if ($ timeout && microtime (true ) >= $ endAt ) {
52
52
return ;
53
53
}
You can’t perform that action at this time.
0 commit comments