diff --git a/docs/bundle/debuging.md b/docs/bundle/debuging.md index ac83f9882..72aea01e4 100644 --- a/docs/bundle/debuging.md +++ b/docs/bundle/debuging.md @@ -35,13 +35,13 @@ class DefaultController extends Controller /** @var ProducerInterface $producer */ $producer = $this->get('enqueue.producer'); - $producer->send('foo_topic', 'Hello world'); + $producer->sendEvent('foo_topic', 'Hello world'); - $producer->send('bar_topic', ['bar' => 'val']); + $producer->sendEvent('bar_topic', ['bar' => 'val']); $message = new Message(); $message->setBody('baz'); - $producer->send('baz_topic', $message); + $producer->sendEvent('baz_topic', $message); // ... } diff --git a/docs/bundle/job_queue.md b/docs/bundle/job_queue.md index 77dd6ee6a..09a9ea175 100644 --- a/docs/bundle/job_queue.md +++ b/docs/bundle/job_queue.md @@ -89,7 +89,7 @@ class Step1Processor implements PsrProcessor $runner->createDelayed( $jobName, function (JobRunner $runner, Job $childJob) use ($entity) { - $this->producer->send('search:index:index-single-entity', [ + $this->producer->sendEvent('search:index:index-single-entity', [ 'entityId' => $entity->getId(), 'jobId' => $childJob->getId(), ]); diff --git a/docs/bundle/message_producer.md b/docs/bundle/message_producer.md index 6efad8296..6175804c2 100644 --- a/docs/bundle/message_producer.md +++ b/docs/bundle/message_producer.md @@ -24,8 +24,13 @@ $context->createProducer()->send( The client is shipped with two types of producers. The first one sends messages immediately where another one (it is called spool producer) collects them in memory and sends them `onTerminate` event (the response is already sent). +The producer has two types on send methods: +* `sendEvent` - Message is sent to topic and many consumers can subscriber to it. It is "fire and forget" strategy. The even could be sent to "message bus" to other applications. +* `sendCommand` - Message is to ONE exact consumer. It could be used as "fire and forget" or as RPC. The command message is always sent in scope of current application. +### Send event + ```php get('enqueue.producer'); // message is being sent right now -$producer->send('a_topic', 'Hello there!'); +$producer->sendEvent('a_topic', 'Hello there!'); + + +/** @var \Enqueue\Client\SpoolProducer $spoolProducer */ +$spoolProducer = $container->get('enqueue.spool_producer'); + +// message is being sent on console.terminate or kernel.terminate event +$spoolProducer->sendEvent('a_topic', 'Hello there!'); + +// you could send queued messages manually by calling flush method +$spoolProducer->flush(); +``` + +### Send command + +```php +get('enqueue.producer'); + +// message is being sent right now, we use it as RPC +$promise = $producer->sendCommand('a_processor_name', 'Hello there!', $needReply = true); + +$replyMessage = $promise->receive(); /** @var \Enqueue\Client\SpoolProducer $spoolProducer */ $spoolProducer = $container->get('enqueue.spool_producer'); // message is being sent on console.terminate or kernel.terminate event -$spoolProducer->send('a_topic', 'Hello there!'); +$spoolProducer->sendCommand('a_processor_name', 'Hello there!'); // you could send queued messages manually by calling flush method $spoolProducer->flush(); diff --git a/docs/bundle/quick_tour.md b/docs/bundle/quick_tour.md index d67cd306d..b6d8c9036 100644 --- a/docs/bundle/quick_tour.md +++ b/docs/bundle/quick_tour.md @@ -60,7 +60,12 @@ use Enqueue\Client\Producer; /** @var Producer $producer **/ $producer = $container->get('enqueue.producer'); -$producer->send('aFooTopic', 'Something has happened'); + +// send event to many consumers +$producer->sendEvent('aFooTopic', 'Something has happened'); + +// send command to ONE consumer +$producer->sendCommand('aProcessorName', 'Something has happened'); ``` To consume messages you have to first create a message processor: diff --git a/docs/client/message_examples.md b/docs/client/message_examples.md index ff7201fad..11e13a100 100644 --- a/docs/client/message_examples.md +++ b/docs/client/message_examples.md @@ -1,4 +1,28 @@ # Client. Message examples + +* [Scope](#scope) +* [Delay](#delay) +* [Expiration (TTL)](#expiration-ttl) +* [Priority](#priority) +* [Timestamp, Content type, Message id](#timestamp-content-type-message-id) + +## Scope + +There are two two types possible scopes: `Message:SCOPE_MESSAGE_BUS` and `Message::SCOPE_APP`. +The first one instructs the client send messages (if driver supports) to the message bus so other apps can consume those messages. +The second in turns limits the message to the application that sent it. No other apps could receive it. + +```php +setScope(Message::SCOPE_MESSAGE_BUS); + +/** @var \Enqueue\Client\ProducerInterface $producer */ +$producer->sendEvent('aTopic', $message); +``` ## Delay @@ -15,7 +39,7 @@ $message = new Message(); $message->setDelay(60); // seconds /** @var \Enqueue\Client\ProducerInterface $producer */ -$producer->send('aTopic', $message); +$producer->sendEvent('aTopic', $message); ``` ## Expiration (TTL) @@ -33,7 +57,7 @@ $message = new Message(); $message->setExpire(60); // seconds /** @var \Enqueue\Client\ProducerInterface $producer */ -$producer->send('aTopic', $message); +$producer->sendEvent('aTopic', $message); ``` ## Priority @@ -52,7 +76,7 @@ $message = new Message(); $message->setPriority(MessagePriority::HIGH); /** @var \Enqueue\Client\ProducerInterface $producer */ -$producer->send('aTopic', $message); +$producer->sendEvent('aTopic', $message); ``` ## Timestamp, Content type, Message id @@ -72,7 +96,7 @@ $message->setTimestamp(time()); $message->setContentType('text/plain'); /** @var \Enqueue\Client\ProducerInterface $producer */ -$producer->send('aTopic', $message); +$producer->sendEvent('aTopic', $message); ``` [back to index](../index.md) diff --git a/docs/job_queue/run_sub_job.md b/docs/job_queue/run_sub_job.md index 3e946ce8b..0c819e889 100644 --- a/docs/job_queue/run_sub_job.md +++ b/docs/job_queue/run_sub_job.md @@ -26,7 +26,7 @@ class RootJobProcessor implements PsrProcessor { $result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) { $runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) { - $this->producer->send('aJobTopic', [ + $this->producer->sendEvent('aJobTopic', [ 'jobId' => $childJob->getId(), // other data required by sub job ]); diff --git a/docs/quick_tour.md b/docs/quick_tour.md index 7159af26a..a49bdc5bc 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -163,7 +163,7 @@ It provides easy to use services for producing and processing messages. It supports unified format for setting message expiration, delay, timestamp, correlation id. It supports message bus so different applications can talk to each other. -Here's an example of how you can send and consume messages. +Here's an example of how you can send and consume event messages. ```php setupBroker(); $client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) { - // your processing logic here + // your event processor logic here }); -$client->send('a_bar_topic', 'aMessageData'); - -// in another process you can consume messages. +// this is a blocking call, it'll consume message until it is interrupted $client->consume(); ``` +and command messages: + +```php + + +$client->setupBroker(); + +$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) { + // your bar command processor logic here +}); + +$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) { + // your baz reply command processor logic here + + return Result::reply($context->createMessage('theReplyBody')); +}); + +// It is sent to one consumer. +$client->sendCommand('bar_command', 'aMessageData'); + +// It is possible to get reply +$promise = $client->sendCommand('bar_command', 'aMessageData', true); + +// you can send several commands and only after start getting replies. + +$replyMessage = $promise->receive(2000); // 2 sec + +// this is a blocking call, it'll consume message until it is interrupted +$client->consume([new ReplyExtension()]); +``` + ## Cli commands The library provides handy commands out of the box. diff --git a/pkg/enqueue-bundle/Events/AsyncListener.php b/pkg/enqueue-bundle/Events/AsyncListener.php index 78f964dce..ae4f5b242 100644 --- a/pkg/enqueue-bundle/Events/AsyncListener.php +++ b/pkg/enqueue-bundle/Events/AsyncListener.php @@ -60,7 +60,7 @@ public function onEvent(Event $event = null, $eventName) $message->setProperty('event_name', $eventName); $message->setProperty('transformer_name', $transformerName); - $this->producer->send('event.'.$eventName, $message); + $this->producer->sendEvent('event.'.$eventName, $message); } } } diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 637b9d98a..cc2b30f73 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -7,6 +7,7 @@ services: class: 'Enqueue\Client\Producer' arguments: - '@enqueue.client.driver' + - '@enqueue.client.rpc_factory' - '@enqueue.client.extensions' enqueue.client.spool_producer: @@ -24,18 +25,15 @@ services: alias: 'enqueue.client.producer' enqueue.client.producer_v2: - class: 'Enqueue\Client\ProducerV2' - arguments: - - '@enqueue.client.producer' - - '@enqueue.client.rpc_client' + alias: 'enqueue.client.producer' enqueue.spool_producer: alias: 'enqueue.client.spool_producer' - enqueue.client.rpc_client: - class: 'Enqueue\Client\RpcClient' + enqueue.client.rpc_factory: + class: 'Enqueue\Rpc\RpcFactory' + public: false arguments: - - '@enqueue.client.producer' - '@enqueue.transport.context' enqueue.client.router_processor: diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml index 6a6d7799a..66dc0d1a8 100644 --- a/pkg/enqueue-bundle/Resources/config/services.yml +++ b/pkg/enqueue-bundle/Resources/config/services.yml @@ -18,7 +18,14 @@ services: tags: - { name: 'console.command' } + enqueue.transport.rpc_factory: + class: 'Enqueue\Rpc\RpcFactory' + public: false + arguments: + - '@enqueue.transport.context' + enqueue.transport.rpc_client: class: 'Enqueue\Rpc\RpcClient' arguments: - '@enqueue.transport.context' + - '@enqueue.transport.rpc_factory' diff --git a/pkg/enqueue-bundle/Tests/Functional/Client/ProducerTest.php b/pkg/enqueue-bundle/Tests/Functional/Client/ProducerTest.php index 94e56ff14..c77f6a677 100644 --- a/pkg/enqueue-bundle/Tests/Functional/Client/ProducerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/Client/ProducerTest.php @@ -3,13 +3,31 @@ namespace Enqueue\Bundle\Tests\Functional\Client; use Enqueue\Bundle\Tests\Functional\WebTestCase; +use Enqueue\Client\Config; +use Enqueue\Client\Message; use Enqueue\Client\ProducerInterface; +use Enqueue\Client\TraceableProducer; +use Enqueue\Rpc\Promise; /** * @group functional */ class ProducerTest extends WebTestCase { + public function setUp() + { + parent::setUp(); + + $this->container->get('enqueue.client.producer')->clearTraces(); + } + + public function tearDown() + { + parent::tearDown(); + + $this->container->get('enqueue.client.producer')->clearTraces(); + } + public function testCouldBeGetFromContainerAsService() { $messageProducer = $this->container->get('enqueue.client.producer'); @@ -24,4 +42,99 @@ public function testCouldBeGetFromContainerAsShortenAlias() $this->assertSame($messageProducer, $aliasMessageProducer); } + + public function testShouldSendEvent() + { + /** @var ProducerInterface $producer */ + $producer = $this->container->get('enqueue.client.producer'); + + $producer->sendEvent('theTopic', 'theMessage'); + + $traces = $this->getTraceableProducer()->getTopicTraces('theTopic'); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + } + + public function testShouldSendCommandWithoutNeedForReply() + { + /** @var ProducerInterface $producer */ + $producer = $this->container->get('enqueue.client.producer'); + + $result = $producer->sendCommand('theCommand', 'theMessage', false); + + $this->assertNull($result); + + $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + } + + public function testShouldSendMessageInstanceAsCommandWithoutNeedForReply() + { + /** @var ProducerInterface $producer */ + $producer = $this->container->get('enqueue.client.producer'); + + $message = new Message('theMessage'); + + $result = $producer->sendCommand('theCommand', $message, false); + + $this->assertNull($result); + + $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + $this->assertEquals([ + 'enqueue.topic_name' => Config::COMMAND_TOPIC, + 'enqueue.processor_name' => 'theCommand', + 'enqueue.processor_queue_name' => 'default', + ], $traces[0]['properties']); + } + + public function testShouldSendCommandWithNeedForReply() + { + /** @var ProducerInterface $producer */ + $producer = $this->container->get('enqueue.client.producer'); + + $result = $producer->sendCommand('theCommand', 'theMessage', true); + + $this->assertInstanceOf(Promise::class, $result); + + $traces = $this->getTraceableProducer()->getCommandTraces('theCommand'); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + } + + public function testShouldSendMessageInstanceCommandWithNeedForReply() + { + /** @var ProducerInterface $producer */ + $producer = $this->container->get('enqueue.client.producer'); + + $message = new Message('theMessage'); + + $result = $producer->sendCommand('theCommand', $message, true); + + $this->assertInstanceOf(Promise::class, $result); + + $traces = $this->getTraceableProducer()->getCommandTraces('theCommand'); + + $this->assertCount(1, $traces); + $this->assertEquals('theMessage', $traces[0]['body']); + $this->assertEquals([ + 'enqueue.topic_name' => Config::COMMAND_TOPIC, + 'enqueue.processor_name' => 'theCommand', + 'enqueue.processor_queue_name' => 'default', + ], $traces[0]['properties']); + } + + /** + * @return TraceableProducer|object + */ + private function getTraceableProducer() + { + return $this->container->get('enqueue.client.producer'); + } } diff --git a/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php b/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php deleted file mode 100644 index 58bf1aeeb..000000000 --- a/pkg/enqueue-bundle/Tests/Functional/Client/ProducerV2Test.php +++ /dev/null @@ -1,97 +0,0 @@ -container->get('enqueue.client.producer')->clearTraces(); - } - - public function tearDown() - { - parent::tearDown(); - - $this->container->get('enqueue.client.producer')->clearTraces(); - } - - public function testCouldBeGetFromContainerAsService() - { - $producer = $this->container->get('enqueue.client.producer_v2'); - - $this->assertInstanceOf(ProducerV2Interface::class, $producer); - } - - public function testShouldSendEvent() - { - /** @var ProducerV2Interface $producer */ - $producer = $this->container->get('enqueue.client.producer_v2'); - - $producer->sendEvent('theTopic', 'theMessage'); - - $traces = $this->getTraceableProducer()->getTopicTraces('theTopic'); - - $this->assertCount(1, $traces); - $this->assertEquals('theMessage', $traces[0]['body']); - } - - public function testShouldSendCommandWithoutNeedForReply() - { - /** @var ProducerV2Interface $producer */ - $producer = $this->container->get('enqueue.client.producer_v2'); - - $result = $producer->sendCommand('theCommand', 'theMessage', false); - - $this->assertNull($result); - - $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); - - $this->assertCount(1, $traces); - $this->assertEquals('theMessage', $traces[0]['body']); - $this->assertEquals([ - 'enqueue.topic_name' => Config::COMMAND_TOPIC, - 'enqueue.processor_name' => 'theCommand', - 'enqueue.processor_queue_name' => 'default', - ], $traces[0]['properties']); - } - - public function testShouldSendCommandWithNeedForReply() - { - /** @var ProducerV2Interface $producer */ - $producer = $this->container->get('enqueue.client.producer_v2'); - - $result = $producer->sendCommand('theCommand', 'theMessage', true); - - $this->assertInstanceOf(Promise::class, $result); - - $traces = $this->getTraceableProducer()->getTopicTraces(Config::COMMAND_TOPIC); - - $this->assertCount(1, $traces); - $this->assertEquals('theMessage', $traces[0]['body']); - $this->assertEquals([ - 'enqueue.topic_name' => Config::COMMAND_TOPIC, - 'enqueue.processor_name' => 'theCommand', - 'enqueue.processor_queue_name' => 'default', - ], $traces[0]['properties']); - } - - /** - * @return TraceableProducer|object - */ - private function getTraceableProducer() - { - return $this->container->get('enqueue.client.producer'); - } -} diff --git a/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php b/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php index 1a9b9b95c..f9c9fcb41 100644 --- a/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php @@ -2,6 +2,8 @@ namespace Enqueue\Bundle\Tests\Functional; +use Enqueue\Rpc\RpcClient; + /** * @group functional */ @@ -11,13 +13,6 @@ public function testTransportRpcClientCouldBeGetFromContainerAsService() { $connection = $this->container->get('enqueue.transport.rpc_client'); - $this->assertInstanceOf(\Enqueue\Rpc\RpcClient::class, $connection); - } - - public function testClientRpcClientCouldBeGetFromContainerAsService() - { - $connection = $this->container->get('enqueue.client.rpc_client'); - - $this->assertInstanceOf(\Enqueue\Client\RpcClient::class, $connection); + $this->assertInstanceOf(RpcClient::class, $connection); } } diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index 38fca16f7..c4a298af9 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -167,7 +167,7 @@ public function testProducerSendsMessage(array $enqueueConfig) { $this->customSetUp($enqueueConfig); - $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, 'test message body'); $queue = $this->getPsrContext()->createQueue('enqueue.test'); @@ -189,7 +189,7 @@ public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueCo $command = $this->container->get('enqueue.client.consume_messages_command'); $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ @@ -213,7 +213,7 @@ public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $e $command->setContainer($this->container); $processor = $this->container->get('test.message.processor'); - $this->getMessageProducer()->send(TestProcessor::TOPIC, 'test message body'); + $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, 'test message body'); $tester = new CommandTester($command); $tester->execute([ diff --git a/pkg/enqueue-bundle/Tests/Unit/Events/AsyncListenerTest.php b/pkg/enqueue-bundle/Tests/Unit/Events/AsyncListenerTest.php index 19879031e..0b0176156 100644 --- a/pkg/enqueue-bundle/Tests/Unit/Events/AsyncListenerTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/Events/AsyncListenerTest.php @@ -25,7 +25,7 @@ public function testShouldDoNothingIfSyncModeOn() $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $registry = $this->createRegistryMock(); @@ -74,7 +74,7 @@ public function testShouldSendMessageIfSyncModeOff() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with('event.fooEvent', $this->identicalTo($message)) ; diff --git a/pkg/enqueue/Client/Producer.php b/pkg/enqueue/Client/Producer.php index ade90049f..a7460aee5 100644 --- a/pkg/enqueue/Client/Producer.php +++ b/pkg/enqueue/Client/Producer.php @@ -2,10 +2,11 @@ namespace Enqueue\Client; +use Enqueue\Rpc\RpcFactory; use Enqueue\Util\JSON; use Enqueue\Util\UUID; -class Producer implements ProducerInterface +class Producer implements ProducerInterface, ProducerV2Interface { /** * @var DriverInterface @@ -18,18 +19,31 @@ class Producer implements ProducerInterface private $extension; /** - * @param DriverInterface $driver + * @var RpcFactory */ - public function __construct(DriverInterface $driver, ExtensionInterface $extension = null) - { + private $rpcFactory; + + /** + * @param DriverInterface $driver + * @param ExtensionInterface|null $extension + * @param RpcFactory $rpcFactory + * + * @internal param RpcClient $rpcClient + */ + public function __construct( + DriverInterface $driver, + RpcFactory $rpcFactory, + ExtensionInterface $extension = null + ) { $this->driver = $driver; + $this->rpcFactory = $rpcFactory; $this->extension = $extension ?: new ChainExtension([]); } /** * {@inheritdoc} */ - public function send($topic, $message) + public function sendEvent($topic, $message) { if (false == $message instanceof Message) { $body = $message; @@ -80,6 +94,51 @@ public function send($topic, $message) } } + /** + * {@inheritdoc} + */ + public function sendCommand($command, $message, $needReply = false) + { + if (false == $message instanceof Message) { + $message = new Message($message); + } + + $deleteReplyQueue = false; + $replyTo = $message->getReplyTo(); + + if ($needReply) { + if (false == $replyTo) { + $message->setReplyTo($replyTo = $this->rpcFactory->createReplyTo()); + $deleteReplyQueue = true; + } + + if (false == $message->getCorrelationId()) { + $message->setCorrelationId(UUID::generate()); + } + } + + $message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); + $message->setScope(Message::SCOPE_APP); + + $this->sendEvent(Config::COMMAND_TOPIC, $message); + + if ($needReply) { + $promise = $this->rpcFactory->createPromise($replyTo, $message->getCorrelationId(), 60000); + $promise->setDeleteReplyQueue($deleteReplyQueue); + + return $promise; + } + } + + /** + * {@inheritdoc} + */ + public function send($topic, $message) + { + $this->sendEvent($topic, $message); + } + /** * @param Message $message */ diff --git a/pkg/enqueue/Client/ProducerInterface.php b/pkg/enqueue/Client/ProducerInterface.php index 67bfaf033..bc8426704 100644 --- a/pkg/enqueue/Client/ProducerInterface.php +++ b/pkg/enqueue/Client/ProducerInterface.php @@ -2,9 +2,28 @@ namespace Enqueue\Client; +use Enqueue\Rpc\Promise; + interface ProducerInterface { /** + * @param string $topic + * @param string|array|Message $message + */ + public function sendEvent($topic, $message); + + /** + * @param string $command + * @param string|array|Message $message + * @param bool $needReply + * + * @return Promise|null the promise is returned if needReply argument is true + */ + public function sendCommand($command, $message, $needReply = false); + + /** + * @deprecated use sendEvent method. + * * Sends a message to a topic. There are some message processor may be subscribed to a topic. * * @param string $topic diff --git a/pkg/enqueue/Client/ProducerV2.php b/pkg/enqueue/Client/ProducerV2.php deleted file mode 100644 index 8a69dfd27..000000000 --- a/pkg/enqueue/Client/ProducerV2.php +++ /dev/null @@ -1,57 +0,0 @@ -realProducer = $realProducer; - $this->rpcClient = $rpcClient; - } - - /** - * {@inheritdoc} - */ - public function sendEvent($topic, $message) - { - $this->realProducer->send($topic, $message); - } - - /** - * {@inheritdoc} - */ - public function sendCommand($command, $message, $needReply = false) - { - if (false == $message instanceof Message) { - $message = new Message($message); - } - - $message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC); - $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $command); - $message->setScope(Message::SCOPE_APP); - - if ($needReply) { - return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60000); - } - - $this->realProducer->send(Config::COMMAND_TOPIC, $message); - } -} diff --git a/pkg/enqueue/Client/ProducerV2Interface.php b/pkg/enqueue/Client/ProducerV2Interface.php index 4013bdbe3..c661f0c47 100644 --- a/pkg/enqueue/Client/ProducerV2Interface.php +++ b/pkg/enqueue/Client/ProducerV2Interface.php @@ -2,25 +2,11 @@ namespace Enqueue\Client; -use Enqueue\Rpc\Promise; - /** * @experimental + * + * @deprecated */ -interface ProducerV2Interface +interface ProducerV2Interface extends ProducerInterface { - /** - * @param string $topic - * @param string|array|Message $message - */ - public function sendEvent($topic, $message); - - /** - * @param string $command - * @param string|array|Message $message - * @param bool $needReply - * - * @return Promise|null the promise is returned if needReply argument is true - */ - public function sendCommand($command, $message, $needReply = false); } diff --git a/pkg/enqueue/Client/RpcClient.php b/pkg/enqueue/Client/RpcClient.php deleted file mode 100644 index 6ca123a6a..000000000 --- a/pkg/enqueue/Client/RpcClient.php +++ /dev/null @@ -1,130 +0,0 @@ -context = $context; - $this->producer = $producer; - } - - /** - * @param string $topic - * @param string|array|Message $message - * @param int $timeout - * - * @return PsrMessage - */ - public function call($topic, $message, $timeout) - { - return $this->callAsync($topic, $message, $timeout)->receive(); - } - - /** - * @param string $topic - * @param string|array|Message $message $message - * @param int $timeout - * - * @return Promise - */ - public function callAsync($topic, $message, $timeout) - { - if ($timeout < 1) { - throw new \InvalidArgumentException(sprintf('Timeout must be positive not zero integer. Got %s', $timeout)); - } - - if (false == $message instanceof Message) { - $body = $message; - $message = new Message(); - $message->setBody($body); - } - - if ($message->getReplyTo()) { - $replyQueue = $this->context->createQueue($message->getReplyTo()); - $deleteReplyQueue = false; - } else { - $replyQueue = $this->context->createTemporaryQueue(); - $message->setReplyTo($replyQueue->getQueueName()); - $deleteReplyQueue = true; - } - - if (false == $message->getCorrelationId()) { - $message->setCorrelationId(UUID::generate()); - } - - $this->producer->send($topic, $message); - - $correlationId = $message->getCorrelationId(); - - $receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) { - $runTimeout = $promiseTimeout ?: $timeout; - $endTime = time() + ((int) ($runTimeout / 1000)); - $consumer = $this->context->createConsumer($replyQueue); - - do { - if ($message = $consumer->receive($runTimeout)) { - if ($message->getCorrelationId() === $correlationId) { - $consumer->acknowledge($message); - - return $message; - } - - $consumer->reject($message, true); - } - } while (time() < $endTime); - - throw TimeoutException::create($runTimeout, $correlationId); - }; - - $receiveNoWait = function () use ($replyQueue, $correlationId) { - static $consumer; - - if (null === $consumer) { - $consumer = $this->context->createConsumer($replyQueue); - } - - if ($message = $consumer->receiveNoWait()) { - if ($message->getCorrelationId() === $correlationId) { - $consumer->acknowledge($message); - - return $message; - } - - $consumer->reject($message, true); - } - }; - - $finally = function (Promise $promise) use ($replyQueue) { - if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { - $this->context->deleteQueue($replyQueue); - } - }; - - $promise = new Promise($receive, $receiveNoWait, $finally); - $promise->setDeleteReplyQueue($deleteReplyQueue); - - return $promise; - } -} diff --git a/pkg/enqueue/Client/SpoolProducer.php b/pkg/enqueue/Client/SpoolProducer.php index 43e9a4e4d..c5388770f 100644 --- a/pkg/enqueue/Client/SpoolProducer.php +++ b/pkg/enqueue/Client/SpoolProducer.php @@ -12,7 +12,12 @@ class SpoolProducer implements ProducerInterface /** * @var array */ - private $queue; + private $events; + + /** + * @var array + */ + private $commands; /** * @param ProducerInterface $realProducer @@ -21,7 +26,28 @@ public function __construct(ProducerInterface $realProducer) { $this->realProducer = $realProducer; - $this->queue = new \SplQueue(); + $this->events = new \SplQueue(); + $this->commands = new \SplQueue(); + } + + /** + * {@inheritdoc} + */ + public function sendCommand($command, $message, $needReply = false) + { + if ($needReply) { + return $this->realProducer->sendCommand($command, $message, $needReply); + } + + $this->commands->enqueue([$command, $message]); + } + + /** + * {@inheritdoc} + */ + public function sendEvent($topic, $message) + { + $this->events->enqueue([$topic, $message]); } /** @@ -29,7 +55,7 @@ public function __construct(ProducerInterface $realProducer) */ public function send($topic, $message) { - $this->queue->enqueue([$topic, $message]); + $this->sendEvent($topic, $message); } /** @@ -37,10 +63,16 @@ public function send($topic, $message) */ public function flush() { - while (false == $this->queue->isEmpty()) { - list($topic, $message) = $this->queue->dequeue(); + while (false == $this->events->isEmpty()) { + list($topic, $message) = $this->events->dequeue(); + + $this->realProducer->sendEvent($topic, $message); + } + + while (false == $this->commands->isEmpty()) { + list($command, $message) = $this->commands->dequeue(); - $this->realProducer->send($topic, $message); + $this->realProducer->sendCommand($command, $message); } } } diff --git a/pkg/enqueue/Client/TraceableProducer.php b/pkg/enqueue/Client/TraceableProducer.php index 20108248c..cd55a91d6 100644 --- a/pkg/enqueue/Client/TraceableProducer.php +++ b/pkg/enqueue/Client/TraceableProducer.php @@ -24,35 +24,31 @@ public function __construct(ProducerInterface $producer) /** * {@inheritdoc} */ - public function send($topic, $message) + public function sendEvent($topic, $message) { - $this->producer->send($topic, $message); + $this->producer->sendEvent($topic, $message); - $trace = [ - 'topic' => $topic, - 'body' => $message, - 'headers' => [], - 'properties' => [], - 'priority' => null, - 'expire' => null, - 'delay' => null, - 'timestamp' => null, - 'contentType' => null, - 'messageId' => null, - ]; - if ($message instanceof Message) { - $trace['body'] = $message->getBody(); - $trace['headers'] = $message->getHeaders(); - $trace['properties'] = $message->getProperties(); - $trace['priority'] = $message->getPriority(); - $trace['expire'] = $message->getExpire(); - $trace['delay'] = $message->getDelay(); - $trace['timestamp'] = $message->getTimestamp(); - $trace['contentType'] = $message->getContentType(); - $trace['messageId'] = $message->getMessageId(); - } + $this->collectTrace($topic, null, $message); + } - $this->traces[] = $trace; + /** + * {@inheritdoc} + */ + public function sendCommand($command, $message, $needReply = false) + { + $result = $this->producer->sendCommand($command, $message, $needReply); + + $this->collectTrace(Config::COMMAND_TOPIC, $command, $message); + + return $result; + } + + /** + * {@inheritdoc} + */ + public function send($topic, $message) + { + $this->sendEvent($topic, $message); } /** @@ -72,6 +68,23 @@ public function getTopicTraces($topic) return $topicTraces; } + /** + * @param string $command + * + * @return array + */ + public function getCommandTraces($command) + { + $commandTraces = []; + foreach ($this->traces as $trace) { + if ($command == $trace['command']) { + $commandTraces[] = $trace; + } + } + + return $commandTraces; + } + /** * @return array */ @@ -84,4 +97,39 @@ public function clearTraces() { $this->traces = []; } + + /** + * @param string|null $topic + * @param string|null $command + * @param mixed $message + */ + private function collectTrace($topic, $command, $message) + { + $trace = [ + 'topic' => $topic, + 'command' => $command, + 'body' => $message, + 'headers' => [], + 'properties' => [], + 'priority' => null, + 'expire' => null, + 'delay' => null, + 'timestamp' => null, + 'contentType' => null, + 'messageId' => null, + ]; + if ($message instanceof Message) { + $trace['body'] = $message->getBody(); + $trace['headers'] = $message->getHeaders(); + $trace['properties'] = $message->getProperties(); + $trace['priority'] = $message->getPriority(); + $trace['expire'] = $message->getExpire(); + $trace['delay'] = $message->getDelay(); + $trace['timestamp'] = $message->getTimestamp(); + $trace['contentType'] = $message->getContentType(); + $trace['messageId'] = $message->getMessageId(); + } + + $this->traces[] = $trace; + } } diff --git a/pkg/enqueue/Rpc/RpcClient.php b/pkg/enqueue/Rpc/RpcClient.php index 578ca289c..66d9e5cb2 100644 --- a/pkg/enqueue/Rpc/RpcClient.php +++ b/pkg/enqueue/Rpc/RpcClient.php @@ -14,12 +14,19 @@ class RpcClient */ private $context; + /** + * @var RpcFactory + */ + private $rpcFactory; + /** * @param PsrContext $context + * @param RpcFactory $promiseFactory */ - public function __construct(PsrContext $context) + public function __construct(PsrContext $context, RpcFactory $promiseFactory = null) { $this->context = $context; + $this->rpcFactory = $promiseFactory ?: new RpcFactory($context); } /** @@ -49,12 +56,11 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim throw new \InvalidArgumentException(sprintf('Timeout must be positive not zero integer. Got %s', $timeout)); } - if ($message->getReplyTo()) { - $replyQueue = $this->context->createQueue($message->getReplyTo()); - $deleteReplyQueue = false; - } else { - $replyQueue = $this->context->createTemporaryQueue(); - $message->setReplyTo($replyQueue->getQueueName()); + $deleteReplyQueue = false; + $replyTo = $message->getReplyTo(); + + if (false == $replyTo) { + $message->setReplyTo($replyTo = $this->rpcFactory->createReplyTo()); $deleteReplyQueue = true; } @@ -64,53 +70,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim $this->context->createProducer()->send($destination, $message); - $correlationId = $message->getCorrelationId(); - - $receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) { - $runTimeout = $promiseTimeout ?: $timeout; - $endTime = time() + ((int) ($runTimeout / 1000)); - $consumer = $this->context->createConsumer($replyQueue); - - do { - if ($message = $consumer->receive($runTimeout)) { - if ($message->getCorrelationId() === $correlationId) { - $consumer->acknowledge($message); - - return $message; - } - - $consumer->reject($message, true); - } - } while (time() < $endTime); - - throw TimeoutException::create($runTimeout, $correlationId); - }; - - $receiveNoWait = function () use ($replyQueue, $correlationId) { - static $consumer; - - if (null === $consumer) { - $consumer = $this->context->createConsumer($replyQueue); - } - - if ($message = $consumer->receiveNoWait()) { - if ($message->getCorrelationId() === $correlationId) { - $consumer->acknowledge($message); - - return $message; - } - - $consumer->reject($message, true); - } - }; - - $finally = function (Promise $promise) use ($replyQueue) { - if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { - $this->context->deleteQueue($replyQueue); - } - }; - - $promise = new Promise($receive, $receiveNoWait, $finally); + $promise = $this->rpcFactory->createPromise($replyTo, $message->getCorrelationId(), $timeout); $promise->setDeleteReplyQueue($deleteReplyQueue); return $promise; diff --git a/pkg/enqueue/Rpc/RpcFactory.php b/pkg/enqueue/Rpc/RpcFactory.php new file mode 100644 index 000000000..9c1e0efb5 --- /dev/null +++ b/pkg/enqueue/Rpc/RpcFactory.php @@ -0,0 +1,87 @@ +context = $context; + } + + /** + * @param string $replyTo + * @param string $correlationId + * @param int $timeout + * + * @return Promise + */ + public function createPromise($replyTo, $correlationId, $timeout) + { + $replyQueue = $this->context->createQueue($replyTo); + + $receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) { + $runTimeout = $promiseTimeout ?: $timeout; + $endTime = time() + ((int) ($runTimeout / 1000)); + $consumer = $this->context->createConsumer($replyQueue); + + do { + if ($message = $consumer->receive($runTimeout)) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + } while (time() < $endTime); + + throw TimeoutException::create($runTimeout, $correlationId); + }; + + $receiveNoWait = function () use ($replyQueue, $correlationId) { + static $consumer; + + if (null === $consumer) { + $consumer = $this->context->createConsumer($replyQueue); + } + + if ($message = $consumer->receiveNoWait()) { + if ($message->getCorrelationId() === $correlationId) { + $consumer->acknowledge($message); + + return $message; + } + + $consumer->reject($message, true); + } + }; + + $finally = function (Promise $promise) use ($replyQueue) { + if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) { + $this->context->deleteQueue($replyQueue); + } + }; + + return new Promise($receive, $receiveNoWait, $finally); + } + + /** + * @return string + */ + public function createReplyTo() + { + return $this->context->createTemporaryQueue()->getQueueName(); + } +} diff --git a/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php b/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php index 1d84a9cb1..a9f9ef90a 100644 --- a/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php +++ b/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php @@ -44,7 +44,7 @@ protected function configure() */ protected function execute(InputInterface $input, OutputInterface $output) { - $this->producer->send( + $this->producer->sendEvent( $input->getArgument('topic'), $input->getArgument('message') ); diff --git a/pkg/enqueue/Tests/Client/ProducerTest.php b/pkg/enqueue/Tests/Client/ProducerTest.php index f0edb98bf..c9a30fcde 100644 --- a/pkg/enqueue/Tests/Client/ProducerTest.php +++ b/pkg/enqueue/Tests/Client/ProducerTest.php @@ -10,6 +10,7 @@ use Enqueue\Client\Producer; use Enqueue\Client\ProducerInterface; use Enqueue\Null\NullQueue; +use Enqueue\Rpc\RpcFactory; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; @@ -24,7 +25,7 @@ public function testShouldImplementProducerInterface() public function testCouldBeConstructedWithDriverAsFirstArgument() { - new Producer($this->createDriverStub()); + new Producer($this->createDriverStub(), $this->createRpcFactory()); } public function testShouldSendMessageToRouter() @@ -38,8 +39,8 @@ public function testShouldSendMessageToRouter() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); $expectedProperties = [ 'enqueue.topic_name' => 'topic', @@ -59,8 +60,8 @@ public function testShouldSendMessageWithNormalPriorityByDefault() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertSame(MessagePriority::NORMAL, $message->getPriority()); } @@ -77,8 +78,8 @@ public function testShouldSendMessageWithCustomPriority() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertSame(MessagePriority::HIGH, $message->getPriority()); } @@ -94,8 +95,8 @@ public function testShouldSendMessageWithGeneratedMessageId() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertNotEmpty($message->getMessageId()); } @@ -112,8 +113,8 @@ public function testShouldSendMessageWithCustomMessageId() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertSame('theCustomMessageId', $message->getMessageId()); } @@ -129,8 +130,8 @@ public function testShouldSendMessageWithGeneratedTimestamp() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertNotEmpty($message->getTimestamp()); } @@ -147,8 +148,8 @@ public function testShouldSendMessageWithCustomTimestamp() ->with(self::identicalTo($message)) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); self::assertSame('theCustomTimestamp', $message->getTimestamp()); } @@ -165,8 +166,8 @@ public function testShouldSendStringAsPlainText() }) ; - $producer = new Producer($driver); - $producer->send('topic', 'theStringMessage'); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', 'theStringMessage'); } public function testShouldSendArrayAsJsonString() @@ -181,8 +182,8 @@ public function testShouldSendArrayAsJsonString() }) ; - $producer = new Producer($driver); - $producer->send('topic', ['foo' => 'fooVal']); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', ['foo' => 'fooVal']); } public function testShouldConvertMessageArrayBodyJsonString() @@ -200,8 +201,8 @@ public function testShouldConvertMessageArrayBodyJsonString() }) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); } public function testSendShouldForceScalarsToStringAndSetTextContentType() @@ -220,8 +221,8 @@ public function testSendShouldForceScalarsToStringAndSetTextContentType() }) ; - $producer = new Producer($driver); - $producer->send($queue, 12345); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent($queue, 12345); } public function testSendShouldForceMessageScalarsBodyToStringAndSetTextContentType() @@ -243,8 +244,8 @@ public function testSendShouldForceMessageScalarsBodyToStringAndSetTextContentTy }) ; - $producer = new Producer($driver); - $producer->send($queue, $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent($queue, $message); } public function testSendShouldForceNullToEmptyStringAndSetTextContentType() @@ -263,8 +264,8 @@ public function testSendShouldForceNullToEmptyStringAndSetTextContentType() }) ; - $producer = new Producer($driver); - $producer->send($queue, null); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent($queue, null); } public function testSendShouldForceNullBodyToEmptyStringAndSetTextContentType() @@ -286,8 +287,8 @@ public function testSendShouldForceNullBodyToEmptyStringAndSetTextContentType() }) ; - $producer = new Producer($driver); - $producer->send($queue, $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent($queue, $message); } public function testShouldThrowExceptionIfBodyIsObjectOnSend() @@ -302,12 +303,12 @@ public function testShouldThrowExceptionIfBodyIsObjectOnSend() ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\InvalidArgumentException::class); $this->expectExceptionMessage('The message\'s body must be either null, scalar, array or object (implements \JsonSerializable). Got: stdClass'); - $producer->send('topic', new \stdClass()); + $producer->sendEvent('topic', new \stdClass()); } public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInsideOnSend() @@ -324,12 +325,12 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInsideOnSend() ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The message\'s body must be an array of scalars. Found not scalar in the array: stdClass'); - $producer->send($queue, ['foo' => new \stdClass()]); + $producer->sendEvent($queue, ['foo' => new \stdClass()]); } public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsideOnSend() @@ -346,12 +347,12 @@ public function testShouldThrowExceptionIfBodyIsArrayWithObjectsInSubArraysInsid ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The message\'s body must be an array of scalars. Found not scalar in the array: stdClass'); - $producer->send($queue, ['foo' => ['bar' => new \stdClass()]]); + $producer->sendEvent($queue, ['foo' => ['bar' => new \stdClass()]]); } public function testShouldSendJsonSerializableObjectAsJsonStringToMessageBus() @@ -368,8 +369,8 @@ public function testShouldSendJsonSerializableObjectAsJsonStringToMessageBus() }) ; - $producer = new Producer($driver); - $producer->send('topic', $object); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $object); } public function testShouldSendMessageJsonSerializableBodyAsJsonStringToMessageBus() @@ -389,8 +390,8 @@ public function testShouldSendMessageJsonSerializableBodyAsJsonStringToMessageBu }) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); } public function testThrowIfTryToSendMessageToMessageBusWithProcessorNamePropertySet() @@ -411,11 +412,11 @@ public function testThrowIfTryToSendMessageToMessageBusWithProcessorNameProperty ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The enqueue.processor_name property must not be set for messages that are sent to message bus.'); - $producer->send('topic', $message); + $producer->sendEvent('topic', $message); } public function testThrowIfTryToSendMessageToMessageBusWithProcessorQueueNamePropertySet() @@ -436,11 +437,11 @@ public function testThrowIfTryToSendMessageToMessageBusWithProcessorQueueNamePro ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The enqueue.processor_queue_name property must not be set for messages that are sent to message bus.'); - $producer->send('topic', $message); + $producer->sendEvent('topic', $message); } public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableBody() @@ -464,8 +465,8 @@ public function testThrowIfNotApplicationJsonContentTypeSetWithJsonSerializableB $this->expectException(\LogicException::class); $this->expectExceptionMessage('Content type "application/json" only allowed when body is array'); - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); } public function testShouldSendMessageToApplicationRouter() @@ -489,8 +490,8 @@ public function testShouldSendMessageToApplicationRouter() }) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); } public function testShouldSendToCustomMessageToApplicationRouter() @@ -516,8 +517,8 @@ public function testShouldSendToCustomMessageToApplicationRouter() }) ; - $producer = new Producer($driver); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory()); + $producer->sendEvent('topic', $message); } public function testThrowIfUnSupportedScopeGivenOnSend() @@ -535,11 +536,11 @@ public function testThrowIfUnSupportedScopeGivenOnSend() ->method('sendToProcessor') ; - $producer = new Producer($driver); + $producer = new Producer($driver, $this->createRpcFactory()); $this->expectException(\LogicException::class); $this->expectExceptionMessage('The message scope "iDontKnowScope" is not supported.'); - $producer->send('topic', $message); + $producer->sendEvent('topic', $message); } public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() @@ -566,8 +567,8 @@ public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToRouter() ->method('sendToRouter') ; - $producer = new Producer($driver, $extension); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory(), $extension); + $producer->sendEvent('topic', $message); } public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor() @@ -594,14 +595,22 @@ public function testShouldCallPreSendPostSendExtensionMethodsWhenSendToProcessor ->method('sendToProcessor') ; - $producer = new Producer($driver, $extension); - $producer->send('topic', $message); + $producer = new Producer($driver, $this->createRpcFactory(), $extension); + $producer->sendEvent('topic', $message); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RpcFactory + */ + private function createRpcFactory() + { + return $this->createMock(RpcFactory::class); } /** * @return \PHPUnit_Framework_MockObject_MockObject|DriverInterface */ - protected function createDriverStub() + private function createDriverStub() { $config = new Config( 'a_prefix', diff --git a/pkg/enqueue/Tests/Client/RpcClientTest.php b/pkg/enqueue/Tests/Client/RpcClientTest.php deleted file mode 100644 index 3b75bc56f..000000000 --- a/pkg/enqueue/Tests/Client/RpcClientTest.php +++ /dev/null @@ -1,363 +0,0 @@ -createProducerMock(), - $this->createPsrContextMock() - ); - } - - public function testShouldSetReplyToIfNotSet() - { - $context = new NullContext(); - - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('send') - ->willReturnCallback(function ($topic, Message $message) { - $this->assertNotEmpty($message->getReplyTo()); - }) - ; - - $rpc = new RpcClient( - $producerMock, - $context - ); - - $rpc->callAsync('aTopic', new Message(), 2); - } - - public function testShouldNotSetReplyToIfSet() - { - $context = new NullContext(); - - $message = new Message(); - $message->setReplyTo('theReplyTo'); - - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('send') - ->willReturnCallback(function ($topic, Message $message) { - $this->assertEquals('theReplyTo', $message->getReplyTo()); - }) - ; - - $rpc = new RpcClient( - $producerMock, - $context - ); - - $rpc->callAsync('aTopic', $message, 2); - } - - public function testShouldUseSameTopicOnProducerSendCall() - { - $context = new NullContext(); - - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('send') - ->willReturnCallback(function ($topic) { - $this->assertEquals('theTopic', $topic); - }) - ; - - $rpc = new RpcClient( - $producerMock, - $context - ); - - $rpc->callAsync('theTopic', new Message(), 2); - } - - public function testShouldSetCorrelationIdIfNotSet() - { - $context = new NullContext(); - - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('send') - ->willReturnCallback(function ($topic, Message $message) { - $this->assertNotEmpty($message->getCorrelationId()); - }) - ; - - $rpc = new RpcClient( - $producerMock, - $context - ); - - $rpc->callAsync('aTopic', new Message(), 2); - } - - public function testShouldNotSetCorrelationIdIfSet() - { - $context = new NullContext(); - - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('send') - ->willReturnCallback(function ($topic, Message $message) { - $this->assertEquals('theCorrelationId', $message->getCorrelationId()); - }) - ; - - $rpc = new RpcClient( - $producerMock, - $context - ); - - $rpc->callAsync('aTopic', $message, 2); - } - - public function testShouldDoSyncCall() - { - $timeout = 123; - $replyMessage = new NullMessage(); - - $promiseMock = $this->createMock(Promise::class); - $promiseMock - ->expects($this->once()) - ->method('receive') - ->willReturn($replyMessage) - ; - - /** @var \PHPUnit_Framework_MockObject_MockObject|RpcClient $rpc */ - $rpc = $this->getMockBuilder(RpcClient::class)->disableOriginalConstructor()->setMethods(['callAsync'])->getMock(); - $rpc - ->expects($this->once()) - ->method('callAsync') - ->with('theTopic', 'theMessage', $timeout) - ->willReturn($promiseMock) - ; - - $actualReplyMessage = $rpc->call('theTopic', 'theMessage', $timeout); - - $this->assertSame($replyMessage, $actualReplyMessage); - } - - public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() - { - $replyQueue = new NullQueue('theReplyTo'); - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - $message->setReplyTo('theReplyTo'); - - $receivedMessage = new NullMessage(); - $receivedMessage->setCorrelationId('theCorrelationId'); - - $consumer = $this->createPsrConsumerMock(); - $consumer - ->expects($this->once()) - ->method('receive') - ->with(12345) - ->willReturn($receivedMessage) - ; - $consumer - ->expects($this->once()) - ->method('acknowledge') - ->with($this->identicalTo($receivedMessage)) - ; - $consumer - ->expects($this->never()) - ->method('reject') - ; - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->with('theReplyTo') - ->willReturn($replyQueue) - ; - $context - ->expects($this->once()) - ->method('createConsumer') - ->with($this->identicalTo($replyQueue)) - ->willReturn($consumer) - ; - - $rpc = new RpcClient($this->createProducerMock(), $context); - - $rpc->callAsync('topic', $message, 2)->receive(12345); - } - - public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() - { - $replyQueue = new NullQueue('theReplyTo'); - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - $message->setReplyTo('theReplyTo'); - - $receivedMessage = new NullMessage(); - $receivedMessage->setCorrelationId('theCorrelationId'); - - $consumer = $this->createPsrConsumerMock(); - $consumer - ->expects($this->once()) - ->method('receiveNoWait') - ->willReturn($receivedMessage) - ; - $consumer - ->expects($this->once()) - ->method('acknowledge') - ->with($this->identicalTo($receivedMessage)) - ; - $consumer - ->expects($this->never()) - ->method('reject') - ; - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->with('theReplyTo') - ->willReturn($replyQueue) - ; - $context - ->expects($this->once()) - ->method('createConsumer') - ->with($this->identicalTo($replyQueue)) - ->willReturn($consumer) - ; - - $rpc = new RpcClient($this->createProducerMock(), $context); - - $rpc->callAsync('topic', $message, 2)->receiveNoWait(); - } - - public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() - { - $replyQueue = new NullQueue('theReplyTo'); - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - $message->setReplyTo('theReplyTo'); - - $receivedMessage = new NullMessage(); - $receivedMessage->setCorrelationId('theCorrelationId'); - - $consumer = $this->createPsrConsumerMock(); - $consumer - ->expects($this->once()) - ->method('receive') - ->willReturn($receivedMessage) - ; - - $context = $this->getMockBuilder(PsrContext::class) - ->disableOriginalConstructor() - ->setMethods(['deleteQueue']) - ->getMockForAbstractClass() - ; - $context - ->expects($this->once()) - ->method('createQueue') - ->with('theReplyTo') - ->willReturn($replyQueue) - ; - $context - ->expects($this->once()) - ->method('createConsumer') - ->with($this->identicalTo($replyQueue)) - ->willReturn($consumer) - ; - $context - ->expects($this->once()) - ->method('deleteQueue') - ->with($this->identicalTo($replyQueue)) - ; - - $rpc = new RpcClient($this->createProducerMock(), $context); - - $promise = $rpc->callAsync('topic', $message, 2); - $promise->setDeleteReplyQueue(true); - $promise->receive(); - } - - public function testShouldNotCallDeleteQueueIfDeleteReplyQueueIsTrueButContextHasNoDeleteQueueMethod() - { - $replyQueue = new NullQueue('theReplyTo'); - $message = new Message(); - $message->setCorrelationId('theCorrelationId'); - $message->setReplyTo('theReplyTo'); - - $receivedMessage = new NullMessage(); - $receivedMessage->setCorrelationId('theCorrelationId'); - - $consumer = $this->createPsrConsumerMock(); - $consumer - ->expects($this->once()) - ->method('receive') - ->willReturn($receivedMessage) - ; - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->with('theReplyTo') - ->willReturn($replyQueue) - ; - $context - ->expects($this->once()) - ->method('createConsumer') - ->with($this->identicalTo($replyQueue)) - ->willReturn($consumer) - ; - - $rpc = new RpcClient($this->createProducerMock(), $context); - - $promise = $rpc->callAsync('topic', $message, 2); - $promise->setDeleteReplyQueue(true); - - $promise->receive(); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrContext - */ - private function createPsrContextMock() - { - return $this->createMock(PsrContext::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface - */ - private function createProducerMock() - { - return $this->createMock(ProducerInterface::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrConsumer - */ - private function createPsrConsumerMock() - { - return $this->createMock(PsrConsumer::class); - } -} diff --git a/pkg/enqueue/Tests/Client/SpoolProducerTest.php b/pkg/enqueue/Tests/Client/SpoolProducerTest.php index c9d76efd6..8c00dedcd 100644 --- a/pkg/enqueue/Tests/Client/SpoolProducerTest.php +++ b/pkg/enqueue/Tests/Client/SpoolProducerTest.php @@ -5,6 +5,7 @@ use Enqueue\Client\Message; use Enqueue\Client\ProducerInterface; use Enqueue\Client\SpoolProducer; +use Enqueue\Rpc\Promise; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; @@ -22,22 +23,45 @@ public function testCouldBeConstructedWithRealProducer() new SpoolProducer($this->createProducerMock()); } - public function testShouldQueueMessageOnSend() + public function testShouldQueueEventMessageOnSend() { $message = new Message(); $realProducer = $this->createProducerMock(); $realProducer ->expects($this->never()) - ->method('send') + ->method('sendEvent') + ; + $realProducer + ->expects($this->never()) + ->method('sendCommand') ; $producer = new SpoolProducer($realProducer); - $producer->send('foo_topic', $message); - $producer->send('bar_topic', $message); + $producer->sendEvent('foo_topic', $message); + $producer->sendEvent('bar_topic', $message); } - public function testShouldSendQueuedMessagesOnFlush() + public function testShouldQueueCommandMessageOnSend() + { + $message = new Message(); + + $realProducer = $this->createProducerMock(); + $realProducer + ->expects($this->never()) + ->method('sendEvent') + ; + $realProducer + ->expects($this->never()) + ->method('sendCommand') + ; + + $producer = new SpoolProducer($realProducer); + $producer->sendCommand('foo_command', $message); + $producer->sendCommand('bar_command', $message); + } + + public function testShouldSendQueuedEventMessagesOnFlush() { $message = new Message(); $message->setScope('third'); @@ -45,29 +69,90 @@ public function testShouldSendQueuedMessagesOnFlush() $realProducer = $this->createProducerMock(); $realProducer ->expects($this->at(0)) - ->method('send') + ->method('sendEvent') ->with('foo_topic', 'first') ; $realProducer ->expects($this->at(1)) - ->method('send') + ->method('sendEvent') ->with('bar_topic', ['second']) ; $realProducer ->expects($this->at(2)) - ->method('send') + ->method('sendEvent') ->with('baz_topic', $this->identicalTo($message)) ; + $realProducer + ->expects($this->never()) + ->method('sendCommand') + ; + + $producer = new SpoolProducer($realProducer); + + $producer->sendEvent('foo_topic', 'first'); + $producer->sendEvent('bar_topic', ['second']); + $producer->sendEvent('baz_topic', $message); + + $producer->flush(); + } + + public function testShouldSendQueuedCommandMessagesOnFlush() + { + $message = new Message(); + $message->setScope('third'); + + $realProducer = $this->createProducerMock(); + $realProducer + ->expects($this->at(0)) + ->method('sendCommand') + ->with('foo_command', 'first') + ; + $realProducer + ->expects($this->at(1)) + ->method('sendCommand') + ->with('bar_command', ['second']) + ; + $realProducer + ->expects($this->at(2)) + ->method('sendCommand') + ->with('baz_command', $this->identicalTo($message)) + ; $producer = new SpoolProducer($realProducer); - $producer->send('foo_topic', 'first'); - $producer->send('bar_topic', ['second']); - $producer->send('baz_topic', $message); + $producer->sendCommand('foo_command', 'first'); + $producer->sendCommand('bar_command', ['second']); + $producer->sendCommand('baz_command', $message); $producer->flush(); } + public function testShouldSendImmediatelyCommandMessageWithNeedReplyTrue() + { + $message = new Message(); + $message->setScope('third'); + + $promise = $this->createMock(Promise::class); + + $realProducer = $this->createProducerMock(); + $realProducer + ->expects($this->never()) + ->method('sendEvent') + ; + $realProducer + ->expects($this->once()) + ->method('sendCommand') + ->with('foo_command', 'first') + ->willReturn($promise) + ; + + $producer = new SpoolProducer($realProducer); + + $actualPromise = $producer->sendCommand('foo_command', 'first', true); + + $this->assertSame($promise, $actualPromise); + } + /** * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface */ diff --git a/pkg/enqueue/Tests/Client/TraceableProducerTest.php b/pkg/enqueue/Tests/Client/TraceableProducerTest.php index 0912a549b..42c9c8b8f 100644 --- a/pkg/enqueue/Tests/Client/TraceableProducerTest.php +++ b/pkg/enqueue/Tests/Client/TraceableProducerTest.php @@ -2,6 +2,7 @@ namespace Enqueue\Tests\Client; +use Enqueue\Client\Config; use Enqueue\Client\Message; use Enqueue\Client\ProducerInterface; use Enqueue\Client\TraceableProducer; @@ -22,7 +23,7 @@ public function testCouldBeConstructedWithInternalMessageProducer() new TraceableProducer($this->createProducerMock()); } - public function testShouldPassAllArgumentsToInternalMessageProducerSendMethod() + public function testShouldPassAllArgumentsToInternalEventMessageProducerSendMethod() { $topic = 'theTopic'; $body = 'theBody'; @@ -30,24 +31,25 @@ public function testShouldPassAllArgumentsToInternalMessageProducerSendMethod() $internalMessageProducer = $this->createProducerMock(); $internalMessageProducer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with($topic, $body) ; - $messageProducer = new TraceableProducer($internalMessageProducer); + $producer = new TraceableProducer($internalMessageProducer); - $messageProducer->send($topic, $body); + $producer->sendEvent($topic, $body); } - public function testShouldCollectInfoIfStringGivenAsMessage() + public function testShouldCollectInfoIfStringGivenAsEventMessage() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', 'aFooBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); $this->assertSame([ [ 'topic' => 'aFooTopic', + 'command' => null, 'body' => 'aFooBody', 'headers' => [], 'properties' => [], @@ -58,18 +60,19 @@ public function testShouldCollectInfoIfStringGivenAsMessage() 'contentType' => null, 'messageId' => null, ], - ], $messageProducer->getTraces()); + ], $producer->getTraces()); } - public function testShouldCollectInfoIfArrayGivenAsMessage() + public function testShouldCollectInfoIfArrayGivenAsEventMessage() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', ['foo' => 'fooVal', 'bar' => 'barVal']); + $producer->sendEvent('aFooTopic', ['foo' => 'fooVal', 'bar' => 'barVal']); $this->assertSame([ [ 'topic' => 'aFooTopic', + 'command' => null, 'body' => ['foo' => 'fooVal', 'bar' => 'barVal'], 'headers' => [], 'properties' => [], @@ -80,12 +83,12 @@ public function testShouldCollectInfoIfArrayGivenAsMessage() 'contentType' => null, 'messageId' => null, ], - ], $messageProducer->getTraces()); + ], $producer->getTraces()); } - public function testShouldCollectInfoIfMessageObjectGivenAsMessage() + public function testShouldCollectInfoIfEventMessageObjectGivenAsMessage() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); $message = new Message(); $message->setBody(['foo' => 'fooVal', 'bar' => 'barVal']); @@ -98,11 +101,12 @@ public function testShouldCollectInfoIfMessageObjectGivenAsMessage() $message->setPriority('theMessagePriority'); $message->setTimestamp('theTimestamp'); - $messageProducer->send('aFooTopic', $message); + $producer->sendEvent('aFooTopic', $message); $this->assertSame([ [ 'topic' => 'aFooTopic', + 'command' => null, 'body' => ['foo' => 'fooVal', 'bar' => 'barVal'], 'headers' => ['fooHeader' => 'fooVal'], 'properties' => ['fooProp' => 'fooVal'], @@ -113,82 +117,241 @@ public function testShouldCollectInfoIfMessageObjectGivenAsMessage() 'contentType' => 'theContentType', 'messageId' => 'theMessageId', ], - ], $messageProducer->getTraces()); + ], $producer->getTraces()); + } + + public function testShouldNotStoreAnythingIfInternalEventMessageProducerThrowsException() + { + $internalMessageProducer = $this->createProducerMock(); + $internalMessageProducer + ->expects($this->once()) + ->method('sendEvent') + ->willThrowException(new \Exception()) + ; + + $producer = new TraceableProducer($internalMessageProducer); + + $this->expectException(\Exception::class); + + try { + $producer->sendEvent('aFooTopic', 'aFooBody'); + } finally { + $this->assertEmpty($producer->getTraces()); + } + } + + public function testShouldPassAllArgumentsToInternalCommandMessageProducerSendMethod() + { + $command = 'theCommand'; + $body = 'theBody'; + + $internalMessageProducer = $this->createProducerMock(); + $internalMessageProducer + ->expects($this->once()) + ->method('sendCommand') + ->with($command, $body) + ; + + $producer = new TraceableProducer($internalMessageProducer); + + $producer->sendCommand($command, $body); + } + + public function testShouldCollectInfoIfStringGivenAsCommandMessage() + { + $producer = new TraceableProducer($this->createProducerMock()); + + $producer->sendCommand('aFooCommand', 'aFooBody'); + + $this->assertSame([ + [ + 'topic' => Config::COMMAND_TOPIC, + 'command' => 'aFooCommand', + 'body' => 'aFooBody', + 'headers' => [], + 'properties' => [], + 'priority' => null, + 'expire' => null, + 'delay' => null, + 'timestamp' => null, + 'contentType' => null, + 'messageId' => null, + ], + ], $producer->getTraces()); + } + + public function testShouldCollectInfoIfArrayGivenAsCommandMessage() + { + $producer = new TraceableProducer($this->createProducerMock()); + + $producer->sendCommand('aFooCommand', ['foo' => 'fooVal', 'bar' => 'barVal']); + + $this->assertSame([ + [ + 'topic' => Config::COMMAND_TOPIC, + 'command' => 'aFooCommand', + 'body' => ['foo' => 'fooVal', 'bar' => 'barVal'], + 'headers' => [], + 'properties' => [], + 'priority' => null, + 'expire' => null, + 'delay' => null, + 'timestamp' => null, + 'contentType' => null, + 'messageId' => null, + ], + ], $producer->getTraces()); + } + + public function testShouldCollectInfoIfCommandMessageObjectGivenAsMessage() + { + $producer = new TraceableProducer($this->createProducerMock()); + + $message = new Message(); + $message->setBody(['foo' => 'fooVal', 'bar' => 'barVal']); + $message->setProperty('fooProp', 'fooVal'); + $message->setHeader('fooHeader', 'fooVal'); + $message->setContentType('theContentType'); + $message->setDelay('theDelay'); + $message->setExpire('theExpire'); + $message->setMessageId('theMessageId'); + $message->setPriority('theMessagePriority'); + $message->setTimestamp('theTimestamp'); + + $producer->sendCommand('aFooCommand', $message); + + $this->assertSame([ + [ + 'topic' => Config::COMMAND_TOPIC, + 'command' => 'aFooCommand', + 'body' => ['foo' => 'fooVal', 'bar' => 'barVal'], + 'headers' => ['fooHeader' => 'fooVal'], + 'properties' => ['fooProp' => 'fooVal'], + 'priority' => 'theMessagePriority', + 'expire' => 'theExpire', + 'delay' => 'theDelay', + 'timestamp' => 'theTimestamp', + 'contentType' => 'theContentType', + 'messageId' => 'theMessageId', + ], + ], $producer->getTraces()); + } + + public function testShouldNotStoreAnythingIfInternalCommandMessageProducerThrowsException() + { + $internalMessageProducer = $this->createProducerMock(); + $internalMessageProducer + ->expects($this->once()) + ->method('sendCommand') + ->willThrowException(new \Exception()) + ; + + $producer = new TraceableProducer($internalMessageProducer); + + $this->expectException(\Exception::class); + + try { + $producer->sendCommand('aFooCommand', 'aFooBody'); + } finally { + $this->assertEmpty($producer->getTraces()); + } } public function testShouldAllowGetInfoSentToSameTopic() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', 'aFooBody'); - $messageProducer->send('aFooTopic', 'aFooBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); $this->assertArraySubset([ ['topic' => 'aFooTopic', 'body' => 'aFooBody'], ['topic' => 'aFooTopic', 'body' => 'aFooBody'], - ], $messageProducer->getTraces()); + ], $producer->getTraces()); } public function testShouldAllowGetInfoSentToDifferentTopics() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', 'aFooBody'); - $messageProducer->send('aBarTopic', 'aBarBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); + $producer->sendEvent('aBarTopic', 'aBarBody'); $this->assertArraySubset([ ['topic' => 'aFooTopic', 'body' => 'aFooBody'], ['topic' => 'aBarTopic', 'body' => 'aBarBody'], - ], $messageProducer->getTraces()); + ], $producer->getTraces()); } - public function testShouldAllowGetInfoSentToSpecialTopicTopics() + public function testShouldAllowGetInfoSentToSpecialTopic() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', 'aFooBody'); - $messageProducer->send('aBarTopic', 'aBarBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); + $producer->sendEvent('aBarTopic', 'aBarBody'); $this->assertArraySubset([ ['topic' => 'aFooTopic', 'body' => 'aFooBody'], - ], $messageProducer->getTopicTraces('aFooTopic')); + ], $producer->getTopicTraces('aFooTopic')); $this->assertArraySubset([ ['topic' => 'aBarTopic', 'body' => 'aBarBody'], - ], $messageProducer->getTopicTraces('aBarTopic')); + ], $producer->getTopicTraces('aBarTopic')); } - public function testShouldNotStoreAnythingIfInternalMessageProducerThrowsException() + public function testShouldAllowGetInfoSentToSameCommand() { - $internalMessageProducer = $this->createProducerMock(); - $internalMessageProducer - ->expects($this->once()) - ->method('send') - ->willThrowException(new \Exception()) - ; + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer = new TraceableProducer($internalMessageProducer); + $producer->sendCommand('aFooCommand', 'aFooBody'); + $producer->sendCommand('aFooCommand', 'aFooBody'); - $this->expectException(\Exception::class); + $this->assertArraySubset([ + ['command' => 'aFooCommand', 'body' => 'aFooBody'], + ['command' => 'aFooCommand', 'body' => 'aFooBody'], + ], $producer->getTraces()); + } - try { - $messageProducer->send('aFooTopic', 'aFooBody'); - } finally { - $this->assertEmpty($messageProducer->getTraces()); - } + public function testShouldAllowGetInfoSentToDifferentCommands() + { + $producer = new TraceableProducer($this->createProducerMock()); + + $producer->sendCommand('aFooCommand', 'aFooBody'); + $producer->sendCommand('aBarCommand', 'aBarBody'); + + $this->assertArraySubset([ + ['command' => 'aFooCommand', 'body' => 'aFooBody'], + ['command' => 'aBarCommand', 'body' => 'aBarBody'], + ], $producer->getTraces()); + } + + public function testShouldAllowGetInfoSentToSpecialCommand() + { + $producer = new TraceableProducer($this->createProducerMock()); + + $producer->sendCommand('aFooCommand', 'aFooBody'); + $producer->sendCommand('aBarCommand', 'aBarBody'); + + $this->assertArraySubset([ + ['command' => 'aFooCommand', 'body' => 'aFooBody'], + ], $producer->getCommandTraces('aFooCommand')); + + $this->assertArraySubset([ + ['command' => 'aBarCommand', 'body' => 'aBarBody'], + ], $producer->getCommandTraces('aBarCommand')); } public function testShouldAllowClearStoredTraces() { - $messageProducer = new TraceableProducer($this->createProducerMock()); + $producer = new TraceableProducer($this->createProducerMock()); - $messageProducer->send('aFooTopic', 'aFooBody'); + $producer->sendEvent('aFooTopic', 'aFooBody'); //guard - $this->assertNotEmpty($messageProducer->getTraces()); + $this->assertNotEmpty($producer->getTraces()); - $messageProducer->clearTraces(); - $this->assertSame([], $messageProducer->getTraces()); + $producer->clearTraces(); + $this->assertSame([], $producer->getTraces()); } /** diff --git a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php b/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php deleted file mode 100644 index 9f18af46a..000000000 --- a/pkg/enqueue/Tests/Functional/Client/RpcClientTest.php +++ /dev/null @@ -1,85 +0,0 @@ -context = $this->buildAmqpContext(); - $this->replyContext = $this->buildAmqpContext(); - - $this->removeQueue('enqueue.app.default'); - } - - public function testProduceAndConsumeOneMessage() - { - $config = [ - 'transport' => [ - 'rabbitmq_amqp' => [ - 'host' => getenv('SYMFONY__RABBITMQ__HOST'), - 'port' => getenv('SYMFONY__RABBITMQ__AMQP__PORT'), - 'user' => getenv('SYMFONY__RABBITMQ__USER'), - 'pass' => getenv('SYMFONY__RABBITMQ__PASSWORD'), - 'vhost' => getenv('SYMFONY__RABBITMQ__VHOST'), - ], - ], - ]; - - $requestMessage = null; - - $client = new SimpleClient($config); - $client->setupBroker(); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) { - $requestMessage = $message; - - return Result::reply($context->createMessage('Hi John!')); - }); - - $rpcClient = new RpcClient($client->getProducer(), $this->replyContext); - $promise = $rpcClient->callAsync('foo_topic', 'Hi Thomas!', 5); - - $client->consume(new ChainExtension([ - new ReplyExtension(), - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), - new LimitConsumedMessagesExtension(2), - ])); - - //guard - $this->assertInstanceOf(PsrMessage::class, $requestMessage); - $this->assertEquals('Hi Thomas!', $requestMessage->getBody()); - - $replyMessage = $promise->receive(); - $this->assertEquals('Hi John!', $replyMessage->getBody()); - } -} diff --git a/pkg/enqueue/Tests/Rpc/RpcClientTest.php b/pkg/enqueue/Tests/Rpc/RpcClientTest.php index 13f6779ae..846e62569 100644 --- a/pkg/enqueue/Tests/Rpc/RpcClientTest.php +++ b/pkg/enqueue/Tests/Rpc/RpcClientTest.php @@ -134,7 +134,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals() ->willReturn($this->createPsrProducerMock()) ; $context - ->expects($this->once()) + ->expects($this->atLeastOnce()) ->method('createQueue') ->with('theReplyTo') ->willReturn($replyQueue) @@ -185,7 +185,7 @@ public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals() ->willReturn($this->createPsrProducerMock()) ; $context - ->expects($this->once()) + ->expects($this->atLeastOnce()) ->method('createQueue') ->with('theReplyTo') ->willReturn($replyQueue) @@ -232,7 +232,7 @@ public function testShouldDeleteQueueAfterReceiveIfDeleteReplyQueueIsTrue() ->willReturn($this->createPsrProducerMock()) ; $context - ->expects($this->once()) + ->expects($this->atLeastOnce()) ->method('createQueue') ->with('theReplyTo') ->willReturn($replyQueue) diff --git a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php index 35211389c..2cd80952e 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php @@ -52,7 +52,7 @@ public function testShouldExecuteConsumptionAndUseDefaultQueueName() $producerMock = $this->createProducerMock(); $producerMock ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with('theTopic', 'theMessage') ; diff --git a/pkg/job-queue/CalculateRootJobStatusProcessor.php b/pkg/job-queue/CalculateRootJobStatusProcessor.php index 593b97e7d..dbf0a1c26 100644 --- a/pkg/job-queue/CalculateRootJobStatusProcessor.php +++ b/pkg/job-queue/CalculateRootJobStatusProcessor.php @@ -75,7 +75,7 @@ public function process(PsrMessage $message, PsrContext $context) $isRootJobStopped = $this->calculateRootJobStatusService->calculate($job); if ($isRootJobStopped) { - $this->producer->send(Topics::ROOT_JOB_STOPPED, [ + $this->producer->sendEvent(Topics::ROOT_JOB_STOPPED, [ 'jobId' => $job->getRootJob()->getId(), ]); } diff --git a/pkg/job-queue/DependentJobProcessor.php b/pkg/job-queue/DependentJobProcessor.php index e8a69be33..595aa388b 100644 --- a/pkg/job-queue/DependentJobProcessor.php +++ b/pkg/job-queue/DependentJobProcessor.php @@ -105,7 +105,7 @@ public function process(PsrMessage $message, PsrContext $context) $message->setPriority($dependentJob['priority']); } - $this->producer->send($dependentJob['topic'], $message); + $this->producer->sendEvent($dependentJob['topic'], $message); } return Result::ACK; diff --git a/pkg/job-queue/JobProcessor.php b/pkg/job-queue/JobProcessor.php index 147105369..2ab77a72d 100644 --- a/pkg/job-queue/JobProcessor.php +++ b/pkg/job-queue/JobProcessor.php @@ -99,7 +99,7 @@ public function findOrCreateChildJob($jobName, Job $rootJob) $this->jobStorage->saveJob($job); - $this->producer->send(Topics::CALCULATE_ROOT_JOB_STATUS, [ + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ 'jobId' => $job->getId(), ]); @@ -130,7 +130,7 @@ public function startChildJob(Job $job) $this->jobStorage->saveJob($job); - $this->producer->send(Topics::CALCULATE_ROOT_JOB_STATUS, [ + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ 'jobId' => $job->getId(), ]); } @@ -159,7 +159,7 @@ public function successChildJob(Job $job) $this->jobStorage->saveJob($job); - $this->producer->send(Topics::CALCULATE_ROOT_JOB_STATUS, [ + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ 'jobId' => $job->getId(), ]); } @@ -188,7 +188,7 @@ public function failChildJob(Job $job) $this->jobStorage->saveJob($job); - $this->producer->send(Topics::CALCULATE_ROOT_JOB_STATUS, [ + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ 'jobId' => $job->getId(), ]); } @@ -221,7 +221,7 @@ public function cancelChildJob(Job $job) $this->jobStorage->saveJob($job); - $this->producer->send(Topics::CALCULATE_ROOT_JOB_STATUS, [ + $this->producer->sendEvent(Topics::CALCULATE_ROOT_JOB_STATUS, [ 'jobId' => $job->getId(), ]); } diff --git a/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php b/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php index 3959c1939..a2c64741e 100644 --- a/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php +++ b/pkg/job-queue/Tests/CalculateRootJobStatusProcessorTest.php @@ -81,7 +81,7 @@ public function testShouldRejectMessageAndLogErrorIfJobWasNotFound() $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $message = new NullMessage(); @@ -121,7 +121,7 @@ public function testShouldCallCalculateJobRootStatusAndACKMessage() $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $message = new NullMessage(); @@ -161,7 +161,7 @@ public function testShouldSendRootJobStoppedMessageIfJobHasStopped() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with(Topics::ROOT_JOB_STOPPED, ['jobId' => 12345]) ; diff --git a/pkg/job-queue/Tests/DependentJobProcessorTest.php b/pkg/job-queue/Tests/DependentJobProcessorTest.php index 43818a6bd..bf2140f8a 100644 --- a/pkg/job-queue/Tests/DependentJobProcessorTest.php +++ b/pkg/job-queue/Tests/DependentJobProcessorTest.php @@ -121,7 +121,7 @@ public function testShouldDoNothingIfDependentJobsAreMissing() $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $logger = $this->createLoggerMock(); @@ -157,7 +157,7 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobTopicIsMissin $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $logger = $this->createLoggerMock(); @@ -200,7 +200,7 @@ public function testShouldLogCriticalAndRejectMessageIfDependentJobMessageIsMiss $producer = $this->createProducerMock(); $producer ->expects($this->never()) - ->method('send') + ->method('sendEvent') ; $logger = $this->createLoggerMock(); @@ -246,7 +246,7 @@ public function testShouldPublishDependentMessage() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with('topic-name', $this->isInstanceOf(Message::class)) ->will($this->returnCallback(function ($topic, Message $message) use (&$expectedMessage) { $expectedMessage = $message; @@ -294,7 +294,7 @@ public function testShouldPublishDependentMessageWithPriority() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with('topic-name', $this->isInstanceOf(Message::class)) ->will($this->returnCallback(function ($topic, Message $message) use (&$expectedMessage) { $expectedMessage = $message; diff --git a/pkg/job-queue/Tests/JobProcessorTest.php b/pkg/job-queue/Tests/JobProcessorTest.php index 3ba7ea589..bb2386895 100644 --- a/pkg/job-queue/Tests/JobProcessorTest.php +++ b/pkg/job-queue/Tests/JobProcessorTest.php @@ -8,8 +8,9 @@ use Enqueue\JobQueue\Job; use Enqueue\JobQueue\JobProcessor; use Enqueue\JobQueue\Topics; +use PHPUnit\Framework\TestCase; -class JobProcessorTest extends \PHPUnit\Framework\TestCase +class JobProcessorTest extends TestCase { public function testCouldBeCreatedWithRequiredArguments() { @@ -168,7 +169,7 @@ public function testCreateChildJobShouldCreateAndSaveJobAndPublishRecalculateRoo $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ->with(Topics::CALCULATE_ROOT_JOB_STATUS, ['jobId' => 12345]) ; @@ -245,7 +246,7 @@ public function testStartJobShouldUpdateJobWithRunningStatusAndStartAtTime() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ; $processor = new JobProcessor($storage, $producer); @@ -315,7 +316,7 @@ public function testSuccessJobShouldUpdateJobWithSuccessStatusAndStopAtTime() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ; $processor = new JobProcessor($storage, $producer); @@ -385,7 +386,7 @@ public function testFailJobShouldUpdateJobWithFailStatusAndStopAtTime() $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ; $processor = new JobProcessor($storage, $producer); @@ -455,7 +456,7 @@ public function testCancelJobShouldUpdateJobWithCancelStatusAndStoppedAtTimeAndS $producer = $this->createProducerMock(); $producer ->expects($this->once()) - ->method('send') + ->method('sendEvent') ; $processor = new JobProcessor($storage, $producer); diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 82f093c53..d3cc23047 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -11,7 +11,6 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\ProducerInterface; -use Enqueue\Client\ProducerV2Interface; use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; @@ -21,6 +20,7 @@ use Enqueue\Psr\PsrContext; use Enqueue\Psr\PsrProcessor; use Enqueue\Redis\Symfony\RedisTransportFactory; +use Enqueue\Rpc\Promise; use Enqueue\Sqs\Symfony\SqsTransportFactory; use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; use Enqueue\Stomp\Symfony\StompTransportFactory; @@ -103,6 +103,27 @@ public function bind($topic, $processorName, $processor) $this->getRouterProcessor()->add($topic, $queueName, $processorName); } + /** + * @param string $command + * @param mixed $message + * @param bool $needReply + * + * @return Promise|null + */ + public function sendCommand($command, $message, $needReply = false) + { + return $this->getProducer()->sendCommand($command, $message, $needReply); + } + + /** + * @param string $topic + * @param string|array $message + */ + public function sendEvent($topic, $message) + { + $this->getProducer()->sendEvent($topic, $message); + } + /** * @param string $topic * @param string|array $message @@ -110,7 +131,11 @@ public function bind($topic, $processorName, $processor) */ public function send($topic, $message, $setupBroker = false) { - $this->getProducer($setupBroker)->send($topic, $message); + if ($setupBroker) { + $this->setupBroker(); + } + + $this->sendEvent($topic, $message); } /** @@ -195,18 +220,6 @@ public function getProducer($setupBroker = false) return $this->container->get('enqueue.client.producer'); } - /** - * @param bool $setupBroker - * - * @return ProducerV2Interface - */ - public function getProducerV2($setupBroker = false) - { - $setupBroker && $this->setupBroker(); - - return $this->container->get('enqueue.client.producer.v2'); - } - public function setupBroker() { $this->getDriver()->setupBroker(); diff --git a/pkg/simple-client/SimpleClientContainerExtension.php b/pkg/simple-client/SimpleClientContainerExtension.php index bfe31ddc4..7fea6f03b 100644 --- a/pkg/simple-client/SimpleClientContainerExtension.php +++ b/pkg/simple-client/SimpleClientContainerExtension.php @@ -10,11 +10,10 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\Producer; -use Enqueue\Client\ProducerV2; use Enqueue\Client\RouterProcessor; -use Enqueue\Client\RpcClient; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; use Enqueue\Consumption\QueueConsumer; +use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\TransportFactoryInterface; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\NodeInterface; @@ -90,23 +89,19 @@ public function load(array $configs, ContainerBuilder $container) $transportConfig, ]); - $container->register('enqueue.client.producer', Producer::class) - ->setArguments([ - new Reference('enqueue.client.driver'), - ]); - - $container->register('enqueue.client.rpc', RpcClient::class) + $container->register('enqueue.client.rpc_factory', RpcFactory::class) ->setArguments([ - new Reference('enqueue.client.producer'), new Reference('enqueue.transport.context'), - ]); + ]); - $container->register('enqueue.client.producer.v2', ProducerV2::class) + $container->register('enqueue.client.producer', Producer::class) ->setArguments([ - new Reference('enqueue.client.producer'), - new Reference('enqueue.client.rpc'), + new Reference('enqueue.client.driver'), + new Reference('enqueue.client.rpc_factory'), ]); + $container->setAlias('enqueue.client.producer_v2', 'enqueue.client.producer'); + $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) ->setArguments([[]]);