|
19 | 19 | use Enqueue\Dbal\Symfony\DbalTransportFactory;
|
20 | 20 | use Enqueue\Fs\Symfony\FsTransportFactory;
|
21 | 21 | use Enqueue\Psr\PsrContext;
|
| 22 | +use Enqueue\Psr\PsrProcessor; |
22 | 23 | use Enqueue\Redis\Symfony\RedisTransportFactory;
|
23 | 24 | use Enqueue\Sqs\Symfony\SqsTransportFactory;
|
24 | 25 | use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory;
|
@@ -80,17 +81,25 @@ public function __construct($config)
|
80 | 81 | }
|
81 | 82 |
|
82 | 83 | /**
|
83 |
| - * @param string $topic |
84 |
| - * @param string $processorName |
85 |
| - * @param callback $processor |
| 84 | + * @param string $topic |
| 85 | + * @param string $processorName |
| 86 | + * @param callable|PsrProcessor $processor |
86 | 87 | */
|
87 |
| - public function bind($topic, $processorName, callable $processor) |
| 88 | + public function bind($topic, $processorName, $processor) |
88 | 89 | {
|
| 90 | + if (is_callable($processor)) { |
| 91 | + $processor = new CallbackProcessor($processor); |
| 92 | + } |
| 93 | + |
| 94 | + if (false == $processor instanceof PsrProcessor) { |
| 95 | + throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); |
| 96 | + } |
| 97 | + |
89 | 98 | $queueName = $this->getConfig()->getDefaultProcessorQueueName();
|
90 | 99 |
|
91 | 100 | $this->getTopicMetaRegistry()->addProcessor($topic, $processorName);
|
92 | 101 | $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName);
|
93 |
| - $this->getProcessorRegistry()->add($processorName, new CallbackProcessor($processor)); |
| 102 | + $this->getProcessorRegistry()->add($processorName, $processor); |
94 | 103 | $this->getRouterProcessor()->add($topic, $queueName, $processorName);
|
95 | 104 | }
|
96 | 105 |
|
|
0 commit comments