diff --git a/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java b/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java index 3dadfd5eec..7f0ad27070 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.time.Duration; +import org.jspecify.annotations.Nullable; + import org.springframework.kafka.core.KafkaResourceHolder; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ProducerFactoryUtils; @@ -45,8 +47,8 @@ *

* Application code is required to retrieve the transactional Kafka resources via * {@link ProducerFactoryUtils#getTransactionalResourceHolder(ProducerFactory, String, java.time.Duration)}. - * Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto - * detect a thread-bound Producer and automatically participate in it. + * Spring's {@link org.springframework.kafka.core.KafkaTemplate KafkaTemplate} will auto-detect + * a thread-bound Producer and automatically participate in it. * *

* The use of {@link org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -63,6 +65,7 @@ * @param the value type. * * @author Gary Russell + * @author Soby Chacko */ @SuppressWarnings("serial") public class KafkaTransactionManager extends AbstractPlatformTransactionManager @@ -72,7 +75,7 @@ public class KafkaTransactionManager extends AbstractPlatformTransactionMa private final ProducerFactory producerFactory; - private String transactionIdPrefix; + private @Nullable String transactionIdPrefix; private Duration closeTimeout = ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT; @@ -121,7 +124,7 @@ public void setCloseTimeout(Duration closeTimeout) { @SuppressWarnings(UNCHECKED) @Override protected Object doGetTransaction() { - KafkaTransactionObject txObject = new KafkaTransactionObject(); + KafkaTransactionObject txObject = new KafkaTransactionObject<>(); txObject.setResourceHolder((KafkaResourceHolder) TransactionSynchronizationManager .getResource(getProducerFactory())); return txObject; @@ -149,10 +152,10 @@ protected void doBegin(Object transaction, TransactionDefinition definition) { logger.debug("Created Kafka transaction on producer [" + resourceHolder.getProducer() + "]"); } txObject.setResourceHolder(resourceHolder); - txObject.getResourceHolder().setSynchronizedWithTransaction(true); + resourceHolder.setSynchronizedWithTransaction(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { - txObject.getResourceHolder().setTimeoutInSeconds(timeout); + resourceHolder.setTimeoutInSeconds(timeout); } } catch (Exception ex) { @@ -172,9 +175,13 @@ protected Object doSuspend(Object transaction) { } @Override - protected void doResume(Object transaction, Object suspendedResources) { - @SuppressWarnings(UNCHECKED) + @SuppressWarnings(UNCHECKED) + protected void doResume(@Nullable Object transaction, Object suspendedResources) { KafkaResourceHolder producerHolder = (KafkaResourceHolder) suspendedResources; + if (transaction != null) { + KafkaTransactionObject txObject = (KafkaTransactionObject) transaction; + txObject.setResourceHolder(producerHolder); + } TransactionSynchronizationManager.bindResource(getProducerFactory(), producerHolder); } @@ -183,7 +190,9 @@ protected void doCommit(DefaultTransactionStatus status) { @SuppressWarnings(UNCHECKED) KafkaTransactionObject txObject = (KafkaTransactionObject) status.getTransaction(); KafkaResourceHolder resourceHolder = txObject.getResourceHolder(); - resourceHolder.commit(); + if (resourceHolder != null) { + resourceHolder.commit(); + } } @Override @@ -191,14 +200,19 @@ protected void doRollback(DefaultTransactionStatus status) { @SuppressWarnings(UNCHECKED) KafkaTransactionObject txObject = (KafkaTransactionObject) status.getTransaction(); KafkaResourceHolder resourceHolder = txObject.getResourceHolder(); - resourceHolder.rollback(); + if (resourceHolder != null) { + resourceHolder.rollback(); + } } @Override protected void doSetRollbackOnly(DefaultTransactionStatus status) { @SuppressWarnings(UNCHECKED) KafkaTransactionObject txObject = (KafkaTransactionObject) status.getTransaction(); - txObject.getResourceHolder().setRollbackOnly(); + KafkaResourceHolder kafkaResourceHolder = txObject.getResourceHolder(); + if (kafkaResourceHolder != null) { + kafkaResourceHolder.setRollbackOnly(); + } } @Override @@ -206,8 +220,11 @@ protected void doCleanupAfterCompletion(Object transaction) { @SuppressWarnings(UNCHECKED) KafkaTransactionObject txObject = (KafkaTransactionObject) transaction; TransactionSynchronizationManager.unbindResource(getProducerFactory()); - txObject.getResourceHolder().close(); - txObject.getResourceHolder().clear(); + KafkaResourceHolder kafkaResourceHolder = txObject.getResourceHolder(); + if (kafkaResourceHolder != null) { + kafkaResourceHolder.close(); + kafkaResourceHolder.clear(); + } } /** @@ -217,22 +234,22 @@ protected void doCleanupAfterCompletion(Object transaction) { */ private static class KafkaTransactionObject implements SmartTransactionObject { - private KafkaResourceHolder resourceHolder; + private @Nullable KafkaResourceHolder resourceHolder; KafkaTransactionObject() { } - public void setResourceHolder(KafkaResourceHolder resourceHolder) { + public void setResourceHolder(@Nullable KafkaResourceHolder resourceHolder) { this.resourceHolder = resourceHolder; } - public KafkaResourceHolder getResourceHolder() { + public @Nullable KafkaResourceHolder getResourceHolder() { return this.resourceHolder; } @Override public boolean isRollbackOnly() { - return this.resourceHolder.isRollbackOnly(); + return this.resourceHolder != null && this.resourceHolder.isRollbackOnly(); } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/transaction/package-info.java b/spring-kafka/src/main/java/org/springframework/kafka/transaction/package-info.java index 46ac20de06..bce1c64374 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/transaction/package-info.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/transaction/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes related to transactions. */ +@org.jspecify.annotations.NullMarked package org.springframework.kafka.transaction; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java index 641ea558eb..26fabf3c2a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.assertj.core.api.Assertions; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import org.mockito.InOrder; @@ -57,7 +58,6 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.transaction.KafkaTransactionManager; -import org.springframework.lang.Nullable; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionException; import org.springframework.transaction.annotation.EnableTransactionManagement;