Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
'definitions' => $params['yiisoft/queue']['queues'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
Expand Down Expand Up @@ -61,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channels']),
'queues' => array_keys($params['yiisoft/queue']['queues']),
],
],
];
4 changes: 2 additions & 2 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channels' => [
QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class,
'queues' => [
QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
8 changes: 4 additions & 4 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use BackedEnum;
use InvalidArgumentException;
use Yiisoft\Queue\ChannelNormalizer;
use Yiisoft\Queue\QueueNameNormalizer;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
Expand All @@ -25,9 +25,9 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL,
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_QUEUE,
) {
$this->channel = ChannelNormalizer::normalize($channel);
$this->channel = QueueNameNormalizer::normalize($channel);
}

public function __destruct()
Expand Down Expand Up @@ -83,7 +83,7 @@ public function subscribe(callable $handlerCallback): void

public function withChannel(string|BackedEnum $channel): self
{
$channel = ChannelNormalizer::normalize($channel);
$channel = QueueNameNormalizer::normalize($channel);

if ($channel === $this->channel) {
return $this;
Expand Down
18 changes: 0 additions & 18 deletions src/ChannelNormalizer.php

This file was deleted.

20 changes: 10 additions & 10 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class ListenAllCommand extends Command
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}
Expand All @@ -36,36 +36,36 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to',
$this->channels,
'Queue name list to connect to',
$this->queues,
)
->addOption(
'pause',
'p',
InputOption::VALUE_REQUIRED,
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
'Pause between queue iterations in seconds. May save some CPU. Default: 1',
1,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. '
'Maximum number of messages to process in each queue before switching to another queue. '
. 'Default is 0 (no limits).',
0,
);

$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
$this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueProvider->get($channel);
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$queues[] = $this->queueProvider->get($queue);
}

$pauseSeconds = (int) $input->getOption('pause');
Expand Down
8 changes: 4 additions & 4 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public function __construct(
public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueProviderInterface::DEFAULT_CHANNEL,
'Queue name to connect to',
QueueProviderInterface::DEFAULT_QUEUE,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueProvider
->get($input->getArgument('channel'))
->get($input->getArgument('queue'))
->listen();

return 0;
Expand Down
20 changes: 10 additions & 10 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,36 @@ final class RunCommand extends Command
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
private readonly array $queues,
) {
parent::__construct();
}

public function configure(): void
{
$this->addArgument(
'channel',
'queue',
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
'Queue channel name list to connect to.',
$this->channels,
'Queue name list to connect to.',
$this->queues,
)
->addOption(
'maximum',
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
'Maximum number of messages to process in each queue. Default is 0 (no limits).',
0,
)
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
->addUsage('[queue1 [queue2 [...]]] --maximum 100');
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
/** @var string $queue */
foreach ($input->getArgument('queue') as $queue) {
$output->write("Processing queue $queue... ");
$count = $this->queueProvider
->get($channel)
->get($queue)
->run((int) $input->getOption('maximum'));

$output->writeln("Messages processed: $count.");
Expand Down
10 changes: 5 additions & 5 deletions src/Debug/QueueCollector.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,18 @@ public function collectStatus(string $id, JobStatus $status): void
}

public function collectPush(
?string $channel,
?string $queueName,
MessageInterface $message,
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): void {
if (!$this->isActive()) {
return;
}
if ($channel === null) {
$channel = 'null';
if ($queueName === null) {
$queueName = 'null';
}

$this->pushes[$channel][] = [
$this->pushes[$queueName][] = [
'message' => $message,
'middlewares' => $middlewareDefinitions,
];
Expand All @@ -69,7 +69,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac
if (!$this->isActive()) {
return;
}
$this->processingMessages[$queue->getChannel()][] = $message;
$this->processingMessages[$queue->getName()][] = $message;
}

public function getSummary(): array
Expand Down
11 changes: 6 additions & 5 deletions src/Debug/QueueDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Yiisoft\Queue\Debug;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
Expand All @@ -30,7 +31,7 @@ public function push(
string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions,
): MessageInterface {
$message = $this->queue->push($message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getChannel(), $message, ...$middlewareDefinitions);
$this->collector->collectPush($this->queue->getName(), $message, ...$middlewareDefinitions);
return $message;
}

Expand All @@ -44,13 +45,13 @@ public function listen(): void
$this->queue->listen();
}

public function withAdapter(AdapterInterface $adapter): static
public function withAdapter(AdapterInterface $adapter, string|BackedEnum|null $queueName = null): static
{
return new self($this->queue->withAdapter($adapter), $this->collector);
return new self($this->queue->withAdapter($adapter, $queueName), $this->collector);
}

public function getChannel(): string
public function getName(): string
{
return $this->queue->getChannel();
return $this->queue->getName();
}
}
9 changes: 5 additions & 4 deletions src/Debug/QueueProviderInterfaceProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
private readonly QueueCollector $collector,
) {}

public function get(string|BackedEnum $channel): QueueInterface
public function get(string|BackedEnum $queue): QueueInterface

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.4-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)

Check failure on line 18 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:18:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::get has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::get (see https://psalm.dev/230)
{
$queue = $this->queueProvider->get($channel);
$queue = $this->queueProvider->get($queue);

return new QueueDecorator($queue, $this->collector);
}

public function has(string|BackedEnum $channel): bool
public function has(string|BackedEnum $queue): bool

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.4-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)

Check failure on line 25 in src/Debug/QueueProviderInterfaceProxy.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

ParamNameMismatch

src/Debug/QueueProviderInterfaceProxy.php:25:43: ParamNameMismatch: Argument 1 of Yiisoft\Queue\Debug\QueueProviderInterfaceProxy::has has wrong name $queue, expecting $queueName as defined by Yiisoft\Queue\Provider\QueueProviderInterface::has (see https://psalm.dev/230)
{
return $this->queueProvider->has($channel);
return $this->queueProvider->has($queue);
}
}
15 changes: 7 additions & 8 deletions src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ public function dispatch(
FailureHandlingRequest $request,
MessageFailureHandlerInterface $finishHandler,
): FailureHandlingRequest {
/** @var string $channel It is always string in this context */
$channel = $request->getQueue()->getChannel();
if (!isset($this->middlewareDefinitions[$channel]) || $this->middlewareDefinitions[$channel] === []) {
$channel = self::DEFAULT_PIPELINE;
$queueName = $request->getQueue()->getName();
if (!isset($this->middlewareDefinitions[$queueName]) || $this->middlewareDefinitions[$queueName] === []) {
$queueName = self::DEFAULT_PIPELINE;
}
$definitions = array_reverse($this->middlewareDefinitions[$channel]);
$definitions = array_reverse($this->middlewareDefinitions[$queueName]);

if (!isset($this->stack[$channel])) {
$this->stack[$channel] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
if (!isset($this->stack[$queueName])) {
$this->stack[$queueName] = new MiddlewareFailureStack($this->buildMiddlewares(...$definitions), $finishHandler);
}

return $this->stack[$channel]->handleFailure($request);
return $this->stack[$queueName]->handleFailure($request);
}

/**
Expand Down
Loading
Loading