diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6de8c8219f6..e8af36ccb34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1678,7 +1678,7 @@ class AsyncProcess { synchronized (actionsInProgress) { if (actionsInProgress.get() == 0) break; if (!hasWait) { - actionsInProgress.wait(100); + actionsInProgress.wait(10); } else { long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); @@ -1770,9 +1770,16 @@ class AsyncProcess { /** Wait until the async does not have more than max tasks in progress. */ private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { + waitForMaximumCurrentTasks(max, tasksInProgress, id); + } + + // Break out this method so testable + @VisibleForTesting + static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) + throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; - while ((currentInProgress = this.tasksInProgress.get()) > max) { + while ((currentInProgress = tasksInProgress.get()) > max) { if (oldInProgress != currentInProgress) { // Wait for in progress to change. long now = EnvironmentEdgeManager.currentTime(); if (now > lastLog + 10000) { @@ -1783,9 +1790,10 @@ class AsyncProcess { } oldInProgress = currentInProgress; try { - synchronized (this.tasksInProgress) { - if (tasksInProgress.get() != oldInProgress) break; - this.tasksInProgress.wait(100); + synchronized (tasksInProgress) { + if (tasksInProgress.get() == oldInProgress) { + tasksInProgress.wait(10); + } } } catch (InterruptedException e) { throw new InterruptedIOException("#" + id + ", interrupted." + diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 1003d242a20..376c02a9100 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; +import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -1129,4 +1132,43 @@ public class TestAsyncProcess { Assert.assertTrue(puts.isEmpty()); } + @Test + public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { + final AtomicLong tasks = new AtomicLong(0); + final AtomicInteger max = new AtomicInteger(0); + final CyclicBarrier barrier = new CyclicBarrier(2); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + barrier.await(); + AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); + } catch (InterruptedIOException e) { + Assert.fail(e.getMessage()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }; + // First test that our runnable thread only exits when tasks is zero. + Thread t = new Thread(runnable); + t.start(); + barrier.await(); + t.join(); + // Now assert we stay running if max == zero and tasks is > 0. + barrier.reset(); + tasks.set(1000000); + t = new Thread(runnable); + t.start(); + barrier.await(); + while (tasks.get() > 0) { + assertTrue(t.isAlive()); + tasks.set(tasks.get() - 1); + } + t.join(); + } }