|
4 | 4 |
|
5 | 5 | use Enqueue\Client\ChainExtension;
|
6 | 6 | use Enqueue\Client\Config;
|
| 7 | +use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; |
| 8 | +use Enqueue\Client\ConsumptionExtension\ExclusiveCommandExtension; |
| 9 | +use Enqueue\Client\ConsumptionExtension\FlushSpoolProducerExtension; |
7 | 10 | use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension;
|
8 | 11 | use Enqueue\Client\DelegateProcessor;
|
9 | 12 | use Enqueue\Client\DriverFactory;
|
|
13 | 16 | use Enqueue\Client\RouteCollection;
|
14 | 17 | use Enqueue\Client\RouterProcessor;
|
15 | 18 | use Enqueue\Client\SpoolProducer;
|
| 19 | +use Enqueue\Client\TraceableProducer; |
16 | 20 | use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension;
|
17 | 21 | use Enqueue\Consumption\QueueConsumer;
|
18 | 22 | use Enqueue\Rpc\RpcFactory;
|
@@ -54,7 +58,21 @@ public function build(ContainerBuilder $container, array $config): void
|
54 | 58 | ->addArgument($this->reference('route_collection'))
|
55 | 59 | ;
|
56 | 60 |
|
57 |
| - $container->register($this->format('config'), Config::class); |
| 61 | + $container->register($this->format('config'), Config::class) |
| 62 | + ->setArguments([ |
| 63 | + $config['prefix'], |
| 64 | + $config['app_name'], |
| 65 | + $config['router_topic'], |
| 66 | + $config['router_queue'], |
| 67 | + $config['default_processor_queue'], |
| 68 | + $config['router_processor'], |
| 69 | + // @todo should be driver options. |
| 70 | + $config['transport'], |
| 71 | + ]); |
| 72 | + |
| 73 | + $container->setParameter($this->format('router_processor'), $config['router_processor']); |
| 74 | + $container->setParameter($this->format('router_queue_name'), $config['router_queue']); |
| 75 | + $container->setParameter($this->format('default_queue_name'), $config['default_processor_queue']); |
58 | 76 |
|
59 | 77 | $container->register($this->format('route_collection'), RouteCollection::class)
|
60 | 78 | ->addArgument([])
|
@@ -98,31 +116,59 @@ public function build(ContainerBuilder $container, array $config): void
|
98 | 116 | ->addArgument($this->reference('context'))
|
99 | 117 | ->addArgument($this->reference('consumption_extensions'))
|
100 | 118 | ->addArgument([])
|
101 |
| - ->addArgument(null) |
102 |
| - ->addArgument(null) |
103 |
| - ->addArgument(null) |
| 119 | + ->addArgument($this->reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE)) |
| 120 | + ->addArgument($config['consumption']['receive_timeout']) |
104 | 121 | ;
|
105 | 122 |
|
106 |
| - $container->register($this->format('queue_consumer'), QueueConsumer::class) |
107 |
| - ->addArgument($this->reference('context')) |
108 |
| - ->addArgument($this->reference('consumption_extensions')) |
| 123 | + $container->register($this->format('consumption_extensions'), ConsumptionChainExtension::class) |
109 | 124 | ->addArgument([])
|
110 |
| - ->addArgument(new Reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE)) |
111 | 125 | ;
|
112 | 126 |
|
113 |
| - $container->register($this->format('consumption_extensions'), ConsumptionChainExtension::class) |
114 |
| - ->addArgument([]) |
| 127 | + $container->register($this->format('flush_spool_producer_extension'), FlushSpoolProducerExtension::class) |
| 128 | + ->addArgument($this->reference('spool_producer')) |
| 129 | + ->addTag('enqueue.consumption.extension', ['priority' => -100, 'client' => $this->name]) |
115 | 130 | ;
|
116 | 131 |
|
117 |
| - if ('default' === $this->name) { |
118 |
| - $container->setAlias(ProducerInterface::class, $this->format('producer')) |
119 |
| - ->setPublic(true) |
| 132 | + $container->register($this->format('exclusive_command_extension'), ExclusiveCommandExtension::class) |
| 133 | + ->addArgument($this->reference('driver')) |
| 134 | + ->addTag('enqueue.consumption.extension', ['priority' => 100, 'client' => $this->name]) |
| 135 | + ; |
| 136 | + |
| 137 | + if ($config['traceable_producer']) { |
| 138 | + $container->register($this->format('traceable_producer'), TraceableProducer::class) |
| 139 | + ->setDecoratedService($this->format('producer')) |
| 140 | + ->addArgument($this->reference('traceable_producer.inner')) |
| 141 | + ; |
| 142 | + } |
| 143 | + |
| 144 | + if ($config['redelivered_delay_time']) { |
| 145 | + $container->register($this->format('delay_redelivered_message_extension'), DelayRedeliveredMessageExtension::class) |
| 146 | + ->addArgument($this->reference('driver')) |
| 147 | + ->addArgument($config['redelivered_delay_time']) |
| 148 | + ->addTag('enqueue.consumption_extension', ['priority' => 10, 'client' => $this->name]) |
120 | 149 | ;
|
121 | 150 |
|
122 |
| - $container->setAlias(SpoolProducer::class, $this->format('spool_producer')) |
123 |
| - ->setPublic(true) |
| 151 | + $container->getDefinition('enqueue.client.default.delay_redelivered_message_extension') |
| 152 | + ->replaceArgument(1, $config['redelivered_delay_time']) |
124 | 153 | ;
|
125 | 154 | }
|
| 155 | + |
| 156 | + $locatorId = 'enqueue.locator'; |
| 157 | + if ($container->hasDefinition($locatorId)) { |
| 158 | + $locator = $container->getDefinition($locatorId); |
| 159 | + $locator->replaceArgument(0, array_replace($locator->getArgument(0), [ |
| 160 | + $this->format('queue_consumer') => $this->reference('queue_consumer'), |
| 161 | + $this->format('driver') => $this->reference('driver'), |
| 162 | + $this->format('delegate_processor') => $this->reference('delegate_processor'), |
| 163 | + $this->format('producer') => $this->reference('producer'), |
| 164 | + ])); |
| 165 | + } |
| 166 | + |
| 167 | + if ('default' === $this->name) { |
| 168 | + $container->setAlias(ProducerInterface::class, $this->format('producer')); |
| 169 | + |
| 170 | + $container->setAlias(SpoolProducer::class, $this->format('spool_producer')); |
| 171 | + } |
126 | 172 | }
|
127 | 173 |
|
128 | 174 | public function createDriver(ContainerBuilder $container, array $config): string
|
|
0 commit comments