Skip to content

Fix Simple Client #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ use Enqueue\Psr\Processor;
/** @var \Enqueue\Psr\Context $psrContext */

$client = new SimpleClient($psrContext);
$client->bind('foo_topic', function (Message $message) {
$client->bind('foo_topic', 'processor_name', function (Message $message) {
// process message

return Processor::ACK;
Expand Down
48 changes: 40 additions & 8 deletions pkg/enqueue/Client/SimpleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public function __construct(AmqpContext $context, Config $config = null)

/**
* @param string $topic
* @param callback
* @param string $processorName
* @param callback $processor
*/
public function bind($topic, callable $processor)
public function bind($topic, $processorName, callable $processor)
{
$processorName = uniqid('', true);
$queueName = $this->config->getDefaultProcessorQueueName();

$this->topicsMetaRegistry->addProcessor($topic, $processorName);
Expand All @@ -97,9 +97,7 @@ public function consume(ExtensionInterface $runtimeExtension = null)

$processor = $this->getProcessor();

$queueConsumer = new QueueConsumer($this->context, new ChainExtension([
new SetRouterPropertiesExtension($this->driver),
]));
$queueConsumer = $this->getQueueConsumer();

$defaultQueueName = $this->config->getDefaultProcessorQueueName();
$defaultTransportQueueName = $this->config->createTransportQueueName($defaultQueueName);
Expand All @@ -114,10 +112,44 @@ public function consume(ExtensionInterface $runtimeExtension = null)
$queueConsumer->consume($runtimeExtension);
}

/**
* @return QueueConsumer
*/
public function getQueueConsumer()
{
return new QueueConsumer($this->context, new ChainExtension([
new SetRouterPropertiesExtension($this->driver),
]));
}

/**
* @return DriverInterface
*/
public function getDriver()
{
return $this->driver;
}

/**
* @return TopicMetaRegistry
*/
public function getTopicMetaRegistry()
{
return $this->topicsMetaRegistry;
}

/**
* @return QueueMetaRegistry
*/
public function getQueueMetaRegistry()
{
return $this->queueMetaRegistry;
}

/**
* @return MessageProducerInterface
*/
private function getProducer()
public function getProducer()
{
$this->driver->setupBroker();

Expand All @@ -127,7 +159,7 @@ private function getProducer()
/**
* @return DelegateProcessor
*/
private function getProcessor()
public function getProcessor()
{
return new DelegateProcessor($this->processorsRegistry);
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/enqueue/Tests/Functional/Client/SimpleClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function testProduceAndConsumeOneMessage()
$actualMessage = null;

$client = new SimpleClient($this->context);
$client->bind('foo_topic', function (Message $message) use (&$actualMessage) {
$client->bind('foo_topic', 'foo_processor', function (Message $message) use (&$actualMessage) {
$actualMessage = $message;

return Result::ACK;
Expand All @@ -56,12 +56,12 @@ public function testProduceAndRouteToTwoConsumes()
$received = 0;

$client = new SimpleClient($this->context);
$client->bind('foo_topic', function () use (&$received) {
$client->bind('foo_topic', 'foo_processor1', function () use (&$received) {
++$received;

return Result::ACK;
});
$client->bind('foo_topic', function () use (&$received) {
$client->bind('foo_topic', 'foo_processor2', function () use (&$received) {
++$received;

return Result::ACK;
Expand Down