Skip to content
This repository was archived by the owner on May 20, 2019. It is now read-only.

Not invoke consumers that relay on not configure connections #32

Merged
merged 1 commit into from
Oct 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 51 additions & 8 deletions app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
*/
namespace Magento\MessageQueue\Model\Cron;

use Magento\Framework\App\ObjectManager;
use Magento\Framework\MessageQueue\ConnectionTypeResolver;
use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface;
use Magento\Framework\ShellInterface;
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
use Magento\Framework\App\DeploymentConfig;
use Psr\Log\LoggerInterface;
use Symfony\Component\Process\PhpExecutableFinder;
use Magento\MessageQueue\Model\Cron\ConsumersRunner\PidConsumerManager;

Expand Down Expand Up @@ -56,26 +60,44 @@ class ConsumersRunner
*/
private $pidConsumerManager;

/**
* @var ConnectionTypeResolver
*/
private $mqConnectionTypeResolver;

/**
* @var LoggerInterface
*/
private $logger;

/**
* @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
* for the PHP executable
* @param ConsumerConfigInterface $consumerConfig The consumer config provider
* @param DeploymentConfig $deploymentConfig The application deployment configuration
* @param ShellInterface $shellBackground The shell command line wrapper for executing command in background
* @param PidConsumerManager $pidConsumerManager The class for checking status of process by PID
* @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
* @param LoggerInterface $logger Logger
*/
public function __construct(
PhpExecutableFinder $phpExecutableFinder,
ConsumerConfigInterface $consumerConfig,
DeploymentConfig $deploymentConfig,
ShellInterface $shellBackground,
PidConsumerManager $pidConsumerManager
PidConsumerManager $pidConsumerManager,
ConnectionTypeResolver $mqConnectionTypeResolver = null,
LoggerInterface $logger = null
) {
$this->phpExecutableFinder = $phpExecutableFinder;
$this->consumerConfig = $consumerConfig;
$this->deploymentConfig = $deploymentConfig;
$this->shellBackground = $shellBackground;
$this->pidConsumerManager = $pidConsumerManager;
$this->mqConnectionTypeResolver = $mqConnectionTypeResolver
?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class);
$this->logger = $logger
?: ObjectManager::getInstance()->get(LoggerInterface::class);
}

/**
Expand All @@ -94,12 +116,12 @@ public function run()
$php = $this->phpExecutableFinder->find() ?: 'php';

foreach ($this->consumerConfig->getConsumers() as $consumer) {
$consumerName = $consumer->getName();

if (!$this->canBeRun($consumerName, $allowedConsumers)) {
if (!$this->canBeRun($consumer, $allowedConsumers)) {
continue;
}

$consumerName = $consumer->getName();

$arguments = [
$consumerName,
'--pid-file-path=' . $this->getPidFilePath($consumerName),
Expand All @@ -119,16 +141,37 @@ public function run()
/**
* Checks that the consumer can be run
*
* @param string $consumerName The consumer name
* @param ConsumerConfigItemInterface $consumerConfig The consumer config
* @param array $allowedConsumers The list of allowed consumers
* If $allowedConsumers is empty it means that all consumers are allowed
* @return bool Returns true if the consumer can be run
* @throws \Magento\Framework\Exception\FileSystemException
*/
private function canBeRun($consumerName, array $allowedConsumers = [])
private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $allowedConsumers = []): bool
{
$allowed = empty($allowedConsumers) ?: in_array($consumerName, $allowedConsumers);
$consumerName = $consumerConfig->getName();
if (!empty($allowedConsumers) && !in_array($consumerName, $allowedConsumers)) {
return false;
}

if ($this->pidConsumerManager->isRun($this->getPidFilePath($consumerName))) {
return false;
}

$connectionName = $consumerConfig->getConnection();
try {
$this->mqConnectionTypeResolver->getConnectionType($connectionName);
} catch (\LogicException $e) {
$this->logger->info(sprintf(
'Consumer "%s" skipped as required connection "%s" is not configured. %s',
$consumerName,
$connectionName,
$e->getMessage()
));
return false;
}

return $allowed && !$this->pidConsumerManager->isRun($this->getPidFilePath($consumerName));
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
namespace Magento\MessageQueue\Test\Unit\Model\Cron;

use Magento\Framework\MessageQueue\ConnectionTypeResolver;
use \PHPUnit_Framework_MockObject_MockObject as MockObject;
use Magento\Framework\ShellInterface;
use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface;
Expand Down Expand Up @@ -41,6 +42,11 @@ class ConsumersRunnerTest extends \PHPUnit\Framework\TestCase
*/
private $phpExecutableFinderMock;

/**
* @var ConnectionTypeResolver
*/
private $connectionTypeResover;

/**
* @var ConsumersRunner
*/
Expand All @@ -66,13 +72,18 @@ protected function setUp()
$this->deploymentConfigMock = $this->getMockBuilder(DeploymentConfig::class)
->disableOriginalConstructor()
->getMock();
$this->connectionTypeResover = $this->getMockBuilder(ConnectionTypeResolver::class)
->disableOriginalConstructor()
->getMock();
$this->connectionTypeResover->method('getConnectionType')->willReturn('something');

$this->consumersRunner = new ConsumersRunner(
$this->phpExecutableFinderMock,
$this->consumerConfigMock,
$this->deploymentConfigMock,
$this->shellBackgroundMock,
$this->pidConsumerManagerMock
$this->pidConsumerManagerMock,
$this->connectionTypeResover
);
}

Expand Down