diff --git a/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php b/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php index f301fb9289e..f5011248d77 100644 --- a/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php +++ b/app/code/Magento/MessageQueue/Model/Cron/ConsumersRunner.php @@ -5,9 +5,13 @@ */ namespace Magento\MessageQueue\Model\Cron; +use Magento\Framework\App\ObjectManager; +use Magento\Framework\MessageQueue\ConnectionTypeResolver; +use Magento\Framework\MessageQueue\Consumer\Config\ConsumerConfigItemInterface; use Magento\Framework\ShellInterface; use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface; use Magento\Framework\App\DeploymentConfig; +use Psr\Log\LoggerInterface; use Symfony\Component\Process\PhpExecutableFinder; use Magento\MessageQueue\Model\Cron\ConsumersRunner\PidConsumerManager; @@ -56,6 +60,16 @@ class ConsumersRunner */ private $pidConsumerManager; + /** + * @var ConnectionTypeResolver + */ + private $mqConnectionTypeResolver; + + /** + * @var LoggerInterface + */ + private $logger; + /** * @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed * for the PHP executable @@ -63,19 +77,27 @@ class ConsumersRunner * @param DeploymentConfig $deploymentConfig The application deployment configuration * @param ShellInterface $shellBackground The shell command line wrapper for executing command in background * @param PidConsumerManager $pidConsumerManager The class for checking status of process by PID + * @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver + * @param LoggerInterface $logger Logger */ public function __construct( PhpExecutableFinder $phpExecutableFinder, ConsumerConfigInterface $consumerConfig, DeploymentConfig $deploymentConfig, ShellInterface $shellBackground, - PidConsumerManager $pidConsumerManager + PidConsumerManager $pidConsumerManager, + ConnectionTypeResolver $mqConnectionTypeResolver = null, + LoggerInterface $logger = null ) { $this->phpExecutableFinder = $phpExecutableFinder; $this->consumerConfig = $consumerConfig; $this->deploymentConfig = $deploymentConfig; $this->shellBackground = $shellBackground; $this->pidConsumerManager = $pidConsumerManager; + $this->mqConnectionTypeResolver = $mqConnectionTypeResolver + ?: ObjectManager::getInstance()->get(ConnectionTypeResolver::class); + $this->logger = $logger + ?: ObjectManager::getInstance()->get(LoggerInterface::class); } /** @@ -94,12 +116,12 @@ public function run() $php = $this->phpExecutableFinder->find() ?: 'php'; foreach ($this->consumerConfig->getConsumers() as $consumer) { - $consumerName = $consumer->getName(); - - if (!$this->canBeRun($consumerName, $allowedConsumers)) { + if (!$this->canBeRun($consumer, $allowedConsumers)) { continue; } + $consumerName = $consumer->getName(); + $arguments = [ $consumerName, '--pid-file-path=' . $this->getPidFilePath($consumerName), @@ -119,16 +141,37 @@ public function run() /** * Checks that the consumer can be run * - * @param string $consumerName The consumer name + * @param ConsumerConfigItemInterface $consumerConfig The consumer config * @param array $allowedConsumers The list of allowed consumers * If $allowedConsumers is empty it means that all consumers are allowed * @return bool Returns true if the consumer can be run + * @throws \Magento\Framework\Exception\FileSystemException */ - private function canBeRun($consumerName, array $allowedConsumers = []) + private function canBeRun(ConsumerConfigItemInterface $consumerConfig, array $allowedConsumers = []): bool { - $allowed = empty($allowedConsumers) ?: in_array($consumerName, $allowedConsumers); + $consumerName = $consumerConfig->getName(); + if (!empty($allowedConsumers) && !in_array($consumerName, $allowedConsumers)) { + return false; + } + + if ($this->pidConsumerManager->isRun($this->getPidFilePath($consumerName))) { + return false; + } + + $connectionName = $consumerConfig->getConnection(); + try { + $this->mqConnectionTypeResolver->getConnectionType($connectionName); + } catch (\LogicException $e) { + $this->logger->info(sprintf( + 'Consumer "%s" skipped as required connection "%s" is not configured. %s', + $consumerName, + $connectionName, + $e->getMessage() + )); + return false; + } - return $allowed && !$this->pidConsumerManager->isRun($this->getPidFilePath($consumerName)); + return true; } /** diff --git a/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php b/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php index bf8796a03f5..006354b997d 100644 --- a/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php +++ b/app/code/Magento/MessageQueue/Test/Unit/Model/Cron/ConsumersRunnerTest.php @@ -5,6 +5,7 @@ */ namespace Magento\MessageQueue\Test\Unit\Model\Cron; +use Magento\Framework\MessageQueue\ConnectionTypeResolver; use \PHPUnit_Framework_MockObject_MockObject as MockObject; use Magento\Framework\ShellInterface; use Magento\Framework\MessageQueue\Consumer\ConfigInterface as ConsumerConfigInterface; @@ -41,6 +42,11 @@ class ConsumersRunnerTest extends \PHPUnit\Framework\TestCase */ private $phpExecutableFinderMock; + /** + * @var ConnectionTypeResolver + */ + private $connectionTypeResover; + /** * @var ConsumersRunner */ @@ -66,13 +72,18 @@ protected function setUp() $this->deploymentConfigMock = $this->getMockBuilder(DeploymentConfig::class) ->disableOriginalConstructor() ->getMock(); + $this->connectionTypeResover = $this->getMockBuilder(ConnectionTypeResolver::class) + ->disableOriginalConstructor() + ->getMock(); + $this->connectionTypeResover->method('getConnectionType')->willReturn('something'); $this->consumersRunner = new ConsumersRunner( $this->phpExecutableFinderMock, $this->consumerConfigMock, $this->deploymentConfigMock, $this->shellBackgroundMock, - $this->pidConsumerManagerMock + $this->pidConsumerManagerMock, + $this->connectionTypeResover ); }