From 77e69a73dac92766beebee1eda0a7cba65ef0c36 Mon Sep 17 00:00:00 2001 From: Masatake Iwasaki Date: Fri, 22 May 2020 18:50:19 +0900 Subject: [PATCH] HADOOP-17040. Fix intermittent failure of ITestBlockingThreadPoolExecutorService. (#2020) (cherry picked from commit 968531463375ebf29ba3186c13b5f8685df10d25) --- ...TestBlockingThreadPoolExecutorService.java | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java index 3dfe286bc7b..569f6fbabcd 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java @@ -31,11 +31,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; /** * Basic test for S3A's blocking executor service. @@ -92,11 +92,12 @@ public class ITestBlockingThreadPoolExecutorService { */ protected void verifyQueueSize(ExecutorService executorService, int expectedQueueSize) { - StopWatch stopWatch = new StopWatch().start(); + CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < expectedQueueSize; i++) { - executorService.submit(sleeper); - assertDidntBlock(stopWatch); + executorService.submit(new LatchedSleeper(latch)); } + StopWatch stopWatch = new StopWatch().start(); + latch.countDown(); executorService.submit(sleeper); assertDidBlock(stopWatch); } @@ -124,15 +125,6 @@ public class ITestBlockingThreadPoolExecutorService { // Helper functions, etc. - private void assertDidntBlock(StopWatch sw) { - try { - assertFalse("Non-blocking call took too long.", - sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC); - } finally { - sw.reset().start(); - } - } - private void assertDidBlock(StopWatch sw) { try { if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) { @@ -164,6 +156,25 @@ public class ITestBlockingThreadPoolExecutorService { } }; + private class LatchedSleeper implements Runnable { + private final CountDownLatch latch; + + LatchedSleeper(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + try { + latch.await(); + Thread.sleep(TASK_SLEEP_MSEC); + } catch (InterruptedException e) { + LOG.info("Thread {} interrupted.", Thread.currentThread().getName()); + Thread.currentThread().interrupt(); + } + } + } + /** * Helper function to create thread pool under test. */