Skip to content

Commit 9685314

Browse files
authored
HADOOP-17040. Fix intermittent failure of ITestBlockingThreadPoolExecutorService. (#2020)
1 parent 4734c77 commit 9685314

File tree

1 file changed

+24
-13
lines changed

1 file changed

+24
-13
lines changed

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@
3131
import org.slf4j.LoggerFactory;
3232

3333
import java.util.concurrent.Callable;
34+
import java.util.concurrent.CountDownLatch;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.TimeUnit;
3637

3738
import static org.junit.Assert.assertEquals;
38-
import static org.junit.Assert.assertFalse;
3939

4040
/**
4141
* Basic test for S3A's blocking executor service.
@@ -92,11 +92,12 @@ public void testSubmitRunnable() throws Exception {
9292
*/
9393
protected void verifyQueueSize(ExecutorService executorService,
9494
int expectedQueueSize) {
95-
StopWatch stopWatch = new StopWatch().start();
95+
CountDownLatch latch = new CountDownLatch(1);
9696
for (int i = 0; i < expectedQueueSize; i++) {
97-
executorService.submit(sleeper);
98-
assertDidntBlock(stopWatch);
97+
executorService.submit(new LatchedSleeper(latch));
9998
}
99+
StopWatch stopWatch = new StopWatch().start();
100+
latch.countDown();
100101
executorService.submit(sleeper);
101102
assertDidBlock(stopWatch);
102103
}
@@ -124,15 +125,6 @@ public void testChainedQueue() throws Throwable {
124125

125126
// Helper functions, etc.
126127

127-
private void assertDidntBlock(StopWatch sw) {
128-
try {
129-
assertFalse("Non-blocking call took too long.",
130-
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
131-
} finally {
132-
sw.reset().start();
133-
}
134-
}
135-
136128
private void assertDidBlock(StopWatch sw) {
137129
try {
138130
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
@@ -164,6 +156,25 @@ public Integer call() throws Exception {
164156
}
165157
};
166158

159+
private class LatchedSleeper implements Runnable {
160+
private final CountDownLatch latch;
161+
162+
LatchedSleeper(CountDownLatch latch) {
163+
this.latch = latch;
164+
}
165+
166+
@Override
167+
public void run() {
168+
try {
169+
latch.await();
170+
Thread.sleep(TASK_SLEEP_MSEC);
171+
} catch (InterruptedException e) {
172+
LOG.info("Thread {} interrupted.", Thread.currentThread().getName());
173+
Thread.currentThread().interrupt();
174+
}
175+
}
176+
}
177+
167178
/**
168179
* Helper function to create thread pool under test.
169180
*/

0 commit comments

Comments
 (0)