Skip to content

Kafka symfony transport #432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 1, 2018
Merged

Kafka symfony transport #432

merged 16 commits into from
May 1, 2018

Conversation

dheineman
Copy link
Contributor

Adds the TransportFactory and Driver required to use kafka in the Symfony bundle

public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$logger->debug('[RdKafkaDriver] setup broker');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is going to be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will, keep it.

->end()
->arrayNode('global')
->children()
->scalarNode('metadata.broker.list')->end()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symfony does not allow dots in configuration nodes but the librdkafka configuration keys do contain them, need to figure out how to maybe normalize them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can define a variable node (which allows everything to be put in it).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just do you not use underscores?

Copy link
Contributor Author

@dheineman dheineman Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as i can tell even a variable node requires a name and will throw an exception when it contains any dots.

I can just remove the metadata.broker.list scalarNode and just leave the global ArrayNode. Any suggestions?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default you can configure the broker over the config in this way:

enqueue:
    transport:
        default: 'kafka://localhost:9092'

So I think it is a good solution to provide a alternative config way like this:

enqueue:
    transport:
        kafka:
            metadata_broker_list: localhost:9092
            ...

The easier way is to use ->scalarNode('metadata_broker_list')->end()

Copy link
Member

@makasim makasim Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest moving all the broker options to a dedicated configuration option. The option should be a variable node (no validation) like this one for example

Developers are free to put there whatever the broker supports.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution I found is to add a parser in the RdKafkaConnectionFactory class like the parseDsn method and translate the config properties where the dot is needed.

    public function __construct($config = 'kafka:')
    {
        if (empty($config) || 'kafka:' === $config) {
            $config = [];
        } elseif (is_string($config)) {
            $config = $this->parseDsn($config);
        } elseif (is_array($config)) {
            $config = $this->parseDots($config); // or some other naming ;)
        } else {
            throw new \LogicException('The config must be either an array of options, a DSN string or null');
        }

        $this->config = array_replace($this->defaultConfig(), $config);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution I found is to add a parser in the RdKafkaConnectionFactory class like the parseDsn method and translate the config properties where the dot is needed.

I am not in favor of it. If you use factory in plain php you can pass doted options. Symfony does not allow it so it has to be fixed there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is not the configuration to validate the better option?
I think it is a valid choice for a library like this that supports many different libraries to use a self defined config format and map it to the specific library formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the variable node for now as parsing or normalizing the configuration nodes would require maintaining a map of all rdkafka configuration properties (as it consists of keys with both dots and underscores). And i am not particularly in favour of this.

*/
public function addConfiguration(ArrayNodeDefinition $builder)
{
$builder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire configuration needs more work/checking

@makasim
Copy link
Member

makasim commented Apr 27, 2018

It looks good to me, would be happy to merge it.

$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

if ($contentType = $message->getHeader('content_type')) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not have to be checked.
The default of the field $contentType is null and the default return value of $message->getHeader('content_type') is also null if the header content_type not set.

->end()
->arrayNode('global')
->children()
->scalarNode('metadata.broker.list')->end()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just do you not use underscores?

@dergriewatz
Copy link

Does not the transport factory have to be set in the EnqueueBundle class?

if (class_exists(RdKafkaConnectionFactory::class)) {
    $extension->setTransportFactory(new RdKafkaTransportFactory('kafka'));
} else {
    $extension->setTransportFactory(new MissingTransportFactory('kafka', ['enqueue/rdkafka']));
}

@dheineman dheineman changed the title [WIP] Kafka symfony transport Kafka symfony transport Apr 30, 2018
@dheineman
Copy link
Contributor Author

@makasim This should be done

@makasim makasim merged commit 6444ee7 into php-enqueue:master May 1, 2018
@makasim
Copy link
Member

makasim commented May 1, 2018

@dheineman Thank you very much

@makasim
Copy link
Member

makasim commented May 1, 2018

Tagged 0.8.27 with this feature included.

@dheineman dheineman deleted the kafka-symfony-transport branch May 1, 2018 18:47
ASKozienko pushed a commit that referenced this pull request Nov 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants