@@ -121,11 +121,16 @@ public function consume(ExtensionInterface $runtimeExtension = null)
121
121
$ logger = $ context ->getLogger () ?: new NullLogger ();
122
122
$ logger ->info ('Start consuming ' );
123
123
124
+ /** @var Queue|null $previousQueue */
125
+ $ previousQueue = null ;
126
+
124
127
while (true ) {
125
128
try {
126
129
/** @var Queue $queue */
127
130
foreach ($ this ->boundProcessors as list ($ queue , $ processor )) {
128
- $ logger ->debug (sprintf ('Switch to a queue %s ' , $ queue ->getQueueName ()));
131
+ if (false == $ previousQueue || $ previousQueue ->getQueueName () != $ queue ->getQueueName ()) {
132
+ $ logger ->debug (sprintf ('Switch to a queue %s ' , $ queue ->getQueueName ()));
133
+ }
129
134
130
135
$ messageConsumer = $ messageConsumers [$ queue ->getQueueName ()];
131
136
@@ -136,6 +141,8 @@ public function consume(ExtensionInterface $runtimeExtension = null)
136
141
$ context ->setPsrProcessor ($ processor );
137
142
138
143
$ this ->doConsume ($ extension , $ context );
144
+
145
+ $ previousQueue = $ queue ;
139
146
}
140
147
} catch (ConsumptionInterruptedException $ e ) {
141
148
$ logger ->info (sprintf ('Consuming interrupted ' ));
@@ -215,7 +222,7 @@ protected function doConsume(ExtensionInterface $extension, Context $context)
215
222
216
223
$ extension ->onPostReceived ($ context );
217
224
} else {
218
- $ logger ->info (sprintf ('Idle ' ));
225
+ $ logger ->debug (sprintf ('Idle ' ));
219
226
220
227
usleep ($ this ->idleMicroseconds );
221
228
$ extension ->onIdle ($ context );
0 commit comments