@@ -92,6 +92,8 @@ public class KafkaAdmin extends KafkaResourceFactory
92
92
93
93
private boolean initializingContext ;
94
94
95
+ private boolean modifyTopicConfigs ;
96
+
95
97
/**
96
98
* Create an instance with an {@link AdminClient} based on the supplied
97
99
* configuration.
@@ -140,6 +142,16 @@ public void setAutoCreate(boolean autoCreate) {
140
142
this .autoCreate = autoCreate ;
141
143
}
142
144
145
+ /**
146
+ * Set to true to compare the current topic configuration properties with those in the
147
+ * {@link NewTopic} bean, and update if different.
148
+ * @param modifyTopicConfigs true to check and update configs if necessary.
149
+ * @since 2.8.7
150
+ */
151
+ public void setModifyTopicConfigs (boolean modifyTopicConfigs ) {
152
+ this .modifyTopicConfigs = modifyTopicConfigs ;
153
+ }
154
+
143
155
@ Override
144
156
public Map <String , Object > getConfigurationProperties () {
145
157
Map <String , Object > configs2 = new HashMap <>(this .configs );
@@ -254,16 +266,19 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTo
254
266
if (topicsWithPartitionMismatches .size () > 0 ) {
255
267
createMissingPartitions (adminClient , topicsWithPartitionMismatches );
256
268
}
257
- Map <ConfigResource , List <ConfigEntry >> mismatchingConfigs =
258
- checkTopicsForConfigMismatches (adminClient , topics );
259
- if (!mismatchingConfigs .isEmpty ()) {
260
- adjustConfigMismatches (adminClient , topics , mismatchingConfigs );
269
+ if (this .modifyTopicConfigs ) {
270
+ Map <ConfigResource , List <ConfigEntry >> mismatchingConfigs =
271
+ checkTopicsForConfigMismatches (adminClient , topics );
272
+ if (!mismatchingConfigs .isEmpty ()) {
273
+ adjustConfigMismatches (adminClient , topics , mismatchingConfigs );
274
+ }
261
275
}
262
276
}
263
277
}
264
278
265
279
private Map <ConfigResource , List <ConfigEntry >> checkTopicsForConfigMismatches (
266
280
AdminClient adminClient , Collection <NewTopic > topics ) {
281
+
267
282
List <ConfigResource > configResources = topics .stream ()
268
283
.map (topic -> new ConfigResource (Type .TOPIC , topic .name ()))
269
284
.collect (Collectors .toList ());
@@ -297,10 +312,10 @@ private Map<ConfigResource, List<ConfigEntry>> checkTopicsForConfigMismatches(
297
312
}
298
313
catch (InterruptedException ie ) {
299
314
Thread .currentThread ().interrupt ();
300
- throw new KafkaException ("Interrupted while getting topic descriptions" , ie );
315
+ throw new KafkaException ("Interrupted while getting topic descriptions:" + topics , ie );
301
316
}
302
317
catch (ExecutionException | TimeoutException ex ) {
303
- throw new KafkaException ("Failed to obtain topic descriptions" , ex );
318
+ throw new KafkaException ("Failed to obtain topic descriptions:" + topics , ex );
304
319
}
305
320
}
306
321
0 commit comments