Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@
use Illuminate\Queue\Console\ListenCommand as QueueListenCommand;
use Illuminate\Queue\Console\ListFailedCommand as ListFailedQueueCommand;
use Illuminate\Queue\Console\MonitorCommand as QueueMonitorCommand;
use Illuminate\Queue\Console\PauseCommand as QueuePauseCommand;
use Illuminate\Queue\Console\PauseListCommand as QueuePauseListCommand;
use Illuminate\Queue\Console\PruneBatchesCommand as QueuePruneBatchesCommand;
use Illuminate\Queue\Console\PruneFailedJobsCommand as QueuePruneFailedJobsCommand;
use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand;
use Illuminate\Queue\Console\ResumeCommand as QueueResumeCommand;
use Illuminate\Queue\Console\RetryBatchCommand as QueueRetryBatchCommand;
use Illuminate\Queue\Console\RetryCommand as QueueRetryCommand;
use Illuminate\Queue\Console\TableCommand;
Expand Down Expand Up @@ -150,9 +153,12 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
'QueueForget' => ForgetFailedQueueCommand::class,
'QueueListen' => QueueListenCommand::class,
'QueueMonitor' => QueueMonitorCommand::class,
'QueuePause' => QueuePauseCommand::class,
'QueuePauseList' => QueuePauseListCommand::class,
'QueuePruneBatches' => QueuePruneBatchesCommand::class,
'QueuePruneFailedJobs' => QueuePruneFailedJobsCommand::class,
'QueueRestart' => QueueRestartCommand::class,
'QueueResume' => QueueResumeCommand::class,
'QueueRetry' => QueueRetryCommand::class,
'QueueRetryBatch' => QueueRetryBatchCommand::class,
'QueueWork' => QueueWorkCommand::class,
Expand Down
62 changes: 62 additions & 0 deletions src/Illuminate/Queue/Console/PauseCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:pause')]
class PauseCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:pause {queue : The name of the queue to pause}
{--ttl=86400 : The TTL for the pause in seconds}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Pause processing for a specific queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue pause command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$queue = $this->argument('queue');
$ttl = (int) $this->option('ttl');

$this->manager->pause($queue, $ttl);

$this->components->info("Queue [{$queue}] has been paused.");

return 0;
}
}
69 changes: 69 additions & 0 deletions src/Illuminate/Queue/Console/PauseListCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:pause:list')]
class PauseListCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:pause:list';

/**
* The console command description.
*
* @var string
*/
protected $description = 'List all currently paused queues';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue pause list command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$pausedQueues = $this->manager->getPausedQueues();

if (empty($pausedQueues)) {
$this->components->info('No queues are currently paused.');

return 0;
}

$this->components->info('Paused Queues:');

$this->table(
['Queue Name'],
collect($pausedQueues)->map(fn ($queue) => [$queue])->toArray()
);

return 0;
}
}
66 changes: 66 additions & 0 deletions src/Illuminate/Queue/Console/ResumeCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:resume')]
class ResumeCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:resume {queue : The name of the queue to resume}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Resume processing for a paused queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue resume command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$queue = $this->argument('queue');

if (! $this->manager->isPaused($queue)) {
$this->components->warn("Queue [{$queue}] is not paused.");

return 1;
}

$this->manager->resume($queue);

$this->components->info("Queue [{$queue}] has been resumed.");

return 0;
}
}
82 changes: 82 additions & 0 deletions src/Illuminate/Queue/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,88 @@ public function setApplication($app)
return $this;
}

/**
* Pause a queue by name.
*
* @param string $queue
* @param int $ttl
* @return void
*/
public function pause($queue, $ttl = 86400)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very strange solution to pause just for one day. Why not week, year, or 30 seconds? It's better to pass null as default, meaning by default pausing queue until it's resumed manualy.

For other cases developers still will have a way to pass any ttl they want in certain case

{
$cache = $this->app['cache']->store();

$cache->put("queue_paused:{$queue}", true, $ttl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Operation is not atomic here. In some concurrent situation (pausing two queues same time) threir may be such case:

  • first process getPausedQueues returns []
  • second proccess getPausedQueues returns []
  • first process storePausedQueuesList sets ['queue1']
  • second process storePausedQueuesList sets ['queue2']

finaly queue_paused_list has ['queue2'] and not ['queue1', 'queue2'] as expected.

One way is to use some sort of locks or isolation here.
But maybe it would be better for siplicicty not to provide PauseList functionality at all. Pausing and resuming certain queue (and single isolated cache key) would be enough. Currently framwork does not have any functions to get all existing queues list, so listing just paused queues is redundant.


// Add to the list of paused queues
$pausedQueues = $this->getPausedQueues();

if (! in_array($queue, $pausedQueues)) {
$pausedQueues[] = $queue;
$this->storePausedQueuesList($pausedQueues);
}
}

/**
* Resume a paused queue by name.
*
* @param string $queue
* @return void
*/
public function resume($queue)
{
$cache = $this->app['cache']->store();

$cache->forget("queue_paused:{$queue}");

// Remove from the list of paused queues
$pausedQueues = $this->getPausedQueues();

if (($key = array_search($queue, $pausedQueues)) !== false) {
unset($pausedQueues[$key]);
$this->storePausedQueuesList(array_values($pausedQueues));
}
}

/**
* Determine if a queue is paused.
*
* @param string $queue
* @return bool
*/
public function isPaused($queue)
{
$cache = $this->app['cache']->store();

return (bool) $cache->get("queue_paused:{$queue}", false);
}

/**
* Get all paused queues.
*
* @return array
*/
public function getPausedQueues()
{
$cache = $this->app['cache']->store();

// This implementation stores a list of paused queues
return $cache->get('queue_paused_list', []);
}

/**
* Store the list of paused queues.
*
* @param array $queues
* @return void
*/
protected function storePausedQueuesList($queues)
{
$cache = $this->app['cache']->store();

$cache->forever('queue_paused_list', $queues);
}

/**
* Dynamically pass calls to the default connection.
*
Expand Down
20 changes: 20 additions & 0 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,11 @@ protected function getNextJob($connection, $queue)
}

foreach (explode(',', $queue) as $index => $queue) {
// Skip paused queues
if ($this->isQueuePaused($queue)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider situations, where you have three default queue on three different connections. In current solution, trying to pause it, you will pause all three queues, leaving no way to pause just one.

You should take connection name into account too, to point exactly to specific queue.

continue;
}

if (! is_null($job = $popJobCallback($queue, $index))) {
$this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);

Expand All @@ -387,6 +392,21 @@ protected function getNextJob($connection, $queue)
}
}

/**
* Determine if a queue is paused.
*
* @param string $queue
* @return bool
*/
protected function isQueuePaused($queue)
{
if (! $this->cache) {
return false;
}

return (bool) $this->cache->get("queue_paused:{$queue}", false);
}

/**
* Process the given job.
*
Expand Down
4 changes: 4 additions & 0 deletions src/Illuminate/Support/Facades/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@
* @method static array pushedJobs()
* @method static array rawPushes()
* @method static \Illuminate\Support\Testing\Fakes\QueueFake serializeAndRestore(bool $serializeAndRestore = true)
* @method static void pause(string $queue, int $ttl = 86400)
* @method static void resume(string $queue)
* @method static bool isPaused(string $queue)
* @method static array getPausedQueues()
*
* @see \Illuminate\Queue\QueueManager
* @see \Illuminate\Queue\Queue
Expand Down