diff --git a/pkg/amqp-ext/AmqpConsumer.php b/pkg/amqp-ext/AmqpConsumer.php index bf5db633f..809ebc049 100644 --- a/pkg/amqp-ext/AmqpConsumer.php +++ b/pkg/amqp-ext/AmqpConsumer.php @@ -64,52 +64,61 @@ public function getQueue() */ public function receive($timeout = 0) { - if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) { - return $message; - } - - /** @var \AMQPQueue $extQueue */ - $extConnection = $this->getExtQueue()->getChannel()->getConnection(); - - $originalTimeout = $extConnection->getReadTimeout(); - try { - $extConnection->setReadTimeout($timeout / 1000); - - if (false == $this->isInit) { - $this->getExtQueue()->consume(null, AMQP_NOPARAM); - - $this->isInit = true; - } - - /** @var AmqpMessage|null $message */ - $message = null; + // @see https://github.com/php-enqueue/enqueue-dev/issues/110 and https://github.com/pdezwart/php-amqp/issues/281 + $end = microtime(true) + ($timeout / 1000); - $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) { - $message = $this->convertMessage($extEnvelope); - $message->setConsumerTag($q->getConsumerTag()); - - if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) { - return false; - } - - // not our message, put it to buffer and continue. - $this->buffer->push($q->getConsumerTag(), $message); - - $message = null; - - return true; - }, AMQP_JUST_CONSUME); - - return $message; - } catch (\AMQPQueueException $e) { - if ('Consumer timeout exceed' == $e->getMessage()) { - return null; + while (0 === $timeout || microtime(true) < $end) { + if ($message = $this->receiveNoWait()) { + return $message; } - - throw $e; - } finally { - $extConnection->setReadTimeout($originalTimeout); } + +// if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) { +// return $message; +// } +// +// /** @var \AMQPQueue $extQueue */ +// $extConnection = $this->getExtQueue()->getChannel()->getConnection(); +// +// $originalTimeout = $extConnection->getReadTimeout(); +// try { +// $extConnection->setReadTimeout($timeout / 1000); +// +// if (false == $this->isInit) { +// $this->getExtQueue()->consume(null, AMQP_NOPARAM); +// +// $this->isInit = true; +// } +// +// /** @var AmqpMessage|null $message */ +// $message = null; +// +// $this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) { +// $message = $this->convertMessage($extEnvelope); +// $message->setConsumerTag($q->getConsumerTag()); +// +// if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) { +// return false; +// } +// +// // not our message, put it to buffer and continue. +// $this->buffer->push($q->getConsumerTag(), $message); +// +// $message = null; +// +// return true; +// }, AMQP_JUST_CONSUME); +// +// return $message; +// } catch (\AMQPQueueException $e) { +// if ('Consumer timeout exceed' == $e->getMessage()) { +// return null; +// } +// +// throw $e; +// } finally { +// $extConnection->setReadTimeout($originalTimeout); +// } } /**