Skip to content

Commit 1c6d2a9

Browse files
authored
Merge pull request #81 from php-enqueue/dsn-transport
Add ability to set transport DSN directly to default transport factory.
2 parents ba766d4 + 553974a commit 1c6d2a9

File tree

14 files changed

+502
-187
lines changed

14 files changed

+502
-187
lines changed

Diff for: docker-compose.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ services:
1010
volumes:
1111
- './:/mqdev'
1212
environment:
13-
- AMQP_DSN=amqp://rabbitmq
13+
- AMQP_DSN=amqp://guest:guest@rabbitmq:5672/mqdev
1414
- SYMFONY__RABBITMQ__HOST=rabbitmq
1515
- SYMFONY__RABBITMQ__USER=guest
1616
- SYMFONY__RABBITMQ__PASSWORD=guest

Diff for: docs/bundle/quick_tour.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ First, you have to configure a transport layer and set one to be default.
4545

4646
enqueue:
4747
transport:
48-
default: 'amqp'
49-
amqp: "amqp://"
48+
default: "amqp://"
5049
client: ~
5150
```
5251

Diff for: docs/client/quick_tour.md

+1-9
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,7 @@ use Enqueue\SimpleClient\SimpleClient;
2222

2323
include __DIR__.'/vendor/autoload.php';
2424

25-
$client = new SimpleClient([
26-
'transport' => [
27-
'default' => 'amqp',
28-
'amqp' => 'amqp://'
29-
],
30-
'client' => [
31-
'app_name' => 'plain_php',
32-
],
33-
]);
25+
$client = new SimpleClient('amqp://');
3426
```
3527

3628
## Produce message

Diff for: docs/client/rpc_call.md

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
11
# Client. RPC call
22

33
The client's [quick tour](quick_tour.md) describes how to get the client object.
4-
We use you followed instructions there and have instance of `Enqueue\SimpleClient\SimpleClient` in `$client` var.
4+
Here we'll use `Enqueue\SimpleClient\SimpleClient` though it is not required.
5+
You can get all that stuff from manually built client or get objects from a container (Symfony).
6+
7+
The simple client could be created like this:
58

69
## The client side
710

811
There is a handy class RpcClient shipped with the client component.
9-
It allows you to easily send a message and wait for a reply.
12+
It allows you to easily perform [RPC calls](https://en.wikipedia.org/wiki/Remote_procedure_call).
13+
It send a message and wait for a reply.
1014

1115
```php
1216
<?php
1317
use Enqueue\Client\RpcClient;
18+
use Enqueue\SimpleClient\SimpleClient;
1419

15-
/** @var \Enqueue\SimpleClient\SimpleClient $client */
20+
$client = new SimpleClient('amqp://');
1621

1722
$rpcClient = new RpcClient($client->getProducer(), $context);
1823

@@ -24,8 +29,9 @@ You can perform several requests asynchronously with `callAsync` and request rep
2429
```php
2530
<?php
2631
use Enqueue\Client\RpcClient;
32+
use Enqueue\SimpleClient\SimpleClient;
2733

28-
/** @var \Enqueue\SimpleClient\SimpleClient $client */
34+
$client = new SimpleClient('amqp://');
2935

3036
$rpcClient = new RpcClient($client->getProducer(), $context);
3137

@@ -54,10 +60,11 @@ use Enqueue\Psr\PsrContext;
5460
use Enqueue\Consumption\Result;
5561
use Enqueue\Consumption\ChainExtension;
5662
use Enqueue\Consumption\Extension\ReplyExtension;
63+
use Enqueue\SimpleClient\SimpleClient;
5764

5865
/** @var \Enqueue\Psr\PsrContext $context */
5966

60-
/** @var \Enqueue\SimpleClient\SimpleClient $client */
67+
$client = new SimpleClient('amqp://');
6168

6269
$client->bind('greeting_topic', 'greeting_processor', function (PsrMessage $message, PsrContext $context) use (&$requestMessage) {
6370
echo $message->getBody();

Diff for: docs/quick_tour.md

+5-13
Original file line numberDiff line numberDiff line change
@@ -170,19 +170,11 @@ Here's an example of how you can send and consume messages.
170170
use Enqueue\SimpleClient\SimpleClient;
171171
use Enqueue\Psr\PsrMessage;
172172

173-
$client = new SimpleClient([
174-
'transport' => [
175-
'default' => 'amqp',
176-
'amqp' => [
177-
'host' => 'localhost',
178-
'port' => 5672,
179-
'vhost' => '/',
180-
'user' => 'guest',
181-
'pass' => 'guest',
182-
],
183-
],
184-
'client' => true,
185-
]);
173+
// composer require enqueue/amqp-ext
174+
$client = new SimpleClient('amqp://');
175+
176+
// composer require enqueue/fs
177+
$client = new SimpleClient('file://foo/bar');
186178

187179
$client->setupBroker();
188180

Diff for: pkg/amqp-ext/AmqpConnectionFactory.php

+2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,9 @@ private function establishConnection()
7777
$config = $this->config;
7878
$config['login'] = $this->config['user'];
7979
$config['password'] = $this->config['pass'];
80+
8081
$this->connection = new \AMQPConnection($config);
82+
8183
$this->config['persisted'] ? $this->connection->pconnect() : $this->connection->connect();
8284
}
8385
if (false == $this->connection->isConnected()) {

Diff for: pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

+12
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ public function provideEnqueueConfigs()
4343
],
4444
]];
4545

46+
yield 'default_amqp_as_dsn' => [[
47+
'transport' => [
48+
'default' => getenv('AMQP_DSN'),
49+
],
50+
]];
51+
4652
yield 'stomp' => [[
4753
'transport' => [
4854
'default' => 'stomp',
@@ -97,6 +103,12 @@ public function provideEnqueueConfigs()
97103
],
98104
]];
99105

106+
yield 'default_fs_as_dsn' => [[
107+
'transport' => [
108+
'default' => 'file:/'.sys_get_temp_dir(),
109+
],
110+
]];
111+
100112
yield 'dbal' => [[
101113
'transport' => [
102114
'default' => 'dbal',

Diff for: pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php

+28-4
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
use Enqueue\Bundle\Tests\Unit\Mocks\FooTransportFactory;
88
use Enqueue\Client\Producer;
99
use Enqueue\Client\TraceableProducer;
10-
use Enqueue\Symfony\DefaultTransportFactory;
10+
use Enqueue\Null\NullContext;
1111
use Enqueue\Null\Symfony\NullTransportFactory;
12+
use Enqueue\Symfony\DefaultTransportFactory;
1213
use Enqueue\Test\ClassExtensionTrait;
13-
use Enqueue\Null\NullContext;
14+
use PHPUnit\Framework\TestCase;
1415
use Symfony\Component\DependencyInjection\ContainerBuilder;
1516
use Symfony\Component\DependencyInjection\Reference;
1617
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
17-
use PHPUnit\Framework\TestCase;
1818

1919
class EnqueueExtensionTest extends TestCase
2020
{
@@ -95,6 +95,30 @@ public function testShouldUseNullTransportAsDefault()
9595
);
9696
}
9797

98+
public function testShouldUseNullTransportAsDefaultConfiguredViaDSN()
99+
{
100+
$container = new ContainerBuilder();
101+
102+
$extension = new EnqueueExtension();
103+
$extension->addTransportFactory(new NullTransportFactory());
104+
$extension->addTransportFactory(new DefaultTransportFactory());
105+
106+
$extension->load([[
107+
'transport' => [
108+
'default' => 'null://',
109+
],
110+
]], $container);
111+
112+
self::assertEquals(
113+
'enqueue.transport.default.context',
114+
(string) $container->getAlias('enqueue.transport.context')
115+
);
116+
self::assertEquals(
117+
'enqueue.transport.default_null.context',
118+
(string) $container->getAlias('enqueue.transport.default.context')
119+
);
120+
}
121+
98122
public function testShouldConfigureFooTransport()
99123
{
100124
$container = new ContainerBuilder();
@@ -445,7 +469,7 @@ public function testShouldAddJobQueueEntityMapping()
445469

446470
$extension->prepend($container);
447471

448-
$config = $container->getExtensionConfig('doctrine');
472+
$config = $container->getExtensionConfig('doctrine');
449473

450474
$this->assertSame(['dbal' => true], $config[1]);
451475
$this->assertNotEmpty($config[0]['orm']['mappings']['enqueue_job_queue']);

Diff for: pkg/enqueue/Symfony/DefaultTransportFactory.php

+68-6
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,15 @@
22

33
namespace Enqueue\Symfony;
44

5+
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
7+
use Enqueue\Fs\FsConnectionFactory;
8+
use Enqueue\Fs\Symfony\FsTransportFactory;
9+
use Enqueue\Null\NullConnectionFactory;
10+
use Enqueue\Null\Symfony\NullTransportFactory;
511
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
612
use Symfony\Component\DependencyInjection\ContainerBuilder;
13+
use function Enqueue\dsn_to_connection_factory;
714

815
class DefaultTransportFactory implements TransportFactoryInterface
916
{
@@ -29,18 +36,30 @@ public function addConfiguration(ArrayNodeDefinition $builder)
2936
->beforeNormalization()
3037
->ifString()
3138
->then(function ($v) {
32-
return ['alias' => $v];
39+
if (false === strpos($v, '://')) {
40+
return ['alias' => $v];
41+
}
42+
43+
return ['dsn' => $v];
3344
})
3445
->end()
3546
->children()
36-
->scalarNode('alias')->isRequired()->cannotBeEmpty()->end()
37-
;
47+
->scalarNode('alias')->cannotBeEmpty()->end()
48+
->scalarNode('dsn')->cannotBeEmpty()->end()
49+
;
3850
}
3951

4052
public function createConnectionFactory(ContainerBuilder $container, array $config)
4153
{
54+
if (isset($config['alias'])) {
55+
$aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']);
56+
} elseif (isset($config['dsn'])) {
57+
$aliasId = $this->findFactory($config['dsn'])->createConnectionFactory($container, $config);
58+
} else {
59+
throw new \LogicException('Either dsn or alias option must be set.');
60+
}
61+
4262
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
43-
$aliasId = sprintf('enqueue.transport.%s.connection_factory', $config['alias']);
4463

4564
$container->setAlias($factoryId, $aliasId);
4665
$container->setAlias('enqueue.transport.connection_factory', $factoryId);
@@ -53,8 +72,15 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf
5372
*/
5473
public function createContext(ContainerBuilder $container, array $config)
5574
{
75+
if (isset($config['alias'])) {
76+
$aliasId = sprintf('enqueue.transport.%s.context', $config['alias']);
77+
} elseif (isset($config['dsn'])) {
78+
$aliasId = $this->findFactory($config['dsn'])->createContext($container, $config);
79+
} else {
80+
throw new \LogicException('Either dsn or alias option must be set.');
81+
}
82+
5683
$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
57-
$aliasId = sprintf('enqueue.transport.%s.context', $config['alias']);
5884

5985
$container->setAlias($contextId, $aliasId);
6086
$container->setAlias('enqueue.transport.context', $contextId);
@@ -67,8 +93,15 @@ public function createContext(ContainerBuilder $container, array $config)
6793
*/
6894
public function createDriver(ContainerBuilder $container, array $config)
6995
{
96+
if (isset($config['alias'])) {
97+
$aliasId = sprintf('enqueue.client.%s.driver', $config['alias']);
98+
} elseif (isset($config['dsn'])) {
99+
$aliasId = $this->findFactory($config['dsn'])->createDriver($container, $config);
100+
} else {
101+
throw new \LogicException('Either dsn or alias option must be set.');
102+
}
103+
70104
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
71-
$aliasId = sprintf('enqueue.client.%s.driver', $config['alias']);
72105

73106
$container->setAlias($driverId, $aliasId);
74107
$container->setAlias('enqueue.client.driver', $driverId);
@@ -83,4 +116,33 @@ public function getName()
83116
{
84117
return $this->name;
85118
}
119+
120+
/**
121+
* @param string
122+
* @param mixed $dsn
123+
*
124+
* @return TransportFactoryInterface
125+
*/
126+
private function findFactory($dsn)
127+
{
128+
$connectionFactory = dsn_to_connection_factory($dsn);
129+
130+
if ($connectionFactory instanceof AmqpConnectionFactory) {
131+
return new AmqpTransportFactory('default_amqp');
132+
}
133+
134+
if ($connectionFactory instanceof FsConnectionFactory) {
135+
return new FsTransportFactory('default_fs');
136+
}
137+
138+
if ($connectionFactory instanceof NullConnectionFactory) {
139+
return new NullTransportFactory('default_null');
140+
}
141+
142+
throw new \LogicException(sprintf(
143+
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
144+
get_class($connectionFactory),
145+
$dsn
146+
));
147+
}
86148
}

0 commit comments

Comments
 (0)