Skip to content

Commit b4d22f9

Browse files
viktorproggervjiksamdark
authored
Divide queue and channel names (#259)
Co-authored-by: Sergei Predvoditelev <sergei@predvoditelev.ru> Co-authored-by: Alexander Makarov <sam@rmcreative.ru>
1 parent 6a69c56 commit b4d22f9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+430
-468
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.DEFAULT_GOAL := help
22

3-
PHP_VERSION ?= 8.5
3+
PHP_VERSION ?= 8.4
44
-include .env.local
55

66
DOCKER_RUN := docker run --rm -it \

README.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,9 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;
185185

186186
[
187187
'channel1' => new SynchronousAdapter(),
188-
'channel2' => static fn(SynchronousAdapter $adapter) => $adapter->withChannel('channel2'),
189-
'channel3' => [
188+
'channel2' => new SynchronousAdapter(), // a second instance for a different queue processing pipeline
189+
'channel3' => [ // use a yiisoft/factory syntax for adapter creation
190190
'class' => SynchronousAdapter::class,
191-
'__constructor' => ['channel' => 'channel3'],
192191
],
193192
]
194193
```

config/di.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
return [
3232
AdapterFactoryQueueProvider::class => [
3333
'__construct()' => [
34-
'definitions' => $params['yiisoft/queue']['channels'],
34+
'definitions' => $params['yiisoft/queue']['queues'],
3535
],
3636
],
3737
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
@@ -61,12 +61,12 @@
6161
MessageSerializerInterface::class => JsonMessageSerializer::class,
6262
RunCommand::class => [
6363
'__construct()' => [
64-
'channels' => array_keys($params['yiisoft/queue']['channels']),
64+
'queues' => array_keys($params['yiisoft/queue']['queues']),
6565
],
6666
],
6767
ListenAllCommand::class => [
6868
'__construct()' => [
69-
'channels' => array_keys($params['yiisoft/queue']['channels']),
69+
'queues' => array_keys($params['yiisoft/queue']['queues']),
7070
],
7171
],
7272
];

config/params.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
],
2323
'yiisoft/queue' => [
2424
'handlers' => [],
25-
'channels' => [
26-
QueueProviderInterface::DEFAULT_CHANNEL => AdapterInterface::class,
25+
'queues' => [
26+
QueueProviderInterface::DEFAULT_QUEUE => AdapterInterface::class,
2727
],
2828
'middlewares-push' => [],
2929
'middlewares-consume' => [],

src/Adapter/AdapterInterface.php

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
namespace Yiisoft\Queue\Adapter;
66

7-
use BackedEnum;
87
use InvalidArgumentException;
98
use Yiisoft\Queue\JobStatus;
109
use Yiisoft\Queue\Message\MessageInterface;
@@ -38,8 +37,4 @@ public function push(MessageInterface $message): MessageInterface;
3837
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages.
3938
*/
4039
public function subscribe(callable $handlerCallback): void;
41-
42-
public function withChannel(string|BackedEnum $channel): self;
43-
44-
public function getChannel(): string;
4540
}

src/Adapter/SynchronousAdapter.php

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,9 @@
44

55
namespace Yiisoft\Queue\Adapter;
66

7-
use BackedEnum;
87
use InvalidArgumentException;
9-
use Yiisoft\Queue\ChannelNormalizer;
108
use Yiisoft\Queue\JobStatus;
119
use Yiisoft\Queue\Message\MessageInterface;
12-
use Yiisoft\Queue\Provider\QueueProviderInterface;
1310
use Yiisoft\Queue\QueueInterface;
1411
use Yiisoft\Queue\Worker\WorkerInterface;
1512
use Yiisoft\Queue\Message\IdEnvelope;
@@ -20,15 +17,11 @@ final class SynchronousAdapter implements AdapterInterface
2017
{
2118
private array $messages = [];
2219
private int $current = 0;
23-
private string $channel;
2420

2521
public function __construct(
2622
private readonly WorkerInterface $worker,
2723
private readonly QueueInterface $queue,
28-
string|BackedEnum $channel = QueueProviderInterface::DEFAULT_CHANNEL,
29-
) {
30-
$this->channel = ChannelNormalizer::normalize($channel);
31-
}
24+
) {}
3225

3326
public function __destruct()
3427
{
@@ -80,24 +73,4 @@ public function subscribe(callable $handlerCallback): void
8073
{
8174
$this->runExisting($handlerCallback);
8275
}
83-
84-
public function withChannel(string|BackedEnum $channel): self
85-
{
86-
$channel = ChannelNormalizer::normalize($channel);
87-
88-
if ($channel === $this->channel) {
89-
return $this;
90-
}
91-
92-
$new = clone $this;
93-
$new->channel = $channel;
94-
$new->messages = [];
95-
96-
return $new;
97-
}
98-
99-
public function getChannel(): string
100-
{
101-
return $this->channel;
102-
}
10376
}

src/ChannelNormalizer.php

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/Command/ListenAllCommand.php

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ final class ListenAllCommand extends Command
2525
public function __construct(
2626
private readonly QueueProviderInterface $queueProvider,
2727
private readonly LoopInterface $loop,
28-
private readonly array $channels,
28+
private readonly array $queues,
2929
) {
3030
parent::__construct();
3131
}
@@ -36,36 +36,36 @@ public function __construct(
3636
public function configure(): void
3737
{
3838
$this->addArgument(
39-
'channel',
39+
'queue',
4040
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
41-
'Queue channel name list to connect to',
42-
$this->channels,
41+
'Queue name list to connect to',
42+
$this->queues,
4343
)
4444
->addOption(
4545
'pause',
4646
'p',
4747
InputOption::VALUE_REQUIRED,
48-
'Pause between queue channel iterations in seconds. May save some CPU. Default: 1',
48+
'Pause between queue iterations in seconds. May save some CPU. Default: 1',
4949
1,
5050
)
5151
->addOption(
5252
'maximum',
5353
'm',
5454
InputOption::VALUE_REQUIRED,
55-
'Maximum number of messages to process in each channel before switching to another channel. '
55+
'Maximum number of messages to process in each queue before switching to another queue. '
5656
. 'Default is 0 (no limits).',
5757
0,
5858
);
5959

60-
$this->addUsage('[channel1 [channel2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
60+
$this->addUsage('[queue1 [queue2 [...]]] [--timeout=<timeout>] [--maximum=<maximum>]');
6161
}
6262

6363
protected function execute(InputInterface $input, OutputInterface $output): int
6464
{
6565
$queues = [];
66-
/** @var string $channel */
67-
foreach ($input->getArgument('channel') as $channel) {
68-
$queues[] = $this->queueProvider->get($channel);
66+
/** @var string $queue */
67+
foreach ($input->getArgument('queue') as $queue) {
68+
$queues[] = $this->queueProvider->get($queue);
6969
}
7070

7171
$pauseSeconds = (int) $input->getOption('pause');

src/Command/ListenCommand.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,17 @@ public function __construct(
2626
public function configure(): void
2727
{
2828
$this->addArgument(
29-
'channel',
29+
'queue',
3030
InputArgument::OPTIONAL,
31-
'Queue channel name to connect to',
32-
QueueProviderInterface::DEFAULT_CHANNEL,
31+
'Queue name to connect to',
32+
QueueProviderInterface::DEFAULT_QUEUE,
3333
);
3434
}
3535

3636
protected function execute(InputInterface $input, OutputInterface $output): int
3737
{
3838
$this->queueProvider
39-
->get($input->getArgument('channel'))
39+
->get($input->getArgument('queue'))
4040
->listen();
4141

4242
return 0;

src/Command/RunCommand.php

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,36 @@ final class RunCommand extends Command
2020
{
2121
public function __construct(
2222
private readonly QueueProviderInterface $queueProvider,
23-
private readonly array $channels,
23+
private readonly array $queues,
2424
) {
2525
parent::__construct();
2626
}
2727

2828
public function configure(): void
2929
{
3030
$this->addArgument(
31-
'channel',
31+
'queue',
3232
InputArgument::OPTIONAL | InputArgument::IS_ARRAY,
33-
'Queue channel name list to connect to.',
34-
$this->channels,
33+
'Queue name list to connect to.',
34+
$this->queues,
3535
)
3636
->addOption(
3737
'maximum',
3838
'm',
3939
InputOption::VALUE_REQUIRED,
40-
'Maximum number of messages to process in each channel. Default is 0 (no limits).',
40+
'Maximum number of messages to process in each queue. Default is 0 (no limits).',
4141
0,
4242
)
43-
->addUsage('[channel1 [channel2 [...]]] --maximum 100');
43+
->addUsage('[queue1 [queue2 [...]]] --maximum 100');
4444
}
4545

4646
protected function execute(InputInterface $input, OutputInterface $output): int
4747
{
48-
/** @var string $channel */
49-
foreach ($input->getArgument('channel') as $channel) {
50-
$output->write("Processing channel $channel... ");
48+
/** @var string $queue */
49+
foreach ($input->getArgument('queue') as $queue) {
50+
$output->write("Processing queue $queue... ");
5151
$count = $this->queueProvider
52-
->get($channel)
52+
->get($queue)
5353
->run((int) $input->getOption('maximum'));
5454

5555
$output->writeln("Messages processed: $count.");

0 commit comments

Comments
 (0)