Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,12 @@
use Illuminate\Queue\Console\ListenCommand as QueueListenCommand;
use Illuminate\Queue\Console\ListFailedCommand as ListFailedQueueCommand;
use Illuminate\Queue\Console\MonitorCommand as QueueMonitorCommand;
use Illuminate\Queue\Console\PauseCommand as QueuePauseCommand;
use Illuminate\Queue\Console\PauseListCommand as QueuePauseListCommand;
use Illuminate\Queue\Console\PruneBatchesCommand as QueuePruneBatchesCommand;
use Illuminate\Queue\Console\PruneFailedJobsCommand as QueuePruneFailedJobsCommand;
use Illuminate\Queue\Console\RestartCommand as QueueRestartCommand;
use Illuminate\Queue\Console\ResumeCommand as QueueResumeCommand;
use Illuminate\Queue\Console\RetryBatchCommand as QueueRetryBatchCommand;
use Illuminate\Queue\Console\RetryCommand as QueueRetryCommand;
use Illuminate\Queue\Console\TableCommand;
Expand Down Expand Up @@ -150,9 +153,12 @@ class ArtisanServiceProvider extends ServiceProvider implements DeferrableProvid
'QueueForget' => ForgetFailedQueueCommand::class,
'QueueListen' => QueueListenCommand::class,
'QueueMonitor' => QueueMonitorCommand::class,
'QueuePause' => QueuePauseCommand::class,
'QueuePauseList' => QueuePauseListCommand::class,
'QueuePruneBatches' => QueuePruneBatchesCommand::class,
'QueuePruneFailedJobs' => QueuePruneFailedJobsCommand::class,
'QueueRestart' => QueueRestartCommand::class,
'QueueResume' => QueueResumeCommand::class,
'QueueRetry' => QueueRetryCommand::class,
'QueueRetryBatch' => QueueRetryBatchCommand::class,
'QueueWork' => QueueWorkCommand::class,
Expand Down
81 changes: 81 additions & 0 deletions src/Illuminate/Queue/Console/PauseCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:pause')]
class PauseCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:pause {queue : The name of the queue to pause (connection:queue format, e.g., redis:default)}
{--ttl= : The TTL for the pause in seconds (omit for indefinite pause)}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Pause processing for a specific queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue pause command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
[$connection, $queue] = $this->parseQueue($this->argument('queue'));

$ttl = $this->option('ttl') !== null ? (int) $this->option('ttl') : null;

$this->manager->pause($connection, $queue, $ttl);

$this->components->info("Queue [{$connection}:{$queue}] has been paused".($ttl ? " for {$ttl} seconds." : ' indefinitely.'));

return 0;
}

/**
* Parse the queue argument into connection and queue name.
*
* @param string $queue
* @return array
*/
protected function parseQueue($queue)
{
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);

if (! isset($queue)) {
$queue = $connection;
$connection = $this->laravel['config']['queue.default'];
}

return [$connection, $queue];
}
}
79 changes: 79 additions & 0 deletions src/Illuminate/Queue/Console/PauseListCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:pause:list')]
class PauseListCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $name = 'queue:pause:list';

/**
* The console command description.
*
* @var string
*/
protected $description = 'List all currently paused queues';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue pause list command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$pausedQueues = $this->manager->getPausedQueues();

if (empty($pausedQueues)) {
$this->components->info('No queues are currently paused.');

return 0;
}

$this->components->info('Paused Queues:');

$tableData = collect($pausedQueues)->map(function ($queue) {
// Parse connection:queue format
$parts = explode(':', $queue, 2);

return [
'connection' => $parts[0] ?? '',
'queue' => $parts[1] ?? $parts[0],
];
})->toArray();

$this->table(
['Connection', 'Queue'],
$tableData
);

return 0;
}
}
84 changes: 84 additions & 0 deletions src/Illuminate/Queue/Console/ResumeCommand.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

namespace Illuminate\Queue\Console;

use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\Factory as QueueManager;
use Symfony\Component\Console\Attribute\AsCommand;

#[AsCommand(name: 'queue:resume')]
class ResumeCommand extends Command
{
/**
* The console command name.
*
* @var string
*/
protected $signature = 'queue:resume {queue : The name of the queue to resume (connection:queue format, e.g., redis:default)}';

/**
* The console command description.
*
* @var string
*/
protected $description = 'Resume processing for a paused queue';

/**
* The queue manager instance.
*
* @var \Illuminate\Contracts\Queue\Factory
*/
protected $manager;

/**
* Create a new queue resume command.
*
* @param \Illuminate\Contracts\Queue\Factory $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();

$this->manager = $manager;
}

/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
[$connection, $queue] = $this->parseQueue($this->argument('queue'));

if (! $this->manager->isPaused($connection, $queue)) {
$this->components->warn("Queue [{$connection}:{$queue}] is not paused.");

return 1;
}

$this->manager->resume($connection, $queue);

$this->components->info("Queue [{$connection}:{$queue}] has been resumed.");

return 0;
}

/**
* Parse the queue argument into connection and queue name.
*
* @param string $queue
* @return array
*/
protected function parseQueue($queue)
{
[$connection, $queue] = array_pad(explode(':', $queue, 2), 2, null);

if (! isset($queue)) {
$queue = $connection;
$connection = $this->laravel['config']['queue.default'];
}

return [$connection, $queue];
}
}
106 changes: 106 additions & 0 deletions src/Illuminate/Queue/QueueManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,112 @@ public function setApplication($app)
return $this;
}

/**
* Pause a queue by name.
*
* @param string $connection
* @param string $queue
* @param int|null $ttl
* @return void
*/
public function pause($connection, $queue, $ttl = null)
{
$cache = $this->app['cache']->store();

if ($ttl === null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you do not need conditions here, put method already implements forever if ttl=null

$cache->forever("queue_paused:{$connection}:{$queue}", true);
} else {
$cache->put("queue_paused:{$connection}:{$queue}", true, $ttl);
}

// Add to the list of paused queues using cache lock for atomicity
$lock = $cache->lock('queue_paused_list_lock', 10);

try {
$lock->block(5);

$pausedQueues = $this->getPausedQueues();
$queueIdentifier = "{$connection}:{$queue}";

if (! in_array($queueIdentifier, $pausedQueues)) {
$pausedQueues[] = $queueIdentifier;
$this->storePausedQueuesList($pausedQueues);
}
} finally {
$lock->release();
}
}

/**
* Resume a paused queue by name.
*
* @param string $connection
* @param string $queue
* @return void
*/
public function resume($connection, $queue)
{
$cache = $this->app['cache']->store();

$cache->forget("queue_paused:{$connection}:{$queue}");

// Remove from the list of paused queues using cache lock for atomicity
$lock = $cache->lock('queue_paused_list_lock', 10);

try {
$lock->block(5);

$pausedQueues = $this->getPausedQueues();
$queueIdentifier = "{$connection}:{$queue}";

if (($key = array_search($queueIdentifier, $pausedQueues)) !== false) {
unset($pausedQueues[$key]);
$this->storePausedQueuesList(array_values($pausedQueues));
}
} finally {
$lock->release();
}
}

/**
* Determine if a queue is paused.
*
* @param string $connection
* @param string $queue
* @return bool
*/
public function isPaused($connection, $queue)
{
$cache = $this->app['cache']->store();

return (bool) $cache->get("queue_paused:{$connection}:{$queue}", false);
}

/**
* Get all paused queues.
*
* @return array
*/
public function getPausedQueues()
{
$cache = $this->app['cache']->store();

return $cache->get('queue_paused_list', []);
}

/**
* Store the list of paused queues.
*
* @param array $queues
* @return void
*/
protected function storePausedQueuesList($queues)
{
$cache = $this->app['cache']->store();

$cache->forever('queue_paused_list', $queues);
}

/**
* Dynamically pass calls to the default connection.
*
Expand Down
Loading