From f0df0341fdbdde2c1e38d2003f0aff6dc612cb5f Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Wed, 13 Jun 2018 15:55:01 +0700 Subject: [PATCH 1/6] fix #173 --- src/drivers/redis/Queue.php | 100 ++++++++++++++++++++++++++++-------- 1 file changed, 78 insertions(+), 22 deletions(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 994d9e865f..272b76c70c 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -12,6 +12,7 @@ use yii\di\Instance; use yii\queue\cli\Queue as CliQueue; use yii\redis\Connection; +use yii\redis\Mutex; /** * Redis Queue. @@ -24,10 +25,25 @@ class Queue extends CliQueue * @var Connection|array|string */ public $redis = 'redis'; + + /** + * @var Mutex|array|string + */ + public $mutex = [ + 'class' => Mutex::class, + 'redis' => 'redis', + ]; + + /** + * @var integer + */ + public $mutexTimeout = 3; + /** * @var string */ public $channel = 'queue'; + /** * @var string command class name */ @@ -41,6 +57,7 @@ public function init() { parent::init(); $this->redis = Instance::ensure($this->redis, Connection::class); + $this->mutex = Instance::ensure($this->mutex, Mutex::class); } /** @@ -56,15 +73,22 @@ public function run($repeat, $timeout = 0) { return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { - if (($payload = $this->reserve($timeout)) !== null) { - list($id, $message, $ttr, $attempt) = $payload; - if ($this->handleMessage($id, $message, $ttr, $attempt)) { - $this->delete($id); + if ($this->acquire()) { + if (($payload = $this->reserve($timeout)) !== null) { + list($id, $message, $ttr, $attempt) = $payload; + if ($this->handleMessage($id, $message, $ttr, $attempt)) { + $this->delete($id); + } + + } elseif (!$repeat) { + break; } - } elseif (!$repeat) { - break; + + $this->release(); } } + + $this->release(); }); } @@ -95,10 +119,15 @@ public function status($id) */ public function clear() { - while (!$this->redis->set("$this->channel.moving_lock", true, 'NX')) { + while (!$this->acquire(0)) { usleep(10000); } - $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); + + try { + $this->redis->executeCommand('DEL', $this->redis->keys("$this->channel.*")); + } finally { + $this->release(); + } } /** @@ -110,19 +139,25 @@ public function clear() */ public function remove($id) { - while (!$this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { + while (!$this->acquire(0)) { usleep(10000); } - if ($this->redis->hdel("$this->channel.messages", $id)) { - $this->redis->zrem("$this->channel.delayed", $id); - $this->redis->zrem("$this->channel.reserved", $id); - $this->redis->lrem("$this->channel.waiting", 0, $id); - $this->redis->hdel("$this->channel.attempts", $id); - return true; - } + try { + if ($this->redis->hdel("$this->channel.messages", $id)) { + $this->redis->zrem("$this->channel.delayed", $id); + $this->redis->zrem("$this->channel.reserved", $id); + $this->redis->lrem("$this->channel.waiting", 0, $id); + $this->redis->hdel("$this->channel.attempts", $id); + + return true; + } + + return false; - return false; + } finally { + $this->release(); + } } /** @@ -131,11 +166,9 @@ public function remove($id) */ protected function reserve($timeout) { - // Moves delayed and reserved jobs into waiting list with lock for one second - if ($this->redis->set("$this->channel.moving_lock", true, 'NX', 'EX', 1)) { - $this->moveExpired("$this->channel.delayed"); - $this->moveExpired("$this->channel.reserved"); - } + // Moves delayed and reserved jobs into waiting list + $this->moveExpired("$this->channel.delayed"); + $this->moveExpired("$this->channel.reserved"); // Find a new waiting message $id = null; @@ -201,4 +234,27 @@ protected function pushMessage($message, $ttr, $delay, $priority) return $id; } + + /** + * Acquire the lock. + * + * @return boolean + */ + protected function acquire($timeout = null) + { + $timeout = $timeout !== null ? $timeout : $this->mutexTimeout; + + return $this->mutex->acquire(__CLASS__ . $this->channel, $timeout); + } + + /** + * Release the lock. + * + * @return boolean + */ + protected function release() + { + return $this->mutex->release(__CLASS__ . $this->channel); + } + } From 85030857be338baf6c9e4bc13e40078ed176c929 Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Thu, 14 Jun 2018 01:28:53 +0700 Subject: [PATCH 2/6] fix unfair locking the queue by first worker --- src/drivers/redis/Queue.php | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 272b76c70c..3a17044b88 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -74,21 +74,22 @@ public function run($repeat, $timeout = 0) return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { while ($canContinue()) { if ($this->acquire()) { - if (($payload = $this->reserve($timeout)) !== null) { + try { + $payload = $this->reserve($timeout); + } finally { + $this->release(); + } + + if ($payload !== null) { list($id, $message, $ttr, $attempt) = $payload; if ($this->handleMessage($id, $message, $ttr, $attempt)) { $this->delete($id); } - } elseif (!$repeat) { break; } - - $this->release(); } } - - $this->release(); }); } From bddf37044cd06a309aeff35620279afdc4196e38 Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Thu, 14 Jun 2018 19:10:31 +0700 Subject: [PATCH 3/6] do not place a job into reserved list if ttr is empty --- src/drivers/redis/Queue.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 3a17044b88..10fec539bc 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -184,7 +184,9 @@ protected function reserve($timeout) $payload = $this->redis->hget("$this->channel.messages", $id); list($ttr, $message) = explode(';', $payload, 2); - $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id); + if (!empty($ttr)) { + $this->redis->zadd("$this->channel.reserved", time() + $ttr, $id); + } $attempt = $this->redis->hincrby("$this->channel.attempts", $id, 1); return [$id, $message, $ttr, $attempt]; From f156be42e9e4c88a968f5fd38ac2f726bbfc86ed Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Mon, 18 Jun 2018 19:48:57 +0700 Subject: [PATCH 4/6] get back failed job into the queue --- src/drivers/redis/Queue.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 10fec539bc..7e110212bd 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -84,7 +84,12 @@ public function run($repeat, $timeout = 0) list($id, $message, $ttr, $attempt) = $payload; if ($this->handleMessage($id, $message, $ttr, $attempt)) { $this->delete($id); + } else { + // job is failed but we want to return it back into + // the queue + $this->redis->zadd("$this->channel.reserved", time(), $id); } + } elseif (!$repeat) { break; } From 5d4aa70b360a6bdfe17a3c41e28e35d1fe4943cd Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Mon, 30 Jul 2018 22:51:35 +0700 Subject: [PATCH 5/6] allow to use any type of mutex, not only yii\redis\Mutex --- src/drivers/redis/Queue.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 7e110212bd..1766e086aa 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -13,6 +13,7 @@ use yii\queue\cli\Queue as CliQueue; use yii\redis\Connection; use yii\redis\Mutex; +use yii\mutex\Mutex as BaseMutex; /** * Redis Queue. @@ -57,7 +58,7 @@ public function init() { parent::init(); $this->redis = Instance::ensure($this->redis, Connection::class); - $this->mutex = Instance::ensure($this->mutex, Mutex::class); + $this->mutex = Instance::ensure($this->mutex, BaseMutex::class); } /** From 66c75879dda1dcb63d709450ef91796bc3d356f3 Mon Sep 17 00:00:00 2001 From: Arthur Skobara Date: Tue, 31 Jul 2018 00:00:04 +0700 Subject: [PATCH 6/6] use the same timeout of all locks --- src/drivers/redis/Queue.php | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 1766e086aa..bde32cb981 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -22,6 +22,11 @@ */ class Queue extends CliQueue { + /** + * @var int + */ + const MUTEX_TIMEOUT = 0; + /** * @var Connection|array|string */ @@ -36,9 +41,9 @@ class Queue extends CliQueue ]; /** - * @var integer + * @var integer Number of microseconds between attempts to acquire a lock. */ - public $mutexTimeout = 3; + public $acquireTimeout = 10000; /** * @var string @@ -94,6 +99,8 @@ public function run($repeat, $timeout = 0) } elseif (!$repeat) { break; } + } else { + usleep($this->acquireTimeout); } } }); @@ -126,8 +133,8 @@ public function status($id) */ public function clear() { - while (!$this->acquire(0)) { - usleep(10000); + while (!$this->acquire()) { + usleep($this->acquireTimeout); } try { @@ -146,8 +153,8 @@ public function clear() */ public function remove($id) { - while (!$this->acquire(0)) { - usleep(10000); + while (!$this->acquire()) { + usleep($this->acquireTimeout); } try { @@ -249,11 +256,9 @@ protected function pushMessage($message, $ttr, $delay, $priority) * * @return boolean */ - protected function acquire($timeout = null) + protected function acquire() { - $timeout = $timeout !== null ? $timeout : $this->mutexTimeout; - - return $this->mutex->acquire(__CLASS__ . $this->channel, $timeout); + return $this->mutex->acquire(__CLASS__ . $this->channel, self::MUTEX_TIMEOUT); } /**