Skip to content

Commit 390803b

Browse files
authored
Merge branch 'apache:trunk' into KAFKA-15983
2 parents 9838434 + af03353 commit 390803b

File tree

22 files changed

+107
-787
lines changed

22 files changed

+107
-787
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.UUID;
6464
import java.util.concurrent.CompletableFuture;
6565
import java.util.concurrent.ExecutionException;
66+
import java.util.concurrent.Executors;
6667
import java.util.concurrent.Future;
6768
import java.util.concurrent.TimeUnit;
6869
import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +110,7 @@
109110
import static org.junit.jupiter.api.Assertions.assertNull;
110111
import static org.junit.jupiter.api.Assertions.assertThrows;
111112
import static org.junit.jupiter.api.Assertions.assertTrue;
113+
import static org.junit.jupiter.api.Assertions.fail;
112114

113115
@ClusterTestDefaults(
114116
types = {Type.KRAFT},
@@ -1593,6 +1595,75 @@ private void sendCompressedMessages(int numRecords, TopicPartition tp) {
15931595
}
15941596
}
15951597

1598+
@ClusterTest
1599+
public void testClassicConsumerStallBetweenPoll() throws Exception {
1600+
testStallBetweenPoll(GroupProtocol.CLASSIC);
1601+
}
1602+
1603+
@ClusterTest
1604+
public void testAsyncConsumerStallBetweenPoll() throws Exception {
1605+
testStallBetweenPoll(GroupProtocol.CONSUMER);
1606+
}
1607+
1608+
/**
1609+
* This test is to prove that the intermittent stalling that has been experienced when using the asynchronous
1610+
* consumer, as filed under KAFKA-19259, have been fixed.
1611+
*
1612+
* <p/>
1613+
*
1614+
* The basic idea is to have one thread that produces a record every 500 ms. and the main thread that consumes
1615+
* records without pausing between polls for much more than the produce delay. In the test case filed in
1616+
* KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds despite records being produced every second.
1617+
*/
1618+
private void testStallBetweenPoll(GroupProtocol groupProtocol) throws Exception {
1619+
var testTopic = "stall-test-topic";
1620+
var numPartitions = 6;
1621+
cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
1622+
1623+
// The producer must produce slowly to tickle the scenario.
1624+
var produceDelay = 500;
1625+
1626+
var executor = Executors.newScheduledThreadPool(1);
1627+
1628+
try (var producer = cluster.producer()) {
1629+
// Start a thread running that produces records at a relative trickle.
1630+
executor.scheduleWithFixedDelay(
1631+
() -> producer.send(new ProducerRecord<>(testTopic, TestUtils.randomBytes(64))),
1632+
0,
1633+
produceDelay,
1634+
TimeUnit.MILLISECONDS
1635+
);
1636+
1637+
Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
1638+
1639+
// Assign a tolerance for how much time is allowed to pass between Consumer.poll() calls given that there
1640+
// should be *at least* one record to read every second.
1641+
var pollDelayTolerance = 2000;
1642+
1643+
try (Consumer<byte[], byte[]> consumer = cluster.consumer(consumerConfig)) {
1644+
consumer.subscribe(List.of(testTopic));
1645+
1646+
// This is here to allow the consumer time to settle the group membership/assignment.
1647+
awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 0));
1648+
1649+
// Keep track of the last time the poll is invoked to ensure the deltas between invocations don't
1650+
// exceed the delay threshold defined above.
1651+
var beforePoll = System.currentTimeMillis();
1652+
consumer.poll(Duration.ofSeconds(5));
1653+
consumer.poll(Duration.ofSeconds(5));
1654+
var afterPoll = System.currentTimeMillis();
1655+
var pollDelay = afterPoll - beforePoll;
1656+
1657+
if (pollDelay > pollDelayTolerance)
1658+
fail("Detected a stall of " + pollDelay + " ms between Consumer.poll() invocations despite a Producer producing records every " + produceDelay + " ms");
1659+
} finally {
1660+
executor.shutdownNow();
1661+
// Wait for any active tasks to terminate to ensure consumer is not closed while being used from another thread
1662+
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), "Executor did not terminate");
1663+
}
1664+
}
1665+
}
1666+
15961667
private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
15971668
Consumer<byte[], byte[]> consumer,
15981669
TopicPartition tp

clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@
2020
/**
2121
* Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}.
2222
*/
23-
public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
23+
public class UnregisterBrokerOptions extends AbstractOptions<UnregisterBrokerOptions> {
2424
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public boolean hasAvailableFetches() {
147147
* @param data {@link FetchSessionHandler.FetchRequestData} that represents the session data
148148
* @param resp {@link ClientResponse} from which the {@link FetchResponse} will be retrieved
149149
*/
150+
@SuppressWarnings("NPathComplexity")
150151
protected void handleFetchSuccess(final Node fetchTarget,
151152
final FetchSessionHandler.FetchRequestData data,
152153
final ClientResponse resp) {
@@ -174,6 +175,8 @@ protected void handleFetchSuccess(final Node fetchTarget,
174175
final Set<TopicPartition> partitions = new HashSet<>(responseData.keySet());
175176
final FetchMetricsAggregator metricAggregator = new FetchMetricsAggregator(metricsManager, partitions);
176177

178+
boolean needsWakeup = true;
179+
177180
Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>();
178181
for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
179182
TopicPartition partition = entry.getKey();
@@ -220,8 +223,14 @@ protected void handleFetchSuccess(final Node fetchTarget,
220223
metricAggregator,
221224
fetchOffset);
222225
fetchBuffer.add(completedFetch);
226+
needsWakeup = false;
223227
}
224228

229+
// "Wake" the fetch buffer on any response, even if it's empty, to allow the consumer to not block
230+
// indefinitely waiting on the fetch buffer to get data.
231+
if (needsWakeup)
232+
fetchBuffer.wakeup();
233+
225234
if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
226235
List<Node> leaderNodes = new ArrayList<>();
227236

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1811,7 +1811,7 @@ private Fetch<K, V> pollForFetches(Timer timer) {
18111811
// use of a shorter, dedicated "pollTimer" here which updates "timer" so that calling method (poll) will
18121812
// correctly handle the overall timeout.
18131813
try {
1814-
fetchBuffer.awaitNotEmpty(pollTimer);
1814+
fetchBuffer.awaitWakeup(pollTimer);
18151815
} catch (InterruptException e) {
18161816
log.trace("Interrupt during fetch", e);
18171817
throw e;

clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Collection;
2828
import java.util.Collections;
2929
import java.util.HashSet;
30+
import java.util.List;
3031
import java.util.Set;
3132
import java.util.concurrent.ConcurrentLinkedQueue;
3233
import java.util.concurrent.TimeUnit;
@@ -51,7 +52,7 @@ public class FetchBuffer implements AutoCloseable {
5152
private final Logger log;
5253
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
5354
private final Lock lock;
54-
private final Condition notEmptyCondition;
55+
private final Condition blockingCondition;
5556
private final IdempotentCloser idempotentCloser = new IdempotentCloser();
5657

5758
private final AtomicBoolean wokenup = new AtomicBoolean(false);
@@ -62,7 +63,7 @@ public FetchBuffer(final LogContext logContext) {
6263
this.log = logContext.logger(FetchBuffer.class);
6364
this.completedFetches = new ConcurrentLinkedQueue<>();
6465
this.lock = new ReentrantLock();
65-
this.notEmptyCondition = lock.newCondition();
66+
this.blockingCondition = lock.newCondition();
6667
}
6768

6869
/**
@@ -95,13 +96,7 @@ boolean hasCompletedFetches(Predicate<CompletedFetch> predicate) {
9596
}
9697

9798
void add(CompletedFetch completedFetch) {
98-
try {
99-
lock.lock();
100-
completedFetches.add(completedFetch);
101-
notEmptyCondition.signalAll();
102-
} finally {
103-
lock.unlock();
104-
}
99+
addAll(List.of(completedFetch));
105100
}
106101

107102
void addAll(Collection<CompletedFetch> completedFetches) {
@@ -111,7 +106,8 @@ void addAll(Collection<CompletedFetch> completedFetches) {
111106
try {
112107
lock.lock();
113108
this.completedFetches.addAll(completedFetches);
114-
notEmptyCondition.signalAll();
109+
wokenup.set(true);
110+
blockingCondition.signalAll();
115111
} finally {
116112
lock.unlock();
117113
}
@@ -154,23 +150,23 @@ CompletedFetch poll() {
154150
}
155151

156152
/**
157-
* Allows the caller to await presence of data in the buffer. The method will block, returning only
153+
* Allows the caller to await a response from the broker for requested data. The method will block, returning only
158154
* under one of the following conditions:
159155
*
160156
* <ol>
161-
* <li>The buffer was already non-empty on entry</li>
162-
* <li>The buffer was populated during the wait</li>
157+
* <li>The buffer was already woken</li>
158+
* <li>The buffer was woken during the wait</li>
163159
* <li>The remaining time on the {@link Timer timer} elapsed</li>
164160
* <li>The thread was interrupted</li>
165161
* </ol>
166162
*
167163
* @param timer Timer that provides time to wait
168164
*/
169-
void awaitNotEmpty(Timer timer) {
165+
void awaitWakeup(Timer timer) {
170166
try {
171167
lock.lock();
172168

173-
while (isEmpty() && !wokenup.compareAndSet(true, false)) {
169+
while (!wokenup.compareAndSet(true, false)) {
174170
// Update the timer before we head into the loop in case it took a while to get the lock.
175171
timer.update();
176172

@@ -185,7 +181,7 @@ void awaitNotEmpty(Timer timer) {
185181
break;
186182
}
187183

188-
if (!notEmptyCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
184+
if (!blockingCondition.await(timer.remainingMs(), TimeUnit.MILLISECONDS)) {
189185
break;
190186
}
191187
}
@@ -198,10 +194,10 @@ void awaitNotEmpty(Timer timer) {
198194
}
199195

200196
void wakeup() {
201-
wokenup.set(true);
202197
try {
203198
lock.lock();
204-
notEmptyCondition.signalAll();
199+
wokenup.set(true);
200+
blockingCondition.signalAll();
205201
} finally {
206202
lock.unlock();
207203
}

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ public enum ApiKeys {
134134
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE),
135135
DESCRIBE_SHARE_GROUP_OFFSETS(ApiMessageType.DESCRIBE_SHARE_GROUP_OFFSETS),
136136
ALTER_SHARE_GROUP_OFFSETS(ApiMessageType.ALTER_SHARE_GROUP_OFFSETS),
137-
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS),
138-
GET_REPLICA_LOG_INFO(ApiMessageType.GET_REPLICA_LOG_INFO);
137+
DELETE_SHARE_GROUP_OFFSETS(ApiMessageType.DELETE_SHARE_GROUP_OFFSETS);
138+
139139

140140
private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
141141
new EnumMap<>(ApiMessageType.ListenerType.class);

clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,6 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
354354
return AlterShareGroupOffsetsRequest.parse(readable, apiVersion);
355355
case DELETE_SHARE_GROUP_OFFSETS:
356356
return DeleteShareGroupOffsetsRequest.parse(readable, apiVersion);
357-
case GET_REPLICA_LOG_INFO:
358-
return GetReplicaLogInfoRequest.parse(readable, apiVersion);
359357
default:
360358
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
361359
"code should be updated to do so.", apiKey));

clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,6 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, Readable readable,
291291
return AlterShareGroupOffsetsResponse.parse(readable, version);
292292
case DELETE_SHARE_GROUP_OFFSETS:
293293
return DeleteShareGroupOffsetsResponse.parse(readable, version);
294-
case GET_REPLICA_LOG_INFO:
295-
return GetReplicaLogInfoResponse.parse(readable, version);
296294
default:
297295
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
298296
"code should be updated to do so.", apiKey));

clients/src/main/java/org/apache/kafka/common/requests/GetReplicaLogInfoRequest.java

Lines changed: 0 additions & 98 deletions
This file was deleted.

0 commit comments

Comments
 (0)