diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index e90e16a4961b..9f16eb241734 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -170,7 +170,7 @@ public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException @Bean @ConditionalOnMissingBean KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) { - Map properties = this.properties.buildAdminProperties(null); + Map properties = this.properties.buildAdminProperties(); applyKafkaConnectionDetailsForAdmin(properties, connectionDetails); KafkaAdmin kafkaAdmin = new KafkaAdmin(properties); KafkaProperties.Admin admin = this.properties.getAdmin(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 95113f0dbf95..08b5dd8027e0 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -38,7 +38,6 @@ import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.convert.DurationUnit; -import org.springframework.boot.ssl.SslBundles; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -162,7 +161,7 @@ public Retry getRetry() { return this.retry; } - private Map buildCommonProperties(SslBundles sslBundles) { + private Map buildCommonProperties() { Map properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); @@ -170,7 +169,7 @@ private Map buildCommonProperties(SslBundles sslBundles) { if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } - properties.putAll(this.ssl.buildProperties(sslBundles)); + properties.putAll(this.ssl.buildProperties()); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); @@ -187,21 +186,8 @@ private Map buildCommonProperties(SslBundles sslBundles) { * instance */ public Map buildConsumerProperties() { - return buildConsumerProperties(null); - } - - /** - * Create an initial map of consumer properties from the state of this instance. - *

- * This allows you to add additional properties, if necessary, and override the - * default {@code kafkaConsumerFactory} bean. - * @param sslBundles bundles providing SSL trust material - * @return the consumer properties initialized with the customizations defined on this - * instance - */ - public Map buildConsumerProperties(SslBundles sslBundles) { - Map properties = buildCommonProperties(sslBundles); - properties.putAll(this.consumer.buildProperties(sslBundles)); + Map properties = buildCommonProperties(); + properties.putAll(this.consumer.buildProperties()); return properties; } @@ -214,21 +200,8 @@ public Map buildConsumerProperties(SslBundles sslBundles) { * instance */ public Map buildProducerProperties() { - return buildProducerProperties(null); - } - - /** - * Create an initial map of producer properties from the state of this instance. - *

- * This allows you to add additional properties, if necessary, and override the - * default {@code kafkaProducerFactory} bean. - * @param sslBundles bundles providing SSL trust material - * @return the producer properties initialized with the customizations defined on this - * instance - */ - public Map buildProducerProperties(SslBundles sslBundles) { - Map properties = buildCommonProperties(sslBundles); - properties.putAll(this.producer.buildProperties(sslBundles)); + Map properties = buildCommonProperties(); + properties.putAll(this.producer.buildProperties()); return properties; } @@ -237,13 +210,12 @@ public Map buildProducerProperties(SslBundles sslBundles) { *

* This allows you to add additional properties, if necessary, and override the * default {@code kafkaAdmin} bean. - * @param sslBundles bundles providing SSL trust material * @return the admin properties initialized with the customizations defined on this * instance */ - public Map buildAdminProperties(SslBundles sslBundles) { - Map properties = buildCommonProperties(sslBundles); - properties.putAll(this.admin.buildProperties(sslBundles)); + public Map buildAdminProperties() { + Map properties = buildCommonProperties(); + properties.putAll(this.admin.buildProperties()); return properties; } @@ -251,13 +223,12 @@ public Map buildAdminProperties(SslBundles sslBundles) { * Create an initial map of streams properties from the state of this instance. *

* This allows you to add additional properties, if necessary. - * @param sslBundles bundles providing SSL trust material * @return the streams properties initialized with the customizations defined on this * instance */ - public Map buildStreamsProperties(SslBundles sslBundles) { - Map properties = buildCommonProperties(sslBundles); - properties.putAll(this.streams.buildProperties(sslBundles)); + public Map buildStreamsProperties() { + Map properties = buildCommonProperties(); + properties.putAll(this.streams.buildProperties()); return properties; } @@ -473,7 +444,7 @@ public Map getProperties() { return this.properties; } - public Map buildProperties(SslBundles sslBundles) { + public Map buildProperties() { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getAutoCommitInterval) @@ -501,7 +472,7 @@ public Map buildProperties(SslBundles sslBundles) { map.from(this::getMaxPollInterval) .asInt(Duration::toMillis) .to(properties.in(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)); - return properties.with(this.ssl, this.security, this.properties, sslBundles); + return properties.with(this.ssl, this.security, this.properties); } } @@ -663,7 +634,7 @@ public Map getProperties() { return this.properties; } - public Map buildProperties(SslBundles sslBundles) { + public Map buildProperties() { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG)); @@ -677,7 +648,7 @@ public Map buildProperties(SslBundles sslBundles) { map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - return properties.with(this.ssl, this.security, this.properties, sslBundles); + return properties.with(this.ssl, this.security, this.properties); } } @@ -784,11 +755,11 @@ public Map getProperties() { return this.properties; } - public Map buildProperties(SslBundles sslBundles) { + public Map buildProperties() { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); - return properties.with(this.ssl, this.security, this.properties, sslBundles); + return properties.with(this.ssl, this.security, this.properties); } } @@ -918,7 +889,7 @@ public Map getProperties() { return this.properties; } - public Map buildProperties(SslBundles sslBundles) { + public Map buildProperties() { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getApplicationId).to(properties.in("application.id")); @@ -929,7 +900,7 @@ public Map buildProperties(SslBundles sslBundles) { map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG)); map.from(this::getReplicationFactor).to(properties.in("replication.factor")); map.from(this::getStateDir).to(properties.in("state.dir")); - return properties.with(this.ssl, this.security, this.properties, sslBundles); + return properties.with(this.ssl, this.security, this.properties); } } @@ -1423,12 +1394,7 @@ public void setProtocol(String protocol) { this.protocol = protocol; } - @Deprecated(since = "3.2.0", forRemoval = true) public Map buildProperties() { - return buildProperties(null); - } - - public Map buildProperties(SslBundles sslBundles) { validate(); String bundleName = getBundle(); Properties properties = new Properties(); @@ -1794,8 +1760,8 @@ java.util.function.Consumer in(String key) { return (value) -> put(key, value); } - Properties with(Ssl ssl, Security security, Map properties, SslBundles sslBundles) { - putAll(ssl.buildProperties(sslBundles)); + Properties with(Ssl ssl, Security security, Map properties) { + putAll(ssl.buildProperties()); putAll(security.buildProperties()); putAll(properties); return this; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index a3078a0dad36..fc2957277325 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -62,7 +62,7 @@ class KafkaStreamsAnnotationDrivenConfiguration { @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment, KafkaConnectionDetails connectionDetails) { - Map properties = this.properties.buildStreamsProperties(null); + Map properties = this.properties.buildStreamsProperties(); applyKafkaConnectionDetailsForStreams(properties, connectionDetails); if (this.properties.getStreams().getApplicationId() == null) { String applicationName = environment.getProperty("spring.application.name"); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index a536e0786e74..ece4b6c83305 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -27,8 +27,6 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; -import org.springframework.boot.ssl.DefaultSslBundleRegistry; -import org.springframework.boot.ssl.SslBundle; import org.springframework.core.io.ClassPathResource; import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.KafkaAdmin; @@ -36,7 +34,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.mockito.Mockito.mock; /** * Tests for {@link KafkaProperties}. @@ -47,8 +44,6 @@ */ class KafkaPropertiesTests { - private final SslBundle sslBundle = mock(SslBundle.class); - @Test void isolationLevelEnumConsistentWithKafkaVersion() { org.apache.kafka.common.IsolationLevel[] original = org.apache.kafka.common.IsolationLevel.values(); @@ -101,15 +96,6 @@ void sslPemConfigurationWithEmptyBundle() { "-----BEGINchain"); } - @Test - void sslBundleConfiguration() { - KafkaProperties properties = new KafkaProperties(); - properties.getSsl().setBundle("myBundle"); - Map consumerProperties = properties - .buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)); - assertThat(consumerProperties).doesNotContainKey(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG); - } - @Test void sslPropertiesWhenKeyStoreLocationAndKeySetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); @@ -133,8 +119,8 @@ void sslPropertiesWhenKeyStoreLocationAndBundleSetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); properties.getSsl().setBundle("myBundle"); properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc")); - assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( - () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) + .isThrownBy(properties::buildConsumerProperties); } @Test @@ -142,8 +128,8 @@ void sslPropertiesWhenKeyStoreKeyAndBundleSetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); properties.getSsl().setBundle("myBundle"); properties.getSsl().setKeyStoreKey("-----BEGIN"); - assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( - () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) + .isThrownBy(properties::buildConsumerProperties); } @Test @@ -151,8 +137,8 @@ void sslPropertiesWhenTrustStoreLocationAndBundleSetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); properties.getSsl().setBundle("myBundle"); properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc")); - assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( - () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) + .isThrownBy(properties::buildConsumerProperties); } @Test @@ -160,8 +146,8 @@ void sslPropertiesWhenTrustStoreCertificatesAndBundleSetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); properties.getSsl().setBundle("myBundle"); properties.getSsl().setTrustStoreCertificates("-----BEGIN"); - assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( - () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) + .isThrownBy(properties::buildConsumerProperties); } @Test