Skip to content

Update testing docs for 4.0.0 with the updates in EmbeddedKafka #3822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ However, the quickest way to get started is to use https://start.spring.io[start

This quick tour works with the following versions:

* Apache Kafka Clients 3.7.x
* Spring Framework 6.1.x
* Apache Kafka Clients 4.0.x
* Spring Framework 7.0.0
* Minimum Java version: 17

[[getting-started]]
Expand Down
62 changes: 19 additions & 43 deletions spring-kafka-docs/src/main/antora/modules/ROOT/pages/testing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ The `spring-kafka-test` jar contains some useful utilities to assist with testin
[[ekb]]
== Embedded Kafka Broker

Two implementations are provided:
Since Kafka 4.0 has fully transitioned to KRaft mode, only the `EmbeddedKafkaKraftBroker` implementation is now available:

* `EmbeddedKafkaZKBroker` - legacy implementation which starts an embedded `Zookeeper` instance (which is still the default when using `EmbeddedKafka`).
* `EmbeddedKafkaKraftBroker` - uses `Kraft` instead of `Zookeeper` in combined controller and broker modes (since 3.1).
* `EmbeddedKafkaKraftBroker` - uses `Kraft` in combined controller and broker modes.

There are several techniques to configure the broker as discussed in the following sections.

Expand All @@ -22,7 +21,7 @@ Refer to its javadoc:org.springframework.kafka.test.utils.KafkaTestUtils[Javadoc
[[junit]]
== JUnit

`org.springframework.kafka.test.utils.KafkaTestUtils` also provides some static methods to set up producer and consumer properties.
`org.springframework.kafka.test.utils.KafkaTestUtils` provides some static methods to set up producer and consumer properties.
The following listing shows those method signatures:

[source, java]
Expand Down Expand Up @@ -57,32 +56,8 @@ If this is not possible for some reason, note that the `consumeFromEmbeddedTopic
Since it does not have access to the consumer properties, you must use the overloaded method that takes a `seekToEnd` boolean parameter to seek to the end instead of the beginning.
====

A JUnit 4 `@Rule` wrapper for the `EmbeddedKafkaZKBroker` is provided to create an embedded Kafka and an embedded Zookeeper server.
(See xref:testing.adoc#embedded-kafka-annotation[@EmbeddedKafka Annotation] for information about using `@EmbeddedKafka` with JUnit 5).
The following listing shows the signatures of those methods:

[source, java]
----
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }

/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
----

NOTE: The `EmbeddedKafkaKraftBroker` is not supported with JUnit4.
NOTE: The `EmbeddedKafkaRule` JUnit 4 rule has been removed in version 4.0.
For JUnit 4, you should use the `EmbeddedKafkaKraftBroker` directly or migrate to JUnit 5 with the `@EmbeddedKafka` annotation.

The `EmbeddedKafkaBroker` class has a utility method that lets you consume for all the topics it created.
The following example shows how to use it:
Expand Down Expand Up @@ -127,8 +102,8 @@ ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consum
...
----

When the embedded Kafka and embedded Zookeeper server are started by the `EmbeddedKafkaBroker`, a system property named `spring.embedded.kafka.brokers` is set to the address of the Kafka brokers and a system property named `spring.embedded.zookeeper.connect` is set to the address of Zookeeper.
Convenient constants (`EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS` and `EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT`) are provided for this property.
When the embedded Kafka broker is started by the `EmbeddedKafkaBroker`, a system property named `spring.embedded.kafka.brokers` is set to the address of the Kafka brokers.
Convenient constants (`EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS`) are provided for this property.

Instead of default `spring.embedded.kafka.brokers` system property, the address of the Kafka brokers can be exposed to any arbitrary and convenient property.
For this purpose a `spring.embedded.kafka.brokers.property` (`EmbeddedKafkaBroker.BROKER_LIST_PROPERTY`) system property can be set before starting an embedded Kafka.
Expand Down Expand Up @@ -227,8 +202,7 @@ In addition, these properties can be provided:
- `spring.kafka.embedded.ports` - ports (comma-separated value) for every Kafka broker to start, `0` if random port is preferred; the number of values must be equal to the `count` mentioned above;
- `spring.kafka.embedded.topics` - topics (comma-separated value) to create in the started Kafka cluster;
- `spring.kafka.embedded.partitions` - number of partitions to provision for the created topics;
- `spring.kafka.embedded.broker.properties.location` - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern;
- `spring.kafka.embedded.kraft` - default false, when true, use an `EmbeddedKafkaKraftBroker` instead of an `EmbeddedKafkaZKBroker`.
- `spring.kafka.embedded.broker.properties.location` - the location of the file for additional Kafka broker configuration properties; the value of this property must follow the Spring resource abstraction pattern.

Essentially these properties mimic some of the `@EmbeddedKafka` attributes.

Expand All @@ -244,7 +218,7 @@ If you wish to use the embedded broker and are NOT using JUnit, you may wish to

[[embedded-kafka-annotation]]
== `@EmbeddedKafka` Annotation
We generally recommend that you use the rule as a `@ClassRule` to avoid starting and stopping the broker between tests (and use a different topic for each test).
We generally recommend that you use a single broker instance to avoid starting and stopping the broker between tests (and use a different topic for each test).
Starting with version 2.0, if you use Spring's test application context caching, you can also declare a `EmbeddedKafkaBroker` bean, so a single broker can be used across multiple test classes.
For convenience, we provide a test class-level annotation called `@EmbeddedKafka` to register the `EmbeddedKafkaBroker` bean.
The following example shows how to use it:
Expand Down Expand Up @@ -295,7 +269,7 @@ public class KafkaStreamsTests {

Starting with version 2.2.4, you can also use the `@EmbeddedKafka` annotation to specify the Kafka ports property.

Starting with version 3.2, set the `kraft` property to `true` to use an `EmbeddedKafkaKraftBroker` instead of an `EmbeddedKafkaZKBroker`.
NOTE: As of version 4.0, all ZooKeeper-related properties have been removed from the `@EmbeddedKafka` annotation since Kafka 4.0 uses KRaft exclusively.

The following example sets the `topics`, `brokerProperties`, and `brokerPropertiesLocation` attributes of `@EmbeddedKafka` support property placeholder resolutions:

Expand Down Expand Up @@ -550,17 +524,20 @@ The following example brings together most of the topics covered in this chapter

[source, java]
----
@EmbeddedKafka(topics = KafkaTemplateTests.TEMPLATE_TOPIC)
public class KafkaTemplateTests {

private static final String TEMPLATE_TOPIC = "templateTopic";
public static final String TEMPLATE_TOPIC = "templateTopic";

@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@BeforeAll
public static void setUp() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}

@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
Expand All @@ -579,9 +556,9 @@ public class KafkaTemplateTests {
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
embeddedKafka.getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
Expand All @@ -599,7 +576,6 @@ public class KafkaTemplateTests {
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}

}
----

Expand Down