Skip to content

Commit 3520fab

Browse files
committed
set routing key in consumed message
1 parent 4192bcf commit 3520fab

File tree

3 files changed

+8
-4
lines changed

3 files changed

+8
-4
lines changed

pkg/amqp-ext/AmqpConsumer.php

+1
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ private function convertMessage(\AMQPEnvelope $extEnvelope)
287287
);
288288
$message->setRedelivered($extEnvelope->isRedelivery());
289289
$message->setDeliveryTag($extEnvelope->getDeliveryTag());
290+
$message->setRoutingKey($extEnvelope->getRoutingKey());
290291

291292
return $message;
292293
}

pkg/amqp-lib/AmqpConsumer.php

+1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private function convertMessage(LibAMQPMessage $amqpMessage)
196196
$message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers);
197197
$message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']);
198198
$message->setRedelivered($amqpMessage->delivery_info['redelivered']);
199+
$message->setRoutingKey($amqpMessage->delivery_info['routing_key']);
199200

200201
return $message;
201202
}

pkg/amqp-lib/AmqpContext.php

+6-4
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ public function createProducer()
124124
*/
125125
public function createTemporaryQueue()
126126
{
127-
$queue = $this->createQueue(null);
128-
$queue->addFlag(InteropAmqpQueue::FLAG_EXCLUSIVE);
127+
list($name) = $this->getChannel()->queue_declare('', false, false, true, false);
129128

130-
$this->declareQueue($queue);
129+
$queue = $this->createQueue($name);
130+
$queue->addFlag(InteropAmqpQueue::FLAG_EXCLUSIVE);
131131

132132
return $queue;
133133
}
@@ -166,7 +166,7 @@ public function deleteTopic(InteropAmqpTopic $topic)
166166
*/
167167
public function declareQueue(InteropAmqpQueue $queue)
168168
{
169-
return $this->getChannel()->queue_declare(
169+
list(, $messageCount) = $this->getChannel()->queue_declare(
170170
$queue->getQueueName(),
171171
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_PASSIVE),
172172
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_DURABLE),
@@ -175,6 +175,8 @@ public function declareQueue(InteropAmqpQueue $queue)
175175
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT),
176176
$queue->getArguments()
177177
);
178+
179+
return $messageCount;
178180
}
179181

180182
/**

0 commit comments

Comments
 (0)