Skip to content

Commit a73973f

Browse files
committed
Upgrade to Kafka 3.4.0
Closes gh-34284
1 parent c0b5bfe commit a73973f

File tree

3 files changed

+35
-3
lines changed

3 files changed

+35
-3
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.kafka.common.serialization.StringSerializer;
3535

3636
import org.springframework.boot.context.properties.ConfigurationProperties;
37+
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
3738
import org.springframework.boot.context.properties.PropertyMapper;
3839
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
3940
import org.springframework.boot.convert.DurationUnit;
@@ -769,6 +770,11 @@ public static class Streams {
769770
*/
770771
private DataSize cacheMaxSizeBuffering;
771772

773+
/**
774+
* Maximum size of the in-memory state store cache across all threads.
775+
*/
776+
private DataSize stateStoreCacheMaxSize;
777+
772778
/**
773779
* ID to pass to the server when making requests. Used for server-side logging.
774780
*/
@@ -826,14 +832,25 @@ public void setBootstrapServers(List<String> bootstrapServers) {
826832
this.bootstrapServers = bootstrapServers;
827833
}
828834

835+
@DeprecatedConfigurationProperty(replacement = "spring.kafka.streams.state-store-cache-max-size")
836+
@Deprecated(since = "3.1.0", forRemoval = true)
829837
public DataSize getCacheMaxSizeBuffering() {
830838
return this.cacheMaxSizeBuffering;
831839
}
832840

841+
@Deprecated(since = "3.1.0", forRemoval = true)
833842
public void setCacheMaxSizeBuffering(DataSize cacheMaxSizeBuffering) {
834843
this.cacheMaxSizeBuffering = cacheMaxSizeBuffering;
835844
}
836845

846+
public DataSize getStateStoreCacheMaxSize() {
847+
return this.stateStoreCacheMaxSize;
848+
}
849+
850+
public void setStateStoreCacheMaxSize(DataSize stateStoreCacheMaxSize) {
851+
this.stateStoreCacheMaxSize = stateStoreCacheMaxSize;
852+
}
853+
837854
public String getClientId() {
838855
return this.clientId;
839856
}
@@ -869,6 +886,8 @@ public Map<String, Object> buildProperties() {
869886
map.from(this::getBootstrapServers).to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
870887
map.from(this::getCacheMaxSizeBuffering).asInt(DataSize::toBytes)
871888
.to(properties.in("cache.max.bytes.buffering"));
889+
map.from(this::getStateStoreCacheMaxSize).asInt(DataSize::toBytes)
890+
.to(properties.in("statestore.cache.max.bytes"));
872891
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
873892
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
874893
map.from(this::getStateDir).to(properties.in("state.dir"));

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ void streamsProperties() {
261261
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).withPropertyValues(
262262
"spring.kafka.client-id=cid", "spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
263263
"spring.application.name=appName", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
264-
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cache-max-size-buffering=1KB",
264+
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.state-store-cache-max-size=1KB",
265265
"spring.kafka.streams.client-id=override", "spring.kafka.streams.properties.fiz.buz=fix.fox",
266266
"spring.kafka.streams.replication-factor=2", "spring.kafka.streams.state-dir=/tmp/state",
267267
"spring.kafka.streams.security.protocol=SSL", "spring.kafka.streams.ssl.key-password=p7",
@@ -276,7 +276,7 @@ void streamsProperties() {
276276
.asProperties();
277277
assertThat((List<String>) configs.get(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG))
278278
.containsExactly("localhost:9092", "localhost:9093");
279-
assertThat(configs).containsEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024);
279+
assertThat(configs).containsEntry(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1024);
280280
assertThat(configs).containsEntry(StreamsConfig.CLIENT_ID_CONFIG, "override");
281281
assertThat(configs).containsEntry(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
282282
assertThat(configs).containsEntry(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
@@ -299,6 +299,19 @@ void streamsProperties() {
299299
});
300300
}
301301

302+
@SuppressWarnings("deprecation")
303+
@Deprecated(since = "3.1.0", forRemoval = true)
304+
void streamsCacheMaxSizeBuffering() {
305+
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class)
306+
.withPropertyValues("spring.kafka.streams.cache-max-size-buffering=1KB").run((context) -> {
307+
Properties configs = context
308+
.getBean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
309+
KafkaStreamsConfiguration.class)
310+
.asProperties();
311+
assertThat(configs).containsEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024);
312+
});
313+
}
314+
302315
@SuppressWarnings("unchecked")
303316
@Test
304317
void streamsApplicationIdUsesMainApplicationNameByDefault() {

spring-boot-project/spring-boot-dependencies/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ bom {
700700
]
701701
}
702702
}
703-
library("Kafka", "3.3.2") {
703+
library("Kafka", "3.4.0") {
704704
group("org.apache.kafka") {
705705
modules = [
706706
"connect",

0 commit comments

Comments
 (0)