This bundle aims to provide a simple Kafka transport for Symfony Messenger.
Open a command console, enter your project directory and execute:
$ composer require koco/messenger-kafkaOpen a command console, enter your project directory and execute the following command to download the latest stable version of this bundle:
$ composer require koco/messenger-kafkaThis command requires you to have Composer installed globally, as explained in the installation chapter of the Composer documentation.
Then, enable the bundle by adding it to the list of registered bundles
in the config/bundles.php file of your project:
// config/bundles.php
return [
// ...
Koco\Kafka\KocoKafkaBundle::class => ['all' => true],
];Specify a DSN starting with either kafka:// or kafka+ssl://. There can be multiple brokers separated by ,
kafka://my-local-kafka:9092kafka+ssl://my-staging-kafka:9093kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-01:9093,kafka+ssl://prod-kafka-01:9093
The configuration options for kafka_conf and topic_conf can be found here.
It is highly recommended to set enable.auto.offset.store to false for consumers. Otherwise every message is acknowledged, regardless of any error thrown by the message handlers.
framework:
messenger:
transports:
producer:
dsn: '%env(KAFKA_URL)%'
options:
topic:
name: 'events'
kafka_conf:
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
consumer:
dsn: '%env(KAFKA_URL)%'
options:
commitAsync: true
receiveTimeout: 10000
topic:
name: "events"
kafka_conf:
enable.auto.offset.store: 'false'
group.id: 'my-group-id' # should be unique per consumer
security.protocol: 'sasl_ssl'
ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
sasl.username: '%env(KAFKA_SASL_USERNAME)%'
sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
sasl.mechanisms: 'SCRAM-SHA-256'
max.poll.interval.ms: '45000'
topic_conf:
auto.offset.reset: 'smallest'