diff --git a/docs/quick_tour.md b/docs/quick_tour.md index b4cfb8e0e..bb0a87684 100644 --- a/docs/quick_tour.md +++ b/docs/quick_tour.md @@ -173,7 +173,7 @@ use Enqueue\Psr\Processor; /** @var \Enqueue\Psr\Context $psrContext */ $client = new SimpleClient($psrContext); -$client->bind('foo_topic', function (Message $message) { +$client->bind('foo_topic', 'processor_name', function (Message $message) { // process message return Processor::ACK; diff --git a/pkg/enqueue/Client/SimpleClient.php b/pkg/enqueue/Client/SimpleClient.php index 12ceac7f0..ad60d2d27 100644 --- a/pkg/enqueue/Client/SimpleClient.php +++ b/pkg/enqueue/Client/SimpleClient.php @@ -72,11 +72,11 @@ public function __construct(AmqpContext $context, Config $config = null) /** * @param string $topic - * @param callback + * @param string $processorName + * @param callback $processor */ - public function bind($topic, callable $processor) + public function bind($topic, $processorName, callable $processor) { - $processorName = uniqid('', true); $queueName = $this->config->getDefaultProcessorQueueName(); $this->topicsMetaRegistry->addProcessor($topic, $processorName); @@ -97,9 +97,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) $processor = $this->getProcessor(); - $queueConsumer = new QueueConsumer($this->context, new ChainExtension([ - new SetRouterPropertiesExtension($this->driver), - ])); + $queueConsumer = $this->getQueueConsumer(); $defaultQueueName = $this->config->getDefaultProcessorQueueName(); $defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName); @@ -114,10 +112,44 @@ public function consume(ExtensionInterface $runtimeExtension = null) $queueConsumer->consume($runtimeExtension); } + /** + * @return QueueConsumer + */ + public function getQueueConsumer() + { + return new QueueConsumer($this->context, new ChainExtension([ + new SetRouterPropertiesExtension($this->driver), + ])); + } + + /** + * @return DriverInterface + */ + public function getDriver() + { + return $this->driver; + } + + /** + * @return TopicMetaRegistry + */ + public function getTopicMetaRegistry() + { + return $this->topicsMetaRegistry; + } + + /** + * @return QueueMetaRegistry + */ + public function getQueueMetaRegistry() + { + return $this->queueMetaRegistry; + } + /** * @return MessageProducerInterface */ - private function getProducer() + public function getProducer() { $this->driver->setupBroker(); @@ -127,7 +159,7 @@ private function getProducer() /** * @return DelegateProcessor */ - private function getProcessor() + public function getProcessor() { return new DelegateProcessor($this->processorsRegistry); } diff --git a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php index eb84cf2bc..346fb7b57 100644 --- a/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php +++ b/pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php @@ -34,7 +34,7 @@ public function testProduceAndConsumeOneMessage() $actualMessage = null; $client = new SimpleClient($this->context); - $client->bind('foo_topic', function (Message $message) use (&$actualMessage) { + $client->bind('foo_topic', 'foo_processor', function (Message $message) use (&$actualMessage) { $actualMessage = $message; return Result::ACK; @@ -56,12 +56,12 @@ public function testProduceAndRouteToTwoConsumes() $received = 0; $client = new SimpleClient($this->context); - $client->bind('foo_topic', function () use (&$received) { + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { ++$received; return Result::ACK; }); - $client->bind('foo_topic', function () use (&$received) { + $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { ++$received; return Result::ACK;