Skip to content

cosmetic doc improvements part 4 #2879

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[[native-images]]
= Native Images

https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aot[Spring AOT] native hints are provided to assist in developing native images for Spring applications that use Spring for Apache Kafka, including hints for AVRO generated classes used in `@KafkaListener` s.
https://docs.spring.io/spring-framework/docs/current/reference/html/core.html#aot[Spring AOT] native hints are provided to assist in developing native images for Spring applications that use Spring for Apache Kafka, including hints for AVRO generated classes used in `@KafkaListener`+++s+++.

IMPORTANT: `spring-kafka-test` (and, specifically, its `EmbeddedKafkaBroker`) is not supported in native images.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc

Using transactions enables Exactly Once Semantics (EOS).

This means that, for a `read+++->+++process+++->+++write` sequence, it is guaranteed that the **sequence** is completed exactly once.
This means that, for a `read -> process -> write` sequence, it is guaranteed that the **sequence** is completed exactly once.
(The read and process have at least once semantics).

Spring for Apache Kafka version 3.0 and later only supports `EOSMode.V2`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnly

This will exclude all headers beginning with `abc` and include all others.

By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the class path.
By default, the `DefaultKafkaHeaderMapper` is used in the `MessagingMessageConverter` and `BatchMessagingMessageConverter`, as long as Jackson is on the classpath.

With the batch converter, the converted headers are available in the `KafkaHeaders.BATCH_CONVERTED_HEADERS` as a `List<Map<String, Object>>` where the map in a position of the list corresponds to the data position in the payload.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[[monitoring-listener-performance]]
== Monitoring Listener Performance

Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
Starting with version 2.3, the listener container will automatically create and update Micrometer `Timer`+++s+++ for the listener, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the `ContainerProperty`+++'+++s `micrometerEnabled` to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.
Expand All @@ -24,7 +24,7 @@ NOTE: With the concurrent container, timers are created for each thread and the
[[monitoring-kafkatemplate-performance]]
== Monitoring KafkaTemplate Performance

Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the class path, and a single `MeterRegistry` is present in the application context.
Starting with version 2.5, the template will automatically create and update Micrometer `Timer`+++s for send operations, if `Micrometer` is detected on the classpath, and a single `MeterRegistry` is present in the application context.
The timers can be disabled by setting the template's `micrometerEnabled` property to `false`.

Two timers are maintained - one for successful calls to the listener and one for failures.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public ProducerFactory<String, Thing> kafkaProducerFactory(JsonSerializer custom
Setters are also provided, as an alternative to using these constructors.
====

Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean `useHeadersIfPresent` parameter (which is `true` by default).
Starting with version 2.2, you can explicitly configure the deserializer to use the supplied target type and ignore type information in headers by using one of the overloaded constructors that have a boolean `useHeadersIfPresent` argument (which is `true` by default).
The following example shows how to do so:

[source, java]
Expand Down Expand Up @@ -509,7 +509,7 @@ Accessor methods will be used to lookup the property name as field in the receiv
The `@JsonPath` expression allows customization of the value lookup, and even to define multiple JSON Path expressions, to look up values from multiple places until an expression returns an actual value.

To enable this feature, use a `ProjectingMessageConverter` configured with an appropriate delegate converter (used for outbound conversion and converting non-projection interfaces).
You must also add `spring-data:spring-data-commons` and `com.jayway.jsonpath:json-path` to the class path.
You must also add `spring-data:spring-data-commons` and `com.jayway.jsonpath:json-path` to the classpath.

When used as the parameter to a `@KafkaListener` method, the interface type is automatically passed to the converter as normal.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
= Quick Tour

Prerequisites: You must install and run Apache Kafka.
Then you must put the Spring for Apache Kafka (`spring-kafka`) JAR and all of its dependencies on your class path.
Then you must put the Spring for Apache Kafka (`spring-kafka`) JAR and all of its dependencies on your classpath.
The easiest way to do that is to declare a dependency in your build tool.

If you are not using Spring Boot, declare the `spring-kafka` jar as a dependency in your project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To access blocking and non-blocking delivery attempts, add these headers to your
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) Integer nonBlockingAttempts
----

Blocking delivery attempts are only provided if you set `ContainerProperties` <<deliveryAttemptHeader>> to `true`.
Blocking delivery attempts are only provided if you set `ContainerProperties`+++'+++s xref:kafka/container-props.adoc#deliveryAttemptHeader[deliveryAttemptHeader] to `true`.

Note that the non blocking attempts will be `null` for the initial delivery.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[[dlt-strategies]]
= Dlt Strategies
= DLT Strategies

The framework provides a few strategies for working with DLTs.
You can provide a method for DLT processing, use the default logging method, or have no DLT at all.
Also you can choose what happens if DLT processing fails.

[[dlt-processing-method]]
== Dlt Processing Method
== DLT Processing Method

You can specify the method used to process the DLT for the topic, as well as the behavior if that processing fails.

Expand All @@ -18,16 +18,16 @@ Note that the same method will be used for all the `@RetryableTopic` annotated m
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}

@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
// ... message processing, persistence, etc
}
----

The DLT handler method can also be provided through the RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) method, passing as arguments the bean name and method name that should process the DLT's messages.
The DLT handler method can also be provided through the `RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)` method, passing as arguments the bean name and method name that should process the DLT's messages.

[source, java]
----
Expand All @@ -49,12 +49,12 @@ public class MyCustomDltProcessor {
}

public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
// ... message processing, persistence, etc
}
}
----

NOTE: If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used.
NOTE: If no DLT handler is provided, the default `RetryTopicConfigurer.LoggingDltListenerHandlerMethod` is used.

Starting with version 2.8, if you don't want to consume from the DLT in this application at all, including by the default handler (or you wish to defer consumption), you can control whether or not the DLT container starts, independent of the container factory's `autoStartup` property.

Expand All @@ -77,7 +77,7 @@ In the latter the consumer ends the execution without forwarding the message.
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----

Expand All @@ -96,7 +96,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
NOTE: The default behavior is to `ALWAYS_RETRY_ON_ERROR`.

IMPORTANT: Starting with version 2.8.3, `ALWAYS_RETRY_ON_ERROR` will NOT route a record back to the DLT if the record causes a fatal exception to be thrown,
such as a `DeserializationException` because, generally, such exceptions will always be thrown.
such as a `DeserializationException`, because, generally, such exceptions will always be thrown.

Exceptions that are considered fatal are:

Expand Down Expand Up @@ -125,7 +125,7 @@ In this case after retrials are exhausted the processing simply ends.
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ It includes:
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----

Expand All @@ -33,7 +33,7 @@ public void processMessage(MyPojo message) {
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.fixedBackoff(3_000)
.maxAttempts(4)
.create(template);
}
Expand All @@ -53,25 +53,25 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
}
----

NOTE: The default backoff policy is `FixedBackOffPolicy` with a maximum of 3 attempts and 1000ms intervals.
NOTE: The default back off policy is `FixedBackOffPolicy` with a maximum of 3 attempts and 1000ms intervals.

NOTE: There is a 30-second default maximum delay for the `ExponentialBackOffPolicy`.
If your back off policy requires delays with values bigger than that, adjust the maxDelay property accordingly.
If your back off policy requires delays with values bigger than that, adjust the `maxDelay` property accordingly.

IMPORTANT: The first attempt counts against `maxAttempts`, so if you provide a `maxAttempts` value of 4 there'll be the original attempt plus 3 retries.

[[global-timeout]]
== Global timeout
== Global Timeout

You can set the global timeout for the retrying process.
If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.

[source, java]
----
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----

Expand All @@ -81,8 +81,8 @@ public void processMessage(MyPojo message) {
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.fixedBackoff(2_000)
.timeoutAfter(5_000)
.create(template);
}
----
Expand All @@ -100,7 +100,7 @@ You can also set it to traverse the causes to lookup nested exceptions.
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
throw new RuntimeException(new MyRetryException()); // will retry
}
----

Expand Down Expand Up @@ -168,7 +168,7 @@ NOTE: The default behavior is to include all topics.

Unless otherwise specified the framework will auto create the required topics using `NewTopic` beans that are consumed by the `KafkaAdmin` bean.
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
Starting with version 3.0, the default replication factor is `-1`, meaning use the broker default.
Starting with version 3.0, the default replication factor is `-1`, meaning using the broker default.
If your broker version is earlier than 2.4, you will need to set an explicit value.

IMPORTANT: Note that if you're not using Spring Boot you'll have to provide a KafkaAdmin bean in order to use this feature.
Expand All @@ -178,13 +178,13 @@ IMPORTANT: Note that if you're not using Spring Boot you'll have to provide a Ka
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----
[source, java]
Expand All @@ -206,7 +206,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
}
----

NOTE: By default the topics are autocreated with one partition and a replication factor of -1 (meaning use the broker default).
NOTE: By default the topics are autocreated with one partition and a replication factor of -1 (meaning using the broker default).
If your broker version is earlier than 2.4, you will need to set an explicit value.

[[retry-headers]]
Expand Down Expand Up @@ -235,7 +235,7 @@ protected void configureCustomizers(CustomizersConfigurer customizersConfigurer)
}
----

Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) -> { ... })`
Starting with version 2.8.4, if you wish to add custom headers (in addition to the retry information headers added by the factory, you can add a `headersFunction` to the factory - `factory.setHeadersFunction((rec, ex) +++->+++ { +++...+++ })`.

By default, any headers added will be cumulative - Kafka headers can contain multiple values.
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ See xref:retrytopic/retry-config.adoc#retry-topic-global-settings[Configuring Gl
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
.backOff(new FixedBackOff(3000, 5));
.backOff(new FixedBackOff(3_000, 5));
}

----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ For the `@RetryableTopic` annotation you can provide the factory's bean name, an
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
// ... message processing
}
----
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {

return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
Expand Down
Loading