Skip to content

Commit 5410d15

Browse files
mschwrzgaryrussell
authored andcommitted
GH-1482: Support Admin.incrementalAlterConfigs
Resolves #1482 Renamed modifyTopics to createMissingParitions Typo Added method to detect mismatches in topic-config Added method to adjust mismatches in topic-config Added test for mismatch adjustment Refactor Added more testvalues
1 parent 460098a commit 5410d15

File tree

2 files changed

+160
-8
lines changed

2 files changed

+160
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,28 @@
2424
import java.util.HashMap;
2525
import java.util.List;
2626
import java.util.Map;
27+
import java.util.Optional;
2728
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
3031
import java.util.stream.Collectors;
3132

3233
import org.apache.commons.logging.LogFactory;
3334
import org.apache.kafka.clients.admin.AdminClient;
35+
import org.apache.kafka.clients.admin.AlterConfigOp;
36+
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
37+
import org.apache.kafka.clients.admin.AlterConfigsResult;
38+
import org.apache.kafka.clients.admin.Config;
39+
import org.apache.kafka.clients.admin.ConfigEntry;
3440
import org.apache.kafka.clients.admin.CreatePartitionsResult;
3541
import org.apache.kafka.clients.admin.CreateTopicsResult;
42+
import org.apache.kafka.clients.admin.DescribeConfigsResult;
3643
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3744
import org.apache.kafka.clients.admin.NewPartitions;
3845
import org.apache.kafka.clients.admin.NewTopic;
3946
import org.apache.kafka.clients.admin.TopicDescription;
47+
import org.apache.kafka.common.config.ConfigResource;
48+
import org.apache.kafka.common.config.ConfigResource.Type;
4049
import org.apache.kafka.common.errors.InvalidPartitionsException;
4150
import org.apache.kafka.common.errors.TopicExistsException;
4251
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -237,13 +246,99 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
237246
.map(NewTopic::name)
238247
.collect(Collectors.toList()));
239248
List<NewTopic> topicsToAdd = new ArrayList<>();
240-
Map<String, NewPartitions> topicsToModify = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
249+
Map<String, NewPartitions> topicsWithPartitionMismatches =
250+
checkPartitions(topicNameToTopic, topicInfo, topicsToAdd);
241251
if (topicsToAdd.size() > 0) {
242252
addTopics(adminClient, topicsToAdd);
243253
}
244-
if (topicsToModify.size() > 0) {
245-
modifyTopics(adminClient, topicsToModify);
254+
if (topicsWithPartitionMismatches.size() > 0) {
255+
createMissingPartitions(adminClient, topicsWithPartitionMismatches);
246256
}
257+
Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs =
258+
checkTopicsForConfigMismatches(adminClient, topics);
259+
if (!mismatchingConfigs.isEmpty()) {
260+
adjustConfigMismatches(adminClient, topics, mismatchingConfigs);
261+
}
262+
}
263+
}
264+
265+
private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
266+
AdminClient adminClient, Collection<NewTopic> topics) {
267+
List<ConfigResource> configResources = topics.stream()
268+
.map(topic -> new ConfigResource(Type.TOPIC, topic.name()))
269+
.collect(Collectors.toList());
270+
271+
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(configResources);
272+
try {
273+
Map<ConfigResource, Config> topicsConfig = describeConfigsResult.all()
274+
.get(this.operationTimeout, TimeUnit.SECONDS);
275+
276+
Map<ConfigResource, List<ConfigEntry>> configMismatches = new HashMap<>();
277+
for (Map.Entry<ConfigResource, Config> topicConfig : topicsConfig.entrySet()) {
278+
Optional<NewTopic> topicOptional = topics.stream()
279+
.filter(p -> p.name().equals(topicConfig.getKey().name()))
280+
.findFirst();
281+
282+
List<ConfigEntry> configMismatchesEntries = new ArrayList<>();
283+
if (topicOptional.isPresent() && topicOptional.get().configs() != null) {
284+
for (Map.Entry<String, String> desiredConfigParameter : topicOptional.get().configs().entrySet()) {
285+
ConfigEntry actualConfigParameter = topicConfig.getValue().get(desiredConfigParameter.getKey());
286+
if (!actualConfigParameter.value().equals(desiredConfigParameter.getValue())) {
287+
configMismatchesEntries.add(actualConfigParameter);
288+
}
289+
290+
if (configMismatchesEntries.size() > 0) {
291+
configMismatches.put(topicConfig.getKey(), configMismatchesEntries);
292+
}
293+
}
294+
}
295+
}
296+
return configMismatches;
297+
}
298+
catch (InterruptedException ie) {
299+
Thread.currentThread().interrupt();
300+
throw new KafkaException("Interrupted while getting topic descriptions", ie);
301+
}
302+
catch (ExecutionException | TimeoutException ex) {
303+
throw new KafkaException("Failed to obtain topic descriptions", ex);
304+
}
305+
}
306+
307+
private void adjustConfigMismatches(AdminClient adminClient, Collection<NewTopic> topics,
308+
Map<ConfigResource, List<ConfigEntry>> mismatchingConfigs) {
309+
for (Map.Entry<ConfigResource, List<ConfigEntry>> mismatchingConfigsOfTopic : mismatchingConfigs.entrySet()) {
310+
ConfigResource topicConfigResource = mismatchingConfigsOfTopic.getKey();
311+
312+
Optional<NewTopic> topicOptional = topics.stream().filter(p -> p.name().equals(topicConfigResource.name()))
313+
.findFirst();
314+
if (topicOptional.isPresent()) {
315+
for (ConfigEntry mismatchConfigEntry : mismatchingConfigsOfTopic.getValue()) {
316+
List<AlterConfigOp> alterConfigOperations = new ArrayList<>();
317+
Map<String, String> desiredConfigs = topicOptional.get().configs();
318+
if (desiredConfigs.get(mismatchConfigEntry.name()) != null) {
319+
alterConfigOperations.add(
320+
new AlterConfigOp(
321+
new ConfigEntry(mismatchConfigEntry.name(),
322+
desiredConfigs.get(mismatchConfigEntry.name())),
323+
OpType.SET));
324+
}
325+
if (alterConfigOperations.size() > 0) {
326+
try {
327+
AlterConfigsResult alterConfigsResult = adminClient
328+
.incrementalAlterConfigs(Map.of(topicConfigResource, alterConfigOperations));
329+
alterConfigsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);
330+
}
331+
catch (InterruptedException ie) {
332+
Thread.currentThread().interrupt();
333+
throw new KafkaException("Interrupted while getting topic descriptions", ie);
334+
}
335+
catch (ExecutionException | TimeoutException ex) {
336+
throw new KafkaException("Failed to obtain topic descriptions", ex);
337+
}
338+
}
339+
}
340+
}
341+
247342
}
248343
}
249344

@@ -304,7 +399,7 @@ private void addTopics(AdminClient adminClient, List<NewTopic> topicsToAdd) {
304399
}
305400
}
306401

307-
private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
402+
private void createMissingPartitions(AdminClient adminClient, Map<String, NewPartitions> topicsToModify) {
308403
CreatePartitionsResult partitionsResult = adminClient.createPartitions(topicsToModify);
309404
try {
310405
partitionsResult.all().get(this.operationTimeout, TimeUnit.SECONDS);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,26 @@
2323
import java.util.Arrays;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Optional;
2829
import java.util.concurrent.ExecutionException;
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicReference;
3132

33+
import org.apache.kafka.clients.CommonClientConfigs;
3234
import org.apache.kafka.clients.admin.AdminClient;
3335
import org.apache.kafka.clients.admin.AdminClientConfig;
36+
import org.apache.kafka.clients.admin.AlterConfigOp;
37+
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
38+
import org.apache.kafka.clients.admin.ConfigEntry;
39+
import org.apache.kafka.clients.admin.DescribeConfigsResult;
3440
import org.apache.kafka.clients.admin.DescribeTopicsResult;
3541
import org.apache.kafka.clients.admin.NewPartitions;
3642
import org.apache.kafka.clients.admin.NewTopic;
3743
import org.apache.kafka.clients.admin.TopicDescription;
44+
import org.apache.kafka.common.config.ConfigResource;
45+
import org.apache.kafka.common.config.ConfigResource.Type;
3846
import org.apache.kafka.common.config.TopicConfig;
3947
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
4048
import org.junit.jupiter.api.Test;
@@ -54,7 +62,6 @@
5462
/**
5563
* @author Gary Russell
5664
* @since 1.3
57-
*
5865
*/
5966
@SpringJUnitConfig
6067
@DirtiesContext
@@ -63,6 +70,9 @@ public class KafkaAdminTests {
6370
@Autowired
6471
private KafkaAdmin admin;
6572

73+
@Autowired
74+
private AdminClient adminClient;
75+
6676
@Autowired
6777
private NewTopic topic1;
6878

@@ -72,6 +82,9 @@ public class KafkaAdminTests {
7282
@Autowired
7383
private NewTopic topic3;
7484

85+
@Autowired
86+
private NewTopic mismatchconfig;
87+
7588
@Test
7689
public void testTopicConfigs() {
7790
assertThat(topic1.configs()).containsEntry(
@@ -93,6 +106,7 @@ public void testAddTopicsAndAddPartitions() throws Exception {
93106
new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", Optional.of(4));
94107
new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", Optional.of(3));
95108
this.admin.initialize();
109+
96110
int n = 0;
97111
await().until(() -> {
98112
results.putAll(this.admin.describeTopics("foo", "bar"));
@@ -122,6 +136,33 @@ public void testAddTopicsAndAddPartitions() throws Exception {
122136
return foo.partitions().size() == 5;
123137
});
124138
results.forEach((name, td) -> assertThat(td.partitions()).hasSize(5));
139+
140+
await().until(() -> {
141+
adminClient.incrementalAlterConfigs(
142+
Map.of(
143+
new ConfigResource(Type.TOPIC, "mismatchconfig"),
144+
List.of(new AlterConfigOp(new ConfigEntry("retention.bytes", "10"), OpType.SET),
145+
new AlterConfigOp(new ConfigEntry("retention.ms", "11"), OpType.SET))));
146+
DescribeConfigsResult describeConfigsResult = this.adminClient
147+
.describeConfigs(List.of(new ConfigResource(Type.TOPIC, "mismatchconfig")));
148+
Map<ConfigResource, org.apache.kafka.clients.admin.Config> configResourceConfigMap = describeConfigsResult.all()
149+
.get();
150+
return configResourceConfigMap.get(new ConfigResource(Type.TOPIC, "mismatchconfig")).get("retention.bytes").value().equals("10")
151+
&& configResourceConfigMap.get(new ConfigResource(Type.TOPIC, "mismatchconfig")).get("retention.ms").value().equals("11");
152+
});
153+
154+
this.admin.createOrModifyTopics(mismatchconfig);
155+
156+
await().until(() -> {
157+
DescribeConfigsResult describeConfigsResult = this.adminClient
158+
.describeConfigs(List.of(new ConfigResource(Type.TOPIC, "mismatchconfig")));
159+
Map<ConfigResource, org.apache.kafka.clients.admin.Config> configResourceConfigMap = describeConfigsResult.all()
160+
.get();
161+
return configResourceConfigMap.get(new ConfigResource(Type.TOPIC, "mismatchconfig"))
162+
.get("retention.bytes").value().equals("1024")
163+
&& configResourceConfigMap.get(new ConfigResource(Type.TOPIC, "mismatchconfig"))
164+
.get("retention.ms").value().equals("1111");
165+
});
125166
}
126167

127168
@Test
@@ -175,11 +216,9 @@ public void alreadyExists() throws Exception {
175216
if (m.getName().equals("addTopics")) {
176217
addTopics.set(m);
177218
}
178-
else if (m.getName().equals("modifyTopics")) {
219+
else if (m.getName().equals("createMissingPartitions")) {
179220
modifyTopics.set(m);
180221
}
181-
}, m -> {
182-
return m.getName().endsWith("Topics");
183222
});
184223
try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) {
185224
addTopics.get().invoke(this.admin, adminClient, Collections.singletonList(this.topic1));
@@ -223,6 +262,14 @@ public KafkaAdmin admin() {
223262
return admin;
224263
}
225264

265+
@Bean
266+
public AdminClient adminClient() {
267+
Map<String, Object> configs = new HashMap<>();
268+
configs.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
269+
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
270+
return AdminClient.create(configs);
271+
}
272+
226273
@Bean
227274
public NewTopic topic1() {
228275
return TopicBuilder.name("foo")
@@ -250,6 +297,16 @@ public NewTopic topic3() {
250297
.build();
251298
}
252299

300+
@Bean
301+
public NewTopic mismatchconfig() {
302+
return TopicBuilder.name("mismatchconfig")
303+
.partitions(2)
304+
.replicas(1)
305+
.config("retention.bytes", "1024")
306+
.config("retention.ms", "1111")
307+
.build();
308+
}
309+
253310
@Bean
254311
public NewTopics topics456() {
255312
return new NewTopics(

0 commit comments

Comments
 (0)