Skip to content

Commit 3aac6ca

Browse files
committed
[client] migrate simple client to new concept. adopt route collection.
1 parent 562c134 commit 3aac6ca

19 files changed

+277
-141
lines changed

pkg/enqueue/Client/Driver/DbalDriver.php

+10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace Enqueue\Client\Driver;
44

55
use Enqueue\Dbal\DbalContext;
6+
use Enqueue\Dbal\DbalDestination;
7+
use Interop\Queue\PsrTopic;
68
use Psr\Log\LoggerInterface;
79
use Psr\Log\NullLogger;
810

@@ -26,4 +28,12 @@ public function setupBroker(LoggerInterface $logger = null): void
2628
$log('Creating database table: "%s"', $this->getContext()->getTableName());
2729
$this->getContext()->createDataBaseTable();
2830
}
31+
32+
/**
33+
* @return DbalDestination
34+
*/
35+
protected function createRouterTopic(): PsrTopic
36+
{
37+
return $this->createQueue($this->getConfig()->getRouterQueueName());
38+
}
2939
}

pkg/enqueue/Client/Driver/FsDriver.php

+10-6
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44

55
use Enqueue\Fs\FsContext;
66
use Enqueue\Fs\FsDestination;
7+
use Interop\Queue\PsrTopic;
78
use Psr\Log\LoggerInterface;
89
use Psr\Log\NullLogger;
910

1011
/**
1112
* @method FsContext getContext
1213
* @method FsDestination createQueue(string $name)
13-
* @method FsDestination createRouterTopic
1414
*/
1515
class FsDriver extends GenericDriver
1616
{
@@ -27,13 +27,9 @@ public function setupBroker(LoggerInterface $logger = null): void
2727
};
2828

2929
// setup router
30-
$routerTopic = $this->createRouterTopic();
3130
$routerQueue = $this->createQueue($this->getConfig()->getRouterQueueName());
3231

33-
$log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo());
34-
$this->getContext()->declareDestination($routerTopic);
35-
36-
$log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo());
32+
$log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerQueue->getFileInfo());
3733
$this->getContext()->declareDestination($routerQueue);
3834

3935
// setup queues
@@ -51,4 +47,12 @@ public function setupBroker(LoggerInterface $logger = null): void
5147
$declaredQueues[$queue->getQueueName()] = true;
5248
}
5349
}
50+
51+
/**
52+
* @return FsDestination
53+
*/
54+
protected function createRouterTopic(): PsrTopic
55+
{
56+
return $this->createQueue($this->getConfig()->getRouterQueueName());
57+
}
5458
}

pkg/enqueue/Client/Driver/MongodbDriver.php

+10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace Enqueue\Client\Driver;
44

55
use Enqueue\Mongodb\MongodbContext;
6+
use Enqueue\Mongodb\MongodbDestination;
7+
use Interop\Queue\PsrTopic;
68
use Psr\Log\LoggerInterface;
79
use Psr\Log\NullLogger;
810

@@ -27,4 +29,12 @@ public function setupBroker(LoggerInterface $logger = null): void
2729
$log('Creating database and collection: "%s" "%s"', $contextConfig['dbname'], $contextConfig['collection_name']);
2830
$this->getContext()->createCollection();
2931
}
32+
33+
/**
34+
* @return MongodbDestination
35+
*/
36+
protected function createRouterTopic(): PsrTopic
37+
{
38+
return $this->createQueue($this->getConfig()->getRouterQueueName());
39+
}
3040
}
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Enqueue\Client\Driver;
4+
5+
use Enqueue\Redis\RedisContext;
6+
use Enqueue\Redis\RedisDestination;
7+
use Interop\Queue\PsrTopic;
8+
9+
/**
10+
* @method RedisContext getContext
11+
* @method RedisDestination createQueue(string $name)
12+
*/
13+
class RedisDriver extends GenericDriver
14+
{
15+
public function __construct(RedisContext $context, ...$args)
16+
{
17+
parent::__construct($context, ...$args);
18+
}
19+
20+
/**
21+
* @return RedisDestination
22+
*/
23+
protected function createRouterTopic(): PsrTopic
24+
{
25+
return $this->createQueue($this->getConfig()->getRouterQueueName());
26+
}
27+
}

pkg/enqueue/Client/Driver/SqsDriver.php

+9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Sqs\SqsContext;
66
use Enqueue\Sqs\SqsDestination;
7+
use Interop\Queue\PsrTopic;
78
use Psr\Log\LoggerInterface;
89
use Psr\Log\NullLogger;
910

@@ -59,4 +60,12 @@ protected function createTransportQueueName(string $name, bool $prefix): string
5960

6061
return str_replace('.', '_dot_', $name);
6162
}
63+
64+
/**
65+
* @return SqsDestination
66+
*/
67+
protected function createRouterTopic(): PsrTopic
68+
{
69+
return $this->createQueue($this->getConfig()->getRouterQueueName());
70+
}
6271
}

pkg/enqueue/Client/Resources.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Client\Driver\RabbitMqDriver;
1212
use Enqueue\Client\Driver\RabbitMqStompDriver;
1313
use Enqueue\Client\Driver\RdKafkaDriver;
14+
use Enqueue\Client\Driver\RedisDriver;
1415
use Enqueue\Client\Driver\SqsDriver;
1516
use Enqueue\Client\Driver\StompDriver;
1617

@@ -81,7 +82,7 @@ public static function getKnownDrivers(): array
8182
];
8283
$map[] = [
8384
'schemes' => ['redis'],
84-
'factoryClass' => GenericDriver::class,
85+
'factoryClass' => RedisDriver::class,
8586
'requiredSchemeExtensions' => [],
8687
'packages' => ['enqueue/enqueue', 'enqueue/redis'],
8788
];

pkg/enqueue/Tests/Client/Driver/AmqpDriverTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,11 @@ protected function createMessage(): PsrMessage
328328
return new AmqpMessage();
329329
}
330330

331+
protected function getRouterTransportName(): string
332+
{
333+
return 'aprefix.router';
334+
}
335+
331336
protected function assertTransportMessage(PsrMessage $transportMessage): void
332337
{
333338
$this->assertSame('body', $transportMessage->getBody());

pkg/enqueue/Tests/Client/Driver/FsDriverTest.php

+3-14
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ public function testShouldBeSubClassOfGenericDriver()
3737

3838
public function testShouldSetupBroker()
3939
{
40-
$routerTopic = new FsDestination(TempFile::generate());
4140
$routerQueue = new FsDestination(TempFile::generate());
4241

4342
$processorQueue = new FsDestination(TempFile::generate());
@@ -46,32 +45,22 @@ public function testShouldSetupBroker()
4645
// setup router
4746
$context
4847
->expects($this->at(0))
49-
->method('createTopic')
50-
->willReturn($routerTopic)
51-
;
52-
$context
53-
->expects($this->at(1))
5448
->method('createQueue')
5549
->willReturn($routerQueue)
5650
;
5751
$context
58-
->expects($this->at(2))
59-
->method('declareDestination')
60-
->with($this->identicalTo($routerTopic))
61-
;
62-
$context
63-
->expects($this->at(3))
52+
->expects($this->at(1))
6453
->method('declareDestination')
6554
->with($this->identicalTo($routerQueue))
6655
;
6756
// setup processor queue
6857
$context
69-
->expects($this->at(4))
58+
->expects($this->at(2))
7059
->method('createQueue')
7160
->willReturn($processorQueue)
7261
;
7362
$context
74-
->expects($this->at(5))
63+
->expects($this->at(3))
7564
->method('declareDestination')
7665
->with($this->identicalTo($processorQueue))
7766
;

pkg/enqueue/Tests/Client/Driver/GenericDriverTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ protected function createMessage(): PsrMessage
5656
return new NullMessage();
5757
}
5858

59+
protected function getRouterTransportName(): string
60+
{
61+
return 'aprefix.router';
62+
}
63+
5964
protected function assertTransportMessage(PsrMessage $transportMessage): void
6065
{
6166
$this->assertSame('body', $transportMessage->getBody());

pkg/enqueue/Tests/Client/Driver/GenericDriverTestsTrait.php

+39-31
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,12 @@ public function testShouldSendMessageToRouter()
258258
$producer
259259
->expects($this->once())
260260
->method('send')
261-
->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
262-
;
263-
$context = $this->createContextMock();
264-
$context
265-
->expects($this->once())
266-
->method('createTopic')
267-
->willReturn($topic)
261+
->willReturnCallback(function (PsrTopic $topic, PsrMessage $message) use ($transportMessage) {
262+
$this->assertSame($this->getRouterTransportName(), $topic->getTopicName());
263+
$this->assertSame($transportMessage, $message);
264+
})
268265
;
266+
$context = $this->createContextStub();
269267
$context
270268
->expects($this->once())
271269
->method('createProducer')
@@ -291,26 +289,19 @@ public function testShouldSendMessageToRouter()
291289

292290
public function testShouldNotInitDeliveryDelayOnSendMessageToRouter()
293291
{
294-
$topic = $this->createTopic('');
295292
$transportMessage = $this->createMessage();
296293

297294
$producer = $this->createProducerMock();
298295
$producer
299296
->expects($this->once())
300297
->method('send')
301-
->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
302298
;
303299
$producer
304300
->expects($this->never())
305301
->method('setDeliveryDelay')
306302
;
307303

308-
$context = $this->createContextMock();
309-
$context
310-
->expects($this->once())
311-
->method('createTopic')
312-
->willReturn($topic)
313-
;
304+
$context = $this->createContextStub();
314305
$context
315306
->expects($this->once())
316307
->method('createProducer')
@@ -337,26 +328,19 @@ public function testShouldNotInitDeliveryDelayOnSendMessageToRouter()
337328

338329
public function testShouldNotInitTimeToLiveOnSendMessageToRouter()
339330
{
340-
$topic = $this->createTopic('');
341331
$transportMessage = $this->createMessage();
342332

343333
$producer = $this->createProducerMock();
344334
$producer
345335
->expects($this->once())
346336
->method('send')
347-
->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
348337
;
349338
$producer
350339
->expects($this->never())
351340
->method('setTimeToLive')
352341
;
353342

354-
$context = $this->createContextMock();
355-
$context
356-
->expects($this->once())
357-
->method('createTopic')
358-
->willReturn($topic)
359-
;
343+
$context = $this->createContextStub();
360344
$context
361345
->expects($this->once())
362346
->method('createProducer')
@@ -383,26 +367,19 @@ public function testShouldNotInitTimeToLiveOnSendMessageToRouter()
383367

384368
public function testShouldNotInitPriorityOnSendMessageToRouter()
385369
{
386-
$topic = $this->createTopic('');
387370
$transportMessage = $this->createMessage();
388371

389372
$producer = $this->createProducerMock();
390373
$producer
391374
->expects($this->once())
392375
->method('send')
393-
->with($this->identicalTo($topic), $this->identicalTo($transportMessage))
394376
;
395377
$producer
396378
->expects($this->never())
397379
->method('setPriority')
398380
;
399381

400-
$context = $this->createContextMock();
401-
$context
402-
->expects($this->once())
403-
->method('createTopic')
404-
->willReturn($topic)
405-
;
382+
$context = $this->createContextStub();
406383
$context
407384
->expects($this->once())
408385
->method('createProducer')
@@ -1104,6 +1081,32 @@ abstract protected function createTopic(string $name): PsrTopic;
11041081

11051082
abstract protected function createMessage(): PsrMessage;
11061083

1084+
/**
1085+
* @return \PHPUnit_Framework_MockObject_MockObject
1086+
*/
1087+
protected function createContextStub(): PsrContext
1088+
{
1089+
$context = $this->createContextMock();
1090+
1091+
$context
1092+
->expects($this->any())
1093+
->method('createQueue')
1094+
->willReturnCallback(function (string $name) {
1095+
return $this->createQueue($name);
1096+
})
1097+
;
1098+
1099+
$context
1100+
->expects($this->any())
1101+
->method('createTopic')
1102+
->willReturnCallback(function (string $name) {
1103+
return $this->createTopic($name);
1104+
})
1105+
;
1106+
1107+
return $context;
1108+
}
1109+
11071110
protected function assertTransportMessage(PsrMessage $transportMessage): void
11081111
{
11091112
$this->assertSame('body', $transportMessage->getBody());
@@ -1165,6 +1168,11 @@ protected function getCustomQueueTransportName(): string
11651168
return 'aprefix.custom';
11661169
}
11671170

1171+
protected function getRouterTransportName(): string
1172+
{
1173+
return 'aprefix.default';
1174+
}
1175+
11681176
protected function getPrefixAppFooQueueTransportName(): string
11691177
{
11701178
return 'aprefix.anappname.afooqueue';

pkg/enqueue/Tests/Client/Driver/GpsDriverTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,9 @@ protected function createMessage(): PsrMessage
134134
{
135135
return new GpsMessage();
136136
}
137+
138+
protected function getRouterTransportName(): string
139+
{
140+
return 'aprefix.router';
141+
}
137142
}

pkg/enqueue/Tests/Client/Driver/RabbitMqDriverTest.php

+5
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ protected function createMessage(): PsrMessage
106106
return new AmqpMessage();
107107
}
108108

109+
protected function getRouterTransportName(): string
110+
{
111+
return 'aprefix.router';
112+
}
113+
109114
protected function assertTransportMessage(PsrMessage $transportMessage): void
110115
{
111116
$this->assertSame('body', $transportMessage->getBody());

0 commit comments

Comments
 (0)