Skip to content

[Symfony] sendCommand / sendEvent for delayed message have different behaviour #523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
amenophis opened this issue Sep 6, 2018 · 10 comments
Milestone

Comments

@amenophis
Copy link

Hi,
we are trying to use RabbitMqDlxDelayStrategy with the rabbitmq_amqp transport in a symfony project. Our config looks like that:

enqueue:
    transport:
        default:
            alias: rabbitmq_amqp
        rabbitmq_amqp:
            driver: bunny
            host: '%env(RABBITMQ_HOST)%'
            user: '%env(RABBITMQ_USER)%'
            pass: '%env(RABBITMQ_PASSWORD)%'
            delay_strategy: dlx
    client:
        prefix: company
        app_name: webhook
        router_queue: router # Used id queue name (${prefix}.{app_name}.{router_queue})
        router_topic: router # Used id exchange name (${prefix}.{router_topic})

With sendEvent, the delayed message is pushed in enqueue.company.webhook.job_execute.120000.delayed for 120s, and then delivered to the company.webhook.job_execute queue.

With sendCommand, the delayed message is pushed in enqueue.company.webhook.router.120000.delayed for 120s. After the 120s it is sent to enqueue.company.webhook.job_execute.120000.delayed for 120s, and then delivered to the company.webhook.job_execute queue.

In both cases, we use the autowired ProducerInterface to sendEvent/sendCommand.

We are waiting for the same behaviour with Event or Command (The actual Event behaviour)

Is it a normal behaviour ? Is it a bug ?
Thanks

@makasim
Copy link
Member

makasim commented Sep 7, 2018

@amenophis the message is being delayed twice right? I think that a router should reset any priority, delaying, and expiration while sending the message to its final destination.

@makasim makasim added the bug label Sep 7, 2018
@amenophis
Copy link
Author

@makasim Yes, the message is delayed first for 120s in the router.delayed queue, and then a second time for 120s in the job_execute.delayed queue.

This behaviour appear only while sending command, not event.

@amenophis
Copy link
Author

I don't understand why the router.delayed queue is created, what's it's purpose ?

@makasim
Copy link
Member

makasim commented Sep 7, 2018

rabbitmq does not support message delaying out of the box. We have to two options: a workaround based on dead letter queue and time to live features and the plugin.

You are using a kind of "work around" which needs a queue to be created

@amenophis
Copy link
Author

I understand how the delayed feature work, but i don't understand why 2 queues are needed with Command and just 1 queue with Event

@makasim
Copy link
Member

makasim commented Sep 7, 2018

Could you post a code where you register an event and command processors?

I guess it should the other way around.

@amenophis
Copy link
Author

@makasim You can see code below.

This is the Processors

<?php

namespace AppBundle\Async\Processor;

use AppBundle\Entity\WebhookJob;
use AppBundle\WebhookJobRunner;
use Enqueue\Client\CommandSubscriberInterface;
use Enqueue\Client\TopicSubscriberInterface;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProcessor;

class WebhookExecuteProcessor implements PsrProcessor, TopicSubscriberInterface
{
    public static function getSubscribedTopics()
    {
        return [
            'webhook.job_execute' => ['queueName' => 'job_execute']
        ];
    }

    protected $runner;

    public function __construct(WebhookJobRunner $runner)
    {
        $this->runner = $runner;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $this->runner->run($message->getBody());

        return self::ACK;
    }
}

class WebhookExecuteProcessor2 implements PsrProcessor, CommandSubscriberInterface
{
    public static function getSubscribedCommand()
    {
        return [
            'processorName' => 'webhook.job_execute',
            'queueName' => 'job_execute',
        ];
    }

    protected $runner;

    public function __construct(WebhookJobRunner $runner)
    {
        $this->runner = $runner;
    }

    public function process(PsrMessage $message, PsrContext $context)
    {
        $this->runner->run($message->getBody());

        return self::ACK;
    }
}

And this is the runner used in the Processors

<?php

namespace AppBundle;

use Enqueue\Client\Message;
use Enqueue\Client\ProducerInterface;

class WebhookJobRunner
{
    protected $producer;

    public function __construct(ProducerInterface $producer)
    {
        $this->producer = $producer;
    }

    public function run(string $jobId): void
    {
        // ...
        $message = new Message($jobId);
        $message->setDelay(120);
        
        $this->producer->sendEvent('webhook_job.execute', $message);
        $this->producer->sendCommand('webhook_job.execute', $message);
    }
}

@amenophis
Copy link
Author

@makasim any updates on this issue ?

makasim added a commit to php-enqueue/enqueue that referenced this issue Sep 27, 2018
@amenophis
Copy link
Author

Thanks @makasim. When do you plan the next release ?

@makasim
Copy link
Member

makasim commented Oct 11, 2018

I was almost ready to release a stable version (3-5 days left). Now I am involved in a project development. Not sure when it happens.

ASKozienko pushed a commit that referenced this issue Nov 2, 2018
@makasim makasim added the 0.9 label Nov 27, 2018
@makasim makasim added this to the 0.9 milestone Nov 27, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants