@@ -72,11 +72,11 @@ public function __construct(AmqpContext $context, Config $config = null)
72
72
73
73
/**
74
74
* @param string $topic
75
- * @param callback
75
+ * @param string $processorName
76
+ * @param callback $processor
76
77
*/
77
- public function bind ($ topic , callable $ processor )
78
+ public function bind ($ topic , $ processorName , callable $ processor )
78
79
{
79
- $ processorName = uniqid ('' , true );
80
80
$ queueName = $ this ->config ->getDefaultProcessorQueueName ();
81
81
82
82
$ this ->topicsMetaRegistry ->addProcessor ($ topic , $ processorName );
@@ -97,9 +97,7 @@ public function consume(ExtensionInterface $runtimeExtension = null)
97
97
98
98
$ processor = $ this ->getProcessor ();
99
99
100
- $ queueConsumer = new QueueConsumer ($ this ->context , new ChainExtension ([
101
- new SetRouterPropertiesExtension ($ this ->driver ),
102
- ]));
100
+ $ queueConsumer = $ this ->getQueueConsumer ();
103
101
104
102
$ defaultQueueName = $ this ->config ->getDefaultProcessorQueueName ();
105
103
$ defaultTransportQueueName = $ this ->config ->createTransportQueueName ($ defaultQueueName );
@@ -114,10 +112,44 @@ public function consume(ExtensionInterface $runtimeExtension = null)
114
112
$ queueConsumer ->consume ($ runtimeExtension );
115
113
}
116
114
115
+ /**
116
+ * @return QueueConsumer
117
+ */
118
+ public function getQueueConsumer ()
119
+ {
120
+ return new QueueConsumer ($ this ->context , new ChainExtension ([
121
+ new SetRouterPropertiesExtension ($ this ->driver ),
122
+ ]));
123
+ }
124
+
125
+ /**
126
+ * @return DriverInterface
127
+ */
128
+ public function getDriver ()
129
+ {
130
+ return $ this ->driver ;
131
+ }
132
+
133
+ /**
134
+ * @return TopicMetaRegistry
135
+ */
136
+ public function getTopicMetaRegistry ()
137
+ {
138
+ return $ this ->topicsMetaRegistry ;
139
+ }
140
+
141
+ /**
142
+ * @return QueueMetaRegistry
143
+ */
144
+ public function getQueueMetaRegistry ()
145
+ {
146
+ return $ this ->queueMetaRegistry ;
147
+ }
148
+
117
149
/**
118
150
* @return MessageProducerInterface
119
151
*/
120
- private function getProducer ()
152
+ public function getProducer ()
121
153
{
122
154
$ this ->driver ->setupBroker ();
123
155
@@ -127,7 +159,7 @@ private function getProducer()
127
159
/**
128
160
* @return DelegateProcessor
129
161
*/
130
- private function getProcessor ()
162
+ public function getProcessor ()
131
163
{
132
164
return new DelegateProcessor ($ this ->processorsRegistry );
133
165
}
0 commit comments