Skip to content

Commit ce454a7

Browse files
authored
Merge pull request #1 from mordilion/hotfix/wait-between-subsciption-requests
add subscription_interval as config for dbal subscription consumer
2 parents 856839e + d69f276 commit ce454a7

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

Diff for: pkg/dbal/DbalContext.php

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public function __construct($connection, array $config = [])
4646
$this->config = array_replace([
4747
'table_name' => 'enqueue',
4848
'polling_interval' => null,
49+
'subscription_interval' => null,
4950
], $config);
5051

5152
if ($connection instanceof Connection) {
@@ -135,6 +136,10 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
135136
$consumer->setRedeliveryDelay($this->config['redelivery_delay']);
136137
}
137138

139+
if (isset($this->config['subscription_interval'])) {
140+
$consumer->setSubscriptionInterval($this->config['subscription_interval']);
141+
}
142+
138143
return $consumer;
139144
}
140145

Diff for: pkg/dbal/DbalSubscriptionConsumer.php

+16-2
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ class DbalSubscriptionConsumer implements SubscriptionConsumer
3636
*/
3737
private $redeliveryDelay;
3838

39+
/**
40+
* Time to wait between subscription requests in milliseconds.
41+
*
42+
* @var int
43+
*/
44+
private $subscriptionInterval = 200;
45+
3946
/**
4047
* @param DbalContext $context
4148
*/
@@ -63,6 +70,13 @@ public function setRedeliveryDelay(int $redeliveryDelay): self
6370
return $this;
6471
}
6572

73+
public function setSubscriptionInterval(int $subscriptionInterval): self
74+
{
75+
$this->subscriptionInterval = $subscriptionInterval;
76+
77+
return $this;
78+
}
79+
6680
public function consume(int $timeout = 0): void
6781
{
6882
if (empty($this->subscribers)) {
@@ -92,7 +106,7 @@ public function consume(int $timeout = 0): void
92106
* @var DbalConsumer
93107
* @var callable $callback
94108
*/
95-
list($consumer, $callback) = $this->subscribers[$message->getQueue()];
109+
[$consumer, $callback] = $this->subscribers[$message->getQueue()];
96110

97111
if (false === call_user_func($callback, $message, $consumer)) {
98112
return;
@@ -102,7 +116,7 @@ public function consume(int $timeout = 0): void
102116
} else {
103117
$currentQueueNames = [];
104118

105-
usleep(200000); // 200ms
119+
usleep($this->subscriptionInterval * 1000); // 200ms
106120
}
107121

108122
if ($timeout && microtime(true) >= $now + $timeout) {

0 commit comments

Comments
 (0)