Skip to content

Commit 06771a8

Browse files
authored
Merge pull request #223 from php-enqueue/amqp-basic-consume-fixes
Amqp basic consume fixes
2 parents 1ba3d19 + d98145f commit 06771a8

28 files changed

+310
-455
lines changed

Diff for: pkg/amqp-bunny/AmqpConsumer.php

+20-93
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,20 @@
33
namespace Enqueue\AmqpBunny;
44

55
use Bunny\Channel;
6-
use Bunny\Client;
76
use Bunny\Message;
87
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
98
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
109
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
11-
use Interop\Amqp\Impl\AmqpMessage;
12-
use Interop\Queue\Exception;
1310
use Interop\Queue\InvalidMessageException;
1411
use Interop\Queue\PsrMessage;
1512

1613
class AmqpConsumer implements InteropAmqpConsumer
1714
{
15+
/**
16+
* @var AmqpContext
17+
*/
18+
private $context;
19+
1820
/**
1921
* @var Channel
2022
*/
@@ -30,11 +32,6 @@ class AmqpConsumer implements InteropAmqpConsumer
3032
*/
3133
private $buffer;
3234

33-
/**
34-
* @var bool
35-
*/
36-
private $isInit;
37-
3835
/**
3936
* @var string
4037
*/
@@ -51,36 +48,26 @@ class AmqpConsumer implements InteropAmqpConsumer
5148
private $consumerTag;
5249

5350
/**
54-
* @var Message
55-
*/
56-
private $bunnyMessages = [];
57-
58-
/**
59-
* @param Channel $channel
51+
* @param AmqpContext $context
6052
* @param InteropAmqpQueue $queue
6153
* @param Buffer $buffer
6254
* @param string $receiveMethod
6355
*/
64-
public function __construct(Channel $channel, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
56+
public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
6557
{
66-
$this->channel = $channel;
58+
$this->context = $context;
59+
$this->channel = $context->getBunnyChannel();
6760
$this->queue = $queue;
6861
$this->buffer = $buffer;
6962
$this->receiveMethod = $receiveMethod;
7063
$this->flags = self::FLAG_NOPARAM;
71-
72-
$this->isInit = false;
7364
}
7465

7566
/**
7667
* {@inheritdoc}
7768
*/
7869
public function setConsumerTag($consumerTag)
7970
{
80-
if ($this->isInit) {
81-
throw new Exception('Consumer tag is not mutable after it has been subscribed to broker');
82-
}
83-
8471
$this->consumerTag = $consumerTag;
8572
}
8673

@@ -154,9 +141,7 @@ public function receive($timeout = 0)
154141
public function receiveNoWait()
155142
{
156143
if ($message = $this->channel->get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) {
157-
$this->bunnyMessages[$message->deliveryTag] = $message;
158-
159-
return $this->convertMessage($message);
144+
return $this->context->convertMessage($message);
160145
}
161146
}
162147

@@ -167,11 +152,8 @@ public function acknowledge(PsrMessage $message)
167152
{
168153
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
169154

170-
if (isset($this->bunnyMessages[$message->getDeliveryTag()])) {
171-
$this->channel->ack($this->bunnyMessages[$message->getDeliveryTag()]);
172-
173-
unset($this->bunnyMessages[$message->getDeliveryTag()]);
174-
}
155+
$bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], '');
156+
$this->channel->ack($bunnyMessage);
175157
}
176158

177159
/**
@@ -182,41 +164,8 @@ public function reject(PsrMessage $message, $requeue = false)
182164
{
183165
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
184166

185-
if (isset($this->bunnyMessages[$message->getDeliveryTag()])) {
186-
$this->channel->reject($this->bunnyMessages[$message->getDeliveryTag()], $requeue);
187-
188-
unset($this->bunnyMessages[$message->getDeliveryTag()]);
189-
}
190-
}
191-
192-
/**
193-
* @param Message $bunnyMessage
194-
*
195-
* @return InteropAmqpMessage
196-
*/
197-
private function convertMessage(Message $bunnyMessage)
198-
{
199-
$headers = $bunnyMessage->headers;
200-
201-
$properties = [];
202-
if (isset($headers['application_headers'])) {
203-
$properties = $headers['application_headers'];
204-
}
205-
unset($headers['application_headers']);
206-
207-
if (array_key_exists('timestamp', $headers)) {
208-
/** @var \DateTime $date */
209-
$date = $headers['timestamp'];
210-
211-
$headers['timestamp'] = (int) $date->format('U');
212-
}
213-
214-
$message = new AmqpMessage($bunnyMessage->content, $properties, $headers);
215-
$message->setDeliveryTag($bunnyMessage->deliveryTag);
216-
$message->setRedelivered($bunnyMessage->redelivered);
217-
$message->setRoutingKey($bunnyMessage->routingKey);
218-
219-
return $message;
167+
$bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], '');
168+
$this->channel->reject($bunnyMessage, $requeue);
220169
}
221170

222171
/**
@@ -244,34 +193,12 @@ private function receiveBasicGet($timeout)
244193
*/
245194
private function receiveBasicConsume($timeout)
246195
{
247-
if (false === $this->isInit) {
248-
$callback = function (Message $message, Channel $channel, Client $bunny) {
249-
$receivedMessage = $this->convertMessage($message);
250-
$receivedMessage->setConsumerTag($message->consumerTag);
251-
252-
$this->bunnyMessages[$message->deliveryTag] = $message;
253-
$this->buffer->push($receivedMessage->getConsumerTag(), $receivedMessage);
254-
255-
$bunny->stop();
256-
};
257-
258-
$frame = $this->channel->consume(
259-
$callback,
260-
$this->queue->getQueueName(),
261-
$this->getConsumerTag() ?: $this->getQueue()->getConsumerTag(),
262-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
263-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
264-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
265-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT)
266-
);
267-
268-
$this->consumerTag = $frame->consumerTag;
269-
270-
if (empty($this->consumerTag)) {
271-
throw new Exception('Got empty consumer tag');
272-
}
196+
if (false == $this->consumerTag) {
197+
$this->context->subscribe($this, function (InteropAmqpMessage $message) {
198+
$this->buffer->push($message->getConsumerTag(), $message);
273199

274-
$this->isInit = true;
200+
return false;
201+
});
275202
}
276203

277204
if ($message = $this->buffer->pop($this->consumerTag)) {
@@ -281,7 +208,7 @@ private function receiveBasicConsume($timeout)
281208
while (true) {
282209
$start = microtime(true);
283210

284-
$this->channel->getClient()->run($timeout / 1000);
211+
$this->context->consume($timeout);
285212

286213
if ($message = $this->buffer->pop($this->consumerTag)) {
287214
return $message;

Diff for: pkg/amqp-bunny/AmqpContext.php

+6-4
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ public function createConsumer(PsrDestination $destination)
127127
$queue = $this->createTemporaryQueue();
128128
$this->bind(new AmqpBind($destination, $queue, $queue->getQueueName()));
129129

130-
return new AmqpConsumer($this->getBunnyChannel(), $queue, $this->buffer, $this->config['receive_method']);
130+
return new AmqpConsumer($this, $queue, $this->buffer, $this->config['receive_method']);
131131
}
132132

133-
return new AmqpConsumer($this->getBunnyChannel(), $destination, $this->buffer, $this->config['receive_method']);
133+
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
134134
}
135135

136136
/**
@@ -411,11 +411,13 @@ public function getBunnyChannel()
411411
}
412412

413413
/**
414+
* @internal It must be used here and in the consumer only
415+
*
414416
* @param Message $bunnyMessage
415417
*
416418
* @return InteropAmqpMessage
417419
*/
418-
private function convertMessage(Message $bunnyMessage)
420+
public function convertMessage(Message $bunnyMessage)
419421
{
420422
$headers = $bunnyMessage->headers;
421423

@@ -425,7 +427,7 @@ private function convertMessage(Message $bunnyMessage)
425427
}
426428
unset($headers['application_headers']);
427429

428-
if (array_key_exists('timestamp', $headers)) {
430+
if (array_key_exists('timestamp', $headers) && $headers['timestamp']) {
429431
/** @var \DateTime $date */
430432
$date = $headers['timestamp'];
431433

Diff for: pkg/amqp-bunny/AmqpProducer.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public function send(PsrDestination $destination, PsrMessage $message)
7777

7878
$amqpProperties = $message->getHeaders();
7979

80-
if (array_key_exists('timestamp', $amqpProperties)) {
80+
if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) {
8181
$amqpProperties['timestamp'] = \DateTime::createFromFormat('U', $amqpProperties['timestamp']);
8282
}
8383

0 commit comments

Comments
 (0)