diff --git a/src/EnqueueServiceProvider.php b/src/EnqueueServiceProvider.php index 36ac9d8..67d547c 100644 --- a/src/EnqueueServiceProvider.php +++ b/src/EnqueueServiceProvider.php @@ -13,6 +13,11 @@ class EnqueueServiceProvider extends ServiceProvider { + /** + * @var array + */ + protected $extensions = []; + public function boot() { $this->bootInteropQueueDriver(); @@ -36,7 +41,7 @@ private function registerClient() $this->app->singleton(SimpleClient::class, function() { /** @var \Illuminate\Config\Repository $config */ $config = $this->app['config']; - + return new SimpleClient($config->get('enqueue.client')); }); @@ -64,14 +69,14 @@ private function bootInteropQueueDriver() }); $this->app->extend('queue.worker', function ($worker, $app) { - return new Worker( + return (new Worker( $app['queue'], $app['events'], $app[ExceptionHandler::class], function () use ($app) { return $app->isDownForMaintenance(); } - ); + ))->setExtensions($this->extensions); }); } } diff --git a/src/Worker.php b/src/Worker.php index 46d6bce..2a0c1f8 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -40,6 +40,8 @@ class Worker extends \Illuminate\Queue\Worker implements protected $job; + protected $extensions = []; + public function daemon($connectionName, $queueNames, WorkerOptions $options) { $this->connectionName = $connectionName; @@ -56,7 +58,9 @@ public function daemon($connectionName, $queueNames, WorkerOptions $options) } $context = $this->queue->getQueueInteropContext(); - $queueConsumer = new QueueConsumer($context, new ChainExtension([$this])); + $queueConsumer = new QueueConsumer($context, new ChainExtension( + $this->getAllExtensions([$this]) + )); foreach (explode(',', $queueNames) as $queueName) { $queueConsumer->bindCallback($queueName, function() { $this->runJob($this->job, $this->connectionName, $this->options); @@ -85,10 +89,10 @@ public function runNextJob($connectionName, $queueNames, WorkerOptions $options) $context = $this->queue->getQueueInteropContext(); - $queueConsumer = new QueueConsumer($context, new ChainExtension([ + $queueConsumer = new QueueConsumer($context, new ChainExtension($this->getAllExtensions([ $this, new LimitConsumedMessagesExtension(1), - ])); + ]))); foreach (explode(',', $queueNames) as $queueName) { $queueConsumer->bindCallback($queueName, function() { @@ -163,5 +167,20 @@ public function stop($status = 0) parent::stop($status); } -} + public function setExtensions(array $extensions): self + { + $this->extensions = $extensions; + + return $this; + } + + protected function getAllExtensions(array $array): array + { + foreach ($this->extensions as $extension) { + $array[] = $extension; + } + + return $array; + } +}