Skip to content

Commit 0c36cb1

Browse files
authored
Merge pull request #144 from php-enqueue/interop-amqp-client
amqp interop client
2 parents 0240672 + 528203b commit 0c36cb1

16 files changed

+711
-67
lines changed

Diff for: pkg/amqp-ext/AmqpConsumer.php

+5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
77
use Interop\Amqp\AmqpQueue;
88
use Interop\Amqp\Impl\AmqpMessage;
9+
use Interop\Queue\Exception;
910
use Interop\Queue\InvalidMessageException;
1011
use Interop\Queue\PsrMessage;
1112

@@ -73,6 +74,10 @@ public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buff
7374
*/
7475
public function setConsumerTag($consumerTag)
7576
{
77+
if ($this->isInit) {
78+
throw new Exception('Consumer tag is not mutable after it has been subscribed to broker');
79+
}
80+
7681
$this->consumerTag = $consumerTag;
7782
}
7883

Diff for: pkg/amqp-ext/Symfony/AmqpTransportFactory.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use Enqueue\AmqpExt\AmqpConnectionFactory;
66
use Enqueue\AmqpExt\AmqpContext;
7-
use Enqueue\AmqpExt\Client\AmqpDriver;
7+
use Enqueue\Client\Amqp\AmqpDriver;
88
use Enqueue\Symfony\DriverFactoryInterface;
99
use Enqueue\Symfony\TransportFactoryInterface;
1010
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;

Diff for: pkg/amqp-ext/Symfony/RabbitMqAmqpTransportFactory.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\AmqpExt\Symfony;
44

5-
use Enqueue\AmqpExt\Client\RabbitMqDriver;
5+
use Enqueue\Client\Amqp\RabbitMqDriver;
66
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
77
use Symfony\Component\DependencyInjection\ContainerBuilder;
88
use Symfony\Component\DependencyInjection\Definition;

Diff for: pkg/amqp-ext/Tests/Symfony/AmqpTransportFactoryTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
namespace Enqueue\AmqpExt\Tests\Symfony;
44

55
use Enqueue\AmqpExt\AmqpConnectionFactory;
6-
use Enqueue\AmqpExt\Client\AmqpDriver;
76
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
7+
use Enqueue\Client\Amqp\AmqpDriver;
88
use Enqueue\Symfony\TransportFactoryInterface;
99
use Enqueue\Test\ClassExtensionTrait;
1010
use PHPUnit\Framework\TestCase;

Diff for: pkg/amqp-ext/Tests/Symfony/RabbitMqAmqpTransportFactoryTest.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
namespace Enqueue\AmqpExt\Tests\Symfony;
44

55
use Enqueue\AmqpExt\AmqpConnectionFactory;
6-
use Enqueue\AmqpExt\Client\RabbitMqDriver;
76
use Enqueue\AmqpExt\Symfony\AmqpTransportFactory;
87
use Enqueue\AmqpExt\Symfony\RabbitMqAmqpTransportFactory;
8+
use Enqueue\Client\Amqp\RabbitMqDriver;
99
use Enqueue\Symfony\TransportFactoryInterface;
1010
use Enqueue\Test\ClassExtensionTrait;
1111
use PHPUnit\Framework\TestCase;

Diff for: pkg/amqp-lib/AmqpConsumer.php

+4
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ public function __construct(AMQPChannel $channel, InteropAmqpQueue $queue, Buffe
7373
*/
7474
public function setConsumerTag($consumerTag)
7575
{
76+
if ($this->isInit) {
77+
throw new Exception('Consumer tag is not mutable after it has been subscribed to broker');
78+
}
79+
7680
$this->consumerTag = $consumerTag;
7781
}
7882

Diff for: pkg/amqp-lib/Symfony/AmqpLibTransportFactory.php

+168
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Symfony;
4+
5+
use Enqueue\AmqpLib\AmqpConnectionFactory;
6+
use Enqueue\AmqpLib\AmqpContext;
7+
use Enqueue\Client\Amqp\AmqpDriver;
8+
use Enqueue\Symfony\DriverFactoryInterface;
9+
use Enqueue\Symfony\TransportFactoryInterface;
10+
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
11+
use Symfony\Component\DependencyInjection\ContainerBuilder;
12+
use Symfony\Component\DependencyInjection\Definition;
13+
use Symfony\Component\DependencyInjection\Reference;
14+
15+
class AmqpLibTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
16+
{
17+
/**
18+
* @var string
19+
*/
20+
private $name;
21+
22+
/**
23+
* @param string $name
24+
*/
25+
public function __construct($name = 'amqp_lib')
26+
{
27+
$this->name = $name;
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public function addConfiguration(ArrayNodeDefinition $builder)
34+
{
35+
$builder
36+
->beforeNormalization()
37+
->ifString()
38+
->then(function ($v) {
39+
return ['dsn' => $v];
40+
})
41+
->end()
42+
->children()
43+
->scalarNode('dsn')
44+
->info('The connection to AMQP broker set as a string. Other parameters are ignored if set')
45+
->end()
46+
->scalarNode('host')
47+
->defaultValue('localhost')
48+
->cannotBeEmpty()
49+
->info('The host to connect too. Note: Max 1024 characters')
50+
->end()
51+
->scalarNode('port')
52+
->defaultValue(5672)
53+
->cannotBeEmpty()
54+
->info('Port on the host.')
55+
->end()
56+
->scalarNode('user')
57+
->defaultValue('guest')
58+
->cannotBeEmpty()
59+
->info('The user name to use. Note: Max 128 characters.')
60+
->end()
61+
->scalarNode('pass')
62+
->defaultValue('guest')
63+
->cannotBeEmpty()
64+
->info('Password. Note: Max 128 characters.')
65+
->end()
66+
->scalarNode('vhost')
67+
->defaultValue('/')
68+
->cannotBeEmpty()
69+
->info('The virtual host on the host. Note: Max 128 characters.')
70+
->end()
71+
->integerNode('connection_timeout')
72+
->defaultValue(3.0)
73+
->min(0)
74+
->info('Connection timeout. Note: 0 or greater seconds. May be fractional.')
75+
->end()
76+
->integerNode('read_write_timeout')
77+
->defaultValue(3.0)
78+
->min(0)
79+
->end()
80+
->integerNode('read_timeout')
81+
->defaultValue(3)
82+
->min(0)
83+
->info('Timeout in for income activity. Note: 0 or greater seconds. May be fractional.')
84+
->end()
85+
->integerNode('write_timeout')
86+
->defaultValue(3)
87+
->min(0)
88+
->info('Timeout in for outcome activity. Note: 0 or greater seconds. May be fractional.')
89+
->end()
90+
->booleanNode('lazy')
91+
->defaultTrue()
92+
->end()
93+
->booleanNode('stream')
94+
->defaultTrue()
95+
->end()
96+
->booleanNode('insist')
97+
->defaultFalse()
98+
->end()
99+
->booleanNode('keepalive')
100+
->defaultFalse()
101+
->end()
102+
->enumNode('receive_method')
103+
->values(['basic_get', 'basic_consume'])
104+
->defaultValue('basic_get')
105+
->info('The receive strategy to be used. We suggest to use basic_consume as it is more performant. Though you need AMQP extension 1.9.1 or higher')
106+
->end()
107+
->integerNode('heartbeat')
108+
->defaultValue(0)
109+
->end()
110+
;
111+
}
112+
113+
/**
114+
* {@inheritdoc}
115+
*/
116+
public function createConnectionFactory(ContainerBuilder $container, array $config)
117+
{
118+
$factory = new Definition(AmqpConnectionFactory::class);
119+
$factory->setArguments(isset($config['dsn']) ? [$config['dsn']] : [$config]);
120+
121+
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
122+
$container->setDefinition($factoryId, $factory);
123+
124+
return $factoryId;
125+
}
126+
127+
/**
128+
* {@inheritdoc}
129+
*/
130+
public function createContext(ContainerBuilder $container, array $config)
131+
{
132+
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
133+
134+
$context = new Definition(AmqpContext::class);
135+
$context->setFactory([new Reference($factoryId), 'createContext']);
136+
137+
$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
138+
$container->setDefinition($contextId, $context);
139+
140+
return $contextId;
141+
}
142+
143+
/**
144+
* {@inheritdoc}
145+
*/
146+
public function createDriver(ContainerBuilder $container, array $config)
147+
{
148+
$driver = new Definition(AmqpDriver::class);
149+
$driver->setArguments([
150+
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
151+
new Reference('enqueue.client.config'),
152+
new Reference('enqueue.client.meta.queue_meta_registry'),
153+
]);
154+
155+
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
156+
$container->setDefinition($driverId, $driver);
157+
158+
return $driverId;
159+
}
160+
161+
/**
162+
* {@inheritdoc}
163+
*/
164+
public function getName()
165+
{
166+
return $this->name;
167+
}
168+
}
+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
namespace Enqueue\AmqpLib\Symfony;
4+
5+
use Enqueue\Client\Amqp\RabbitMqDriver;
6+
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
7+
use Symfony\Component\DependencyInjection\ContainerBuilder;
8+
use Symfony\Component\DependencyInjection\Definition;
9+
use Symfony\Component\DependencyInjection\Reference;
10+
11+
class RabbitMqAmqpLibTransportFactory extends AmqpLibTransportFactory
12+
{
13+
/**
14+
* @param string $name
15+
*/
16+
public function __construct($name = 'rabbitmq_amqp_lib')
17+
{
18+
parent::__construct($name);
19+
}
20+
21+
/**
22+
* {@inheritdoc}
23+
*/
24+
public function addConfiguration(ArrayNodeDefinition $builder)
25+
{
26+
parent::addConfiguration($builder);
27+
28+
$builder
29+
->children()
30+
->booleanNode('delay_plugin_installed')
31+
->defaultFalse()
32+
->info('The option tells whether RabbitMQ broker has delay plugin installed or not')
33+
->end()
34+
;
35+
}
36+
37+
/**
38+
* {@inheritdoc}
39+
*/
40+
public function createDriver(ContainerBuilder $container, array $config)
41+
{
42+
$driver = new Definition(RabbitMqDriver::class);
43+
$driver->setArguments([
44+
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
45+
new Reference('enqueue.client.config'),
46+
new Reference('enqueue.client.meta.queue_meta_registry'),
47+
]);
48+
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
49+
$container->setDefinition($driverId, $driver);
50+
51+
return $driverId;
52+
}
53+
}

0 commit comments

Comments
 (0)