5
5
*/
6
6
namespace Magento \MessageQueue \Model \Cron ;
7
7
8
+ use Magento \Framework \App \ObjectManager ;
9
+ use Magento \Framework \MessageQueue \ConnectionTypeResolver ;
10
+ use Magento \Framework \MessageQueue \Consumer \Config \ConsumerConfigItemInterface ;
8
11
use Magento \Framework \ShellInterface ;
9
12
use Magento \Framework \MessageQueue \Consumer \ConfigInterface as ConsumerConfigInterface ;
10
13
use Magento \Framework \App \DeploymentConfig ;
14
+ use Psr \Log \LoggerInterface ;
11
15
use Symfony \Component \Process \PhpExecutableFinder ;
12
16
use Magento \MessageQueue \Model \Cron \ConsumersRunner \PidConsumerManager ;
13
17
@@ -56,26 +60,44 @@ class ConsumersRunner
56
60
*/
57
61
private $ pidConsumerManager ;
58
62
63
+ /**
64
+ * @var ConnectionTypeResolver
65
+ */
66
+ private $ mqConnectionTypeResolver ;
67
+
68
+ /**
69
+ * @var LoggerInterface
70
+ */
71
+ private $ logger ;
72
+
59
73
/**
60
74
* @param PhpExecutableFinder $phpExecutableFinder The executable finder specifically designed
61
75
* for the PHP executable
62
76
* @param ConsumerConfigInterface $consumerConfig The consumer config provider
63
77
* @param DeploymentConfig $deploymentConfig The application deployment configuration
64
78
* @param ShellInterface $shellBackground The shell command line wrapper for executing command in background
65
79
* @param PidConsumerManager $pidConsumerManager The class for checking status of process by PID
80
+ * @param ConnectionTypeResolver $mqConnectionTypeResolver Consumer connection resolver
81
+ * @param LoggerInterface $logger Logger
66
82
*/
67
83
public function __construct (
68
84
PhpExecutableFinder $ phpExecutableFinder ,
69
85
ConsumerConfigInterface $ consumerConfig ,
70
86
DeploymentConfig $ deploymentConfig ,
71
87
ShellInterface $ shellBackground ,
72
- PidConsumerManager $ pidConsumerManager
88
+ PidConsumerManager $ pidConsumerManager ,
89
+ ConnectionTypeResolver $ mqConnectionTypeResolver = null ,
90
+ LoggerInterface $ logger = null
73
91
) {
74
92
$ this ->phpExecutableFinder = $ phpExecutableFinder ;
75
93
$ this ->consumerConfig = $ consumerConfig ;
76
94
$ this ->deploymentConfig = $ deploymentConfig ;
77
95
$ this ->shellBackground = $ shellBackground ;
78
96
$ this ->pidConsumerManager = $ pidConsumerManager ;
97
+ $ this ->mqConnectionTypeResolver = $ mqConnectionTypeResolver
98
+ ?: ObjectManager::getInstance ()->get (ConnectionTypeResolver::class);
99
+ $ this ->logger = $ logger
100
+ ?: ObjectManager::getInstance ()->get (LoggerInterface::class);
79
101
}
80
102
81
103
/**
@@ -94,12 +116,12 @@ public function run()
94
116
$ php = $ this ->phpExecutableFinder ->find () ?: 'php ' ;
95
117
96
118
foreach ($ this ->consumerConfig ->getConsumers () as $ consumer ) {
97
- $ consumerName = $ consumer ->getName ();
98
-
99
- if (!$ this ->canBeRun ($ consumerName , $ allowedConsumers )) {
119
+ if (!$ this ->canBeRun ($ consumer , $ allowedConsumers )) {
100
120
continue ;
101
121
}
102
122
123
+ $ consumerName = $ consumer ->getName ();
124
+
103
125
$ arguments = [
104
126
$ consumerName ,
105
127
'--pid-file-path= ' . $ this ->getPidFilePath ($ consumerName ),
@@ -119,16 +141,37 @@ public function run()
119
141
/**
120
142
* Checks that the consumer can be run
121
143
*
122
- * @param string $consumerName The consumer name
144
+ * @param ConsumerConfigItemInterface $consumerConfig The consumer config
123
145
* @param array $allowedConsumers The list of allowed consumers
124
146
* If $allowedConsumers is empty it means that all consumers are allowed
125
147
* @return bool Returns true if the consumer can be run
148
+ * @throws \Magento\Framework\Exception\FileSystemException
126
149
*/
127
- private function canBeRun ($ consumerName , array $ allowedConsumers = [])
150
+ private function canBeRun (ConsumerConfigItemInterface $ consumerConfig , array $ allowedConsumers = []): bool
128
151
{
129
- $ allowed = empty ($ allowedConsumers ) ?: in_array ($ consumerName , $ allowedConsumers );
152
+ $ consumerName = $ consumerConfig ->getName ();
153
+ if (!empty ($ allowedConsumers ) && !in_array ($ consumerName , $ allowedConsumers )) {
154
+ return false ;
155
+ }
156
+
157
+ if ($ this ->pidConsumerManager ->isRun ($ this ->getPidFilePath ($ consumerName ))) {
158
+ return false ;
159
+ }
160
+
161
+ $ connectionName = $ consumerConfig ->getConnection ();
162
+ try {
163
+ $ this ->mqConnectionTypeResolver ->getConnectionType ($ connectionName );
164
+ } catch (\LogicException $ e ) {
165
+ $ this ->logger ->info (sprintf (
166
+ 'Consumer "%s" skipped as required connection "%s" is not configured. %s ' ,
167
+ $ consumerName ,
168
+ $ connectionName ,
169
+ $ e ->getMessage ()
170
+ ));
171
+ return false ;
172
+ }
130
173
131
- return $ allowed && ! $ this -> pidConsumerManager -> isRun ( $ this -> getPidFilePath ( $ consumerName )) ;
174
+ return true ;
132
175
}
133
176
134
177
/**
0 commit comments