Description
Affects Version(s):
2.9.0 (an earlier - starting 2.5.17, 2.6.12, 2.7.9 and 2.8.0)
Problem description
We are using transaction synchronization mechanism for some business logic in afterCommit method (hibernate transactions).
After upgrading to kafka 3.2.0 + spring-kafka 2.9.0 we started to receive errors:
java.lang.IllegalStateException: TransactionalId tx-tenants-RgIAuTvu9J-1: Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089)
at org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1082)
at org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:368)
at org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:628)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.beginTransaction(DefaultKafkaProducerFactory.java:1126)
at org.springframework.kafka.core.ProducerFactoryUtils.getTransactionalResourceHolder(ProducerFactoryUtils.java:99)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:751)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:645)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:430)
The problem is that when our "afterCommit" method throws exception - "Kafka synchronized" producer initiated transaction is never commited ( ProducerFactoryUtils.processResourceAfterCommit is never invoked). ProducerFactoryUtils.releaseResource method is then called in the end of the main transaction and this results in returning invalid KafkaProducer (transactionState = IN_TRANSACTION) to the DefaultKafkaProducerFactory cache.
Subsequent calls end with above error because KafkaProducer returned from cache has invalid transaction state ((transactionState = IN_TRANSACTION)
Sample method:
@Transactional
public void sampleTransactionalMethod() {
...
kafkaTemplate.send(..)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
if(true){
throw new RuntimeException("Any exception");
}
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
});
}
The problem was probably introduced in #2005 - in ProducerFactoryUtils commit on resourceHolder was moved from afterCompletion method to processResourceAfterCommit method.