Skip to content

Commit c57b4b3

Browse files
authored
Merge pull request #534 from php-enqueue/client-impr-topics-commands-configuration
[client] Introduce routes. Foundation for multi transport support.
2 parents b9c49c6 + 67779da commit c57b4b3

File tree

191 files changed

+7435
-10625
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

191 files changed

+7435
-10625
lines changed

Diff for: docs/client/quick_tour.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ use Interop\Queue\PsrProcessor;
8989

9090
/** @var \Enqueue\SimpleClient\SimpleClient $client */
9191

92-
$client->bind('a_bar_topic', 'a_processor_name', function(PsrMessage $psrMessage) {
92+
$client->bindTopic('a_bar_topic', function(PsrMessage $psrMessage) {
9393
// processing logic here
9494

9595
return PsrProcessor::ACK;

Diff for: docs/client/rpc_call.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,14 @@ use Interop\Queue\PsrContext;
1919
use Enqueue\Consumption\Result;
2020
use Enqueue\Consumption\ChainExtension;
2121
use Enqueue\Consumption\Extension\ReplyExtension;
22-
use Enqueue\Client\Config;
2322
use Enqueue\SimpleClient\SimpleClient;
2423

2524
/** @var \Interop\Queue\PsrContext $context */
2625

2726
// composer require enqueue/amqp-ext # or enqueue/amqp-bunny, enqueue/amqp-lib
2827
$client = new SimpleClient('amqp:');
2928

30-
$client->bind(Config::COMMAND_TOPIC, 'square', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
29+
$client->bindCommand('square', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
3130
$number = (int) $message->getBody();
3231

3332
return Result::reply($context->createMessage($number ^ 2));

Diff for: docs/laravel/quick_tour.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ use Interop\Queue\PsrMessage;
7272
use Interop\Queue\PsrProcessor;
7373

7474
$app->resolving(SimpleClient::class, function (SimpleClient $client, $app) {
75-
$client->bind('enqueue_test', 'a_processor', function(PsrMessage $message) {
75+
$client->bindTopic('enqueue_test', function(PsrMessage $message) {
7676
// do stuff here
7777

7878
return PsrProcessor::ACK;

Diff for: docs/quick_tour.md

+9-10
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,16 @@ $client = new SimpleClient('amqp:');
175175

176176
// composer require enqueue/fs
177177
$client = new SimpleClient('file://foo/bar');
178-
179-
$client->setupBroker();
180-
181-
$client->sendEvent('a_foo_topic', 'message');
182-
183-
$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
178+
$client->bindTopic('a_foo_topic', function(PsrMessage $message) {
184179
echo $message->getBody().PHP_EOL;
185180

186181
// your event processor logic here
187182
});
188183

184+
$client->setupBroker();
185+
186+
$client->sendEvent('a_foo_topic', 'message');
187+
189188
// this is a blocking call, it'll consume message until it is interrupted
190189
$client->consume();
191190
```
@@ -207,18 +206,18 @@ $client = new SimpleClient('amqp:');
207206
// composer require enqueue/fs
208207
//$client = new SimpleClient('file://foo/bar');
209208

210-
$client->setupBroker();
211-
212-
$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) {
209+
$client->bindCommand('bar_command', function(PsrMessage $message) {
213210
// your bar command processor logic here
214211
});
215212

216-
$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) {
213+
$client->bindCommand('baz_reply_command', function(PsrMessage $message, PsrContext $context) {
217214
// your baz reply command processor logic here
218215

219216
return Result::reply($context->createMessage('theReplyBody'));
220217
});
221218

219+
$client->setupBroker();
220+
222221
// It is sent to one consumer.
223222
$client->sendCommand('bar_command', 'aMessageData');
224223

Diff for: pkg/async-command/Resources/config/services.yml

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
services:
22
enqueue.async_command.run_command_processor:
33
class: 'Enqueue\AsyncCommand\RunCommandProcessor'
4-
public: public
54
arguments:
65
- '%kernel.project_dir%'
76
tags:
8-
- { name: 'enqueue.client.processor' }
7+
- { name: 'enqueue.command_subscriber', client: 'default' }

Diff for: pkg/async-command/RunCommandProcessor.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ public function process(PsrMessage $message, PsrContext $context): Result
4545
public static function getSubscribedCommand(): array
4646
{
4747
return [
48-
'processorName' => Commands::RUN_COMMAND,
49-
'queueName' => Commands::RUN_COMMAND,
50-
'queueNameHardcoded' => true,
48+
'command' => Commands::RUN_COMMAND,
49+
'queue' => Commands::RUN_COMMAND,
50+
'prefix_queue' => false,
5151
'exclusive' => true,
5252
];
5353
}

Diff for: pkg/async-command/Tests/RunCommandProcessorTest.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ public function testShouldSubscribeOnRunCommand()
4343
$subscription = RunCommandProcessor::getSubscribedCommand();
4444

4545
$this->assertSame([
46-
'processorName' => Commands::RUN_COMMAND,
47-
'queueName' => Commands::RUN_COMMAND,
48-
'queueNameHardcoded' => true,
46+
'command' => Commands::RUN_COMMAND,
47+
'queue' => Commands::RUN_COMMAND,
48+
'prefix_queue' => false,
4949
'exclusive' => true,
5050
], $subscription);
5151
}

Diff for: pkg/async-event-dispatcher/AsyncProcessor.php

+7-8
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22

33
namespace Enqueue\AsyncEventDispatcher;
44

5+
use Enqueue\Client\CommandSubscriberInterface;
56
use Enqueue\Consumption\Result;
67
use Interop\Queue\PsrContext;
78
use Interop\Queue\PsrMessage;
89
use Interop\Queue\PsrProcessor;
910
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
1011

11-
class AsyncProcessor implements PsrProcessor
12+
class AsyncProcessor implements PsrProcessor, CommandSubscriberInterface
1213
{
1314
/**
1415
* @var Registry
@@ -20,10 +21,6 @@ class AsyncProcessor implements PsrProcessor
2021
*/
2122
private $dispatcher;
2223

23-
/**
24-
* @param Registry $registry
25-
* @param EventDispatcherInterface $dispatcher
26-
*/
2724
public function __construct(Registry $registry, EventDispatcherInterface $dispatcher)
2825
{
2926
$this->registry = $registry;
@@ -39,9 +36,6 @@ public function __construct(Registry $registry, EventDispatcherInterface $dispat
3936
$this->dispatcher = $dispatcher;
4037
}
4138

42-
/**
43-
* {@inheritdoc}
44-
*/
4539
public function process(PsrMessage $message, PsrContext $context)
4640
{
4741
if (false == $eventName = $message->getProperty('event_name')) {
@@ -57,4 +51,9 @@ public function process(PsrMessage $message, PsrContext $context)
5751

5852
return self::ACK;
5953
}
54+
55+
public static function getSubscribedCommand()
56+
{
57+
return Commands::DISPATCH_ASYNC_EVENTS;
58+
}
6059
}

Diff for: pkg/async-event-dispatcher/Commands.php

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher;
4+
5+
final class Commands
6+
{
7+
const DISPATCH_ASYNC_EVENTS = 'symfony.dispatch_async_events';
8+
}

Diff for: pkg/async-event-dispatcher/DependencyInjection/AsyncEventsPass.php

+6-7
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99

1010
class AsyncEventsPass implements CompilerPassInterface
1111
{
12-
/**
13-
* {@inheritdoc}
14-
*/
15-
public function process(ContainerBuilder $container)
12+
public function process(ContainerBuilder $container): void
1613
{
1714
if (false == $container->hasDefinition('enqueue.events.async_listener')) {
1815
return;
@@ -45,8 +42,9 @@ public function process(ContainerBuilder $container)
4542
;
4643

4744
$container->getDefinition('enqueue.events.async_processor')
48-
->addTag('enqueue.client.processor', [
49-
'topicName' => 'event.'.$event,
45+
->addTag('enqueue.processor', [
46+
'topic' => 'event.'.$event,
47+
'client' => 'default',
5048
])
5149
;
5250

@@ -78,8 +76,9 @@ public function process(ContainerBuilder $container)
7876
;
7977

8078
$container->getDefinition('enqueue.events.async_processor')
81-
->addTag('enqueue.client.processor', [
79+
->addTag('enqueue.processor', [
8280
'topicName' => 'event.'.$event,
81+
'client' => 'default',
8382
])
8483
;
8584

Diff for: pkg/async-event-dispatcher/Resources/config/services.yml

+4-5
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ services:
3333
- '@enqueue.events.event_dispatcher'
3434
tags:
3535
-
36-
name: 'enqueue.client.processor'
37-
topicName: '__command__'
38-
processorName: '%enqueue_events_queue%'
39-
queueName: '%enqueue_events_queue%'
40-
queueNameHardcoded: true
36+
name: 'enqueue.processor'
37+
command: 'symfony.dispatch_async_events'
38+
queue: '%enqueue_events_queue%'
39+
queue_prefixed: false
4140
exclusive: true
4241

4342
enqueue.events.php_serializer_event_transofrmer:

Diff for: pkg/enqueue-bundle/DependencyInjection/Compiler/AddTopicMetaPass.php

-66
This file was deleted.

Diff for: pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientExtensionsPass.php

-40
This file was deleted.

Diff for: pkg/enqueue-bundle/DependencyInjection/Compiler/BuildClientRoutingPass.php

-47
This file was deleted.

0 commit comments

Comments
 (0)