Skip to content

Commit 2fd317b

Browse files
authored
Merge pull request #114 from php-enqueue/rpc-cleanup
RPC Deletes Reply Queue After Receive Message
2 parents 3e345fa + 5d2204c commit 2fd317b

File tree

10 files changed

+754
-185
lines changed

10 files changed

+754
-185
lines changed

docs/client/rpc_call.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ $promises[] = $rpcClient->callAsync('greeting_topic', 'Hi Thomas!', 5);
4343

4444
$replyMessages = [];
4545
foreach ($promises as $promise) {
46-
$replyMessages[] = $promise->getMessage();
46+
$replyMessages[] = $promise->receive();
4747
}
4848
```
4949

docs/quick_tour.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ $message = $psrContext->createMessage('Hi there!');
124124
$rpcClient = new RpcClient($psrContext);
125125

126126
$promise = $rpcClient->callAsync($queue, $message, 1);
127-
$replyMessage = $promise->getMessage();
127+
$replyMessage = $promise->receive();
128128
```
129129

130130
There is also extensions for the consumption component.

pkg/enqueue/Client/RpcClient.php

+53-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\Psr\PsrContext;
66
use Enqueue\Psr\PsrMessage;
77
use Enqueue\Rpc\Promise;
8+
use Enqueue\Rpc\TimeoutException;
89
use Enqueue\Util\UUID;
910

1011
class RpcClient
@@ -38,7 +39,7 @@ public function __construct(ProducerInterface $producer, PsrContext $context)
3839
*/
3940
public function call($topic, $message, $timeout)
4041
{
41-
return $this->callAsync($topic, $message, $timeout)->getMessage();
42+
return $this->callAsync($topic, $message, $timeout)->receive();
4243
}
4344

4445
/**
@@ -62,9 +63,11 @@ public function callAsync($topic, $message, $timeout)
6263

6364
if ($message->getReplyTo()) {
6465
$replyQueue = $this->context->createQueue($message->getReplyTo());
66+
$deleteReplyQueue = false;
6567
} else {
6668
$replyQueue = $this->context->createTemporaryQueue();
6769
$message->setReplyTo($replyQueue->getQueueName());
70+
$deleteReplyQueue = true;
6871
}
6972

7073
if (false == $message->getCorrelationId()) {
@@ -73,10 +76,54 @@ public function callAsync($topic, $message, $timeout)
7376

7477
$this->producer->send($topic, $message);
7578

76-
return new Promise(
77-
$this->context->createConsumer($replyQueue),
78-
$message->getCorrelationId(),
79-
$timeout
80-
);
79+
$correlationId = $message->getCorrelationId();
80+
81+
$receive = function () use ($replyQueue, $timeout, $correlationId) {
82+
$endTime = time() + ((int) ($timeout / 1000));
83+
$consumer = $this->context->createConsumer($replyQueue);
84+
85+
do {
86+
if ($message = $consumer->receive($timeout)) {
87+
if ($message->getCorrelationId() === $correlationId) {
88+
$consumer->acknowledge($message);
89+
90+
return $message;
91+
}
92+
93+
$consumer->reject($message, true);
94+
}
95+
} while (time() < $endTime);
96+
97+
throw TimeoutException::create($timeout, $correlationId);
98+
};
99+
100+
$receiveNoWait = function () use ($replyQueue, $correlationId) {
101+
static $consumer;
102+
103+
if (null === $consumer) {
104+
$consumer = $this->context->createConsumer($replyQueue);
105+
}
106+
107+
if ($message = $consumer->receiveNoWait()) {
108+
if ($message->getCorrelationId() === $correlationId) {
109+
$consumer->acknowledge($message);
110+
111+
return $message;
112+
}
113+
114+
$consumer->reject($message, true);
115+
}
116+
};
117+
118+
$finally = function (Promise $promise) use ($replyQueue) {
119+
if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) {
120+
$this->context->deleteQueue($replyQueue);
121+
}
122+
};
123+
124+
$promise = new Promise($receive, $receiveNoWait, $finally);
125+
$promise->setDeleteReplyQueue($deleteReplyQueue);
126+
127+
return $promise;
81128
}
82129
}

pkg/enqueue/Rpc/Promise.php

+89-29
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,135 @@
22

33
namespace Enqueue\Rpc;
44

5-
use Enqueue\Psr\PsrConsumer;
65
use Enqueue\Psr\PsrMessage;
76

87
class Promise
98
{
109
/**
11-
* @var PsrConsumer
10+
* @var \Closure
1211
*/
13-
private $consumer;
12+
private $receiveCallback;
1413

1514
/**
16-
* @var int
15+
* @var \Closure
1716
*/
18-
private $timeout;
17+
private $receiveNoWaitCallback;
1918

2019
/**
21-
* @var string
20+
* @var \Closure
2221
*/
23-
private $correlationId;
22+
private $finallyCallback;
2423

2524
/**
26-
* @param PsrConsumer $consumer
27-
* @param string $correlationId
28-
* @param int $timeout
25+
* @var bool
2926
*/
30-
public function __construct(PsrConsumer $consumer, $correlationId, $timeout)
27+
private $deleteReplyQueue;
28+
29+
/**
30+
* @var PsrMessage
31+
*/
32+
private $message;
33+
34+
/**
35+
* @param \Closure $receiveCallback
36+
* @param \Closure $receiveNoWaitCallback
37+
* @param \Closure $finallyCallback
38+
*/
39+
public function __construct(\Closure $receiveCallback, \Closure $receiveNoWaitCallback, \Closure $finallyCallback)
3140
{
32-
$this->consumer = $consumer;
33-
$this->timeout = $timeout;
34-
$this->correlationId = $correlationId;
41+
$this->receiveCallback = $receiveCallback;
42+
$this->receiveNoWaitCallback = $receiveNoWaitCallback;
43+
$this->finallyCallback = $finallyCallback;
44+
45+
$this->deleteReplyQueue = true;
3546
}
3647

3748
/**
49+
* Blocks until message received or timeout expired.
50+
*
51+
* @deprecated use "receive" instead
52+
*
3853
* @throws TimeoutException if the wait timeout is reached
3954
*
4055
* @return PsrMessage
4156
*/
4257
public function getMessage()
4358
{
44-
$endTime = time() + $this->timeout;
45-
46-
while (time() < $endTime) {
47-
if ($message = $this->consumer->receive($this->timeout)) {
48-
if ($message->getCorrelationId() === $this->correlationId) {
49-
$this->consumer->acknowledge($message);
59+
return $this->receive();
60+
}
5061

51-
return $message;
62+
/**
63+
* Blocks until message received or timeout expired.
64+
*
65+
* @throws TimeoutException if the wait timeout is reached
66+
*
67+
* @return PsrMessage
68+
*/
69+
public function receive()
70+
{
71+
if (null == $this->message) {
72+
try {
73+
if ($message = $this->doReceive($this->receiveCallback)) {
74+
$this->message = $message;
5275
}
76+
} finally {
77+
call_user_func($this->finallyCallback, $this);
78+
}
79+
}
80+
81+
return $this->message;
82+
}
83+
84+
/**
85+
* Non blocking function. Returns message or null.
86+
*
87+
* @return PsrMessage|null
88+
*/
89+
public function receiveNoWait()
90+
{
91+
if (null == $this->message) {
92+
if ($message = $this->doReceive($this->receiveNoWaitCallback)) {
93+
$this->message = $message;
5394

54-
$this->consumer->reject($message, true);
95+
call_user_func($this->finallyCallback, $this);
5596
}
5697
}
5798

58-
throw TimeoutException::create($this->timeout, $this->correlationId);
99+
return $this->message;
59100
}
60101

61102
/**
62-
* @param int $timeout
103+
* On TRUE deletes reply queue after getMessage call.
104+
*
105+
* @param bool $delete
63106
*/
64-
public function setTimeout($timeout)
107+
public function setDeleteReplyQueue($delete)
65108
{
66-
$this->timeout = $timeout;
109+
$this->deleteReplyQueue = (bool) $delete;
67110
}
68111

69112
/**
70-
* @return int
113+
* @return bool
71114
*/
72-
public function getTimeout()
115+
public function isDeleteReplyQueue()
73116
{
74-
return $this->timeout;
117+
return $this->deleteReplyQueue;
118+
}
119+
120+
/**
121+
* @param \Closure $cb
122+
*
123+
* @return PsrMessage
124+
*/
125+
private function doReceive(\Closure $cb)
126+
{
127+
$message = call_user_func($cb, $this);
128+
129+
if (null !== $message && false == $message instanceof PsrMessage) {
130+
throw new \RuntimeException(sprintf(
131+
'Expected "%s" but got: "%s"', PsrMessage::class, is_object($message) ? get_class($message) : gettype($message)));
132+
}
133+
134+
return $message;
75135
}
76136
}

pkg/enqueue/Rpc/RpcClient.php

+52-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public function __construct(PsrContext $context)
3333
*/
3434
public function call(PsrDestination $destination, PsrMessage $message, $timeout)
3535
{
36-
return $this->callAsync($destination, $message, $timeout)->getMessage();
36+
return $this->callAsync($destination, $message, $timeout)->receive();
3737
}
3838

3939
/**
@@ -51,9 +51,11 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim
5151

5252
if ($message->getReplyTo()) {
5353
$replyQueue = $this->context->createQueue($message->getReplyTo());
54+
$deleteReplyQueue = false;
5455
} else {
5556
$replyQueue = $this->context->createTemporaryQueue();
5657
$message->setReplyTo($replyQueue->getQueueName());
58+
$deleteReplyQueue = true;
5759
}
5860

5961
if (false == $message->getCorrelationId()) {
@@ -62,10 +64,54 @@ public function callAsync(PsrDestination $destination, PsrMessage $message, $tim
6264

6365
$this->context->createProducer()->send($destination, $message);
6466

65-
return new Promise(
66-
$this->context->createConsumer($replyQueue),
67-
$message->getCorrelationId(),
68-
$timeout
69-
);
67+
$correlationId = $message->getCorrelationId();
68+
69+
$receive = function () use ($replyQueue, $timeout, $correlationId) {
70+
$endTime = time() + ((int) ($timeout / 1000));
71+
$consumer = $this->context->createConsumer($replyQueue);
72+
73+
do {
74+
if ($message = $consumer->receive($timeout)) {
75+
if ($message->getCorrelationId() === $correlationId) {
76+
$consumer->acknowledge($message);
77+
78+
return $message;
79+
}
80+
81+
$consumer->reject($message, true);
82+
}
83+
} while (time() < $endTime);
84+
85+
throw TimeoutException::create($timeout, $correlationId);
86+
};
87+
88+
$receiveNoWait = function () use ($replyQueue, $correlationId) {
89+
static $consumer;
90+
91+
if (null === $consumer) {
92+
$consumer = $this->context->createConsumer($replyQueue);
93+
}
94+
95+
if ($message = $consumer->receiveNoWait()) {
96+
if ($message->getCorrelationId() === $correlationId) {
97+
$consumer->acknowledge($message);
98+
99+
return $message;
100+
}
101+
102+
$consumer->reject($message, true);
103+
}
104+
};
105+
106+
$finally = function (Promise $promise) use ($replyQueue) {
107+
if ($promise->isDeleteReplyQueue() && method_exists($this->context, 'deleteQueue')) {
108+
$this->context->deleteQueue($replyQueue);
109+
}
110+
};
111+
112+
$promise = new Promise($receive, $receiveNoWait, $finally);
113+
$promise->setDeleteReplyQueue($deleteReplyQueue);
114+
115+
return $promise;
70116
}
71117
}

0 commit comments

Comments
 (0)