Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,9 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

[
'channel1' => new SynchronousAdapter(),
'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
'channel3' => [
'channel2' => new SynchronousAdapter(), // a second instance for a different queue processing pipeline
'channel3' => [ // use a yiisoft/factory syntax for adapter creation
'class' => SynchronousAdapter::class,
'__constructor' => ['channel' => 'channel3'],
],
]
```
Expand Down
6 changes: 3 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
'definitions' => $params['yiisoft/queue']['queues'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
Expand Down Expand Up @@ -61,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
];
4 changes: 2 additions & 2 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channels' => [
QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class,
'queues' => [
QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
5 changes: 0 additions & 5 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Yiisoft\Queue\Adapter;

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand Down Expand Up @@ -38,8 +37,4 @@ public function push(MessageInterface $message): MessageInterface;
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages.
*/
public function subscribe(callable $handlerCallback): void;

public function withChannel(string|BackedEnum $channel): self;

public function getChannel(): string;
}
29 changes: 1 addition & 28 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@

namespace Yiisoft\Queue\Adapter;

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\ChannelNormalizer;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,15 +17,11 @@ final class SynchronousAdapter implements AdapterInterface
{
private array $messages = [];
private int $current = 0;
private string $channel;

public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL,
) {
$this->channel = ChannelNormalizer::normalize($channel);
}
) {}

public function __destruct()
{
Expand Down Expand Up @@ -80,24 +73,4 @@ public function subscribe(callable $handlerCallback): void
{
$this->runExisting($handlerCallback);
}

public function withChannel(string|BackedEnum $channel): self
{
$channel = ChannelNormalizer::normalize($channel);

if ($channel === $this->channel) {
return $this;
}

$new = clone $this;
$new->channel = $channel;
$new->messages = [];

return $new;
}

public function getChannel(): string
{
return $this->channel;
}
}
18 changes: 0 additions & 18 deletions src/ChannelNormalizer.php

This file was deleted.

20 changes: 10 additions & 10 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}
Expand All @@ -36,47 +36,47 @@
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
'Queue name list to connect to',
$this->queues,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
'Pause between queue iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. '
'Maximum number of messages to process in each queue before switching to another queue. '
. 'Default is 0 (no limits).',
0,
);

$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
$this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueProvider->get($channel);
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$queues[] = $this->queueProvider->get($queue);
}

$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {

Check warning on line 72 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LessThanNegotiation": @@ @@ } $pauseSeconds = (int) $input->getOption('pause'); - if ($pauseSeconds < 0) { + if ($pauseSeconds >= 0) { $pauseSeconds = 1; }

Check warning on line 72 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "LessThan": @@ @@ } $pauseSeconds = (int) $input->getOption('pause'); - if ($pauseSeconds < 0) { + if ($pauseSeconds <= 0) { $pauseSeconds = 1; }
$pauseSeconds = 1;
}

while ($this->loop->canContinue()) {
$hasMessages = false;

Check warning on line 77 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": @@ @@ } while ($this->loop->canContinue()) { - $hasMessages = false; + $hasMessages = true; foreach ($queues as $queue) { $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; }
foreach ($queues as $queue) {
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;

Check warning on line 79 in src/Command/ListenAllCommand.php

View workflow job for this annotation

GitHub Actions / mutation / PHP 8.4-ubuntu-latest

Escaped Mutant for Mutator "GreaterThan": @@ @@ while ($this->loop->canContinue()) { $hasMessages = false; foreach ($queues as $queue) { - $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; + $hasMessages = $queue->run((int) $input->getOption('maximum')) >= 0 || $hasMessages; } if (!$hasMessages) {
}

if (!$hasMessages) {
Expand Down
8 changes: 4 additions & 4 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueProviderInterface::DEFAULT_CHANNEL,
'Queue name to connect to',
QueueProviderInterface::DEFAULT_QUEUE,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueProvider
->get($input->getArgument('channel'))
->get($input->getArgument('queue'))
->listen();

return 0;
Expand Down
20 changes: 10 additions & 10 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,36 @@ final class RunCommand extends Command
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}

public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to.',
$this->channels,
'Queue name list to connect to.',
$this->queues,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
'Maximum number of messages to process in each queue. Default is 0 (no limits).',
0,
)
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
->addUsage('[queue1 [queue2 [...]]] --maximum 100');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$output->write("Processing queue $queue... ");
$count = $this->queueProvider
->get($channel)
->get($queue)
->run((int) $input->getOption('maximum'));

$output->writeln("Messages processed: $count.");
Expand Down
10 changes: 5 additions & 5 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ public function collectStatus(string $id, JobStatus $status): void
}

public function collectPush(
?string $channel,
?string $queueName,
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): void {
if (!$this->isActive()) {
return;
}
if ($channel === null) {
$channel = 'null';
if ($queueName === null) {
$queueName = 'null';
}

$this->pushes[$channel][] = [
$this->pushes[$queueName][] = [
'message' => $message,
'middlewares' => $middlewareDefinitions,
];
Expand All @@ -69,7 +69,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac
if (!$this->isActive()) {
return;
}
$this->processingMessages[$queue->getChannel()][] = $message;
$this->processingMessages[$queue->getName()][] = $message;
}

public function getSummary(): array
Expand Down
11 changes: 6 additions & 5 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Yiisoft\Queue\Debug;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand All @@ -30,7 +31,7 @@ public function push(
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannel(), $message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getName(), $message, ...$middlewareDefinitions);
return $message;
}

Expand All @@ -44,13 +45,13 @@ public function listen(): void
$this->queue->listen();
}

public function withAdapter(AdapterInterface $adapter): static
public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static
{
return new self($this->queue->withAdapter($adapter), $this->collector);
return new self($this->queue->withAdapter($adapter, $queueName), $this->collector);
}

public function getChannel(): string
public function getName(): string
{
return $this->queue->getChannel();
return $this->queue->getName();
}
}
9 changes: 5 additions & 4 deletions src/Debug/QueueProviderInterfaceProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ public function __construct(
private readonly QueueCollector $collector,
) {}

public function get(string|BackedEnum $channel): QueueInterface
public function get(string|BackedEnum $queueName): QueueInterface
{
$queue = $this->queueProvider->get($channel);
$queue = $this->queueProvider->get($queueName);

return new QueueDecorator($queue, $this->collector);
}

public function has(string|BackedEnum $channel): bool
public function has(string|BackedEnum $queueName): bool
{
return $this->queueProvider->has($channel);
return $this->queueProvider->has($queueName);
}
}
15 changes: 7 additions & 8 deletions src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ public function dispatch(
FailureHandlingRequest $request,
MessageFailureHandlerInterface $finishHandler,
): FailureHandlingRequest {
/** @var string $channel It is always string in this context */
$channel = $request->getQueue()->getChannel();
if (!isset($this->middlewareDefinitions[$channel]) || $this->middlewareDefinitions[$channel] === []) {
$channel = self::DEFAULT_PIPELINE;
$queueName = $request->getQueue()->getName();
if (!isset($this->middlewareDefinitions[$queueName]) || $this->middlewareDefinitions[$queueName] === []) {
$queueName = self::DEFAULT_PIPELINE;
}
$definitions = array_reverse($this->middlewareDefinitions[$channel]);
$definitions = array_reverse($this->middlewareDefinitions[$queueName]);

if (!isset($this->stack[$channel])) {
$this->stack[$channel] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
if (!isset($this->stack[$queueName])) {
$this->stack[$queueName] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
}

return $this->stack[$channel]->handleFailure($request);
return $this->stack[$queueName]->handleFailure($request);
}

/**
Expand Down
Loading
Loading