Skip to content

Commit 65c5b9c

Browse files
authored
Merge pull request #111 from php-enqueue/amqp-consume-from-multiple-queues-hotfix
[amqp] Switch to AMQP' basic.get till the issue with basic.consume is solved.
2 parents 444c533 + 08ac0f0 commit 65c5b9c

File tree

1 file changed

+52
-43
lines changed

1 file changed

+52
-43
lines changed

Diff for: pkg/amqp-ext/AmqpConsumer.php

+52-43
Original file line numberDiff line numberDiff line change
@@ -64,52 +64,61 @@ public function getQueue()
6464
*/
6565
public function receive($timeout = 0)
6666
{
67-
if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
68-
return $message;
69-
}
70-
71-
/** @var \AMQPQueue $extQueue */
72-
$extConnection = $this->getExtQueue()->getChannel()->getConnection();
73-
74-
$originalTimeout = $extConnection->getReadTimeout();
75-
try {
76-
$extConnection->setReadTimeout($timeout / 1000);
77-
78-
if (false == $this->isInit) {
79-
$this->getExtQueue()->consume(null, AMQP_NOPARAM);
80-
81-
$this->isInit = true;
82-
}
83-
84-
/** @var AmqpMessage|null $message */
85-
$message = null;
67+
// @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281
68+
$end = microtime(true) + ($timeout / 1000);
8669

87-
$this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
88-
$message = $this->convertMessage($extEnvelope);
89-
$message->setConsumerTag($q->getConsumerTag());
90-
91-
if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
92-
return false;
93-
}
94-
95-
// not our message, put it to buffer and continue.
96-
$this->buffer->push($q->getConsumerTag(), $message);
97-
98-
$message = null;
99-
100-
return true;
101-
}, AMQP_JUST_CONSUME);
102-
103-
return $message;
104-
} catch (\AMQPQueueException $e) {
105-
if ('Consumer timeout exceed' == $e->getMessage()) {
106-
return null;
70+
while (0 === $timeout || microtime(true) < $end) {
71+
if ($message = $this->receiveNoWait()) {
72+
return $message;
10773
}
108-
109-
throw $e;
110-
} finally {
111-
$extConnection->setReadTimeout($originalTimeout);
11274
}
75+
76+
// if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
77+
// return $message;
78+
// }
79+
//
80+
// /** @var \AMQPQueue $extQueue */
81+
// $extConnection = $this->getExtQueue()->getChannel()->getConnection();
82+
//
83+
// $originalTimeout = $extConnection->getReadTimeout();
84+
// try {
85+
// $extConnection->setReadTimeout($timeout / 1000);
86+
//
87+
// if (false == $this->isInit) {
88+
// $this->getExtQueue()->consume(null, AMQP_NOPARAM);
89+
//
90+
// $this->isInit = true;
91+
// }
92+
//
93+
// /** @var AmqpMessage|null $message */
94+
// $message = null;
95+
//
96+
// $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
97+
// $message = $this->convertMessage($extEnvelope);
98+
// $message->setConsumerTag($q->getConsumerTag());
99+
//
100+
// if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
101+
// return false;
102+
// }
103+
//
104+
// // not our message, put it to buffer and continue.
105+
// $this->buffer->push($q->getConsumerTag(), $message);
106+
//
107+
// $message = null;
108+
//
109+
// return true;
110+
// }, AMQP_JUST_CONSUME);
111+
//
112+
// return $message;
113+
// } catch (\AMQPQueueException $e) {
114+
// if ('Consumer timeout exceed' == $e->getMessage()) {
115+
// return null;
116+
// }
117+
//
118+
// throw $e;
119+
// } finally {
120+
// $extConnection->setReadTimeout($originalTimeout);
121+
// }
113122
}
114123

115124
/**

0 commit comments

Comments
 (0)