Skip to content

Add support for using the /topic prefix instead of /exchange. #826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/transport/stomp.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ $factory = new StompConnectionFactory('stomp:');
// same as above
$factory = new StompConnectionFactory([]);

// connect via stomp to RabbitMQ (default) - the topic names are prefixed with /exchange
$factory = new StompConnectionFactory('stomp+rabbitmq:');

// connect via stomp to ActiveMQ - the topic names are prefixed with /topic
$factory = new StompConnectionFactory('stomp+activemq:');

// connect to stomp broker at example.com port 1000 using
$factory = new StompConnectionFactory([
'host' => 'example.com',
Expand Down
19 changes: 17 additions & 2 deletions pkg/stomp/StompConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

class StompConnectionFactory implements ConnectionFactory
{
const SCHEME_EXT_ACTIVEMQ = 'activemq';
const SCHEME_EXT_RABBITMQ = 'rabbitmq';

/**
* @var array
*/
Expand Down Expand Up @@ -66,13 +69,15 @@ public function __construct($config = 'stomp:')
*/
public function createContext(): Context
{
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;

if ($this->config['lazy']) {
return new StompContext(function () {
return $this->establishConnection();
});
}, $useExchangePrefix);
}

return new StompContext($this->establishConnection());
return new StompContext($this->establishConnection(), $useExchangePrefix);
}

private function establishConnection(): BufferedStompClient
Expand Down Expand Up @@ -103,7 +108,16 @@ private function parseDsn(string $dsn): array
throw new \LogicException(sprintf('The given DSN is not supported. Must start with "stomp:".'));
}

$schemeExtension = current($dsn->getSchemeExtensions());
if (false === $schemeExtension) {
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
}
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
}

return array_filter(array_replace($dsn->getQuery(), [
'target' => $schemeExtension,
'host' => $dsn->getHost(),
'port' => $dsn->getPort(),
'login' => $dsn->getUser(),
Expand All @@ -120,6 +134,7 @@ private function parseDsn(string $dsn): array
private function defaultConfig(): array
{
return [
'target' => self::SCHEME_EXT_RABBITMQ,
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand Down
12 changes: 10 additions & 2 deletions pkg/stomp/StompContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@ class StompContext implements Context
*/
private $stomp;

/**
* @var bool
*/
private $useExchangePrefix;

/**
* @var callable
*/
private $stompFactory;

/**
* @param BufferedStompClient|callable $stomp
* @param bool $useExchangePrefix
*/
public function __construct($stomp)
public function __construct($stomp, $useExchangePrefix = true)
{
if ($stomp instanceof BufferedStompClient) {
$this->stomp = $stomp;
Expand All @@ -40,6 +46,8 @@ public function __construct($stomp)
} else {
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
}

$this->useExchangePrefix = $useExchangePrefix;
}

/**
Expand Down Expand Up @@ -84,7 +92,7 @@ public function createTopic(string $name): Topic
{
if (0 !== strpos($name, '/')) {
$destination = new StompDestination();
$destination->setType(StompDestination::TYPE_EXCHANGE);
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
$destination->setStompName($name);

return $destination;
Expand Down
43 changes: 43 additions & 0 deletions pkg/stomp/Tests/StompConnectionFactoryConfigTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public static function provideConfigs()
yield [
null,
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand All @@ -71,6 +72,7 @@ public static function provideConfigs()
yield [
'stomp:',
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand All @@ -87,6 +89,7 @@ public static function provideConfigs()
yield [
[],
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand All @@ -103,6 +106,43 @@ public static function provideConfigs()
yield [
'stomp://localhost:1234?foo=bar&lazy=0&sync=true',
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 1234,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
'buffer_size' => 1000,
'connection_timeout' => 1,
'sync' => true,
'lazy' => false,
'foo' => 'bar',
'ssl_on' => false,
],
];

yield [
'stomp+activemq://localhost:1234?foo=bar&lazy=0&sync=true',
[
'target' => 'activemq',
'host' => 'localhost',
'port' => 1234,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/',
'buffer_size' => 1000,
'connection_timeout' => 1,
'sync' => true,
'lazy' => false,
'foo' => 'bar',
'ssl_on' => false,
],
];

yield [
'stomp+rabbitmq://localhost:1234?foo=bar&lazy=0&sync=true',
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 1234,
'login' => 'guest',
Expand All @@ -120,6 +160,7 @@ public static function provideConfigs()
yield [
['dsn' => 'stomp://localhost:1234/theVhost?foo=bar&lazy=0&sync=true', 'baz' => 'bazVal', 'foo' => 'fooVal'],
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 1234,
'login' => 'guest',
Expand All @@ -138,6 +179,7 @@ public static function provideConfigs()
yield [
['dsn' => 'stomp:///%2f'],
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 61613,
'login' => 'guest',
Expand All @@ -154,6 +196,7 @@ public static function provideConfigs()
yield [
['host' => 'localhost', 'port' => 1234, 'foo' => 'bar'],
[
'target' => 'rabbitmq',
'host' => 'localhost',
'port' => 1234,
'login' => 'guest',
Expand Down
25 changes: 25 additions & 0 deletions pkg/stomp/Tests/StompConnectionFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,31 @@ public function testShouldCreateLazyContext()
$this->assertInstanceOf(StompContext::class, $context);

$this->assertAttributeEquals(null, 'stomp', $context);
$this->assertAttributeEquals(true, 'useExchangePrefix', $context);
$this->assertInternalType('callable', $this->readAttribute($context, 'stompFactory'));
}

public function testShouldCreateRabbitMQContext()
{
$factory = new StompConnectionFactory('stomp+rabbitmq://');

$context = $factory->createContext();

$this->assertInstanceOf(StompContext::class, $context);

$this->assertAttributeEquals(null, 'stomp', $context);
$this->assertAttributeEquals(true, 'useExchangePrefix', $context);
}

public function testShouldCreateActiveMQContext()
{
$factory = new StompConnectionFactory('stomp+activemq://');

$context = $factory->createContext();

$this->assertInstanceOf(StompContext::class, $context);

$this->assertAttributeEquals(null, 'stomp', $context);
$this->assertAttributeEquals(false, 'useExchangePrefix', $context);
}
}
14 changes: 13 additions & 1 deletion pkg/stomp/Tests/StompContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public function testCreateQueueShouldCreateDestinationIfNameIsFullDestinationStr
$this->assertEquals('/amq/queue/name/routing-key', $destination->getQueueName());
}

public function testShouldCreateTopicInstance()
public function testShouldCreateTopicInstanceWithExchangePrefix()
{
$context = new StompContext($this->createStompClientMock());

Expand All @@ -91,6 +91,18 @@ public function testShouldCreateTopicInstance()
$this->assertSame(StompDestination::TYPE_EXCHANGE, $topic->getType());
}

public function testShouldCreateTopicInstanceWithTopicPrefix()
{
$context = new StompContext($this->createStompClientMock(), false);

$topic = $context->createTopic('the name');

$this->assertInstanceOf(StompDestination::class, $topic);
$this->assertSame('/topic/the name', $topic->getQueueName());
$this->assertSame('/topic/the name', $topic->getTopicName());
$this->assertSame(StompDestination::TYPE_TOPIC, $topic->getType());
}

public function testCreateTopicShouldCreateDestinationIfNameIsFullDestinationString()
{
$context = new StompContext($this->createStompClientMock());
Expand Down