Skip to content

Commit e01527a

Browse files
committed
[client] fix commands routing.
1 parent 6416c9b commit e01527a

File tree

5 files changed

+100
-25
lines changed

5 files changed

+100
-25
lines changed

DependencyInjection/Compiler/BuildClientRoutingPass.php

+12-16
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,25 @@ public function process(ContainerBuilder $container)
2222
return;
2323
}
2424

25-
$configs = [];
25+
$events = [];
26+
$commands = [];
2627
foreach ($container->findTaggedServiceIds($processorTagName) as $serviceId => $tagAttributes) {
2728
$subscriptions = $this->extractSubscriptions($container, $serviceId, $tagAttributes);
2829

2930
foreach ($subscriptions as $subscription) {
30-
$configs[$subscription['topicName']][] = [
31-
$subscription['processorName'],
32-
$subscription['queueName'],
33-
];
31+
if (Config::COMMAND_TOPIC === $subscription['topicName']) {
32+
$commands[$subscription['processorName']] = $subscription['queueName'];
33+
} else {
34+
$events[$subscription['topicName']][] = [
35+
$subscription['processorName'],
36+
$subscription['queueName'],
37+
];
38+
}
3439
}
3540
}
3641

3742
$router = $container->getDefinition($routerId);
38-
$router->replaceArgument(1, $configs);
39-
40-
if (isset($configs[Config::COMMAND_TOPIC])) {
41-
$commandRoutes = [];
42-
43-
foreach ($configs[Config::COMMAND_TOPIC] as $command) {
44-
$commandRoutes[$command[0]] = $command[1];
45-
}
46-
47-
$router->replaceArgument(2, $commandRoutes);
48-
}
43+
$router->replaceArgument(1, $events);
44+
$router->replaceArgument(2, $commands);
4945
}
5046
}

Tests/Functional/App/config/custom-config.yml

+5
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,8 @@ services:
2626
class: 'Enqueue\Bundle\Tests\Functional\TestProcessor'
2727
tags:
2828
- { name: 'enqueue.client.processor' }
29+
30+
test.message.command_processor:
31+
class: 'Enqueue\Bundle\Tests\Functional\TestCommandProcessor'
32+
tags:
33+
- { name: 'enqueue.client.processor' }
+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
namespace Enqueue\Bundle\Tests\Functional;
4+
5+
use Enqueue\Client\CommandSubscriberInterface;
6+
use Interop\Queue\PsrContext;
7+
use Interop\Queue\PsrMessage;
8+
use Interop\Queue\PsrProcessor;
9+
10+
class TestCommandProcessor implements PsrProcessor, CommandSubscriberInterface
11+
{
12+
const COMMAND = 'test-command';
13+
14+
/**
15+
* @var PsrMessage
16+
*/
17+
public $message;
18+
19+
public function process(PsrMessage $message, PsrContext $context)
20+
{
21+
$this->message = $message;
22+
23+
return self::ACK;
24+
}
25+
26+
public static function getSubscribedCommand()
27+
{
28+
return self::COMMAND;
29+
}
30+
}

Tests/Functional/UseCasesTest.php

+47
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,53 @@ public function testProducerSendsMessage(array $enqueueConfig)
179179
$this->assertSame('test message body', $message->getBody());
180180
}
181181

182+
/**
183+
* @dataProvider provideEnqueueConfigs
184+
*/
185+
public function testProducerSendsCommandMessage(array $enqueueConfig)
186+
{
187+
$this->customSetUp($enqueueConfig);
188+
189+
$expectedBody = __METHOD__.time();
190+
191+
$this->getMessageProducer()->sendCommand(TestCommandProcessor::COMMAND, $expectedBody);
192+
193+
$queue = $this->getPsrContext()->createQueue('enqueue.test');
194+
195+
$consumer = $this->getPsrContext()->createConsumer($queue);
196+
197+
$message = $consumer->receive(100);
198+
$consumer->acknowledge($message);
199+
200+
$this->assertInstanceOf(PsrMessage::class, $message);
201+
$this->assertSame($expectedBody, $message->getBody());
202+
}
203+
204+
/**
205+
* @dataProvider provideEnqueueConfigs
206+
*/
207+
public function testClientConsumeCommandMessagesFromExplicitlySetQueue(array $enqueueConfig)
208+
{
209+
$this->customSetUp($enqueueConfig);
210+
211+
$command = $this->container->get('enqueue.client.consume_messages_command');
212+
$processor = $this->container->get('test.message.command_processor');
213+
214+
$expectedBody = __METHOD__.time();
215+
216+
$this->getMessageProducer()->sendCommand(TestCommandProcessor::COMMAND, $expectedBody);
217+
218+
$tester = new CommandTester($command);
219+
$tester->execute([
220+
'--message-limit' => 2,
221+
'--time-limit' => 'now +10 seconds',
222+
'client-queue-names' => ['test'],
223+
]);
224+
225+
$this->assertInstanceOf(PsrMessage::class, $processor->message);
226+
$this->assertEquals($expectedBody, $processor->message->getBody());
227+
}
228+
182229
/**
183230
* @dataProvider provideEnqueueConfigs
184231
*/

Tests/Unit/DependencyInjection/Compiler/BuildClientRoutingPassTest.php

+6-9
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameCommandSubscriber;
1010
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\ProcessorNameTopicSubscriber;
1111
use Enqueue\Bundle\Tests\Unit\DependencyInjection\Compiler\Mock\QueueNameTopicSubscriber;
12-
use Enqueue\Client\Config;
1312
use PHPUnit\Framework\TestCase;
1413
use Symfony\Component\DependencyInjection\ContainerBuilder;
1514
use Symfony\Component\DependencyInjection\Definition;
@@ -250,12 +249,11 @@ public function testShouldBuildRouteFromCommandSubscriberIfOnlyCommandNameSpecif
250249
$pass->process($container);
251250

252251
$expectedRoutes = [
253-
Config::COMMAND_TOPIC => [
254-
['the-command-name', 'aDefaultQueueName'],
255-
],
252+
'the-command-name' => 'aDefaultQueueName',
256253
];
257254

258-
$this->assertEquals($expectedRoutes, $router->getArgument(1));
255+
$this->assertEquals([], $router->getArgument(1));
256+
$this->assertEquals($expectedRoutes, $router->getArgument(2));
259257
}
260258

261259
public function testShouldBuildRouteFromCommandSubscriberIfProcessorNameSpecified()
@@ -274,12 +272,11 @@ public function testShouldBuildRouteFromCommandSubscriberIfProcessorNameSpecifie
274272
$pass->process($container);
275273

276274
$expectedRoutes = [
277-
Config::COMMAND_TOPIC => [
278-
['the-command-name', 'the-command-queue-name'],
279-
],
275+
'the-command-name' => 'the-command-queue-name',
280276
];
281277

282-
$this->assertEquals($expectedRoutes, $router->getArgument(1));
278+
$this->assertEquals([], $router->getArgument(1));
279+
$this->assertEquals($expectedRoutes, $router->getArgument(2));
283280
}
284281

285282
/**

0 commit comments

Comments
 (0)