diff --git a/composer.json b/composer.json index c6c665357..49c274e6d 100644 --- a/composer.json +++ b/composer.json @@ -7,6 +7,8 @@ "enqueue/enqueue": "*@dev", "enqueue/stomp": "*@dev", "enqueue/amqp-ext": "*@dev", + "enqueue/amqp-lib": "*@dev", + "php-amqplib/php-amqplib": "^2.7@dev", "enqueue/redis": "*@dev", "enqueue/fs": "*@dev", "enqueue/null": "*@dev", @@ -68,6 +70,10 @@ "type": "path", "url": "pkg/amqp-ext" }, + { + "type": "path", + "url": "pkg/amqp-lib" + }, { "type": "path", "url": "pkg/redis" diff --git a/pkg/amqp-lib/.gitignore b/pkg/amqp-lib/.gitignore new file mode 100644 index 000000000..a770439e5 --- /dev/null +++ b/pkg/amqp-lib/.gitignore @@ -0,0 +1,6 @@ +*~ +/composer.lock +/composer.phar +/phpunit.xml +/vendor/ +/.idea/ diff --git a/pkg/amqp-lib/.travis.yml b/pkg/amqp-lib/.travis.yml new file mode 100644 index 000000000..aaa1849c3 --- /dev/null +++ b/pkg/amqp-lib/.travis.yml @@ -0,0 +1,21 @@ +sudo: false + +git: + depth: 1 + +language: php + +php: + - '5.6' + - '7.0' + +cache: + directories: + - $HOME/.composer/cache + +install: + - composer self-update + - composer install --prefer-source --ignore-platform-reqs + +script: + - vendor/bin/phpunit --exclude-group=functional diff --git a/pkg/amqp-lib/AmqpConnectionFactory.php b/pkg/amqp-lib/AmqpConnectionFactory.php new file mode 100644 index 000000000..ac2e31bb1 --- /dev/null +++ b/pkg/amqp-lib/AmqpConnectionFactory.php @@ -0,0 +1,229 @@ + 'amqp.host The host to connect too. Note: Max 1024 characters.', + * 'port' => 'amqp.port Port on the host.', + * 'vhost' => 'amqp.vhost The virtual host on the host. Note: Max 128 characters.', + * 'user' => 'amqp.user The user name to use. Note: Max 128 characters.', + * 'pass' => 'amqp.password Password. Note: Max 128 characters.', + * 'lazy' => 'the connection will be performed as later as possible, if the option set to true', + * 'stream' => 'stream or socket connection', + * 'receive_method' => 'Could be either basic_get or basic_consume', + * ] + * + * or + * + * amqp://user:pass@host:10000/vhost?lazy=true&socket=true + * + * @param array|string $config + */ + public function __construct($config = 'amqp://') + { + if (empty($config) || 'amqp://' === $config) { + $config = []; + } elseif (is_string($config)) { + $config = $this->parseDsn($config); + } elseif (is_array($config)) { + } else { + throw new \LogicException('The config must be either an array of options, a DSN string or null'); + } + + $this->config = array_replace($this->defaultConfig(), $config); + + $supportedMethods = ['basic_get', 'basic_consume']; + if (false == in_array($this->config['receive_method'], $supportedMethods, true)) { + throw new \LogicException(sprintf( + 'Invalid "receive_method" option value "%s". It could be only "%s"', + $this->config['receive_method'], + implode('", "', $supportedMethods) + )); + } + } + + /** + * @return AmqpContext + */ + public function createContext() + { + return new AmqpContext($this->establishConnection(), $this->config['receive_method']); + } + + /** + * @return AbstractConnection + */ + private function establishConnection() + { + if (false == $this->connection) { + if ($this->config['stream']) { + if ($this->config['lazy']) { + $con = new AMQPLazyConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPStreamConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['connection_timeout'], + $this->config['read_write_timeout'], + null, + $this->config['keepalive'], + $this->config['heartbeat'] + ); + } + } else { + if ($this->config['lazy']) { + $con = new AMQPLazySocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } else { + $con = new AMQPSocketConnection( + $this->config['host'], + $this->config['port'], + $this->config['user'], + $this->config['pass'], + $this->config['vhost'], + $this->config['insist'], + $this->config['login_method'], + $this->config['login_response'], + $this->config['locale'], + $this->config['read_timeout'], + $this->config['keepalive'], + $this->config['write_timeout'], + $this->config['heartbeat'] + ); + } + } + + $this->connection = $con; + } + + return $this->connection; + } + + /** + * @param string $dsn + * + * @return array + */ + private function parseDsn($dsn) + { + $dsnConfig = parse_url($dsn); + if (false === $dsnConfig) { + throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn)); + } + + $dsnConfig = array_replace([ + 'scheme' => null, + 'host' => null, + 'port' => null, + 'user' => null, + 'pass' => null, + 'path' => null, + 'query' => null, + ], $dsnConfig); + + if ('amqp' !== $dsnConfig['scheme']) { + throw new \LogicException(sprintf('The given DSN scheme "%s" is not supported. Could be "amqp" only.', $dsnConfig['scheme'])); + } + + if ($dsnConfig['query']) { + $query = []; + parse_str($dsnConfig['query'], $query); + + $dsnConfig = array_replace($query, $dsnConfig); + } + + $dsnConfig['vhost'] = ltrim($dsnConfig['path'], '/'); + + unset($dsnConfig['scheme'], $dsnConfig['query'], $dsnConfig['fragment'], $dsnConfig['path']); + + $dsnConfig = array_map(function ($value) { + return urldecode($value); + }, $dsnConfig); + + return $dsnConfig; + } + + /** + * @return array + */ + private function defaultConfig() + { + return [ + 'stream' => true, + 'lazy' => true, + 'host' => 'localhost', + 'port' => 5672, + 'user' => 'guest', + 'pass' => 'guest', + 'vhost' => '/', + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'read_timeout' => 3, + 'keepalive' => false, + 'write_timeout' => 3, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + 'receive_method' => 'basic_get', + ]; + } +} diff --git a/pkg/amqp-lib/AmqpConsumer.php b/pkg/amqp-lib/AmqpConsumer.php new file mode 100644 index 000000000..d73f1e843 --- /dev/null +++ b/pkg/amqp-lib/AmqpConsumer.php @@ -0,0 +1,219 @@ +channel = $channel; + $this->queue = $queue; + $this->buffer = $buffer; + $this->receiveMethod = $receiveMethod; + + $this->isInit = false; + } + + /** + * @return AmqpQueue + */ + public function getQueue() + { + return $this->queue; + } + + /** + * {@inheritdoc} + * + * @return AmqpMessage|null + */ + public function receive($timeout = 0) + { + if ('basic_get' == $this->receiveMethod) { + return $this->receiveBasicGet($timeout); + } + + if ('basic_consume' == $this->receiveMethod) { + return $this->receiveBasicConsume($timeout); + } + + throw new \LogicException('The "receiveMethod" is not supported'); + } + + /** + * @return AmqpMessage|null + */ + public function receiveNoWait() + { + if ($message = $this->channel->basic_get($this->queue->getQueueName())) { + return $this->convertMessage($message); + } + } + + /** + * @param AmqpMessage $message + */ + public function acknowledge(PsrMessage $message) + { + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $this->channel->basic_ack($message->getDeliveryTag()); + } + + /** + * @param AmqpMessage $message + * @param bool $requeue + */ + public function reject(PsrMessage $message, $requeue = false) + { + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $this->channel->basic_reject($message->getDeliveryTag(), $requeue); + } + + /** + * @param LibAMQPMessage $amqpMessage + * + * @return AmqpMessage + */ + private function convertMessage(LibAMQPMessage $amqpMessage) + { + $headers = new AMQPTable($amqpMessage->get_properties()); + $headers = $headers->getNativeData(); + + $properties = []; + if (isset($headers['application_headers'])) { + $properties = $headers['application_headers']; + } + unset($headers['application_headers']); + + $message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers); + $message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']); + $message->setRedelivered($amqpMessage->delivery_info['redelivered']); + + return $message; + } + + /** + * @param int $timeout + * + * @return AmqpMessage|null + */ + private function receiveBasicGet($timeout) + { + $end = microtime(true) + ($timeout / 1000); + + while (0 === $timeout || microtime(true) < $end) { + if ($message = $this->receiveNoWait()) { + return $message; + } + + usleep(100000); //100ms + } + } + + /** + * @param int $timeout + * + * @return AmqpMessage|null + */ + private function receiveBasicConsume($timeout) + { + if (false === $this->isInit) { + $callback = function (LibAMQPMessage $message) { + $receivedMessage = $this->convertMessage($message); + $consumerTag = $message->delivery_info['consumer_tag']; + + if ($this->consumerTag === $consumerTag) { + $this->receivedMessage = $receivedMessage; + } else { + // not our message, put it to buffer and continue. + $this->buffer->push($consumerTag, $receivedMessage); + } + }; + + $this->channel->basic_qos(0, 1, false); + + $consumerTag = $this->channel->basic_consume( + $this->queue->getQueueName(), + $this->queue->getConsumerTag(), + $this->queue->isNoLocal(), + $this->queue->isNoAck(), + $this->queue->isExclusive(), + $this->queue->isNoWait(), + $callback + ); + + $this->consumerTag = $consumerTag ?: $this->queue->getConsumerTag(); + + if (empty($this->consumerTag)) { + throw new Exception('Got empty consumer tag'); + } + + $this->isInit = true; + } + + if ($message = $this->buffer->pop($this->consumerTag)) { + return $message; + } + + $this->receivedMessage = null; + + try { + $this->channel->wait(null, false, $timeout); + } catch (AMQPTimeoutException $e) { + } + + return $this->receivedMessage; + } +} diff --git a/pkg/amqp-lib/AmqpContext.php b/pkg/amqp-lib/AmqpContext.php new file mode 100644 index 000000000..168766ad5 --- /dev/null +++ b/pkg/amqp-lib/AmqpContext.php @@ -0,0 +1,246 @@ +connection = $connection; + $this->receiveMethod = $receiveMethod; + $this->buffer = new Buffer(); + } + + /** + * @param string|null $body + * @param array $properties + * @param array $headers + * + * @return AmqpMessage + */ + public function createMessage($body = '', array $properties = [], array $headers = []) + { + return new AmqpMessage($body, $properties, $headers); + } + + /** + * @param string $name + * + * @return AmqpQueue + */ + public function createQueue($name) + { + return new AmqpQueue($name); + } + + /** + * @param string $name + * + * @return AmqpTopic + */ + public function createTopic($name) + { + return new AmqpTopic($name); + } + + /** + * @param PsrDestination $destination + * + * @return AmqpConsumer + */ + public function createConsumer(PsrDestination $destination) + { + $destination instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class) + ; + + if ($destination instanceof AmqpTopic) { + $queue = $this->createTemporaryQueue(); + $this->bind($destination, $queue); + + return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->receiveMethod); + } + + return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->receiveMethod); + } + + /** + * @return AmqpProducer + */ + public function createProducer() + { + return new AmqpProducer($this->getChannel()); + } + + /** + * @return AmqpQueue + */ + public function createTemporaryQueue() + { + $queue = $this->createQueue(null); + $queue->setExclusive(true); + + $this->declareQueue($queue); + + return $queue; + } + + /** + * @param AmqpTopic $destination + */ + public function declareTopic(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class); + + $this->getChannel()->exchange_declare( + $destination->getTopicName(), + $destination->getType(), + $destination->isPassive(), + $destination->isDurable(), + $destination->isAutoDelete(), + $destination->isInternal(), + $destination->isNoWait(), + $destination->getArguments(), + $destination->getTicket() + ); + } + + /** + * @param AmqpQueue $destination + */ + public function declareQueue(PsrDestination $destination) + { + InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class); + + $this->getChannel()->queue_declare( + $destination->getQueueName(), + $destination->isPassive(), + $destination->isDurable(), + $destination->isExclusive(), + $destination->isAutoDelete(), + $destination->isNoWait(), + $destination->getArguments(), + $destination->getTicket() + ); + } + + /** + * @param AmqpTopic|AmqpQueue $source + * @param AmqpTopic|AmqpQueue $target + * + * @throws Exception + */ + public function bind(PsrDestination $source, PsrDestination $target) + { + $source instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($source, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($source, AmqpQueue::class) + ; + + $target instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($target, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($target, AmqpQueue::class) + ; + + if ($source instanceof AmqpQueue && $target instanceof AmqpQueue) { + throw new Exception('Is not possible to bind queue to queue. It is possible to bind topic to queue or topic to topic'); + } + + // bind exchange to exchange + if ($source instanceof AmqpTopic && $target instanceof AmqpTopic) { + $this->getChannel()->exchange_bind( + $target->getTopicName(), + $source->getTopicName(), + $source->getRoutingKey(), + $source->isNowait(), + $source->getArguments(), + $source->getTicket() + ); + // bind queue to exchange + } elseif ($source instanceof AmqpQueue) { + $this->getChannel()->queue_bind( + $source->getQueueName(), + $target->getTopicName(), + $target->getRoutingKey(), + $target->isNowait(), + $target->getArguments(), + $target->getTicket() + ); + // bind exchange to queue + } else { + $this->getChannel()->queue_bind( + $target->getQueueName(), + $source->getTopicName(), + $source->getRoutingKey(), + $source->isNowait(), + $source->getArguments(), + $source->getTicket() + ); + } + } + + /** + * Purge all messages from the given queue. + * + * @param PsrQueue $queue + */ + public function purge(PsrQueue $queue) + { + InvalidDestinationException::assertDestinationInstanceOf($queue, AmqpQueue::class); + + $this->getChannel()->queue_purge($queue->getQueueName()); + } + + public function close() + { + if ($this->channel) { + $this->channel->close(); + } + } + + /** + * @return AMQPChannel + */ + private function getChannel() + { + if (null === $this->channel) { + $this->channel = $this->connection->channel(); + } + + return $this->channel; + } +} diff --git a/pkg/amqp-lib/AmqpMessage.php b/pkg/amqp-lib/AmqpMessage.php new file mode 100644 index 000000000..f391d7364 --- /dev/null +++ b/pkg/amqp-lib/AmqpMessage.php @@ -0,0 +1,295 @@ +body = $body; + $this->properties = $properties; + $this->headers = $headers; + $this->redelivered = false; + } + + /** + * @return string + */ + public function getBody() + { + return $this->body; + } + + /** + * @param string $body + */ + public function setBody($body) + { + $this->body = $body; + } + + /** + * @param array $properties + */ + public function setProperties(array $properties) + { + $this->properties = $properties; + } + + /** + * @return array + */ + public function getProperties() + { + return $this->properties; + } + + /** + * @param string $name + * @param mixed $value + */ + public function setProperty($name, $value) + { + $this->properties[$name] = $value; + } + + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ + public function getProperty($name, $default = null) + { + return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; + } + + /** + * @param array $headers + */ + public function setHeaders(array $headers) + { + $this->headers = $headers; + } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * @param string $name + * @param mixed $value + */ + public function setHeader($name, $value) + { + $this->headers[$name] = $value; + } + + /** + * @param string $name + * @param mixed $default + * + * @return mixed + */ + public function getHeader($name, $default = null) + { + return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; + } + + /** + * @param bool $redelivered + */ + public function setRedelivered($redelivered) + { + $this->redelivered = (bool) $redelivered; + } + + /** + * @return bool + */ + public function isRedelivered() + { + return $this->redelivered; + } + + /** + * @param string $correlationId + */ + public function setCorrelationId($correlationId) + { + $this->setHeader('correlation_id', $correlationId); + } + + /** + * @return string + */ + public function getCorrelationId() + { + return $this->getHeader('correlation_id'); + } + + /** + * @param string $messageId + */ + public function setMessageId($messageId) + { + $this->setHeader('message_id', $messageId); + } + + /** + * @return string + */ + public function getMessageId() + { + return $this->getHeader('message_id'); + } + + /** + * @return int + */ + public function getTimestamp() + { + $value = $this->getHeader('timestamp'); + + return $value === null ? null : (int) $value; + } + + /** + * @param int $timestamp + */ + public function setTimestamp($timestamp) + { + $this->setHeader('timestamp', $timestamp); + } + + /** + * @param string|null $replyTo + */ + public function setReplyTo($replyTo) + { + $this->setHeader('reply_to', $replyTo); + } + + /** + * @return string|null + */ + public function getReplyTo() + { + return $this->getHeader('reply_to'); + } + + /** + * @return string + */ + public function getDeliveryTag() + { + return $this->deliveryTag; + } + + /** + * @param string $deliveryTag + */ + public function setDeliveryTag($deliveryTag) + { + $this->deliveryTag = $deliveryTag; + } + + /** + * @return bool + */ + public function isMandatory() + { + return $this->mandatory; + } + + /** + * @param int $mandatory + */ + public function setMandatory($mandatory) + { + $this->mandatory = $mandatory; + } + + /** + * @return bool + */ + public function isImmediate() + { + return $this->immediate; + } + + /** + * @param bool $immediate + */ + public function setImmediate($immediate) + { + $this->immediate = $immediate; + } + + /** + * @return int + */ + public function getTicket() + { + return $this->ticket; + } + + /** + * @param int $ticket + */ + public function setTicket($ticket) + { + $this->ticket = $ticket; + } +} diff --git a/pkg/amqp-lib/AmqpProducer.php b/pkg/amqp-lib/AmqpProducer.php new file mode 100644 index 000000000..b5cf61805 --- /dev/null +++ b/pkg/amqp-lib/AmqpProducer.php @@ -0,0 +1,71 @@ +channel = $channel; + } + + /** + * @param AmqpTopic|AmqpQueue $destination + * @param AmqpMessage $message + */ + public function send(PsrDestination $destination, PsrMessage $message) + { + $destination instanceof PsrTopic + ? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class) + : InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class) + ; + + InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class); + + $amqpProperties = $message->getHeaders(); + + if ($appProperties = $message->getProperties()) { + $amqpProperties['application_headers'] = new AMQPTable($appProperties); + } + + $amqpMessage = new LibAMQPMessage($message->getBody(), $amqpProperties); + + if ($destination instanceof AmqpTopic) { + $this->channel->basic_publish( + $amqpMessage, + $destination->getTopicName(), + $destination->getRoutingKey(), + $message->isMandatory(), + $message->isImmediate(), + $message->getTicket() + ); + } else { + $this->channel->basic_publish( + $amqpMessage, + '', + $destination->getQueueName(), + $message->isMandatory(), + $message->isImmediate(), + $message->getTicket() + ); + } + } +} diff --git a/pkg/amqp-lib/AmqpQueue.php b/pkg/amqp-lib/AmqpQueue.php new file mode 100644 index 000000000..5c6551c73 --- /dev/null +++ b/pkg/amqp-lib/AmqpQueue.php @@ -0,0 +1,243 @@ +name = $name; + $this->passive = false; + $this->durable = false; + $this->exclusive = false; + $this->autoDelete = true; + $this->noWait = false; + $this->noLocal = false; + $this->noAck = false; + } + + /** + * @return string + */ + public function getQueueName() + { + return $this->name; + } + + /** + * @return bool + */ + public function isPassive() + { + return $this->passive; + } + + /** + * @param bool $passive + */ + public function setPassive($passive) + { + $this->passive = (bool) $passive; + } + + /** + * @return bool + */ + public function isDurable() + { + return $this->durable; + } + + /** + * @param bool $durable + */ + public function setDurable($durable) + { + $this->durable = (bool) $durable; + } + + /** + * @return bool + */ + public function isExclusive() + { + return $this->exclusive; + } + + /** + * @param bool $exclusive + */ + public function setExclusive($exclusive) + { + $this->exclusive = (bool) $exclusive; + } + + /** + * @return bool + */ + public function isAutoDelete() + { + return $this->autoDelete; + } + + /** + * @param bool $autoDelete + */ + public function setAutoDelete($autoDelete) + { + $this->autoDelete = (bool) $autoDelete; + } + + /** + * @return bool + */ + public function isNoWait() + { + return $this->noWait; + } + + /** + * @param bool $noWait + */ + public function setNoWait($noWait) + { + $this->noWait = (bool) $noWait; + } + + /** + * @return array|null + */ + public function getArguments() + { + return $this->arguments; + } + + /** + * @param array|null $arguments + */ + public function setArguments(array $arguments = null) + { + $this->arguments = $arguments; + } + + /** + * @return int + */ + public function getTicket() + { + return $this->ticket; + } + + /** + * @param int $ticket + */ + public function setTicket($ticket) + { + $this->ticket = $ticket; + } + + /** + * @return string + */ + public function getConsumerTag() + { + return $this->consumerTag; + } + + /** + * @param string $consumerTag + */ + public function setConsumerTag($consumerTag) + { + $this->consumerTag = $consumerTag; + } + + /** + * @return bool + */ + public function isNoLocal() + { + return $this->noLocal; + } + + /** + * @param bool $noLocal + */ + public function setNoLocal($noLocal) + { + $this->noLocal = $noLocal; + } + + /** + * @return bool + */ + public function isNoAck() + { + return $this->noAck; + } + + /** + * @param bool $noAck + */ + public function setNoAck($noAck) + { + $this->noAck = $noAck; + } +} diff --git a/pkg/amqp-lib/AmqpTopic.php b/pkg/amqp-lib/AmqpTopic.php new file mode 100644 index 000000000..a1d029853 --- /dev/null +++ b/pkg/amqp-lib/AmqpTopic.php @@ -0,0 +1,224 @@ +name = $name; + $this->type = 'direct'; + $this->passive = false; + $this->durable = false; + $this->autoDelete = true; + $this->internal = false; + $this->noWait = false; + } + + /** + * @return string + */ + public function getTopicName() + { + return $this->name; + } + + /** + * @return string + */ + public function getType() + { + return $this->type; + } + + /** + * @param string $type + */ + public function setType($type) + { + $this->type = $type; + } + + /** + * @return bool + */ + public function isPassive() + { + return $this->passive; + } + + /** + * @param bool $passive + */ + public function setPassive($passive) + { + $this->passive = (bool) $passive; + } + + /** + * @return bool + */ + public function isDurable() + { + return $this->durable; + } + + /** + * @param bool $durable + */ + public function setDurable($durable) + { + $this->durable = (bool) $durable; + } + + /** + * @return bool + */ + public function isAutoDelete() + { + return $this->autoDelete; + } + + /** + * @param bool $autoDelete + */ + public function setAutoDelete($autoDelete) + { + $this->autoDelete = (bool) $autoDelete; + } + + /** + * @return bool + */ + public function isInternal() + { + return $this->internal; + } + + /** + * @param bool $internal + */ + public function setInternal($internal) + { + $this->internal = (bool) $internal; + } + + /** + * @return bool + */ + public function isNoWait() + { + return $this->noWait; + } + + /** + * @param bool $noWait + */ + public function setNoWait($noWait) + { + $this->noWait = (bool) $noWait; + } + + /** + * @return array|null + */ + public function getArguments() + { + return $this->arguments; + } + + /** + * @param array|null $arguments + */ + public function setArguments(array $arguments = null) + { + $this->arguments = $arguments; + } + + /** + * @return int + */ + public function getTicket() + { + return $this->ticket; + } + + /** + * @param int $ticket + */ + public function setTicket($ticket) + { + $this->ticket = $ticket; + } + + /** + * @return string + */ + public function getRoutingKey() + { + return $this->routingKey; + } + + /** + * @param string $routingKey + */ + public function setRoutingKey($routingKey) + { + $this->routingKey = $routingKey; + } +} diff --git a/pkg/amqp-lib/Buffer.php b/pkg/amqp-lib/Buffer.php new file mode 100644 index 000000000..55c06f619 --- /dev/null +++ b/pkg/amqp-lib/Buffer.php @@ -0,0 +1,41 @@ + [AmqpMessage, AmqpMessage ...]] + */ + private $messages; + + public function __construct() + { + $this->messages = []; + } + + /** + * @param string $consumerTag + * @param AmqpMessage $message + */ + public function push($consumerTag, AmqpMessage $message) + { + if (false == array_key_exists($consumerTag, $this->messages)) { + $this->messages[$consumerTag] = []; + } + + $this->messages[$consumerTag][] = $message; + } + + /** + * @param string $consumerTag + * + * @return AmqpMessage|null + */ + public function pop($consumerTag) + { + if (false == empty($this->messages[$consumerTag])) { + return array_shift($this->messages[$consumerTag]); + } + } +} diff --git a/pkg/amqp-lib/LICENSE b/pkg/amqp-lib/LICENSE new file mode 100644 index 000000000..681501120 --- /dev/null +++ b/pkg/amqp-lib/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) +Copyright (c) 2017 Paul McLaren + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php new file mode 100644 index 000000000..31a1ca0ef --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpConnectionFactoryConfigTest.php @@ -0,0 +1,281 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new AmqpConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotAmqp() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "amqp" only.'); + + new AmqpConnectionFactory('http://example.com'); + } + + public function testThrowIfDsnCouldNotBeParsed() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Failed to parse DSN "amqp://:@/"'); + + new AmqpConnectionFactory('amqp://:@/'); + } + + public function testThrowIfReceiveMenthodIsInvalid() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Invalid "receive_method" option value "invalidMethod". It could be only "basic_get", "basic_consume"'); + + new AmqpConnectionFactory(['receive_method' => 'invalidMethod']); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new AmqpConnectionFactory($config); + + $this->assertAttributeEquals($expectedConfig, 'config', $factory); + } + + public static function provideConfigs() + { + yield [ + null, + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + // some examples from Appendix A: Examples (https://www.rabbitmq.com/uri-spec.html) + + yield [ + 'amqp://user:pass@host:10000/vhost', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://user%61:%61pass@ho%61st:10000/v%2fhost', + [ + 'host' => 'hoast', + 'port' => 10000, + 'vhost' => 'v/host', + 'user' => 'usera', + 'pass' => 'apass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://', + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + 'amqp://user:pass@host:10000/vhost?connection_timeout=2&lazy=', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => '', + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => '2', + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + [], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + ['lazy' => false, 'host' => 'host'], + [ + 'host' => 'host', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => false, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + ], + ]; + + yield [ + ['connection_timeout' => 123, 'read_write_timeout' => 321], + [ + 'host' => 'localhost', + 'port' => 5672, + 'vhost' => '/', + 'user' => 'guest', + 'pass' => 'guest', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => 123, + 'read_write_timeout' => 321, + ], + ]; + + yield [ + 'amqp://user:pass@host:10000/vhost?connection_timeout=123&read_write_timeout=321', + [ + 'host' => 'host', + 'port' => 10000, + 'vhost' => 'vhost', + 'user' => 'user', + 'pass' => 'pass', + 'read_timeout' => 3, + 'write_timeout' => 3, + 'lazy' => true, + 'receive_method' => 'basic_get', + 'stream' => true, + 'insist' => false, + 'login_method' => 'AMQPLAIN', + 'login_response' => null, + 'locale' => 'en_US', + 'keepalive' => false, + 'heartbeat' => 0, + 'connection_timeout' => '123', + 'read_write_timeout' => '321', + ], + ]; + } +} diff --git a/pkg/amqp-lib/Tests/AmqpConsumerTest.php b/pkg/amqp-lib/Tests/AmqpConsumerTest.php new file mode 100644 index 000000000..a77443223 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpConsumerTest.php @@ -0,0 +1,184 @@ +assertClassImplements(PsrConsumer::class, AmqpConsumer::class); + } + + public function testCouldBeConstructedWithContextAndQueueAndBufferAsArguments() + { + new AmqpConsumer( + $this->createChannelMock(), + new AmqpQueue('aName'), + new Buffer(), + 'basic_get' + ); + } + + public function testShouldReturnQueue() + { + $queue = new AmqpQueue('aName'); + + $consumer = new AmqpConsumer($this->createChannelMock(), $queue, new Buffer(), 'basic_get'); + + $this->assertSame($queue, $consumer->getQueue()); + } + + public function testOnAcknowledgeShouldThrowExceptionIfNotAmqpMessage() + { + $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but'); + + $consumer->acknowledge(new NullMessage()); + } + + public function testOnRejectShouldThrowExceptionIfNotAmqpMessage() + { + $consumer = new AmqpConsumer($this->createChannelMock(), new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but'); + + $consumer->reject(new NullMessage()); + } + + public function testOnAcknowledgeShouldAcknowledgeMessage() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_ack') + ->with('delivery-tag') + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $consumer->acknowledge($message); + } + + public function testOnRejectShouldRejectMessage() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_reject') + ->with('delivery-tag', $this->isTrue()) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $consumer->reject($message, true); + } + + public function testShouldReturnMessageOnReceiveNoWait() + { + $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['redelivered'] = true; + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_get') + ->willReturn($amqpMessage) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $message = $consumer->receiveNoWait(); + + $this->assertInstanceOf(AmqpMessage::class, $message); + $this->assertSame('body', $message->getBody()); + $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertTrue($message->isRedelivered()); + } + + public function testShouldReturnMessageOnReceiveWithReceiveMethodBasicGet() + { + $amqpMessage = new \PhpAmqpLib\Message\AMQPMessage('body'); + $amqpMessage->delivery_info['delivery_tag'] = 'delivery-tag'; + $amqpMessage->delivery_info['redelivered'] = true; + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_get') + ->willReturn($amqpMessage) + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_get'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + + $message = $consumer->receive(); + + $this->assertInstanceOf(AmqpMessage::class, $message); + $this->assertSame('body', $message->getBody()); + $this->assertSame('delivery-tag', $message->getDeliveryTag()); + $this->assertTrue($message->isRedelivered()); + } + + public function testShouldCallExpectedMethodsWhenReceiveWithBasicConsumeMethod() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_consume') + ->willReturn('consumer-tag') + ; + $channel + ->expects($this->once()) + ->method('basic_qos') + ->with($this->identicalTo(0), $this->identicalTo(1), $this->isFalse()) + ; + $channel + ->expects($this->once()) + ->method('wait') + ; + + $consumer = new AmqpConsumer($channel, new AmqpQueue('aName'), new Buffer(), 'basic_consume'); + + $message = new AmqpMessage(); + $message->setDeliveryTag('delivery-tag'); + $consumer->receive(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + public function createChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpContextTest.php b/pkg/amqp-lib/Tests/AmqpContextTest.php new file mode 100644 index 000000000..17070f072 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpContextTest.php @@ -0,0 +1,244 @@ +createChannelMock(); + $channel + ->expects($this->once()) + ->method('exchange_declare') + ->with( + $this->identicalTo('name'), + $this->identicalTo('type'), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->identicalTo(['key' => 'value']), + $this->identicalTo(12345) + ) + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $topic = new AmqpTopic('name'); + $topic->setType('type'); + $topic->setArguments(['key' => 'value']); + $topic->setAutoDelete(true); + $topic->setDurable(true); + $topic->setInternal(true); + $topic->setNoWait(true); + $topic->setPassive(true); + $topic->setRoutingKey('routing-key'); + $topic->setTicket(12345); + + $session = new AmqpContext($connection, ''); + $session->declareTopic($topic); + } + + public function testShouldDeclareQueue() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('queue_declare') + ->with( + $this->identicalTo('name'), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->isTrue(), + $this->identicalTo(['key' => 'value']), + $this->identicalTo(12345) + ) + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $queue = new AmqpQueue('name'); + $queue->setArguments(['key' => 'value']); + $queue->setAutoDelete(true); + $queue->setDurable(true); + $queue->setNoWait(true); + $queue->setPassive(true); + $queue->setTicket(12345); + $queue->setConsumerTag('consumer-tag'); + $queue->setExclusive(true); + $queue->setNoLocal(true); + + $session = new AmqpContext($connection, ''); + $session->declareQueue($queue); + } + + public function testDeclareBindShouldThrowExceptionIfSourceDestinationIsInvalid() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpTopic but got'); + + $context->bind(new NullTopic(''), new AmqpTopic('name')); + } + + public function testDeclareBindShouldThrowExceptionIfTargetDestinationIsInvalid() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpTopic but got'); + + $context->bind(new AmqpQueue('name'), new NullTopic('')); + } + + public function testDeclareBindShouldThrowExceptionWhenSourceAndTargetAreQueues() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(Exception::class); + $this->expectExceptionMessage('Is not possible to bind queue to queue. It is possible to bind topic to queue or topic to topic'); + + $context->bind(new AmqpQueue('name'), new AmqpQueue('name')); + } + + public function testDeclareBindShouldBindTopicToTopic() + { + $source = new AmqpTopic('source'); + $target = new AmqpTopic('target'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('exchange_bind') + ->with('target', 'source') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->bind($source, $target); + } + + public function testDeclareBindShouldBindTopicToQueue() + { + $source = new AmqpTopic('source'); + $target = new AmqpQueue('target'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->exactly(2)) + ->method('queue_bind') + ->with('target', 'source') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->bind($source, $target); + $context->bind($target, $source); + } + + public function testShouldCloseChannelConnection() + { + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('close') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->createProducer(); + + $context->close(); + } + + public function testPurgeShouldThrowExceptionIfDestinationIsNotAmqpQueue() + { + $context = new AmqpContext($this->createConnectionMock(), ''); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpQueue but got'); + + $context->purge(new NullQueue('')); + } + + public function testShouldPurgeQueue() + { + $queue = new AmqpQueue('queue'); + + $channel = $this->createChannelMock(); + $channel + ->expects($this->once()) + ->method('queue_purge') + ->with('queue') + ; + + $connection = $this->createConnectionMock(); + $connection + ->expects($this->once()) + ->method('channel') + ->willReturn($channel) + ; + + $context = new AmqpContext($connection, ''); + $context->purge($queue); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AbstractConnection + */ + public function createConnectionMock() + { + return $this->createMock(AbstractConnection::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + public function createChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpMessageTest.php b/pkg/amqp-lib/Tests/AmqpMessageTest.php new file mode 100644 index 000000000..7c927a2cf --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpMessageTest.php @@ -0,0 +1,55 @@ +setDeliveryTag('theDeliveryTag'); + + $this->assertSame('theDeliveryTag', $message->getDeliveryTag()); + } + + public function testShouldAllowGetPreviouslySetMandatory() + { + $topic = new AmqpMessage('aName'); + + $topic->setMandatory(false); + $this->assertFalse($topic->isMandatory()); + + $topic->setMandatory(true); + $this->assertTrue($topic->isMandatory()); + } + + public function testShouldAllowGetPreviouslySetImmediate() + { + $topic = new AmqpMessage('aName'); + + $topic->setImmediate(false); + $this->assertFalse($topic->isImmediate()); + + $topic->setImmediate(true); + $this->assertTrue($topic->isImmediate()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpMessage('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpProducerTest.php b/pkg/amqp-lib/Tests/AmqpProducerTest.php new file mode 100644 index 000000000..cfad057f4 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpProducerTest.php @@ -0,0 +1,164 @@ +createAmqpChannelMock()); + } + + public function testShouldImplementPsrProducerInterface() + { + $this->assertClassImplements(PsrProducer::class, AmqpProducer::class); + } + + public function testShouldThrowExceptionWhenDestinationTypeIsInvalid() + { + $producer = new AmqpProducer($this->createAmqpChannelMock()); + + $this->expectException(InvalidDestinationException::class); + $this->expectExceptionMessage('The destination must be an instance of Enqueue\AmqpLib\AmqpQueue but got'); + + $producer->send($this->createDestinationMock(), new AmqpMessage()); + } + + public function testShouldThrowExceptionWhenMessageTypeIsInvalid() + { + $producer = new AmqpProducer($this->createAmqpChannelMock()); + + $this->expectException(InvalidMessageException::class); + $this->expectExceptionMessage('The message must be an instance of Enqueue\AmqpLib\AmqpMessage but it is'); + + $producer->send(new AmqpTopic('name'), $this->createMessageMock()); + } + + public function testShouldPublishMessageToTopic() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->with($this->isInstanceOf(LibAMQPMessage::class), 'topic', 'routing-key') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $topic = new AmqpTopic('topic'); + $topic->setRoutingKey('routing-key'); + + $producer = new AmqpProducer($channel); + $producer->send($topic, new AmqpMessage('body')); + + $this->assertEquals('body', $amqpMessage->getBody()); + } + + public function testShouldPublishMessageToQueue() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->with($this->isInstanceOf(LibAMQPMessage::class), $this->isEmpty(), 'queue') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $queue = new AmqpQueue('queue'); + + $producer = new AmqpProducer($channel); + $producer->send($queue, new AmqpMessage('body')); + + $this->assertEquals('body', $amqpMessage->getBody()); + } + + public function testShouldSetMessageHeaders() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $producer = new AmqpProducer($channel); + $producer->send(new AmqpTopic('name'), new AmqpMessage('body', [], ['content_type' => 'text/plain'])); + + $this->assertEquals(['content_type' => 'text/plain'], $amqpMessage->get_properties()); + } + + public function testShouldSetMessageProperties() + { + $amqpMessage = null; + + $channel = $this->createAmqpChannelMock(); + $channel + ->expects($this->once()) + ->method('basic_publish') + ->will($this->returnCallback(function (LibAMQPMessage $message) use (&$amqpMessage) { + $amqpMessage = $message; + })) + ; + + $producer = new AmqpProducer($channel); + $producer->send(new AmqpTopic('name'), new AmqpMessage('body', ['key' => 'value'])); + + $properties = $amqpMessage->get_properties(); + + $this->assertArrayHasKey('application_headers', $properties); + $this->assertInstanceOf(AMQPTable::class, $properties['application_headers']); + $this->assertEquals(['key' => 'value'], $properties['application_headers']->getNativeData()); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrMessage + */ + private function createMessageMock() + { + return $this->createMock(PsrMessage::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrDestination + */ + private function createDestinationMock() + { + return $this->createMock(PsrDestination::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|AMQPChannel + */ + private function createAmqpChannelMock() + { + return $this->createMock(AMQPChannel::class); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpQueueTest.php b/pkg/amqp-lib/Tests/AmqpQueueTest.php new file mode 100644 index 000000000..34aceeba7 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpQueueTest.php @@ -0,0 +1,122 @@ +setPassive(false); + $this->assertFalse($topic->isPassive()); + + $topic->setPassive(true); + $this->assertTrue($topic->isPassive()); + } + + public function testShouldAllowGetPreviouslySetDurable() + { + $topic = new AmqpQueue('aName'); + + $topic->setDurable(false); + $this->assertFalse($topic->isDurable()); + + $topic->setDurable(true); + $this->assertTrue($topic->isDurable()); + } + + public function testShouldAllowGetPreviouslySetExclusive() + { + $topic = new AmqpQueue('aName'); + + $topic->setExclusive(false); + $this->assertFalse($topic->isExclusive()); + + $topic->setExclusive(true); + $this->assertTrue($topic->isExclusive()); + } + + public function testShouldAllowGetPreviouslySetAutoDelete() + { + $topic = new AmqpQueue('aName'); + + $topic->setAutoDelete(false); + $this->assertFalse($topic->isAutoDelete()); + + $topic->setAutoDelete(true); + $this->assertTrue($topic->isAutoDelete()); + } + + public function testShouldAllowGetPreviouslySetNoWait() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoWait(false); + $this->assertFalse($topic->isNoWait()); + + $topic->setNoWait(true); + $this->assertTrue($topic->isNoWait()); + } + + public function testShouldAllowGetPreviouslySetArguments() + { + $queue = new AmqpQueue('aName'); + + $queue->setArguments(['foo' => 'fooVal', 'bar' => 'barVal']); + + $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $queue->getArguments()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpQueue('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } + + public function testShouldAllowGetPreviouslySetConsumerTag() + { + $topic = new AmqpQueue('aName'); + + //guard + $this->assertSame(null, $topic->getConsumerTag()); + + $topic->setConsumerTag('consumer-tag'); + + $this->assertSame('consumer-tag', $topic->getConsumerTag()); + } + + public function testShouldAllowGetPreviouslySetNoLocal() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoLocal(false); + $this->assertFalse($topic->isNoLocal()); + + $topic->setNoLocal(true); + $this->assertTrue($topic->isNoLocal()); + } + + public function testShouldAllowGetPreviouslySetNoAck() + { + $topic = new AmqpQueue('aName'); + + $topic->setNoAck(false); + $this->assertFalse($topic->isNoAck()); + + $topic->setNoAck(true); + $this->assertTrue($topic->isNoAck()); + } +} diff --git a/pkg/amqp-lib/Tests/AmqpTopicTest.php b/pkg/amqp-lib/Tests/AmqpTopicTest.php new file mode 100644 index 000000000..2e4649639 --- /dev/null +++ b/pkg/amqp-lib/Tests/AmqpTopicTest.php @@ -0,0 +1,116 @@ +assertSame('direct', $topic->getType()); + } + + public function testShouldAllowGetPreviouslySetType() + { + $topic = new AmqpTopic('aName'); + + $topic->setType('fanout'); + + $this->assertSame('fanout', $topic->getType()); + } + + public function testShouldAllowGetPreviouslySetPassive() + { + $topic = new AmqpTopic('aName'); + + $topic->setPassive(false); + $this->assertFalse($topic->isPassive()); + + $topic->setPassive(true); + $this->assertTrue($topic->isPassive()); + } + + public function testShouldAllowGetPreviouslySetDurable() + { + $topic = new AmqpTopic('aName'); + + $topic->setDurable(false); + $this->assertFalse($topic->isDurable()); + + $topic->setDurable(true); + $this->assertTrue($topic->isDurable()); + } + + public function testShouldAllowGetPreviouslySetAutoDelete() + { + $topic = new AmqpTopic('aName'); + + $topic->setAutoDelete(false); + $this->assertFalse($topic->isAutoDelete()); + + $topic->setAutoDelete(true); + $this->assertTrue($topic->isAutoDelete()); + } + + public function testShouldAllowGetPreviouslySetInternal() + { + $topic = new AmqpTopic('aName'); + + $topic->setInternal(false); + $this->assertFalse($topic->isInternal()); + + $topic->setInternal(true); + $this->assertTrue($topic->isInternal()); + } + + public function testShouldAllowGetPreviouslySetNoWait() + { + $topic = new AmqpTopic('aName'); + + $topic->setNoWait(false); + $this->assertFalse($topic->isNoWait()); + + $topic->setNoWait(true); + $this->assertTrue($topic->isNoWait()); + } + + public function testShouldAllowGetPreviouslySetArguments() + { + $topic = new AmqpTopic('aName'); + + $topic->setArguments(['foo' => 'fooVal', 'bar' => 'barVal']); + + $this->assertSame(['foo' => 'fooVal', 'bar' => 'barVal'], $topic->getArguments()); + } + + public function testShouldAllowGetPreviouslySetTicket() + { + $topic = new AmqpTopic('aName'); + + //guard + $this->assertSame(null, $topic->getTicket()); + + $topic->setTicket('ticket'); + + $this->assertSame('ticket', $topic->getTicket()); + } + + public function testShouldAllowGetPreviouslySetRoutingKey() + { + $topic = new AmqpTopic('aName'); + + //guard + $this->assertSame(null, $topic->getRoutingKey()); + + $topic->setRoutingKey('theRoutingKey'); + + $this->assertSame('theRoutingKey', $topic->getRoutingKey()); + } +} diff --git a/pkg/amqp-lib/Tests/BufferTest.php b/pkg/amqp-lib/Tests/BufferTest.php new file mode 100644 index 000000000..981ff2b16 --- /dev/null +++ b/pkg/amqp-lib/Tests/BufferTest.php @@ -0,0 +1,64 @@ +assertAttributeSame([], 'messages', $buffer); + } + + public function testShouldReturnNullIfNoMessagesInBuffer() + { + $buffer = new Buffer(); + + $this->assertNull($buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('anotherConsumerTag')); + } + + public function testShouldPushMessageToBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + $bazMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $buffer->push('anotherConsumerTag', $bazMessage); + + $this->assertAttributeSame([ + 'aConsumerTag' => [$fooMessage, $barMessage], + 'anotherConsumerTag' => [$bazMessage], + ], 'messages', $buffer); + } + + public function testShouldPopMessageFromBuffer() + { + $fooMessage = new AmqpMessage(); + $barMessage = new AmqpMessage(); + + $buffer = new Buffer(); + + $buffer->push('aConsumerTag', $fooMessage); + $buffer->push('aConsumerTag', $barMessage); + + $this->assertSame($fooMessage, $buffer->pop('aConsumerTag')); + $this->assertSame($barMessage, $buffer->pop('aConsumerTag')); + $this->assertNull($buffer->pop('aConsumerTag')); + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php b/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php new file mode 100644 index 000000000..ebc3b8a7f --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpConnectionFactoryTest.php @@ -0,0 +1,14 @@ +createMock(AMQPChannel::class); + + $con = $this->createMock(AbstractConnection::class); + $con + ->expects($this->any()) + ->method('channel') + ->willReturn($channel) + ; + + return new AmqpContext($con, ''); + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php b/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php new file mode 100644 index 000000000..57b93cbd0 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpMessageTest.php @@ -0,0 +1,17 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php new file mode 100644 index 000000000..dfce6ccdf --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..b4db35c10 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,38 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + return $queue; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php new file mode 100644 index 000000000..a50fc4c67 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToAndReceiveNoWaitFromTopicTest.php @@ -0,0 +1,39 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php new file mode 100644 index 000000000..8af921998 --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicConsumeMethodTest.php @@ -0,0 +1,59 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queueName .= '_basic_consume'; + + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topicName .= '_basic_consume'; + + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php new file mode 100644 index 000000000..28192181f --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveFromQueueWithBasicGetMethodTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php new file mode 100644 index 000000000..6b4b1906b --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpSendToTopicAndReceiveNoWaitFromQueueTest.php @@ -0,0 +1,55 @@ +createContext(); + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createQueue(PsrContext $context, $queueName) + { + $queue = $context->createQueue($queueName); + $context->declareQueue($queue); + $context->purge($queue); + + $context->bind($context->createTopic($queueName), $queue); + + return $queue; + } + + /** + * {@inheritdoc} + * + * @param AmqpContext $context + */ + protected function createTopic(PsrContext $context, $topicName) + { + $topic = $context->createTopic($topicName); + $topic->setType('fanout'); + $topic->setDurable(true); + $context->declareTopic($topic); + + return $topic; + } +} diff --git a/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php b/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php new file mode 100644 index 000000000..89717f01f --- /dev/null +++ b/pkg/amqp-lib/Tests/Spec/AmqpTopicTest.php @@ -0,0 +1,14 @@ +=5.6", + "php-amqplib/php-amqplib": "^2.7@dev", + "queue-interop/queue-interop": "^0.5@dev", + "psr/log": "^1" + }, + "require-dev": { + "phpunit/phpunit": "~5.4.0", + "enqueue/test": "^0.6@dev", + "enqueue/enqueue": "^0.6@dev", + "enqueue/null": "^0.6@dev", + "queue-interop/queue-spec": "^0.5@dev", + "symfony/dependency-injection": "^2.8|^3", + "symfony/config": "^2.8|^3" + }, + "autoload": { + "psr-4": { "Enqueue\\AmqpLib\\": "" }, + "exclude-from-classmap": [ + "/Tests/" + ] + }, + "suggest": { + "enqueue/enqueue": "If you'd like to use advanced features like Client abstract layer or Symfony integration features" + }, + "minimum-stability": "dev", + "extra": { + "branch-alias": { + "dev-master": "0.6.x-dev" + } + } +} diff --git a/pkg/amqp-lib/phpunit.xml.dist b/pkg/amqp-lib/phpunit.xml.dist new file mode 100644 index 000000000..f6b8b173a --- /dev/null +++ b/pkg/amqp-lib/phpunit.xml.dist @@ -0,0 +1,30 @@ + + + + + + + ./Tests + + + + + + . + + ./vendor + ./Tests + + + +