diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java index 7d700a4390..d1169d6ae3 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 the original author or authors. + * Copyright 2023-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 9566c3f1a2..71aea844b3 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.UUID; @@ -54,6 +55,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.jspecify.annotations.Nullable; import org.springframework.core.log.LogAccessor; import org.springframework.util.Assert; @@ -93,7 +95,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { private static final boolean IS_KAFKA_39_OR_LATER = ClassUtils.isPresent( "org.apache.kafka.server.config.AbstractKafkaConfig", EmbeddedKafkaKraftBroker.class.getClassLoader()); - private static final Method SET_CONFIG_METHOD; + private static final @Nullable Method SET_CONFIG_METHOD; static { if (IS_KAFKA_39_OR_LATER) { @@ -117,7 +119,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { private final AtomicBoolean initialized = new AtomicBoolean(); - private KafkaClusterTestKit cluster; + private @Nullable KafkaClusterTestKit cluster; private int[] kafkaPorts; @@ -131,7 +133,7 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker { * @param partitions partitions per topic. * @param topics the topics to create. */ - public EmbeddedKafkaKraftBroker(int count, int partitions, String... topics) { + public EmbeddedKafkaKraftBroker(int count, int partitions, String @Nullable ... topics) { this.count = count; this.kafkaPorts = new int[this.count]; // random ports by default. if (topics != null) { @@ -261,7 +263,9 @@ private void start() { private static void setConfigProperty(KafkaClusterTestKit.Builder clusterBuilder, String key, Object value) { if (IS_KAFKA_39_OR_LATER) { // For Kafka 3.9.0+: use reflection - ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value); + if (SET_CONFIG_METHOD != null) { + ReflectionUtils.invokeMethod(SET_CONFIG_METHOD, clusterBuilder, key, value); + } } else { // For Kafka 3.8.0: direct call @@ -484,10 +488,12 @@ public int getPartitionsPerTopic() { @Override public String getBrokersAsString() { - return (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + Assert.notNull(this.cluster, "cluster cannot be null"); + String brokersString = (String) this.cluster.clientProperties().get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + return Objects.requireNonNull(brokersString); } - public KafkaClusterTestKit getCluster() { + public @Nullable KafkaClusterTestKit getCluster() { return this.cluster; } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index a12eac978b..be497d023f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2024 the original author or authors. + * Copyright 2018-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -66,6 +66,7 @@ import org.apache.zookeeper.client.ZKClientConfig; import org.apache.zookeeper.server.NIOServerCnxnFactory; import org.apache.zookeeper.server.ZooKeeperServer; +import org.jspecify.annotations.Nullable; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.test.core.BrokerAddress; @@ -117,9 +118,9 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker { private final AtomicBoolean initialized = new AtomicBoolean(); - private EmbeddedZookeeper zookeeper; + private @Nullable EmbeddedZookeeper zookeeper; - private String zkConnect; + private @Nullable String zkConnect; private int zkPort; @@ -133,7 +134,7 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker { private String brokerListProperty = "spring.kafka.bootstrap-servers"; - private volatile ZooKeeperClient zooKeeperClient; + private volatile @Nullable ZooKeeperClient zooKeeperClient; public EmbeddedKafkaZKBroker(int count) { this(count, false); @@ -145,7 +146,7 @@ public EmbeddedKafkaZKBroker(int count) { * @param controlledShutdown passed into TestUtils.createBrokerConfig. * @param topics the topics to create (2 partitions per). */ - public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... topics) { + public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String @Nullable ... topics) { this(count, controlledShutdown, 2, topics); } @@ -156,7 +157,7 @@ public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, String... to * @param partitions partitions per topic. * @param topics the topics to create. */ - public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String... topics) { + public EmbeddedKafkaZKBroker(int count, boolean controlledShutdown, int partitions, String @Nullable ... topics) { this.count = count; this.kafkaPorts = new int[this.count]; // random ports by default. this.controlledShutdown = controlledShutdown; @@ -557,8 +558,10 @@ public void destroy() { } } try { - this.zookeeper.shutdown(); - this.zkConnect = null; + if (this.zookeeper != null) { + this.zookeeper.shutdown(); + this.zkConnect = null; + } } catch (Exception e) { // do nothing @@ -582,7 +585,7 @@ public KafkaServer getKafkaServer(int id) { return this.kafkaServers.get(id); } - public EmbeddedZookeeper getZookeeper() { + public @Nullable EmbeddedZookeeper getZookeeper() { return this.zookeeper; } @@ -599,7 +602,7 @@ public synchronized ZooKeeperClient getZooKeeperClient() { return this.zooKeeperClient; } - public String getZookeeperConnectionString() { + public @Nullable String getZookeeperConnectionString() { return this.zkConnect; } diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/assertj/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/assertj/package-info.java new file mode 100644 index 0000000000..e9d1eaf4e6 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/assertj/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides a class for assertj conditions. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.assertj; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java index a2ac916502..9830a3fbb3 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/condition/package-info.java @@ -1,4 +1,5 @@ /** * Provides classes for JUnit5 conditions. */ +@org.jspecify.annotations.NullMarked package org.springframework.kafka.test.condition; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java index 26b7521ca1..718d0581e6 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2024 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,17 +16,13 @@ package org.springframework.kafka.test.context; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; -import org.springframework.beans.factory.support.BeanDefinitionRegistry; -import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry; -import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.support.GenericApplicationContext; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.EmbeddedKafkaBrokerFactory; import org.springframework.test.context.ContextCustomizer; import org.springframework.test.context.MergedContextConfiguration; -import org.springframework.util.Assert; /** * The {@link ContextCustomizer} implementation for the {@link EmbeddedKafkaBroker} bean registration. @@ -51,16 +47,15 @@ class EmbeddedKafkaContextCustomizer implements ContextCustomizer { @Override public void customizeContext(ConfigurableApplicationContext context, MergedContextConfiguration mergedConfig) { - ConfigurableListableBeanFactory beanFactory = context.getBeanFactory(); - Assert.isInstanceOf(DefaultSingletonBeanRegistry.class, beanFactory); - ConfigurableEnvironment environment = context.getEnvironment(); EmbeddedKafkaBroker embeddedKafkaBroker = EmbeddedKafkaBrokerFactory.create(this.embeddedKafka, environment::resolvePlaceholders); - ((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME, - new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker)); + GenericApplicationContext genericApplicationContext = (GenericApplicationContext) context; + + genericApplicationContext.registerBean(EmbeddedKafkaBroker.BEAN_NAME, + EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker); } @Override diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerFactory.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerFactory.java index a2e4aa131c..fac7c171f6 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerFactory.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/EmbeddedKafkaContextCustomizerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2023 the original author or authors. + * Copyright 2017-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,8 @@ import java.util.List; +import org.jspecify.annotations.Nullable; + import org.springframework.test.context.ContextConfigurationAttributes; import org.springframework.test.context.ContextCustomizer; import org.springframework.test.context.ContextCustomizerFactory; @@ -35,6 +37,7 @@ class EmbeddedKafkaContextCustomizerFactory implements ContextCustomizerFactory { @Override + @Nullable public ContextCustomizer createContextCustomizer(Class testClass, List configAttributes) { EmbeddedKafka embeddedKafka = diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/package-info.java new file mode 100644 index 0000000000..e5e05f282b --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/context/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides classes for EmbeddedKafka context customization. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.context; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/core/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/core/package-info.java new file mode 100644 index 0000000000..f9448348e4 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/core/package-info.java @@ -0,0 +1,5 @@ +/** + * core package for spring-kafka-test module. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.core; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/package-info.java new file mode 100644 index 0000000000..73e0a1c0b4 --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/hamcrest/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides hamcrest matchers. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.hamcrest; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java index 0e9bf06385..415e1e553a 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/GlobalEmbeddedKafkaTestExecutionListener.java @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 the original author or authors. + * Copyright 2022-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -90,8 +90,10 @@ public class GlobalEmbeddedKafkaTestExecutionListener implements TestExecutionLi public static final String BROKER_PROPERTIES_LOCATION_PROPERTY_NAME = "spring.kafka.embedded.broker.properties.location"; + @SuppressWarnings("NullAway.Init") private EmbeddedKafkaBroker embeddedKafkaBroker; + @SuppressWarnings("NullAway.Init") private Log logger; @Override diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/package-info.java new file mode 100644 index 0000000000..8efe8a11dd --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/junit/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides JUnit specific extensions in spring-kafka-test. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.junit; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/package-info.java new file mode 100644 index 0000000000..179ec52bce --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides top-level API for EmbeddedKafka. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/package-info.java new file mode 100644 index 0000000000..44664bc27b --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/rule/package-info.java @@ -0,0 +1,5 @@ +/** + * Provides JUnit rules. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.rule; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java index 7cd269e0c3..36b57a7e56 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/ContainerTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; /** @@ -52,6 +53,7 @@ public static void waitForAssignment(Object container, int partitions) { // NOSO return; } List containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class); + Assert.notNull(containers, "Containers must not be null"); int n = 0; int count = 0; Method getAssignedPartitions = null; diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java index c4902dce2c..76b6cdbe7f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2024 the original author or authors. + * Copyright 2016-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,11 +45,11 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; +import org.jspecify.annotations.Nullable; import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -66,6 +66,7 @@ public final class KafkaTestUtils { private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaTestUtils.class)); // NOSONAR + @SuppressWarnings("NullAway.Init") private static Properties defaults; private KafkaTestUtils() { @@ -261,7 +262,7 @@ public static ConsumerRecord getSingleRecord(Consumer consume * @throws Exception if an exception occurs. * @since 2.3 */ - public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) + public static @Nullable OffsetAndMetadata getCurrentOffset(String brokerAddresses, String group, String topic, int partition) throws Exception { // NOSONAR try (AdminClient client = AdminClient @@ -281,7 +282,7 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String * @throws Exception if an exception occurs. * @since 3.0 */ - public static OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) + public static @Nullable OffsetAndMetadata getCurrentOffset(AdminClient adminClient, String group, String topic, int partition) throws Exception { // NOSONAR return adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get() // NOSONAR false positive @@ -395,7 +396,7 @@ public static ConsumerRecords getRecords(Consumer consumer, D * @param propertyPath The path. * @return The field. */ - public static Object getPropertyValue(Object root, String propertyPath) { + public static @Nullable Object getPropertyValue(Object root, String propertyPath) { Object value = null; DirectFieldAccessor accessor = new DirectFieldAccessor(root); String[] tokens = propertyPath.split("\\."); @@ -424,7 +425,7 @@ else if (i == tokens.length - 1) { * @see #getPropertyValue(Object, String) */ @SuppressWarnings("unchecked") - public static T getPropertyValue(Object root, String propertyPath, Class type) { + public static @Nullable T getPropertyValue(Object root, String propertyPath, Class type) { Object value = getPropertyValue(root, propertyPath); if (value != null) { Assert.isAssignable(type, value.getClass()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/package-info.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/package-info.java new file mode 100644 index 0000000000..648c47f4dd --- /dev/null +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/package-info.java @@ -0,0 +1,5 @@ +/** + * Utils package. + */ +@org.jspecify.annotations.NullMarked +package org.springframework.kafka.test.utils; diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 59b4ecaf28..6ef53f9461 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -185,7 +185,9 @@ - + + +