MultiKafka is a powerful Spring Boot starter designed to simplify the configuration and management of multiple Kafka consumers and producers within a single application. It provides a structured, hierarchical configuration approach that allows for global defaults while still permitting fine-grained control over individual Kafka objects.
- Multiple Consumers & Producers: Easily define and register multiple Kafka consumers and producers via properties.
- Hierarchical Configuration: Three-level property merging (Root > Type Default > Specific Object).
- Automated Bean Registration: Programmatically creates and registers
KafkaTemplate,ProducerFactory,ConsumerFactory, andConcurrentKafkaListenerContainerFactorybeans. - Structured Properties: Full IDE code completion for standard Kafka properties in
application.yml. - Dead Letter Topic (DLT) Support: Simplified configuration for DLT producers and consumers.
- Retry & Listener Configuration: Easy setup for retry policies and listener container properties (concurrency, ack mode, etc.).
- Custom Bean Naming: Explicitly name your registered Kafka beans to avoid naming collisions and improve clarity.
Add the dependency to your pom.xml:
<dependency>
<groupId>nl.cjdev.multikafka</groupId>
<artifactId>MultiKafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>Define your consumers and producers in application.yml:
multi:
kafka:
# Root level: applied to all objects
kafka:
bootstrap-servers: localhost:9092
consumers:
order-consumer:
topics: ["orders"]
groupId: "order-service-group"
kafka:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producers:
notification-producer:
kafka:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializerMultiKafka uses a hierarchical approach to merge properties, allowing you to avoid duplication:
- Root Level (
multi.kafka.*): Global settings for all consumers and producers. - Type Defaults (
multi.kafka.consumerDefaults.*ormulti.kafka.producerDefaults.*): Default settings for all consumers OR all producers. - Specific Level (
multi.kafka.consumers.<name>.*ormulti.kafka.producers.<name>.*): Settings for a specific named instance.
Example of hierarchy:
multi:
kafka:
# Root level
consumer:
bootstrap-servers: localhost:9091
consumerDefaults:
kafka:
auto-offset-reset: earliest
consumers:
special-consumer:
# Overrides root level bootstrap-servers
kafka:
bootstrap-servers: localhost:9092
groupId: "special-group"MultiKafka provides structured properties for both consumers and producers. These include:
bootstrap-serversclient-idkey-serializer/key-deserializervalue-serializer/value-deserializeracksretriesmax-poll-recordsenable-auto-commitsecurity-protocolsasl-mechanism- ... and many more.
Configure the ConcurrentKafkaListenerContainerFactory:
concurrency: Number of threads (default: null, uses Spring Kafka default).ack-mode: Spring KafkaAckMode.type:singleorbatch.missing-topics-fatal: Boolean.
max-attempts: Maximum number of delivery attempts.initial-interval: Initial interval between retries.multiplier: Backoff multiplier.max-interval: Maximum backoff interval.
Enable DLT support for a consumer or producer:
multi:
kafka:
dlt:
suffix: ".dead-letter" # Custom suffix for DLT topics
consumers:
my-consumer:
dlt-enabled: trueFor each entry in consumers or producers, MultiKafka registers beans with the following naming pattern:
- Producers:
<name>ProducerFactory<name>KafkaTemplate
- Consumers:
<name>ConsumerFactory<name>KafkaListenerContainerFactory
If you want to use a different base name for the beans, use the beanName property:
multi:
kafka:
producers:
my-raw-producer-key:
beanName: "externalService"This will result in beans named externalServiceProducerFactory and externalServiceKafkaTemplate.
To use a registered KafkaTemplate, simply inject it using @Qualifier:
@Service
public class MyService {
private final KafkaTemplate<String, Object> kafkaTemplate;
public MyService(@Qualifier("notification-producerKafkaTemplate") KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
}To use a registered KafkaListenerContainerFactory in a @KafkaListener:
@KafkaListener(topics = "orders", containerFactory = "order-consumerKafkaListenerContainerFactory")
public void listen(String message) {
// ...
}MultiKafka validates required properties at startup:
bootstrap-serversis required for both consumers and producers.group-idis required for consumers.
If a required property is missing, MultiKafka will log a helpful error message indicating exactly where to add it and will prevent the application from starting.