3
3
namespace Enqueue \SimpleClient ;
4
4
5
5
use Enqueue \Client \ArrayProcessorRegistry ;
6
+ use Enqueue \Client \ChainExtension as ClientChainExtensions ;
6
7
use Enqueue \Client \Config ;
8
+ use Enqueue \Client \ConsumptionExtension \DelayRedeliveredMessageExtension ;
9
+ use Enqueue \Client \ConsumptionExtension \SetRouterPropertiesExtension ;
7
10
use Enqueue \Client \DelegateProcessor ;
11
+ use Enqueue \Client \DriverFactory ;
8
12
use Enqueue \Client \DriverInterface ;
9
13
use Enqueue \Client \Message ;
10
- use Enqueue \Client \ProcessorRegistryInterface ;
14
+ use Enqueue \Client \Producer ;
11
15
use Enqueue \Client \ProducerInterface ;
12
16
use Enqueue \Client \Route ;
13
17
use Enqueue \Client \RouteCollection ;
14
18
use Enqueue \Client \RouterProcessor ;
19
+ use Enqueue \ConnectionFactoryFactory ;
15
20
use Enqueue \Consumption \CallbackProcessor ;
21
+ use Enqueue \Consumption \ChainExtension as ConsumptionChainExtension ;
16
22
use Enqueue \Consumption \ExtensionInterface ;
23
+ use Enqueue \Consumption \QueueConsumer ;
17
24
use Enqueue \Consumption \QueueConsumerInterface ;
18
25
use Enqueue \Rpc \Promise ;
19
- use Interop \Queue \PsrContext ;
26
+ use Enqueue \Rpc \RpcFactory ;
27
+ use Enqueue \Symfony \DependencyInjection \TransportFactory ;
20
28
use Interop \Queue \PsrProcessor ;
21
- use Symfony \Component \DependencyInjection \ContainerBuilder ;
22
- use Symfony \Component \DependencyInjection \ContainerInterface ;
29
+ use Symfony \Component \Config \Definition \Builder \TreeBuilder ;
30
+ use Symfony \Component \Config \Definition \NodeInterface ;
31
+ use Symfony \Component \Config \Definition \Processor ;
23
32
24
33
final class SimpleClient
25
34
{
26
35
/**
27
- * @var ContainerInterface
36
+ * @var DriverInterface
28
37
*/
29
- private $ container ;
38
+ private $ driver ;
30
39
31
40
/**
32
- * @var array|string
41
+ * @var Producer
33
42
*/
34
- private $ config ;
43
+ private $ producer ;
44
+
45
+ /**
46
+ * @var QueueConsumer
47
+ */
48
+ private $ queueConsumer ;
49
+
50
+ /**
51
+ * @var ArrayProcessorRegistry
52
+ */
53
+ private $ processorRegistry ;
54
+
55
+ /**
56
+ * @var DelegateProcessor
57
+ */
58
+ private $ delegateProcessor ;
35
59
36
60
/**
37
61
* The config could be a transport DSN (string) or an array, here's an example of a few DSNs:.
@@ -78,13 +102,11 @@ final class SimpleClient
78
102
* ]
79
103
*
80
104
*
81
- * @param string|array $config
82
- * @param ContainerBuilder|null $container
105
+ * @param string|array $config
83
106
*/
84
- public function __construct ($ config, ContainerBuilder $ container = null )
107
+ public function __construct ($ config )
85
108
{
86
- $ this ->container = $ this ->buildContainer ($ config , $ container ?: new ContainerBuilder ());
87
- $ this ->config = $ config ;
109
+ $ this ->build (['enqueue ' => $ config ]);
88
110
}
89
111
90
112
/**
@@ -102,8 +124,8 @@ public function bindTopic(string $topic, $processor, string $processorName = nul
102
124
103
125
$ processorName = $ processorName ?: uniqid (get_class ($ processor ));
104
126
105
- $ this ->getRouteCollection ()->add (new Route ($ topic , Route::TOPIC , $ processorName ));
106
- $ this ->getProcessorRegistry () ->add ($ processorName , $ processor );
127
+ $ this ->driver -> getRouteCollection ()->add (new Route ($ topic , Route::TOPIC , $ processorName ));
128
+ $ this ->processorRegistry ->add ($ processorName , $ processor );
107
129
}
108
130
109
131
/**
@@ -121,116 +143,171 @@ public function bindCommand(string $command, $processor, string $processorName =
121
143
122
144
$ processorName = $ processorName ?: uniqid (get_class ($ processor ));
123
145
124
- $ this ->getRouteCollection ()->add (new Route ($ command , Route::COMMAND , $ processorName ));
125
- $ this ->getProcessorRegistry () ->add ($ processorName , $ processor );
146
+ $ this ->driver -> getRouteCollection ()->add (new Route ($ command , Route::COMMAND , $ processorName ));
147
+ $ this ->processorRegistry ->add ($ processorName , $ processor );
126
148
}
127
149
128
150
/**
129
151
* @param string|array|\JsonSerializable|Message $message
130
152
*/
131
153
public function sendCommand (string $ command , $ message , bool $ needReply = false ): ?Promise
132
154
{
133
- return $ this ->getProducer () ->sendCommand ($ command , $ message , $ needReply );
155
+ return $ this ->producer ->sendCommand ($ command , $ message , $ needReply );
134
156
}
135
157
136
158
/**
137
159
* @param string|array|Message $message
138
160
*/
139
161
public function sendEvent (string $ topic , $ message ): void
140
162
{
141
- $ this ->getProducer () ->sendEvent ($ topic , $ message );
163
+ $ this ->producer ->sendEvent ($ topic , $ message );
142
164
}
143
165
144
166
public function consume (ExtensionInterface $ runtimeExtension = null ): void
145
167
{
146
168
$ this ->setupBroker ();
147
169
148
- $ processor = $ this ->getDelegateProcessor ();
149
- $ consumer = $ this ->getQueueConsumer ();
150
-
151
170
$ boundQueues = [];
152
171
153
- $ routerQueue = $ this ->getDriver ()->createQueue ($ this ->getConfig ()->getRouterQueueName ());
154
- $ consumer -> bind ($ routerQueue , $ processor );
172
+ $ routerQueue = $ this ->getDriver ()->createQueue ($ this ->getDriver ()-> getConfig ()->getRouterQueueName ());
173
+ $ this -> queueConsumer -> bind ($ routerQueue , $ this -> delegateProcessor );
155
174
$ boundQueues [$ routerQueue ->getQueueName ()] = true ;
156
175
157
- foreach ($ this ->getRouteCollection ()->all () as $ route ) {
176
+ foreach ($ this ->driver -> getRouteCollection ()->all () as $ route ) {
158
177
$ queue = $ this ->getDriver ()->createRouteQueue ($ route );
159
178
if (array_key_exists ($ queue ->getQueueName (), $ boundQueues )) {
160
179
continue ;
161
180
}
162
181
163
- $ consumer -> bind ($ queue , $ processor );
182
+ $ this -> queueConsumer -> bind ($ queue , $ this -> delegateProcessor );
164
183
165
184
$ boundQueues [$ queue ->getQueueName ()] = true ;
166
185
}
167
186
168
- $ consumer ->consume ($ runtimeExtension );
169
- }
170
-
171
- public function getContext (): PsrContext
172
- {
173
- return $ this ->container ->get ('enqueue.transport.context ' );
187
+ $ this ->queueConsumer ->consume ($ runtimeExtension );
174
188
}
175
189
176
190
public function getQueueConsumer (): QueueConsumerInterface
177
191
{
178
- return $ this ->container ->get ('enqueue.client.queue_consumer ' );
179
- }
180
-
181
- public function getConfig (): Config
182
- {
183
- return $ this ->container ->get ('enqueue.client.config ' );
192
+ return $ this ->queueConsumer ;
184
193
}
185
194
186
195
public function getDriver (): DriverInterface
187
196
{
188
- return $ this ->container -> get ( ' enqueue.client.default. driver' ) ;
197
+ return $ this ->driver ;
189
198
}
190
199
191
200
public function getProducer (bool $ setupBroker = false ): ProducerInterface
192
201
{
193
202
$ setupBroker && $ this ->setupBroker ();
194
203
195
- return $ this ->container -> get ( ' enqueue.client. producer' ) ;
204
+ return $ this ->producer ;
196
205
}
197
206
198
207
public function setupBroker (): void
199
208
{
200
209
$ this ->getDriver ()->setupBroker ();
201
210
}
202
211
203
- /**
204
- * @return ArrayProcessorRegistry
205
- */
206
- public function getProcessorRegistry (): ProcessorRegistryInterface
212
+ public function build (array $ configs ): void
207
213
{
208
- return $ this -> container -> get ( ' enqueue.client.processor_registry ' );
209
- }
214
+ $ configProcessor = new Processor ( );
215
+ $ simpleClientConfig = $ configProcessor -> process ( $ this -> createConfiguration (), $ configs );
210
216
211
- public function getDelegateProcessor (): DelegateProcessor
212
- {
213
- return $ this ->container ->get ('enqueue.client.delegate_processor ' );
214
- }
217
+ if (isset ($ simpleClientConfig ['transport ' ]['factory_service ' ])) {
218
+ throw new \LogicException ('transport.factory_service option is not supported by simple client ' );
219
+ }
220
+ if (isset ($ simpleClientConfig ['transport ' ]['factory_class ' ])) {
221
+ throw new \LogicException ('transport.factory_class option is not supported by simple client ' );
222
+ }
223
+ if (isset ($ simpleClientConfig ['transport ' ]['connection_factory_class ' ])) {
224
+ throw new \LogicException ('transport.connection_factory_class option is not supported by simple client ' );
225
+ }
215
226
216
- public function getRouterProcessor (): RouterProcessor
217
- {
218
- return $ this ->container ->get ('enqueue.client.router_processor ' );
219
- }
227
+ $ connectionFactoryFactory = new ConnectionFactoryFactory ();
228
+ $ connection = $ connectionFactoryFactory ->create ($ simpleClientConfig ['transport ' ]);
220
229
221
- public function getRouteCollection (): RouteCollection
222
- {
223
- return $ this ->container ->get ('enqueue.client.route_collection ' );
224
- }
230
+ $ clientExtensions = new ClientChainExtensions ([]);
225
231
226
- private function buildContainer ($ config , ContainerBuilder $ container ): ContainerInterface
227
- {
228
- $ extension = new SimpleClientContainerExtension ();
229
- $ container ->registerExtension ($ extension );
230
- $ container ->loadFromExtension ($ extension ->getAlias (), $ config );
232
+ $ config = new Config (
233
+ $ simpleClientConfig ['client ' ]['prefix ' ],
234
+ $ simpleClientConfig ['client ' ]['app_name ' ],
235
+ $ simpleClientConfig ['client ' ]['router_topic ' ],
236
+ $ simpleClientConfig ['client ' ]['router_queue ' ],
237
+ $ simpleClientConfig ['client ' ]['default_processor_queue ' ],
238
+ 'enqueue.client.router_processor ' ,
239
+ $ simpleClientConfig ['transport ' ]
240
+ );
241
+ $ routeCollection = new RouteCollection ([]);
242
+ $ driverFactory = new DriverFactory ($ config , $ routeCollection );
243
+
244
+ $ driver = $ driverFactory ->create (
245
+ $ connection ,
246
+ $ simpleClientConfig ['transport ' ]['dsn ' ],
247
+ $ simpleClientConfig ['transport ' ]
248
+ );
249
+
250
+ $ rpcFactory = new RpcFactory ($ driver ->getContext ());
251
+
252
+ $ producer = new Producer ($ driver , $ rpcFactory , $ clientExtensions );
231
253
232
- $ container -> compile ( );
254
+ $ processorRegistry = new ArrayProcessorRegistry ([] );
233
255
234
- return $ container ;
256
+ $ delegateProcessor = new DelegateProcessor ($ processorRegistry );
257
+
258
+ // consumption extensions
259
+ $ consumptionExtensions = [];
260
+ if ($ simpleClientConfig ['client ' ]['redelivered_delay_time ' ]) {
261
+ $ consumptionExtensions [] = new DelayRedeliveredMessageExtension ($ driver , $ simpleClientConfig ['client ' ]['redelivered_delay_time ' ]);
262
+ }
263
+
264
+ $ consumptionExtensions [] = new SetRouterPropertiesExtension ($ driver );
265
+
266
+ $ consumptionChainExtension = new ConsumptionChainExtension ($ consumptionExtensions );
267
+ $ queueConsumer = new QueueConsumer ($ driver ->getContext (), $ consumptionChainExtension );
268
+
269
+ $ routerProcessor = new RouterProcessor ($ driver );
270
+
271
+ $ processorRegistry ->add ($ config ->getRouterProcessorName (), $ routerProcessor );
272
+
273
+ $ this ->driver = $ driver ;
274
+ $ this ->producer = $ producer ;
275
+ $ this ->queueConsumer = $ queueConsumer ;
276
+ $ this ->delegateProcessor = $ delegateProcessor ;
277
+ $ this ->processorRegistry = $ processorRegistry ;
278
+ }
279
+
280
+ private function createConfiguration (): NodeInterface
281
+ {
282
+ $ tb = new TreeBuilder ();
283
+ $ rootNode = $ tb ->root ('enqueue ' );
284
+
285
+ $ rootNode
286
+ ->beforeNormalization ()
287
+ ->ifEmpty ()->then (function () {
288
+ return ['transport ' => ['dsn ' => 'null: ' ]];
289
+ });
290
+
291
+ $ transportNode = $ rootNode ->children ()->arrayNode ('transport ' );
292
+ (new TransportFactory ('default ' ))->addConfiguration ($ transportNode );
293
+
294
+ $ rootNode ->children ()
295
+ ->arrayNode ('client ' )
296
+ ->addDefaultsIfNotSet ()
297
+ ->children ()
298
+ ->scalarNode ('prefix ' )->defaultValue ('enqueue ' )->end ()
299
+ ->scalarNode ('app_name ' )->defaultValue ('app ' )->end ()
300
+ ->scalarNode ('router_topic ' )->defaultValue (Config::DEFAULT_PROCESSOR_QUEUE_NAME )->cannotBeEmpty ()->end ()
301
+ ->scalarNode ('router_queue ' )->defaultValue (Config::DEFAULT_PROCESSOR_QUEUE_NAME )->cannotBeEmpty ()->end ()
302
+ ->scalarNode ('default_processor_queue ' )->defaultValue (Config::DEFAULT_PROCESSOR_QUEUE_NAME )->cannotBeEmpty ()->end ()
303
+ ->integerNode ('redelivered_delay_time ' )->min (0 )->defaultValue (0 )->end ()
304
+ ->end ()
305
+ ->end ()
306
+ ->arrayNode ('extensions ' )->addDefaultsIfNotSet ()->children ()
307
+ ->booleanNode ('signal_extension ' )->defaultValue (function_exists ('pcntl_signal_dispatch ' ))->end ()
308
+ ->end ()->end ()
309
+ ;
310
+
311
+ return $ tb ->buildTree ();
235
312
}
236
313
}
0 commit comments