55 */
66namespace Magento \MessageQueue \Model \Cron ;
77
8+ use Magento \Framework \App \ObjectManager ;
9+ use Magento \Framework \MessageQueue \ConnectionTypeResolver ;
10+ use Magento \Framework \MessageQueue \Consumer \Config \ConsumerConfigItemInterface ;
811use Magento \Framework \ShellInterface ;
912use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfigInterface ;
1013use Magento \Framework \App \DeploymentConfig ;
14+ use Psr \Log \LoggerInterface ;
1115use Symfony \Component \Process \PhpExecutableFinder ;
1216use Magento \MessageQueue \Model \Cron \ConsumersRunner \PidConsumerManager ;
1317
@@ -56,26 +60,44 @@ class ConsumersRunner
5660 */
5761 private $ pidConsumerManager ;
5862
63+ /**
64+ * @var ConnectionTypeResolver
65+ */
66+ private $ mqConnectionTypeResolver ;
67+
68+ /**
69+ * @var LoggerInterface
70+ */
71+ private $ logger ;
72+
5973 /**
6074 * @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
6175 * for the PHP executable
6276 * @param ConsumerConfigInterface $consumerConfig The consumer config provider
6377 * @param DeploymentConfig $deploymentConfig The application deployment configuration
6478 * @param ShellInterface $shellBackground The shell command line wrapper for executing command in background
6579 * @param PidConsumerManager $pidConsumerManager The class for checking status of process by PID
80+ * @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
81+ * @param LoggerInterface $logger Logger
6682 */
6783 public function __construct (
6884 PhpExecutableFinder $ phpExecutableFinder ,
6985 ConsumerConfigInterface $ consumerConfig ,
7086 DeploymentConfig $ deploymentConfig ,
7187 ShellInterface $ shellBackground ,
72- PidConsumerManager $ pidConsumerManager
88+ PidConsumerManager $ pidConsumerManager ,
89+ ConnectionTypeResolver $ mqConnectionTypeResolver = null ,
90+ LoggerInterface $ logger = null
7391 ) {
7492 $ this ->phpExecutableFinder = $ phpExecutableFinder ;
7593 $ this ->consumerConfig = $ consumerConfig ;
7694 $ this ->deploymentConfig = $ deploymentConfig ;
7795 $ this ->shellBackground = $ shellBackground ;
7896 $ this ->pidConsumerManager = $ pidConsumerManager ;
97+ $ this ->mqConnectionTypeResolver = $ mqConnectionTypeResolver
98+ ?: ObjectManager::getInstance ()->get (ConnectionTypeResolver::class);
99+ $ this ->logger = $ logger
100+ ?: ObjectManager::getInstance ()->get (LoggerInterface::class);
79101 }
80102
81103 /**
@@ -94,12 +116,12 @@ public function run()
94116 $ php = $ this ->phpExecutableFinder ->find () ?: 'php ' ;
95117
96118 foreach ($ this ->consumerConfig ->getConsumers () as $ consumer ) {
97- $ consumerName = $ consumer ->getName ();
98-
99- if (!$ this ->canBeRun ($ consumerName , $ allowedConsumers )) {
119+ if (!$ this ->canBeRun ($ consumer , $ allowedConsumers )) {
100120 continue ;
101121 }
102122
123+ $ consumerName = $ consumer ->getName ();
124+
103125 $ arguments = [
104126 $ consumerName ,
105127 '--pid-file-path= ' . $ this ->getPidFilePath ($ consumerName ),
@@ -119,16 +141,37 @@ public function run()
119141 /**
120142 * Checks that the consumer can be run
121143 *
122- * @param string $consumerName The consumer name
144+ * @param ConsumerConfigItemInterface $consumerConfig The consumer config
123145 * @param array $allowedConsumers The list of allowed consumers
124146 * If $allowedConsumers is empty it means that all consumers are allowed
125147 * @return bool Returns true if the consumer can be run
148+ * @throws \Magento\Framework\Exception\FileSystemException
126149 */
127- private function canBeRun ($ consumerName , array $ allowedConsumers = [])
150+ private function canBeRun (ConsumerConfigItemInterface $ consumerConfig , array $ allowedConsumers = []): bool
128151 {
129- $ allowed = empty ($ allowedConsumers ) ?: in_array ($ consumerName , $ allowedConsumers );
152+ $ consumerName = $ consumerConfig ->getName ();
153+ if (!empty ($ allowedConsumers ) && !in_array ($ consumerName , $ allowedConsumers )) {
154+ return false ;
155+ }
156+
157+ if ($ this ->pidConsumerManager ->isRun ($ this ->getPidFilePath ($ consumerName ))) {
158+ return false ;
159+ }
160+
161+ $ connectionName = $ consumerConfig ->getConnection ();
162+ try {
163+ $ this ->mqConnectionTypeResolver ->getConnectionType ($ connectionName );
164+ } catch (\LogicException $ e ) {
165+ $ this ->logger ->info (sprintf (
166+ 'Consumer "%s" skipped as required connection "%s" is not configured. %s ' ,
167+ $ consumerName ,
168+ $ connectionName ,
169+ $ e ->getMessage ()
170+ ));
171+ return false ;
172+ }
130173
131- return $ allowed && ! $ this -> pidConsumerManager -> isRun ( $ this -> getPidFilePath ( $ consumerName )) ;
174+ return true ;
132175 }
133176
134177 /**
0 commit comments