-
Notifications
You must be signed in to change notification settings - Fork 440
/
Copy pathRedisConsumerHelperTrait.php
114 lines (85 loc) · 3.34 KB
/
RedisConsumerHelperTrait.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
<?php
declare(strict_types=1);
namespace Enqueue\Redis;
trait RedisConsumerHelperTrait
{
/**
* @var string[]
*/
protected $queueNames;
abstract protected function getContext(): RedisContext;
/**
* @param RedisDestination[] $queues
*/
protected function receiveMessage(array $queues, int $timeout, int $redeliveryDelay): ?RedisMessage
{
$startAt = time();
$thisTimeout = $timeout;
if (null === $this->queueNames) {
$this->queueNames = [];
foreach ($queues as $queue) {
$this->queueNames[] = $queue->getName();
}
}
while ($thisTimeout > 0) {
$this->migrateExpiredMessages($this->queueNames);
if (false == $result = $this->getContext()->getRedis()->brpop($this->queueNames, $thisTimeout)) {
return null;
}
$this->pushQueueNameBack($result->getKey());
if ($message = $this->processResult($result, $redeliveryDelay)) {
return $message;
}
$thisTimeout -= time() - $startAt;
}
return null;
}
protected function receiveMessageNoWait(RedisDestination $destination, int $redeliveryDelay): ?RedisMessage
{
$this->migrateExpiredMessages([$destination->getName()]);
if ($result = $this->getContext()->getRedis()->rpop($destination->getName())) {
return $this->processResult($result, $redeliveryDelay);
}
return null;
}
protected function processResult(RedisResult $result, int $redeliveryDelay): ?RedisMessage
{
$message = $this->getContext()->getSerializer()->toMessage($result->getMessage());
$now = time();
if (0 === $message->getAttempts() && $expiresAt = $message->getHeader('expires_at')) {
if ($now > $expiresAt) {
return null;
}
}
$message->setHeader('attempts', $message->getAttempts() + 1);
$message->setRedelivered($message->getAttempts() > 1);
$message->setKey($result->getKey());
$message->setReservedKey($this->getContext()->getSerializer()->toString($message));
$reservedQueue = $result->getKey().':reserved';
$redeliveryAt = $now + $redeliveryDelay;
$this->getContext()->getRedis()->zadd($reservedQueue, $message->getReservedKey(), $redeliveryAt);
return $message;
}
protected function pushQueueNameBack(string $queueName): void
{
if (count($this->queueNames) <= 1) {
return;
}
if (false === $from = array_search($queueName, $this->queueNames, true)) {
throw new \LogicException(sprintf('Queue name was not found: "%s"', $queueName));
}
$to = count($this->queueNames) - 1;
$out = array_splice($this->queueNames, $from, 1);
array_splice($this->queueNames, $to, 0, $out);
}
protected function migrateExpiredMessages(array $queueNames): void
{
$now = time();
foreach ($queueNames as $queueName) {
$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':delayed', $queueName], [$now]);
$this->getContext()->getRedis()
->eval(LuaScripts::migrateExpired(), [$queueName.':reserved', $queueName], [$now]);
}
}
}