Skip to content

Commit 76f99ff

Browse files
committed
[doc] Add docs about message processors.
1 parent 4fdd4d1 commit 76f99ff

File tree

3 files changed

+215
-0
lines changed

3 files changed

+215
-0
lines changed

Diff for: docs/bundle/message_processor.md

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# Message processor
2+
3+
Message processors and usage examples described in [consumption/message_processor](../consumption/message_processor.md)
4+
Here we just show how to register a message processor service to enqueue. Let's say we have app bundle and a message processor there
5+
6+
* [Container tag](#container-tag)
7+
* [Topic subscriber](#topic-subscriber)
8+
9+
# Container tag
10+
11+
```yaml
12+
# src/AppBundle/Resources/services.yml
13+
14+
services:
15+
app.async.say_hello_processor:
16+
class: 'AppBundle\Async\SayHelloProcessor'
17+
tags:
18+
- { name: 'enqueue.client.message_processor', topicName: 'aTopic' }
19+
20+
```
21+
22+
The tag has some additional options:
23+
24+
* topicName [Req]: Tells what topic to consume messages from.
25+
* queueName: By default message processor does not require an extra queue on broker side. It reuse a default one. Setting the option you can define a custom queue to be used.
26+
* processorName: By default the service id is used as message processor name. Using the option you can define a custom name.
27+
28+
# Topic subscriber
29+
30+
There is a `TopicSubscriber` interface (like [EventSubscriberInterface](https://github.com/symfony/symfony/blob/master/src/Symfony/Component/EventDispatcher/EventSubscriberInterface.php)).
31+
It allows to keep subscription login and process logic closer to each other.
32+
33+
```php
34+
<?php
35+
namespace AppBundle\Async;
36+
37+
use Enqueue\Client\TopicSubscriberInterface;
38+
use Enqueue\Psr\Processor;
39+
40+
class SayHelloProcessor implements Processor, TopicSubscriberInterface
41+
{
42+
public static function getSubscribedTopics()
43+
{
44+
return ['aTopic', 'anotherTopic'];
45+
}
46+
}
47+
```
48+
49+
On the topic subscriber you can also define queue and processor name:
50+
51+
```php
52+
<?php
53+
use Enqueue\Client\TopicSubscriberInterface;
54+
use Enqueue\Psr\Processor;
55+
56+
class SayHelloProcessor implements Processor, TopicSubscriberInterface
57+
{
58+
public static function getSubscribedTopics()
59+
{
60+
return [
61+
'aTopic' => ['queueName' => 'fooQueue', 'processorName' => 'foo'],
62+
'anotherTopic' => ['queueName' => 'barQueue', 'processorName' => 'bar'],
63+
];
64+
}
65+
}
66+
```
67+
68+
In the container you can just add the tag `enqueue.client.message_processor` and omit any other options:
69+
70+
```yaml
71+
# src/AppBundle/Resources/services.yml
72+
73+
services:
74+
app.async.say_hello_processor:
75+
class: 'AppBundle\Async\SayHelloProcessor'
76+
tags:
77+
- { name: 'enqueue.client.message_processor'}
78+
79+
```
80+
81+
[back to index](../index.md)

Diff for: docs/consumption/message_processor.md

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Message processor
2+
3+
The message processor is an object that actually process the message and must return a result status.
4+
Here's example:
5+
6+
```php
7+
<?php
8+
use Enqueue\Psr\Processor;
9+
use Enqueue\Psr\Message;
10+
use Enqueue\Psr\Context;
11+
12+
class SendMailProcessor implements Processor
13+
{
14+
public function process(Message $message, Context $context)
15+
{
16+
$this->mailer->send('[email protected]', $message->getBody());
17+
18+
return self::ACK;
19+
}
20+
}
21+
```
22+
23+
Usually there is no need to catch exceptions.
24+
The message broker can detect consumer has failed and redeliver the message.
25+
Sometimes you have to reject messages explicitly.
26+
27+
```php
28+
<?php
29+
use Enqueue\Psr\Processor;
30+
use Enqueue\Psr\Message;
31+
use Enqueue\Psr\Context;
32+
use Enqueue\Util\JSON;
33+
34+
class SendMailProcessor implements Processor
35+
{
36+
public function process(Message $message, Context $context)
37+
{
38+
$data = JSON::decode($message->getBody());
39+
if ($user = $this->userRepository->find($data['userId'])) {
40+
return self::REJECT;
41+
}
42+
43+
$this->mailer->send($user->getEmail(), $data['text']);
44+
45+
return self::ACK;
46+
}
47+
}
48+
```
49+
50+
It is possible to find out whether the message failed previously or not.
51+
There is `isRedelivered` method for that.
52+
If it returns true than there was attempt to process message.
53+
54+
```php
55+
<?php
56+
use Enqueue\Psr\Processor;
57+
use Enqueue\Psr\Message;
58+
use Enqueue\Psr\Context;
59+
60+
class SendMailProcessor implements Processor
61+
{
62+
public function process(Message $message, Context $context)
63+
{
64+
if ($message->isRedelivered()) {
65+
return self::REQUEUE;
66+
}
67+
68+
$this->mailer->send('[email protected]', $message->getBody());
69+
70+
return self::ACK;
71+
}
72+
}
73+
```
74+
75+
The second argument is your context. You can use it to send messages to other queues\topics.
76+
77+
```php
78+
<?php
79+
use Enqueue\Psr\Processor;
80+
use Enqueue\Psr\Message;
81+
use Enqueue\Psr\Context;
82+
83+
class SendMailProcessor implements Processor
84+
{
85+
public function process(Message $message, Context $context)
86+
{
87+
$this->mailer->send('[email protected]', $message->getBody());
88+
89+
$queue = $context->createQueue('anotherQueue');
90+
$message = $context->createMessage('Message has been sent');
91+
$context->createProducer()->send($queue, $message);
92+
93+
return self::ACK;
94+
}
95+
}
96+
```
97+
98+
The consumption component provide some useful extensions, for example there is an extension that makes RPC processing simplier.
99+
100+
```php
101+
<?php
102+
use Enqueue\Psr\Processor;
103+
use Enqueue\Psr\Message;
104+
use Enqueue\Psr\Context;
105+
use Enqueue\Consumption\ChainExtension;
106+
use Enqueue\Consumption\QueueConsumer;
107+
use Enqueue\Consumption\Extension\ReplyExtension;
108+
use Enqueue\Consumption\Result;
109+
110+
class SendMailProcessor implements Processor
111+
{
112+
public function process(Message $message, Context $context)
113+
{
114+
$this->mailer->send('[email protected]', $message->getBody());
115+
116+
$replyMessage = $context->createMessage('Message has been sent');
117+
118+
return Result::reply($replyMessage);
119+
}
120+
}
121+
122+
/** @var \Enqueue\Psr\Context $psrContext */
123+
124+
$queueConsumer = new QueueConsumer($psrContext, new ChainExtension([
125+
new ReplyExtension()
126+
]));
127+
128+
$queueConsumer->bind('foo', new SendMailProcessor());
129+
130+
$queueConsumer->consume();
131+
```
132+
133+
[back to index](../index.md)

Diff for: docs/index.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
- [Null](null_transport.md)
99
* Consumption
1010
- [Extensions](consumption/extensions.md)
11+
- [Message processor](consumption/message_processor.md)
1112
* Client
1213
- [Message examples](client/message_examples.md)
1314
- [Supported brokers](client/supported_brokers.md)

0 commit comments

Comments
 (0)