File tree 1 file changed +4
-9
lines changed
1 file changed +4
-9
lines changed Original file line number Diff line number Diff line change @@ -13,21 +13,16 @@ final class FallbackSubscriptionConsumer implements SubscriptionConsumer
13
13
*
14
14
* @var array
15
15
*/
16
- private $ subscribers ;
16
+ private $ subscribers = [] ;
17
17
18
18
/**
19
19
* @var int
20
20
*/
21
21
private $ idleTime = 0 ;
22
22
23
- public function __construct ()
24
- {
25
- $ this ->subscribers = [];
26
- }
27
-
28
23
public function consume (int $ timeout = 0 ): void
29
24
{
30
- if (empty ($ this ->subscribers )) {
25
+ if (! $ subscriberCount = \count ($ this ->subscribers )) {
31
26
throw new \LogicException ('No subscribers ' );
32
27
}
33
28
@@ -41,13 +36,13 @@ public function consume(int $timeout = 0): void
41
36
* @var callable $processor
42
37
*/
43
38
foreach ($ this ->subscribers as $ queueName => list ($ consumer , $ callback )) {
44
- $ message = $ consumer ->receiveNoWait ();
39
+ $ message = 1 === $ subscriberCount ? $ consumer -> receive (( int ) ( $ timeout * 1000 )) : $ consumer ->receiveNoWait ();
45
40
46
41
if ($ message ) {
47
42
if (false === call_user_func ($ callback , $ message , $ consumer )) {
48
43
return ;
49
44
}
50
- } else {
45
+ } elseif ( 1 !== $ subscriberCount ) {
51
46
if ($ timeout && microtime (true ) >= $ endAt ) {
52
47
return ;
53
48
}
You can’t perform that action at this time.
0 commit comments