The bundle integrates enqueue library. It adds easy to use configuration layer, register services, adds handy cli commands.
$ composer require enqueue/enqueue-bundle enqueue/amqp-ext
First, you have to configure a transport layer and set one to be default.
# app/config/config.yml
enqueue:
transport:
default: 'amqp'
amqp:
host: 'localhost'
port: 5672
login: 'guest'
password: 'guest'
vhost: '/'
client: ~
Once you configured everything you can start producing messages:
<?php
use Enqueue\Client\MessageProducer;
/** @var MessageProducer $messageProducer **/
$messageProducer = $container->get('enqueue.message_producer');
$messageProducer->send('aFooTopic', 'Something has happened');
To consume messages you have to first create a message processor:
<?php
use Enqueue\Psr\Message;
use Enqueue\Psr\Context;
use Enqueue\Psr\Processor;
use Enqueue\Client\TopicSubscriberInterface;
class FooProcessor implements Processor, TopicSubscriberInterface
{
public function process(Message $message, Context $session)
{
echo $message->getBody();
return self::ACK;
// return self::REJECT; // when the message is broken
// return self::REQUEUE; // the message is fine but you want to postpone processing
}
public static function getSubscribedTopics()
{
return ['aFooTopic'];
}
}
Register it as a container service and subscribe to the topic:
foo_message_processor:
class: 'FooProcessor'
tags:
- { name: 'enqueue.client.processor' }
Now you can start consuming messages:
$ ./app/console enqueue:consume --setup-broker
Note: Add -vvv to find out what is going while you are consuming messages. There is a lot of valuable debug info there.