Skip to content

Commit 19fef9a

Browse files
authored
Merge pull request #199 from php-enqueue/dbal-improvements
[dbal][bc break] Performance improvements and new features.
2 parents 24a73df + b928422 commit 19fef9a

22 files changed

+415
-451
lines changed

Diff for: pkg/dbal/Client/DbalDriver.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public function createTransportMessage(Message $message)
7171
$transportMessage->setProperties($properties);
7272
$transportMessage->setMessageId($message->getMessageId());
7373
$transportMessage->setTimestamp($message->getTimestamp());
74-
$transportMessage->setDelay($message->getDelay());
74+
$transportMessage->setDeliveryDelay($message->getDelay());
7575
$transportMessage->setReplyTo($message->getReplyTo());
7676
$transportMessage->setCorrelationId($message->getCorrelationId());
7777
if (array_key_exists($message->getPriority(), self::$priorityMap)) {
@@ -97,7 +97,7 @@ public function createClientMessage(PsrMessage $message)
9797
$clientMessage->setContentType($message->getHeader('content_type'));
9898
$clientMessage->setMessageId($message->getMessageId());
9999
$clientMessage->setTimestamp($message->getTimestamp());
100-
$clientMessage->setDelay($message->getDelay());
100+
$clientMessage->setDelay($message->getDeliveryDelay());
101101
$clientMessage->setReplyTo($message->getReplyTo());
102102
$clientMessage->setCorrelationId($message->getCorrelationId());
103103

Diff for: pkg/dbal/DbalConsumer.php

+71-26
Original file line numberDiff line numberDiff line change
@@ -169,31 +169,7 @@ protected function receiveMessage()
169169
try {
170170
$now = time();
171171

172-
$query = $this->dbal->createQueryBuilder();
173-
$query
174-
->select('*')
175-
->from($this->context->getTableName())
176-
->where('queue = :queue')
177-
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
178-
->orderBy('priority', 'desc')
179-
->addOrderBy('id', 'asc')
180-
->setMaxResults(1)
181-
;
182-
183-
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
184-
185-
$dbalMessage = $this->dbal->executeQuery(
186-
$sql,
187-
[
188-
'queue' => $this->queue->getQueueName(),
189-
'delayedUntil' => $now,
190-
],
191-
[
192-
'queue' => Type::STRING,
193-
'delayedUntil' => Type::INTEGER,
194-
]
195-
)->fetch();
196-
172+
$dbalMessage = $this->fetchPrioritizedMessage($now) ?: $dbalMessage = $this->fetchMessage($now);
197173
if (false == $dbalMessage) {
198174
$this->dbal->commit();
199175

@@ -211,9 +187,12 @@ protected function receiveMessage()
211187

212188
$this->dbal->commit();
213189

214-
return $this->convertMessage($dbalMessage);
190+
if (empty($dbalMessage['time_to_live']) || $dbalMessage['time_to_live'] > time()) {
191+
return $this->convertMessage($dbalMessage);
192+
}
215193
} catch (\Exception $e) {
216194
$this->dbal->rollBack();
195+
217196
throw $e;
218197
}
219198
}
@@ -241,4 +220,70 @@ protected function convertMessage(array $dbalMessage)
241220

242221
return $message;
243222
}
223+
224+
/**
225+
* @param int $now
226+
*
227+
* @return array|null
228+
*/
229+
private function fetchPrioritizedMessage($now)
230+
{
231+
$query = $this->dbal->createQueryBuilder();
232+
$query
233+
->select('*')
234+
->from($this->context->getTableName())
235+
->andWhere('queue = :queue')
236+
->andWhere('priority IS NOT NULL')
237+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
238+
->addOrderBy('priority', 'desc')
239+
->setMaxResults(1)
240+
;
241+
242+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
243+
244+
return $this->dbal->executeQuery(
245+
$sql,
246+
[
247+
'queue' => $this->queue->getQueueName(),
248+
'delayedUntil' => $now,
249+
],
250+
[
251+
'queue' => Type::STRING,
252+
'delayedUntil' => Type::INTEGER,
253+
]
254+
)->fetch();
255+
}
256+
257+
/**
258+
* @param int $now
259+
*
260+
* @return array|null
261+
*/
262+
private function fetchMessage($now)
263+
{
264+
$query = $this->dbal->createQueryBuilder();
265+
$query
266+
->select('*')
267+
->from($this->context->getTableName())
268+
->andWhere('queue = :queue')
269+
->andWhere('priority IS NULL')
270+
->andWhere('(delayed_until IS NULL OR delayed_until <= :delayedUntil)')
271+
->addOrderBy('published_at', 'asc')
272+
->setMaxResults(1)
273+
;
274+
275+
$sql = $query->getSQL().' '.$this->dbal->getDatabasePlatform()->getWriteLockSQL();
276+
277+
return $this->dbal->executeQuery(
278+
$sql,
279+
[
280+
'queue' => $this->queue->getQueueName(),
281+
'delayedUntil' => $now,
282+
],
283+
[
284+
'queue' => Type::STRING,
285+
'delayedUntil' => Type::INTEGER,
286+
]
287+
)->fetch();
288+
}
244289
}

Diff for: pkg/dbal/DbalContext.php

+9-2
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public function __construct($connection, array $config = [])
5656
*
5757
* @return DbalMessage
5858
*/
59-
public function createMessage($body = null, array $properties = [], array $headers = [])
59+
public function createMessage($body = '', array $properties = [], array $headers = [])
6060
{
6161
$message = new DbalMessage();
6262
$message->setBody($body);
@@ -170,17 +170,24 @@ public function createDataBaseTable()
170170
return;
171171
}
172172

173+
if ($this->getDbalConnection()->getDatabasePlatform()->hasNativeGuidType()) {
174+
throw new \LogicException('The platform does not support UUIDs natively');
175+
}
176+
173177
$table = new Table($this->getTableName());
174-
$table->addColumn('id', 'integer', ['unsigned' => true, 'autoincrement' => true]);
178+
$table->addColumn('id', 'guid');
179+
$table->addColumn('published_at', 'bigint');
175180
$table->addColumn('body', 'text', ['notnull' => false]);
176181
$table->addColumn('headers', 'text', ['notnull' => false]);
177182
$table->addColumn('properties', 'text', ['notnull' => false]);
178183
$table->addColumn('redelivered', 'boolean', ['notnull' => false]);
179184
$table->addColumn('queue', 'string');
180185
$table->addColumn('priority', 'smallint');
181186
$table->addColumn('delayed_until', 'integer', ['notnull' => false]);
187+
$table->addColumn('time_to_live', 'integer', ['notnull' => false]);
182188

183189
$table->setPrimaryKey(['id']);
190+
$table->addIndex(['published_at']);
184191
$table->addIndex(['queue']);
185192
$table->addIndex(['priority']);
186193
$table->addIndex(['delayed_until']);

Diff for: pkg/dbal/DbalMessage.php

+32-9
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,14 @@ class DbalMessage implements PsrMessage
3232
private $priority;
3333

3434
/**
35-
* @var int
35+
* @var int milliseconds
36+
*/
37+
private $deliveryDelay;
38+
39+
/**
40+
* @var int milliseconds
3641
*/
37-
private $delay;
42+
private $timeToLive;
3843

3944
/**
4045
* @param string $body
@@ -48,7 +53,7 @@ public function __construct($body = '', array $properties = [], array $headers =
4853
$this->headers = $headers;
4954
$this->redelivered = false;
5055
$this->priority = 0;
51-
$this->delay = null;
56+
$this->deliveryDelay = null;
5257
}
5358

5459
/**
@@ -182,19 +187,37 @@ public function setPriority($priority)
182187
/**
183188
* @return int
184189
*/
185-
public function getDelay()
190+
public function getDeliveryDelay()
191+
{
192+
return $this->deliveryDelay;
193+
}
194+
195+
/**
196+
* Set delay in milliseconds.
197+
*
198+
* @param int $deliveryDelay
199+
*/
200+
public function setDeliveryDelay($deliveryDelay)
201+
{
202+
$this->deliveryDelay = $deliveryDelay;
203+
}
204+
205+
/**
206+
* @return int|float|null
207+
*/
208+
public function getTimeToLive()
186209
{
187-
return $this->delay;
210+
return $this->timeToLive;
188211
}
189212

190213
/**
191-
* Set delay in seconds.
214+
* Set time to live in milliseconds.
192215
*
193-
* @param int $delay
216+
* @param int|float|null $timeToLive
194217
*/
195-
public function setDelay($delay)
218+
public function setTimeToLive($timeToLive)
196219
{
197-
$this->delay = $delay;
220+
$this->timeToLive = $timeToLive;
198221
}
199222

200223
/**

Diff for: pkg/dbal/DbalProducer.php

+51-13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@ class DbalProducer implements PsrProducer
1818
*/
1919
private $priority;
2020

21+
/**
22+
* @var int|float|null
23+
*/
24+
private $deliveryDelay;
25+
26+
/**
27+
* @var int|float|null
28+
*/
29+
private $timeToLive;
30+
2131
/**
2232
* @var DbalContext
2333
*/
@@ -47,6 +57,12 @@ public function send(PsrDestination $destination, PsrMessage $message)
4757
if (null !== $this->priority && null === $message->getPriority()) {
4858
$message->setPriority($this->priority);
4959
}
60+
if (null !== $this->deliveryDelay && null === $message->getDeliveryDelay()) {
61+
$message->setDeliveryDelay($this->deliveryDelay);
62+
}
63+
if (null !== $this->timeToLive && null === $message->getTimeToLive()) {
64+
$message->setTimeToLive($this->timeToLive);
65+
}
5066

5167
$body = $message->getBody();
5268
if (is_scalar($body) || null === $body) {
@@ -58,15 +74,24 @@ public function send(PsrDestination $destination, PsrMessage $message)
5874
));
5975
}
6076

77+
$sql = 'SELECT '.$this->context->getDbalConnection()->getDatabasePlatform()->getGuidExpression();
78+
$uuid = $this->context->getDbalConnection()->query($sql)->fetchColumn(0);
79+
80+
if (empty($uuid)) {
81+
throw new \LogicException('The generated uuid is empty');
82+
}
83+
6184
$dbalMessage = [
85+
'id' => $uuid,
86+
'published_at' => (int) microtime(true) * 10000,
6287
'body' => $body,
6388
'headers' => JSON::encode($message->getHeaders()),
6489
'properties' => JSON::encode($message->getProperties()),
6590
'priority' => $message->getPriority(),
6691
'queue' => $destination->getQueueName(),
6792
];
6893

69-
$delay = $message->getDelay();
94+
$delay = $message->getDeliveryDelay();
7095
if ($delay) {
7196
if (!is_int($delay)) {
7297
throw new \LogicException(sprintf(
@@ -79,16 +104,35 @@ public function send(PsrDestination $destination, PsrMessage $message)
79104
throw new \LogicException(sprintf('Delay must be positive integer but got: "%s"', $delay));
80105
}
81106

82-
$dbalMessage['delayed_until'] = time() + $delay;
107+
$dbalMessage['delayed_until'] = time() + (int) $delay / 1000;
108+
}
109+
110+
$timeToLive = $message->getTimeToLive();
111+
if ($timeToLive) {
112+
if (!is_int($timeToLive)) {
113+
throw new \LogicException(sprintf(
114+
'TimeToLive must be integer but got: "%s"',
115+
is_object($timeToLive) ? get_class($timeToLive) : gettype($timeToLive)
116+
));
117+
}
118+
119+
if ($timeToLive <= 0) {
120+
throw new \LogicException(sprintf('TimeToLive must be positive integer but got: "%s"', $timeToLive));
121+
}
122+
123+
$dbalMessage['time_to_live'] = time() + (int) $timeToLive / 1000;
83124
}
84125

85126
try {
86127
$this->context->getDbalConnection()->insert($this->context->getTableName(), $dbalMessage, [
128+
'id' => Type::GUID,
129+
'published_at' => Type::INTEGER,
87130
'body' => Type::TEXT,
88131
'headers' => Type::TEXT,
89132
'properties' => Type::TEXT,
90133
'priority' => Type::SMALLINT,
91134
'queue' => Type::STRING,
135+
'time_to_live' => Type::INTEGER,
92136
'delayed_until' => Type::INTEGER,
93137
]);
94138
} catch (\Exception $e) {
@@ -101,19 +145,17 @@ public function send(PsrDestination $destination, PsrMessage $message)
101145
*/
102146
public function setDeliveryDelay($deliveryDelay)
103147
{
104-
if (null === $deliveryDelay) {
105-
return;
106-
}
148+
$this->deliveryDelay = $deliveryDelay;
107149

108-
throw new \LogicException('Not implemented');
150+
return $this;
109151
}
110152

111153
/**
112154
* {@inheritdoc}
113155
*/
114156
public function getDeliveryDelay()
115157
{
116-
return null;
158+
return $this->deliveryDelay;
117159
}
118160

119161
/**
@@ -139,18 +181,14 @@ public function getPriority()
139181
*/
140182
public function setTimeToLive($timeToLive)
141183
{
142-
if (null === $timeToLive) {
143-
return;
144-
}
145-
146-
throw new \LogicException('Not implemented');
184+
$this->timeToLive = $timeToLive;
147185
}
148186

149187
/**
150188
* {@inheritdoc}
151189
*/
152190
public function getTimeToLive()
153191
{
154-
return null;
192+
return $this->timeToLive;
155193
}
156194
}

Diff for: pkg/dbal/Tests/Client/DbalDriverTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public function testShouldConvertTransportMessageToClientMessage()
9393
$transportMessage->setMessageId('MessageId');
9494
$transportMessage->setTimestamp(1000);
9595
$transportMessage->setPriority(2);
96-
$transportMessage->setDelay(12345);
96+
$transportMessage->setDeliveryDelay(12345);
9797

9898
$driver = new DbalDriver(
9999
$this->createPsrContextMock(),

0 commit comments

Comments
 (0)