Skip to content

Commit 35758c8

Browse files
authored
Merge pull request #115 from php-enqueue/producerv2-for-simpleclient
ProducerV2 For SimpleClient
2 parents b3fa200 + c0ee4c5 commit 35758c8

File tree

9 files changed

+76
-17
lines changed

9 files changed

+76
-17
lines changed

Diff for: pkg/enqueue/Client/ProducerV2.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public function sendCommand($command, $message, $needReply = false)
4949
$message->setScope(Message::SCOPE_APP);
5050

5151
if ($needReply) {
52-
return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60);
52+
return $this->rpcClient->callAsync(Config::COMMAND_TOPIC, $message, 60000);
5353
}
5454

5555
$this->realProducer->send(Config::COMMAND_TOPIC, $message);

Diff for: pkg/enqueue/Client/RpcClient.php

+5-4
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,13 @@ public function callAsync($topic, $message, $timeout)
7878

7979
$correlationId = $message->getCorrelationId();
8080

81-
$receive = function () use ($replyQueue, $timeout, $correlationId) {
82-
$endTime = time() + ((int) ($timeout / 1000));
81+
$receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) {
82+
$runTimeout = $promiseTimeout ?: $timeout;
83+
$endTime = time() + ((int) ($runTimeout / 1000));
8384
$consumer = $this->context->createConsumer($replyQueue);
8485

8586
do {
86-
if ($message = $consumer->receive($timeout)) {
87+
if ($message = $consumer->receive($runTimeout)) {
8788
if ($message->getCorrelationId() === $correlationId) {
8889
$consumer->acknowledge($message);
8990

@@ -94,7 +95,7 @@ public function callAsync($topic, $message, $timeout)
9495
}
9596
} while (time() < $endTime);
9697

97-
throw TimeoutException::create($timeout, $correlationId);
98+
throw TimeoutException::create($runTimeout, $correlationId);
9899
};
99100

100101
$receiveNoWait = function () use ($replyQueue, $correlationId) {

Diff for: pkg/enqueue/Rpc/Promise.php

+8-5
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,17 @@ public function getMessage()
6262
/**
6363
* Blocks until message received or timeout expired.
6464
*
65+
* @param int $timeout
66+
*
6567
* @throws TimeoutException if the wait timeout is reached
6668
*
6769
* @return PsrMessage
6870
*/
69-
public function receive()
71+
public function receive($timeout = null)
7072
{
7173
if (null == $this->message) {
7274
try {
73-
if ($message = $this->doReceive($this->receiveCallback)) {
75+
if ($message = $this->doReceive($this->receiveCallback, $this, $timeout)) {
7476
$this->message = $message;
7577
}
7678
} finally {
@@ -89,7 +91,7 @@ public function receive()
8991
public function receiveNoWait()
9092
{
9193
if (null == $this->message) {
92-
if ($message = $this->doReceive($this->receiveNoWaitCallback)) {
94+
if ($message = $this->doReceive($this->receiveNoWaitCallback, $this)) {
9395
$this->message = $message;
9496

9597
call_user_func($this->finallyCallback, $this);
@@ -119,12 +121,13 @@ public function isDeleteReplyQueue()
119121

120122
/**
121123
* @param \Closure $cb
124+
* @param array $args
122125
*
123126
* @return PsrMessage
124127
*/
125-
private function doReceive(\Closure $cb)
128+
private function doReceive(\Closure $cb, ...$args)
126129
{
127-
$message = call_user_func($cb, $this);
130+
$message = call_user_func_array($cb, $args);
128131

129132
if (null !== $message && false == $message instanceof PsrMessage) {
130133
throw new \RuntimeException(sprintf(

Diff for: pkg/enqueue/Rpc/RpcClient.php

+5-4
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,13 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim
6666

6767
$correlationId = $message->getCorrelationId();
6868

69-
$receive = function () use ($replyQueue, $timeout, $correlationId) {
70-
$endTime = time() + ((int) ($timeout / 1000));
69+
$receive = function (Promise $promise, $promiseTimeout) use ($replyQueue, $timeout, $correlationId) {
70+
$runTimeout = $promiseTimeout ?: $timeout;
71+
$endTime = time() + ((int) ($runTimeout / 1000));
7172
$consumer = $this->context->createConsumer($replyQueue);
7273

7374
do {
74-
if ($message = $consumer->receive($timeout)) {
75+
if ($message = $consumer->receive($runTimeout)) {
7576
if ($message->getCorrelationId() === $correlationId) {
7677
$consumer->acknowledge($message);
7778

@@ -82,7 +83,7 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim
8283
}
8384
} while (time() < $endTime);
8485

85-
throw TimeoutException::create($timeout, $correlationId);
86+
throw TimeoutException::create($runTimeout, $correlationId);
8687
};
8788

8889
$receiveNoWait = function () use ($replyQueue, $correlationId) {

Diff for: pkg/enqueue/Tests/Client/RpcClientTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals()
174174
$consumer
175175
->expects($this->once())
176176
->method('receive')
177+
->with(12345)
177178
->willReturn($receivedMessage)
178179
;
179180
$consumer
@@ -202,7 +203,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals()
202203

203204
$rpc = new RpcClient($this->createProducerMock(), $context);
204205

205-
$rpc->callAsync('topic', $message, 2)->receive();
206+
$rpc->callAsync('topic', $message, 2)->receive(12345);
206207
}
207208

208209
public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals()

Diff for: pkg/enqueue/Tests/Rpc/PromiseTest.php

+26-1
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,39 @@ public function testCouldSetGetDeleteReplyQueue()
2929
public function testOnReceiveShouldCallReceiveCallBack()
3030
{
3131
$receiveInvoked = false;
32-
$receivecb = function () use (&$receiveInvoked) {
32+
$receivePromise = null;
33+
$receiveTimeout = null;
34+
$receivecb = function ($promise, $timout) use (&$receiveInvoked, &$receivePromise, &$receiveTimeout) {
3335
$receiveInvoked = true;
36+
$receivePromise = $promise;
37+
$receiveTimeout = $timout;
3438
};
3539

3640
$promise = new Promise($receivecb, function () {}, function () {});
3741
$promise->receive();
3842

3943
$this->assertTrue($receiveInvoked);
44+
$this->assertInstanceOf(Promise::class, $receivePromise);
45+
$this->assertNull($receiveTimeout);
46+
}
47+
48+
public function testOnReceiveShouldCallReceiveCallBackWithTimeout()
49+
{
50+
$receiveInvoked = false;
51+
$receivePromise = null;
52+
$receiveTimeout = null;
53+
$receivecb = function ($promise, $timout) use (&$receiveInvoked, &$receivePromise, &$receiveTimeout) {
54+
$receiveInvoked = true;
55+
$receivePromise = $promise;
56+
$receiveTimeout = $timout;
57+
};
58+
59+
$promise = new Promise($receivecb, function () {}, function () {});
60+
$promise->receive(12345);
61+
62+
$this->assertTrue($receiveInvoked);
63+
$this->assertInstanceOf(Promise::class, $receivePromise);
64+
$this->assertSame(12345, $receiveTimeout);
4065
}
4166

4267
public function testOnReceiveNoWaitShouldCallReceiveNoWaitCallBack()

Diff for: pkg/enqueue/Tests/Rpc/RpcClientTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals()
114114
$consumer
115115
->expects($this->once())
116116
->method('receive')
117+
->with(12345)
117118
->willReturn($receivedMessage)
118119
;
119120
$consumer
@@ -147,7 +148,7 @@ public function testShouldReceiveMessageAndAckMessageIfCorrelationEquals()
147148

148149
$rpc = new RpcClient($context);
149150

150-
$rpc->callAsync($queue, $message, 2)->receive();
151+
$rpc->callAsync($queue, $message, 2)->receive(12345);
151152
}
152153

153154
public function testShouldReceiveNoWaitMessageAndAckMessageIfCorrelationEquals()

Diff for: pkg/simple-client/SimpleClient.php

+13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Client\Meta\QueueMetaRegistry;
1212
use Enqueue\Client\Meta\TopicMetaRegistry;
1313
use Enqueue\Client\ProducerInterface;
14+
use Enqueue\Client\ProducerV2Interface;
1415
use Enqueue\Client\RouterProcessor;
1516
use Enqueue\Consumption\CallbackProcessor;
1617
use Enqueue\Consumption\ExtensionInterface;
@@ -185,6 +186,18 @@ public function getProducer($setupBroker = false)
185186
return $this->container->get('enqueue.client.producer');
186187
}
187188

189+
/**
190+
* @param bool $setupBroker
191+
*
192+
* @return ProducerV2Interface
193+
*/
194+
public function getProducerV2($setupBroker = false)
195+
{
196+
$setupBroker && $this->setupBroker();
197+
198+
return $this->container->get('enqueue.client.producer.v2');
199+
}
200+
188201
public function setupBroker()
189202
{
190203
$this->getDriver()->setupBroker();

Diff for: pkg/simple-client/SimpleClientContainerExtension.php

+14
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
use Enqueue\Client\Meta\QueueMetaRegistry;
1111
use Enqueue\Client\Meta\TopicMetaRegistry;
1212
use Enqueue\Client\Producer;
13+
use Enqueue\Client\ProducerV2;
1314
use Enqueue\Client\RouterProcessor;
15+
use Enqueue\Client\RpcClient;
1416
use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension;
1517
use Enqueue\Consumption\QueueConsumer;
1618
use Enqueue\Symfony\TransportFactoryInterface;
@@ -93,6 +95,18 @@ public function load(array $configs, ContainerBuilder $container)
9395
new Reference('enqueue.client.driver'),
9496
]);
9597

98+
$container->register('enqueue.client.rpc', RpcClient::class)
99+
->setArguments([
100+
new Reference('enqueue.client.producer'),
101+
new Reference('enqueue.transport.context'),
102+
]);
103+
104+
$container->register('enqueue.client.producer.v2', ProducerV2::class)
105+
->setArguments([
106+
new Reference('enqueue.client.producer'),
107+
new Reference('enqueue.client.rpc'),
108+
]);
109+
96110
$container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class)
97111
->setArguments([[]]);
98112

0 commit comments

Comments
 (0)