Skip to content

Commit c878099

Browse files
authored
GH-3528: Improving Observability in Asynchronous Processing
Fixes: #3528 #3528 - Improve spring-kafka observability for failures in async consumer tasks when listener methods return CompletableFuture<?> or Mono<?> and throw errors during async execution - Refactoring code in KafkaMessageListenerContainer and MessagingMessageListenerAdapter around observability - Adding tests to verify - Add @nullable annotations to relevant methods for better null safety * Addressing PR review
1 parent 3d507c7 commit c878099

File tree

4 files changed

+176
-59
lines changed

4 files changed

+176
-59
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

+46-33
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
107107
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
108108
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
109+
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
109110
import org.springframework.kafka.support.Acknowledgment;
110111
import org.springframework.kafka.support.KafkaHeaders;
111112
import org.springframework.kafka.support.KafkaUtils;
@@ -948,6 +949,10 @@ else if (listener instanceof MessageListener) {
948949
this.lastAlertPartition = new HashMap<>();
949950
this.wasIdlePartition = new HashMap<>();
950951
this.kafkaAdmin = obtainAdmin();
952+
953+
if (this.listener instanceof RecordMessagingMessageListenerAdapter<?, ?> rmmla) {
954+
rmmla.setObservationRegistry(observationRegistry);
955+
}
951956
}
952957

953958
private AckMode determineAckMode() {
@@ -2693,6 +2698,7 @@ private void pauseForNackSleep() {
26932698
* @throws Error an error.
26942699
*/
26952700
@Nullable
2701+
@SuppressWarnings("try")
26962702
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cRecord, // NOSONAR
26972703
Iterator<ConsumerRecord<K, V>> iterator) {
26982704

@@ -2703,42 +2709,49 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
27032709
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId,
27042710
this::clusterId),
27052711
this.observationRegistry);
2706-
return observation.observe(() -> {
2712+
2713+
observation.start();
2714+
try (Observation.Scope ignored = observation.openScope()) {
2715+
invokeOnMessage(cRecord);
2716+
successTimer(sample, cRecord);
2717+
recordInterceptAfter(cRecord, null);
2718+
}
2719+
catch (RuntimeException e) {
2720+
failureTimer(sample, cRecord);
2721+
recordInterceptAfter(cRecord, e);
2722+
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
2723+
observation.error(e);
2724+
}
2725+
if (this.commonErrorHandler == null) {
2726+
throw e;
2727+
}
27072728
try {
2708-
invokeOnMessage(cRecord);
2709-
successTimer(sample, cRecord);
2710-
recordInterceptAfter(cRecord, null);
2729+
invokeErrorHandler(cRecord, iterator, e);
2730+
commitOffsetsIfNeededAfterHandlingError(cRecord);
27112731
}
2712-
catch (RuntimeException e) {
2713-
failureTimer(sample, cRecord);
2714-
recordInterceptAfter(cRecord, e);
2715-
if (this.commonErrorHandler == null) {
2716-
throw e;
2717-
}
2718-
observation.error(e);
2719-
try {
2720-
invokeErrorHandler(cRecord, iterator, e);
2721-
commitOffsetsIfNeededAfterHandlingError(cRecord);
2722-
}
2723-
catch (RecordInRetryException rire) {
2724-
this.logger.info("Record in retry and not yet recovered");
2725-
return rire;
2726-
}
2727-
catch (KafkaException ke) {
2728-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2729-
return ke;
2730-
}
2731-
catch (RuntimeException ee) {
2732-
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2733-
return ee;
2734-
}
2735-
catch (Error er) { // NOSONAR
2736-
this.logger.error(er, "Error handler threw an error");
2737-
throw er;
2738-
}
2732+
catch (RecordInRetryException rire) {
2733+
this.logger.info("Record in retry and not yet recovered");
2734+
return rire;
27392735
}
2740-
return null;
2741-
});
2736+
catch (KafkaException ke) {
2737+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2738+
return ke;
2739+
}
2740+
catch (RuntimeException ee) {
2741+
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
2742+
return ee;
2743+
}
2744+
catch (Error er) { // NOSONAR
2745+
this.logger.error(er, "Error handler threw an error");
2746+
throw er;
2747+
}
2748+
}
2749+
finally {
2750+
if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
2751+
observation.stop();
2752+
}
2753+
}
2754+
return null;
27422755
}
27432756

27442757
private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public boolean isAsyncReplies() {
6565
return this.asyncReplies;
6666
}
6767

68+
@Nullable
6869
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
6970
if (this.invokerHandlerMethod != null) {
7071
return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

+62-21
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Objects;
3030
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionException;
3132
import java.util.stream.Collectors;
3233

3334
import org.apache.commons.logging.LogFactory;
@@ -73,6 +74,8 @@
7374
import org.springframework.util.ObjectUtils;
7475
import org.springframework.util.StringUtils;
7576

77+
import io.micrometer.observation.Observation;
78+
import io.micrometer.observation.ObservationRegistry;
7679
import reactor.core.publisher.Mono;
7780

7881
/**
@@ -153,6 +156,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
153156

154157
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
155158

159+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
160+
156161
/**
157162
* Create an instance with the provided bean and method.
158163
* @param bean the bean.
@@ -247,6 +252,15 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
247252
this.handlerMethod = handlerMethod;
248253
}
249254

255+
/**
256+
* Set the {@link ObservationRegistry} to handle observability.
257+
* @param observationRegistry {@link ObservationRegistry} instance.
258+
* @since 3.3.0
259+
*/
260+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
261+
this.observationRegistry = observationRegistry;
262+
}
263+
250264
public boolean isAsyncReplies() {
251265
return this.handlerMethod != null && this.handlerMethod.isAsyncReplies();
252266
}
@@ -382,15 +396,34 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
382396
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
383397
final Message<?> message) {
384398

399+
Throwable listenerError = null;
400+
Object result = null;
401+
Observation currentObservation = getCurrentObservation();
385402
try {
386-
Object result = invokeHandler(records, acknowledgment, message, consumer);
403+
result = invokeHandler(records, acknowledgment, message, consumer);
387404
if (result != null) {
388405
handleResult(result, records, acknowledgment, consumer, message);
389406
}
390407
}
391-
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
408+
catch (ListenerExecutionFailedException e) {
409+
listenerError = e;
410+
currentObservation.error(e);
392411
handleException(records, acknowledgment, consumer, message, e);
393412
}
413+
catch (Error e) {
414+
listenerError = e;
415+
currentObservation.error(e);
416+
}
417+
finally {
418+
if (listenerError != null || result == null) {
419+
currentObservation.stop();
420+
}
421+
}
422+
}
423+
424+
private Observation getCurrentObservation() {
425+
Observation currentObservation = this.observationRegistry.getCurrentObservation();
426+
return currentObservation == null ? Observation.NOOP : currentObservation;
394427
}
395428

396429
/**
@@ -402,6 +435,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
402435
* @param consumer the consumer.
403436
* @return the result of invocation.
404437
*/
438+
@Nullable
405439
protected final Object invokeHandler(Object data, @Nullable Acknowledgment acknowledgment, Message<?> message,
406440
Consumer<?, ?> consumer) {
407441

@@ -460,7 +494,7 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me
460494
*/
461495
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment,
462496
Consumer<?, ?> consumer, @Nullable Message<?> source) {
463-
497+
final Observation observation = getCurrentObservation();
464498
this.logger.debug(() -> "Listener method returned result [" + resultArg
465499
+ "] - generating response message for it");
466500
String replyTopic = evaluateReplyTopic(request, source, resultArg);
@@ -474,35 +508,42 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
474508
invocationResult.messageReturnType() :
475509
this.messageReturnType;
476510

477-
if (result instanceof CompletableFuture<?> completable) {
511+
CompletableFuture<?> completableFutureResult;
512+
513+
if (monoPresent && result instanceof Mono<?> mono) {
514+
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
515+
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
516+
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
517+
}
518+
completableFutureResult = mono.toFuture();
519+
}
520+
else if (!(result instanceof CompletableFuture<?>)) {
521+
completableFutureResult = CompletableFuture.completedFuture(result);
522+
}
523+
else {
524+
completableFutureResult = (CompletableFuture<?>) result;
478525
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
479526
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
480527
+ "otherwise the container will ack the message immediately");
481528
}
482-
completable.whenComplete((r, t) -> {
529+
}
530+
531+
completableFutureResult.whenComplete((r, t) -> {
532+
try {
483533
if (t == null) {
484534
asyncSuccess(r, replyTopic, source, messageReturnType);
485535
acknowledge(acknowledgment);
486536
}
487537
else {
488-
asyncFailure(request, acknowledgment, consumer, t, source);
538+
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
539+
observation.error(cause);
540+
asyncFailure(request, acknowledgment, consumer, cause, source);
489541
}
490-
});
491-
}
492-
else if (monoPresent && result instanceof Mono<?> mono) {
493-
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
494-
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
495-
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
496542
}
497-
mono.subscribe(
498-
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
499-
t -> asyncFailure(request, acknowledgment, consumer, t, source),
500-
() -> acknowledge(acknowledgment)
501-
);
502-
}
503-
else {
504-
sendResponse(result, replyTopic, source, messageReturnType);
505-
}
543+
finally {
544+
observation.stop();
545+
}
546+
});
506547
}
507548

508549
@Nullable

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

+67-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Deque;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.concurrent.CompletableFuture;
2829
import java.util.concurrent.CountDownLatch;
2930
import java.util.concurrent.ExecutionException;
3031
import java.util.concurrent.TimeUnit;
@@ -87,6 +88,7 @@
8788
import io.micrometer.tracing.propagation.Propagator;
8889
import io.micrometer.tracing.test.simple.SimpleSpan;
8990
import io.micrometer.tracing.test.simple.SimpleTracer;
91+
import reactor.core.publisher.Mono;
9092

9193
/**
9294
* @author Gary Russell
@@ -112,7 +114,11 @@ public class ObservationTests {
112114

113115
public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";
114116

115-
public final static String OBSERVATION_ERROR = "observation.error";
117+
public final static String OBSERVATION_ERROR = "observation.error.sync";
118+
119+
public final static String OBSERVATION_ERROR_COMPLETABLE_FUTURE = "observation.error.completableFuture";
120+
121+
public final static String OBSERVATION_ERROR_MONO = "observation.error.mono";
116122

117123
@Test
118124
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
@@ -387,6 +393,42 @@ void observationErrorException(@Autowired ExceptionListener listener, @Autowired
387393
.hasMessage("obs5 error");
388394
}
389395

396+
@Test
397+
void observationErrorExceptionWhenCompletableFutureReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
398+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
399+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
400+
throws ExecutionException, InterruptedException, TimeoutException {
401+
402+
errorTemplate.send(OBSERVATION_ERROR_COMPLETABLE_FUTURE, "testError").get(10, TimeUnit.SECONDS);
403+
Deque<SimpleSpan> spans = tracer.getSpans();
404+
await().untilAsserted(() -> assertThat(spans).hasSize(2));
405+
SimpleSpan span = spans.poll();
406+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
407+
span = spans.poll();
408+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs6-0");
409+
assertThat(span.getError())
410+
.isInstanceOf(Error.class)
411+
.hasMessage("Should report metric.");
412+
}
413+
414+
@Test
415+
void observationErrorExceptionWhenMonoReturned(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
416+
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
417+
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
418+
throws ExecutionException, InterruptedException, TimeoutException {
419+
420+
errorTemplate.send(OBSERVATION_ERROR_MONO, "testError").get(10, TimeUnit.SECONDS);
421+
Deque<SimpleSpan> spans = tracer.getSpans();
422+
await().untilAsserted(() -> assertThat(spans).hasSize(2));
423+
SimpleSpan span = spans.poll();
424+
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
425+
span = spans.poll();
426+
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs7-0");
427+
assertThat(span.getError())
428+
.isInstanceOf(Error.class)
429+
.hasMessage("Should report metric.");
430+
}
431+
390432
@Test
391433
void kafkaAdminNotRecreatedIfBootstrapServersSameInProducerAndAdminConfig(
392434
@Autowired @Qualifier("reuseAdminBeanKafkaTemplate") KafkaTemplate<Integer, String> template,
@@ -590,14 +632,34 @@ public static class ExceptionListener {
590632

591633
@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
592634
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
593-
this.latch4.countDown();
594-
throw new IllegalStateException("obs4 run time exception");
635+
try {
636+
throw new IllegalStateException("obs4 run time exception");
637+
}
638+
finally {
639+
this.latch4.countDown();
640+
}
595641
}
596642

597643
@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
598644
void listenError(ConsumerRecord<Integer, String> in) {
599-
this.latch5.countDown();
600-
throw new Error("obs5 error");
645+
try {
646+
throw new Error("obs5 error");
647+
}
648+
finally {
649+
this.latch5.countDown();
650+
}
651+
}
652+
653+
@KafkaListener(id = "obs6", topics = OBSERVATION_ERROR_COMPLETABLE_FUTURE)
654+
CompletableFuture<Void> receive(ConsumerRecord<Object, Object> record) {
655+
return CompletableFuture.supplyAsync(() -> {
656+
throw new Error("Should report metric.");
657+
});
658+
}
659+
660+
@KafkaListener(id = "obs7", topics = OBSERVATION_ERROR_MONO)
661+
Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
662+
return Mono.error(new Error("Should report metric."));
601663
}
602664

603665
}

0 commit comments

Comments
 (0)