Skip to content

Commit 65bbdb6

Browse files
authored
Add KafkaTestUtils methods using Duration for timeout (2.9) (#2468)
* Add KafkaTestUtils methods using Duration for timeout To adjust codebase to modern JVM (and Kafka API itself - e.g. poll(Duration)). Old methods with "long timeout" are deprecated. * Fix checkstyle error breaking the build * Remove annotation elements not compatible with JDK 8
1 parent c92ec76 commit 65bbdb6

File tree

3 files changed

+90
-22
lines changed

3 files changed

+90
-22
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 83 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ public static Map<String, Object> producerProps(String brokers) {
136136
* @param <V> the value type.
137137
* @return the record.
138138
* @throws IllegalStateException if exactly one record is not received.
139-
* @see #getSingleRecord(Consumer, String, long)
139+
* @see #getSingleRecord(Consumer, String, Duration)
140140
*/
141141
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) {
142-
return getSingleRecord(consumer, topic, 60000); // NOSONAR magic #
142+
return getSingleRecord(consumer, topic, Duration.ofSeconds(60)); // NOSONAR magic #
143143
}
144144

145145
/**
@@ -152,14 +152,31 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
152152
* @return the record.
153153
* @throws IllegalStateException if exactly one record is not received.
154154
* @since 2.0
155+
* @deprecated in favor of {@link #getSingleRecord(Consumer, String, Duration)}
155156
*/
157+
@Deprecated
156158
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, long timeout) {
157-
long expire = System.currentTimeMillis() + timeout;
159+
return getSingleRecord(consumer, topic, Duration.ofMillis(timeout));
160+
}
161+
162+
/**
163+
* Poll the consumer, expecting a single record for the specified topic.
164+
* @param consumer the consumer.
165+
* @param topic the topic.
166+
* @param timeout max duration to wait for records; forwarded to {@link Consumer#poll(Duration)}.
167+
* @param <K> the key type.
168+
* @param <V> the value type.
169+
* @return the record.
170+
* @throws IllegalStateException if exactly one record is not received.
171+
* @since 2.9.3
172+
*/
173+
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic, Duration timeout) {
174+
long expire = System.currentTimeMillis() + timeout.toMillis();
158175
ConsumerRecords<K, V> received;
159176
Iterator<ConsumerRecord<K, V>> iterator;
160-
long remaining = timeout;
177+
long remaining = timeout.toMillis();
161178
do {
162-
received = getRecords(consumer, remaining);
179+
received = getRecords(consumer, Duration.ofMillis(remaining));
163180
iterator = received.records(topic).iterator();
164181
Map<TopicPartition, Long> reset = new HashMap<>();
165182
received.forEach(rec -> {
@@ -198,11 +215,31 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
198215
* @param timeout the timeout.
199216
* @return the record or null if no record received.
200217
* @since 2.3
218+
* @deprecated in favor of {@link #getOneRecord(String, String, String, int, boolean, boolean, Duration)}
219+
*/
220+
@Nullable
221+
@Deprecated
222+
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
223+
boolean seekToLast, boolean commit, long timeout) {
224+
return getOneRecord(brokerAddresses, group, topic, partition, seekToLast, commit, Duration.ofMillis(timeout));
225+
}
226+
227+
/**
228+
* Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
229+
* @param brokerAddresses the broker address(es).
230+
* @param group the group.
231+
* @param topic the topic.
232+
* @param partition the partition.
233+
* @param seekToLast true to fetch an existing last record, if present.
234+
* @param commit commit offset after polling or not.
235+
* @param timeout the timeout.
236+
* @return the record or null if no record received.
237+
* @since 2.9.3
201238
*/
202239
@Nullable
203240
@SuppressWarnings({ "rawtypes", "unchecked" })
204241
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
205-
boolean seekToLast, boolean commit, long timeout) {
242+
boolean seekToLast, boolean commit, Duration timeout) {
206243

207244
Map<String, Object> consumerConfig = consumerProps(brokerAddresses, group, "false");
208245
consumerConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
@@ -215,7 +252,7 @@ public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consume
215252
consumer.seek(topicPart, consumer.position(topicPart) - 1);
216253
}
217254
}
218-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofMillis(timeout));
255+
ConsumerRecords<?, ?> records = consumer.poll(timeout);
219256
ConsumerRecord<?, ?> record = records.count() == 1 ? records.iterator().next() : null;
220257
if (record != null && commit) {
221258
consumer.commitSync();
@@ -298,44 +335,74 @@ public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, S
298335
* @param <K> the key type.
299336
* @param <V> the value type.
300337
* @return the records.
301-
* @see #getRecords(Consumer, long)
338+
* @see #getRecords(Consumer, Duration)
302339
*/
303340
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) {
304-
return getRecords(consumer, 60000); // NOSONAR magic #
341+
return getRecords(consumer, Duration.ofSeconds(60)); // NOSONAR magic #
305342
}
306343

307344
/**
308345
* Poll the consumer for records.
309346
* @param consumer the consumer.
310-
* @param timeout max time in milliseconds to wait for records; forwarded to
311-
* {@link Consumer#poll(long)}.
347+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(long)}.
312348
* @param <K> the key type.
313349
* @param <V> the value type.
314350
* @return the records.
315351
* @throws IllegalStateException if the poll returns null (since 2.3.4).
316352
* @since 2.0
353+
* @deprecated in favor of {@link #getRecords(Consumer, Duration)}
317354
*/
355+
@Deprecated
318356
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout) {
319357
return getRecords(consumer, timeout, -1);
320358
}
321359

322360
/**
323361
* Poll the consumer for records.
324362
* @param consumer the consumer.
325-
* @param timeout max time in milliseconds to wait for records; forwarded to
326-
* {@link Consumer#poll(long)}.
363+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(Duration)}.
364+
* @param <K> the key type.
365+
* @param <V> the value type.
366+
* @return the records.
367+
* @throws IllegalStateException if the poll returns null (since 2.3.4).
368+
* @since 2.9.3
369+
*/
370+
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout) {
371+
return getRecords(consumer, timeout, -1);
372+
}
373+
374+
/**
375+
* Poll the consumer for records.
376+
* @param consumer the consumer.
377+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(long)}.
327378
* @param <K> the key type.
328379
* @param <V> the value type.
329-
* @param minRecords wait until the timeout or at least this number of receords are
330-
* received.
380+
* @param minRecords wait until the timeout or at least this number of records are received.
331381
* @return the records.
332382
* @throws IllegalStateException if the poll returns null.
333383
* @since 2.4.2
384+
* @deprecated in favor of {#{@link #getRecords(Consumer, Duration, int)}}
334385
*/
386+
@Deprecated
335387
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, long timeout, int minRecords) {
388+
return getRecords(consumer, Duration.ofMillis(timeout), minRecords);
389+
}
390+
391+
/**
392+
* Poll the consumer for records.
393+
* @param consumer the consumer.
394+
* @param timeout max time in milliseconds to wait for records; forwarded to {@link Consumer#poll(Duration)}.
395+
* @param <K> the key type.
396+
* @param <V> the value type.
397+
* @param minRecords wait until the timeout or at least this number of records are received.
398+
* @return the records.
399+
* @throws IllegalStateException if the poll returns null.
400+
* @since 2.9.3
401+
*/
402+
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer, Duration timeout, int minRecords) {
336403
logger.debug("Polling...");
337404
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
338-
long remaining = timeout;
405+
long remaining = timeout.toMillis();
339406
int count = 0;
340407
do {
341408
long t1 = System.currentTimeMillis();

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121

22+
import java.time.Duration;
2223
import java.util.Map;
2324

2425
import org.apache.kafka.clients.admin.AdminClient;
@@ -72,7 +73,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7273
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
7374
long t1 = System.currentTimeMillis();
7475
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
75-
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", 2000L));
76+
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
7677
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(2000L);
7778
producer.send(new ProducerRecord<>("singleTopic5", 1, "foo"));
7879
producer.close();
@@ -92,14 +93,14 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
9293
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
9394
producer.close();
9495
ConsumerRecord<?, ?> oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
95-
"singleTopic3", 0, false, true, 10_000L);
96+
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
9697
assertThat(oneRecord.value()).isEqualTo("foo");
9798
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
9899
.isNotNull()
99100
.extracting(omd -> omd.offset())
100101
.isEqualTo(1L);
101102
oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
102-
"singleTopic3", 0, true, true, 10_000L);
103+
"singleTopic3", 0, true, true, Duration.ofSeconds(10));
103104
assertThat(oneRecord.value()).isEqualTo("foo");
104105
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
105106
.isNotNull()
@@ -124,7 +125,7 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
124125
Thread.currentThread().interrupt();
125126
}
126127
}).start();
127-
ConsumerRecords<Integer, String> records = KafkaTestUtils.getRecords(consumer, 10_000L, 2);
128+
ConsumerRecords<Integer, String> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(10), 2);
128129
assertThat(records.count()).isEqualTo(2);
129130
producer.close();
130131
consumer.close();
@@ -138,7 +139,7 @@ public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) thro
138139
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
139140

140141
KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient",
141-
"singleTopic3", 0, false, true, 10_000L);
142+
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
142143
assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0))
143144
.isNotNull()
144145
.extracting(omd -> omd.offset())

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingCoverageTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
77
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
8+
* https://www.apache.org/licenses/LICENSE-2.0
99
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,

0 commit comments

Comments
 (0)