Skip to content

[client] Merge experimental ProducerV2 methods to Producer interface. #124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/bundle/debuging.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ class DefaultController extends Controller
/** @var ProducerInterface $producer */
$producer = $this->get('enqueue.producer');

$producer->send('foo_topic', 'Hello world');
$producer->sendEvent('foo_topic', 'Hello world');

$producer->send('bar_topic', ['bar' => 'val']);
$producer->sendEvent('bar_topic', ['bar' => 'val']);

$message = new Message();
$message->setBody('baz');
$producer->send('baz_topic', $message);
$producer->sendEvent('baz_topic', $message);

// ...
}
Expand Down
2 changes: 1 addition & 1 deletion docs/bundle/job_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Step1Processor implements PsrProcessor
$runner->createDelayed(
$jobName,
function (JobRunner $runner, Job $childJob) use ($entity) {
$this->producer->send('search:index:index-single-entity', [
$this->producer->sendEvent('search:index:index-single-entity', [
'entityId' => $entity->getId(),
'jobId' => $childJob->getId(),
]);
Expand Down
35 changes: 33 additions & 2 deletions docs/bundle/message_producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ $context->createProducer()->send(
The client is shipped with two types of producers. The first one sends messages immediately
where another one (it is called spool producer) collects them in memory and sends them `onTerminate` event (the response is already sent).

The producer has two types on send methods:

* `sendEvent` - Message is sent to topic and many consumers can subscriber to it. It is "fire and forget" strategy. The even could be sent to "message bus" to other applications.
* `sendCommand` - Message is to ONE exact consumer. It could be used as "fire and forget" or as RPC. The command message is always sent in scope of current application.

### Send event

```php
<?php

Expand All @@ -35,14 +40,40 @@ where another one (it is called spool producer) collects them in memory and send
$producer = $container->get('enqueue.producer');

// message is being sent right now
$producer->send('a_topic', 'Hello there!');
$producer->sendEvent('a_topic', 'Hello there!');


/** @var \Enqueue\Client\SpoolProducer $spoolProducer */
$spoolProducer = $container->get('enqueue.spool_producer');

// message is being sent on console.terminate or kernel.terminate event
$spoolProducer->sendEvent('a_topic', 'Hello there!');

// you could send queued messages manually by calling flush method
$spoolProducer->flush();
```

### Send command

```php
<?php

/** @var Symfony\Component\DependencyInjection\ContainerInterface $container */

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer = $container->get('enqueue.producer');

// message is being sent right now, we use it as RPC
$promise = $producer->sendCommand('a_processor_name', 'Hello there!', $needReply = true);

$replyMessage = $promise->receive();


/** @var \Enqueue\Client\SpoolProducer $spoolProducer */
$spoolProducer = $container->get('enqueue.spool_producer');

// message is being sent on console.terminate or kernel.terminate event
$spoolProducer->send('a_topic', 'Hello there!');
$spoolProducer->sendCommand('a_processor_name', 'Hello there!');

// you could send queued messages manually by calling flush method
$spoolProducer->flush();
Expand Down
7 changes: 6 additions & 1 deletion docs/bundle/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,12 @@ use Enqueue\Client\Producer;
/** @var Producer $producer **/
$producer = $container->get('enqueue.producer');

$producer->send('aFooTopic', 'Something has happened');

// send event to many consumers
$producer->sendEvent('aFooTopic', 'Something has happened');

// send command to ONE consumer
$producer->sendCommand('aProcessorName', 'Something has happened');
```

To consume messages you have to first create a message processor:
Expand Down
32 changes: 28 additions & 4 deletions docs/client/message_examples.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
# Client. Message examples

* [Scope](#scope)
* [Delay](#delay)
* [Expiration (TTL)](#expiration-ttl)
* [Priority](#priority)
* [Timestamp, Content type, Message id](#timestamp-content-type-message-id)

## Scope

There are two two types possible scopes: `Message:SCOPE_MESSAGE_BUS` and `Message::SCOPE_APP`.
The first one instructs the client send messages (if driver supports) to the message bus so other apps can consume those messages.
The second in turns limits the message to the application that sent it. No other apps could receive it.

```php
<?php

use Enqueue\Client\Message;

$message = new Message();
$message->setScope(Message::SCOPE_MESSAGE_BUS);

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer->sendEvent('aTopic', $message);
```

## Delay

Expand All @@ -15,7 +39,7 @@ $message = new Message();
$message->setDelay(60); // seconds

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer->send('aTopic', $message);
$producer->sendEvent('aTopic', $message);
```

## Expiration (TTL)
Expand All @@ -33,7 +57,7 @@ $message = new Message();
$message->setExpire(60); // seconds

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer->send('aTopic', $message);
$producer->sendEvent('aTopic', $message);
```

## Priority
Expand All @@ -52,7 +76,7 @@ $message = new Message();
$message->setPriority(MessagePriority::HIGH);

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer->send('aTopic', $message);
$producer->sendEvent('aTopic', $message);
```

## Timestamp, Content type, Message id
Expand All @@ -72,7 +96,7 @@ $message->setTimestamp(time());
$message->setContentType('text/plain');

/** @var \Enqueue\Client\ProducerInterface $producer */
$producer->send('aTopic', $message);
$producer->sendEvent('aTopic', $message);
```

[back to index](../index.md)
2 changes: 1 addition & 1 deletion docs/job_queue/run_sub_job.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RootJobProcessor implements PsrProcessor
{
$result = $this->jobRunner->runUnique($message->getMessageId(), 'aJobName', function (JobRunner $runner) {
$runner->createDelayed('aSubJobName1', function (JobRunner $runner, Job $childJob) {
$this->producer->send('aJobTopic', [
$this->producer->sendEvent('aJobTopic', [
'jobId' => $childJob->getId(),
// other data required by sub job
]);
Expand Down
52 changes: 47 additions & 5 deletions docs/quick_tour.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ It provides easy to use services for producing and processing messages.
It supports unified format for setting message expiration, delay, timestamp, correlation id.
It supports message bus so different applications can talk to each other.

Here's an example of how you can send and consume messages.
Here's an example of how you can send and consume event messages.

```php
<?php
Expand All @@ -179,15 +179,57 @@ $client = new SimpleClient('file://foo/bar');
$client->setupBroker();

$client->bind('a_foo_topic', 'fooProcessor', function(PsrMessage $message) {
// your processing logic here
// your event processor logic here
});

$client->send('a_bar_topic', 'aMessageData');

// in another process you can consume messages.
// this is a blocking call, it'll consume message until it is interrupted
$client->consume();
```

and command messages:

```php
<?php
use Enqueue\SimpleClient\SimpleClient;
use Enqueue\Psr\PsrMessage;
use Enqueue\Psr\PsrContext;
use Enqueue\Client\Config;
use Enqueue\Consumption\Extension\ReplyExtension;
use Enqueue\Consumption\Result;

// composer require enqueue/amqp-ext
$client = new SimpleClient('amqp://');

// composer require enqueue/fs
$client = new SimpleClient('file://foo/bar');
$client->

$client->setupBroker();

$client->bind(Config::COMMAND_TOPIC, 'bar_command', function(PsrMessage $message) {
// your bar command processor logic here
});

$client->bind(Config::COMMAND_TOPIC, 'baz_reply_command', function(PsrMessage $message, PsrContext $context) {
// your baz reply command processor logic here

return Result::reply($context->createMessage('theReplyBody'));
});

// It is sent to one consumer.
$client->sendCommand('bar_command', 'aMessageData');

// It is possible to get reply
$promise = $client->sendCommand('bar_command', 'aMessageData', true);

// you can send several commands and only after start getting replies.

$replyMessage = $promise->receive(2000); // 2 sec

// this is a blocking call, it'll consume message until it is interrupted
$client->consume([new ReplyExtension()]);
```

## Cli commands

The library provides handy commands out of the box.
Expand Down
2 changes: 1 addition & 1 deletion pkg/enqueue-bundle/Events/AsyncListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function onEvent(Event $event = null, $eventName)
$message->setProperty('event_name', $eventName);
$message->setProperty('transformer_name', $transformerName);

$this->producer->send('event.'.$eventName, $message);
$this->producer->sendEvent('event.'.$eventName, $message);
}
}
}
12 changes: 5 additions & 7 deletions pkg/enqueue-bundle/Resources/config/client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
class: 'Enqueue\Client\Producer'
arguments:
- '@enqueue.client.driver'
- '@enqueue.client.rpc_factory'
- '@enqueue.client.extensions'

enqueue.client.spool_producer:
Expand All @@ -24,18 +25,15 @@ services:
alias: 'enqueue.client.producer'

enqueue.client.producer_v2:
class: 'Enqueue\Client\ProducerV2'
arguments:
- '@enqueue.client.producer'
- '@enqueue.client.rpc_client'
alias: 'enqueue.client.producer'

enqueue.spool_producer:
alias: 'enqueue.client.spool_producer'

enqueue.client.rpc_client:
class: 'Enqueue\Client\RpcClient'
enqueue.client.rpc_factory:
class: 'Enqueue\Rpc\RpcFactory'
public: false
arguments:
- '@enqueue.client.producer'
- '@enqueue.transport.context'

enqueue.client.router_processor:
Expand Down
7 changes: 7 additions & 0 deletions pkg/enqueue-bundle/Resources/config/services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ services:
tags:
- { name: 'console.command' }

enqueue.transport.rpc_factory:
class: 'Enqueue\Rpc\RpcFactory'
public: false
arguments:
- '@enqueue.transport.context'

enqueue.transport.rpc_client:
class: 'Enqueue\Rpc\RpcClient'
arguments:
- '@enqueue.transport.context'
- '@enqueue.transport.rpc_factory'
Loading