Skip to content

Commit 0748873

Browse files
committed
[client] Generic driver should not support message bus
It should message to router queue, ignoring router topic options. If a driver (that extends generic one) supports message bus feature (routing between applications) it should overwrite createRouterTopic method.
1 parent 3aac6ca commit 0748873

13 files changed

+54
-81
lines changed

pkg/enqueue/Client/Driver/AmqpDriver.php

+16-15
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@
1111
use Interop\Amqp\AmqpQueue;
1212
use Interop\Amqp\AmqpTopic;
1313
use Interop\Amqp\Impl\AmqpBind;
14+
use Interop\Queue\PsrDestination;
1415
use Interop\Queue\PsrMessage;
1516
use Interop\Queue\PsrProducer;
1617
use Interop\Queue\PsrQueue;
17-
use Interop\Queue\PsrTopic;
1818
use Psr\Log\LoggerInterface;
1919
use Psr\Log\NullLogger;
2020

@@ -93,6 +93,20 @@ public function setupBroker(LoggerInterface $logger = null): void
9393
}
9494
}
9595

96+
/**
97+
* @return AmqpTopic
98+
*/
99+
protected function createRouterTopic(): PsrDestination
100+
{
101+
$topic = $this->doCreateTopic(
102+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopicName(), true)
103+
);
104+
$topic->setType(AmqpTopic::TYPE_FANOUT);
105+
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
106+
107+
return $topic;
108+
}
109+
96110
/**
97111
* @return AmqpQueue
98112
*/
@@ -110,7 +124,7 @@ protected function doCreateQueue(string $transportQueueName): PsrQueue
110124
* @param AmqpTopic $topic
111125
* @param AmqpMessage $transportMessage
112126
*/
113-
protected function doSendToRouter(PsrProducer $producer, PsrTopic $topic, PsrMessage $transportMessage): void
127+
protected function doSendToRouter(PsrProducer $producer, PsrDestination $topic, PsrMessage $transportMessage): void
114128
{
115129
// We should not handle priority, expiration, and delay at this stage.
116130
// The router will take care of it while re-sending the message to the final destinations.
@@ -119,17 +133,4 @@ protected function doSendToRouter(PsrProducer $producer, PsrTopic $topic, PsrMes
119133

120134
$producer->send($topic, $transportMessage);
121135
}
122-
123-
/**
124-
* @return AmqpTopic
125-
*/
126-
protected function createRouterTopic(): PsrTopic
127-
{
128-
/** @var AmqpTopic $topic */
129-
$topic = parent::createRouterTopic();
130-
$topic->setType(AmqpTopic::TYPE_FANOUT);
131-
$topic->addFlag(AmqpTopic::FLAG_DURABLE);
132-
133-
return $topic;
134-
}
135136
}

pkg/enqueue/Client/Driver/DbalDriver.php

-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
namespace Enqueue\Client\Driver;
44

55
use Enqueue\Dbal\DbalContext;
6-
use Enqueue\Dbal\DbalDestination;
7-
use Interop\Queue\PsrTopic;
86
use Psr\Log\LoggerInterface;
97
use Psr\Log\NullLogger;
108

@@ -28,12 +26,4 @@ public function setupBroker(LoggerInterface $logger = null): void
2826
$log('Creating database table: "%s"', $this->getContext()->getTableName());
2927
$this->getContext()->createDataBaseTable();
3028
}
31-
32-
/**
33-
* @return DbalDestination
34-
*/
35-
protected function createRouterTopic(): PsrTopic
36-
{
37-
return $this->createQueue($this->getConfig()->getRouterQueueName());
38-
}
3929
}

pkg/enqueue/Client/Driver/FsDriver.php

-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Enqueue\Fs\FsContext;
66
use Enqueue\Fs\FsDestination;
7-
use Interop\Queue\PsrTopic;
87
use Psr\Log\LoggerInterface;
98
use Psr\Log\NullLogger;
109

@@ -47,12 +46,4 @@ public function setupBroker(LoggerInterface $logger = null): void
4746
$declaredQueues[$queue->getQueueName()] = true;
4847
}
4948
}
50-
51-
/**
52-
* @return FsDestination
53-
*/
54-
protected function createRouterTopic(): PsrTopic
55-
{
56-
return $this->createQueue($this->getConfig()->getRouterQueueName());
57-
}
5849
}

pkg/enqueue/Client/Driver/GenericDriver.php

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Client\Route;
1212
use Enqueue\Client\RouteCollection;
1313
use Interop\Queue\PsrContext;
14+
use Interop\Queue\PsrDestination;
1415
use Interop\Queue\PsrMessage;
1516
use Interop\Queue\PsrProducer;
1617
use Interop\Queue\PsrQueue;
@@ -212,7 +213,7 @@ public function getRouteCollection(): RouteCollection
212213
return $this->routeCollection;
213214
}
214215

215-
protected function doSendToRouter(PsrProducer $producer, PsrTopic $topic, PsrMessage $transportMessage): void
216+
protected function doSendToRouter(PsrProducer $producer, PsrDestination $topic, PsrMessage $transportMessage): void
216217
{
217218
$producer->send($topic, $transportMessage);
218219
}
@@ -222,11 +223,9 @@ protected function doSendToProcessor(PsrProducer $producer, PsrQueue $queue, Psr
222223
$producer->send($queue, $transportMessage);
223224
}
224225

225-
protected function createRouterTopic(): PsrTopic
226+
protected function createRouterTopic(): PsrDestination
226227
{
227-
return $this->doCreateTopic(
228-
$this->createTransportRouterTopicName($this->config->getRouterTopicName(), true)
229-
);
228+
return $this->createQueue($this->getConfig()->getRouterQueueName());
230229
}
231230

232231
protected function createTransportRouterTopicName(string $name, bool $prefix): string

pkg/enqueue/Client/Driver/GpsDriver.php

+11-1
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
use Enqueue\Gps\GpsContext;
66
use Enqueue\Gps\GpsQueue;
77
use Enqueue\Gps\GpsTopic;
8+
use Interop\Queue\PsrDestination;
89
use Psr\Log\LoggerInterface;
910
use Psr\Log\NullLogger;
1011

1112
/**
1213
* @method GpsContext getContext
1314
* @method GpsQueue createQueue(string $name)
14-
* @method GpsTopic createRouterTopic
1515
*/
1616
class GpsDriver extends GenericDriver
1717
{
@@ -51,4 +51,14 @@ public function setupBroker(LoggerInterface $logger = null): void
5151
$declaredQueues[$queue->getQueueName()] = true;
5252
}
5353
}
54+
55+
/**
56+
* @return GpsTopic
57+
*/
58+
protected function createRouterTopic(): PsrDestination
59+
{
60+
return $this->doCreateTopic(
61+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopicName(), true)
62+
);
63+
}
5464
}

pkg/enqueue/Client/Driver/MongodbDriver.php

-10
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
namespace Enqueue\Client\Driver;
44

55
use Enqueue\Mongodb\MongodbContext;
6-
use Enqueue\Mongodb\MongodbDestination;
7-
use Interop\Queue\PsrTopic;
86
use Psr\Log\LoggerInterface;
97
use Psr\Log\NullLogger;
108

@@ -29,12 +27,4 @@ public function setupBroker(LoggerInterface $logger = null): void
2927
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
3028
$this->getContext()->createCollection();
3129
}
32-
33-
/**
34-
* @return MongodbDestination
35-
*/
36-
protected function createRouterTopic(): PsrTopic
37-
{
38-
return $this->createQueue($this->getConfig()->getRouterQueueName());
39-
}
4030
}

pkg/enqueue/Client/Driver/RabbitMqStompDriver.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
use Enqueue\Stomp\StompDestination;
1010
use Enqueue\Stomp\StompMessage;
1111
use Enqueue\Stomp\StompProducer;
12+
use Interop\Queue\PsrDestination;
1213
use Interop\Queue\PsrMessage;
1314
use Interop\Queue\PsrProducer;
1415
use Interop\Queue\PsrQueue;
15-
use Interop\Queue\PsrTopic;
1616
use Psr\Log\LoggerInterface;
1717
use Psr\Log\NullLogger;
1818

@@ -151,7 +151,7 @@ protected function doCreateQueue(string $transportQueueName): PsrQueue
151151
* @param StompDestination $topic
152152
* @param StompMessage $transportMessage
153153
*/
154-
protected function doSendToRouter(PsrProducer $producer, PsrTopic $topic, PsrMessage $transportMessage): void
154+
protected function doSendToRouter(PsrProducer $producer, PsrDestination $topic, PsrMessage $transportMessage): void
155155
{
156156
// We should not handle priority, expiration, and delay at this stage.
157157
// The router will take care of it while re-sending the message to the final destinations.

pkg/enqueue/Client/Driver/RdKafkaDriver.php

+11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\RdKafka\RdKafkaContext;
66
use Enqueue\RdKafka\RdKafkaTopic;
7+
use Interop\Queue\PsrDestination;
78
use Psr\Log\LoggerInterface;
89
use Psr\Log\NullLogger;
910

@@ -43,4 +44,14 @@ public function setupBroker(LoggerInterface $logger = null): void
4344
$this->getContext()->createConsumer($queue);
4445
}
4546
}
47+
48+
/**
49+
* @return RdKafkaTopic
50+
*/
51+
protected function createRouterTopic(): PsrDestination
52+
{
53+
return $this->doCreateTopic(
54+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopicName(), true)
55+
);
56+
}
4657
}

pkg/enqueue/Client/Driver/RedisDriver.php

-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Enqueue\Redis\RedisContext;
66
use Enqueue\Redis\RedisDestination;
7-
use Interop\Queue\PsrTopic;
87

98
/**
109
* @method RedisContext getContext
@@ -16,12 +15,4 @@ public function __construct(RedisContext $context, ...$args)
1615
{
1716
parent::__construct($context, ...$args);
1817
}
19-
20-
/**
21-
* @return RedisDestination
22-
*/
23-
protected function createRouterTopic(): PsrTopic
24-
{
25-
return $this->createQueue($this->getConfig()->getRouterQueueName());
26-
}
2718
}

pkg/enqueue/Client/Driver/SqsDriver.php

-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
use Enqueue\Sqs\SqsContext;
66
use Enqueue\Sqs\SqsDestination;
7-
use Interop\Queue\PsrTopic;
87
use Psr\Log\LoggerInterface;
98
use Psr\Log\NullLogger;
109

@@ -60,12 +59,4 @@ protected function createTransportQueueName(string $name, bool $prefix): string
6059

6160
return str_replace('.', '_dot_', $name);
6261
}
63-
64-
/**
65-
* @return SqsDestination
66-
*/
67-
protected function createRouterTopic(): PsrTopic
68-
{
69-
return $this->createQueue($this->getConfig()->getRouterQueueName());
70-
}
7162
}

pkg/enqueue/Client/Driver/StompDriver.php

+5-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
use Enqueue\Stomp\StompContext;
77
use Enqueue\Stomp\StompDestination;
88
use Enqueue\Stomp\StompMessage;
9+
use Interop\Queue\PsrDestination;
910
use Interop\Queue\PsrMessage;
1011
use Interop\Queue\PsrQueue;
11-
use Interop\Queue\PsrTopic;
1212
use Psr\Log\LoggerInterface;
1313
use Psr\Log\NullLogger;
1414

@@ -57,10 +57,12 @@ protected function doCreateQueue(string $transportQueueName): PsrQueue
5757
/**
5858
* @return StompDestination
5959
*/
60-
protected function createRouterTopic(): PsrTopic
60+
protected function createRouterTopic(): PsrDestination
6161
{
6262
/** @var StompDestination $topic */
63-
$topic = parent::createRouterTopic();
63+
$topic = $this->doCreateTopic(
64+
$this->createTransportRouterTopicName($this->getConfig()->getRouterTopicName(), true)
65+
);
6466
$topic->setDurable(true);
6567
$topic->setAutoDelete(false);
6668

pkg/enqueue/Tests/Client/Driver/GenericDriverTest.php

-5
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,6 @@ protected function createMessage(): PsrMessage
5656
return new NullMessage();
5757
}
5858

59-
protected function getRouterTransportName(): string
60-
{
61-
return 'aprefix.router';
62-
}
63-
6459
protected function assertTransportMessage(PsrMessage $transportMessage): void
6560
{
6661
$this->assertSame('body', $transportMessage->getBody());

pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php

+5-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Enqueue\Client\Route;
1010
use Enqueue\Client\RouteCollection;
1111
use Interop\Queue\PsrContext;
12+
use Interop\Queue\PsrDestination;
1213
use Interop\Queue\PsrMessage;
1314
use Interop\Queue\PsrProducer;
1415
use Interop\Queue\PsrQueue;
@@ -251,15 +252,16 @@ public function testShouldCreateTransportMessageFromClientOne()
251252

252253
public function testShouldSendMessageToRouter()
253254
{
254-
$topic = $this->createTopic('');
255255
$transportMessage = $this->createMessage();
256256

257257
$producer = $this->createProducerMock();
258258
$producer
259259
->expects($this->once())
260260
->method('send')
261-
->willReturnCallback(function (PsrTopic $topic, PsrMessage $message) use ($transportMessage) {
262-
$this->assertSame($this->getRouterTransportName(), $topic->getTopicName());
261+
->willReturnCallback(function (PsrDestination $topic, PsrMessage $message) use ($transportMessage) {
262+
$this->assertSame(
263+
$this->getRouterTransportName(),
264+
$topic instanceof PsrTopic ? $topic->getTopicName() : $topic->getQueueName());
263265
$this->assertSame($transportMessage, $message);
264266
})
265267
;

0 commit comments

Comments
 (0)