From 6904430a3d2bd87190b5f5d51a85d929684caae1 Mon Sep 17 00:00:00 2001 From: stack Date: Thu, 12 May 2016 14:37:29 -0700 Subject: [PATCH] HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around Object#notify" It added this in AsyncProcess#waitForMaximumCurrentTasks: synchronized (this.tasksInProgress) { + if (tasksInProgress.get() != oldInProgress) break; this.tasksInProgress.wait(100); which added a break out of our waiting loop if any change in count of tasks; it seems that what was wanted was instead to avoid the wait if there was movement in the count of completed task. Reformats waitForMaximumCurrentTasks so it is testable. Adds test that we indeed wait on the specified parameter. --- .../hadoop/hbase/client/AsyncProcess.java | 18 +++++--- .../hadoop/hbase/client/TestAsyncProcess.java | 42 +++++++++++++++++++ 2 files changed, 55 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 6de8c8219f6..e8af36ccb34 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1678,7 +1678,7 @@ class AsyncProcess { synchronized (actionsInProgress) { if (actionsInProgress.get() == 0) break; if (!hasWait) { - actionsInProgress.wait(100); + actionsInProgress.wait(10); } else { long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L)); TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond); @@ -1770,9 +1770,16 @@ class AsyncProcess { /** Wait until the async does not have more than max tasks in progress. */ private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { + waitForMaximumCurrentTasks(max, tasksInProgress, id); + } + + // Break out this method so testable + @VisibleForTesting + static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) + throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; - while ((currentInProgress = this.tasksInProgress.get()) > max) { + while ((currentInProgress = tasksInProgress.get()) > max) { if (oldInProgress != currentInProgress) { // Wait for in progress to change. long now = EnvironmentEdgeManager.currentTime(); if (now > lastLog + 10000) { @@ -1783,9 +1790,10 @@ class AsyncProcess { } oldInProgress = currentInProgress; try { - synchronized (this.tasksInProgress) { - if (tasksInProgress.get() != oldInProgress) break; - this.tasksInProgress.wait(100); + synchronized (tasksInProgress) { + if (tasksInProgress.get() == oldInProgress) { + tasksInProgress.wait(10); + } } } catch (InterruptedException e) { throw new InterruptedIOException("#" + id + ", interrupted." + diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 1003d242a20..376c02a9100 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; +import static org.junit.Assert.assertTrue; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -1129,4 +1132,43 @@ public class TestAsyncProcess { Assert.assertTrue(puts.isEmpty()); } + @Test + public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { + final AtomicLong tasks = new AtomicLong(0); + final AtomicInteger max = new AtomicInteger(0); + final CyclicBarrier barrier = new CyclicBarrier(2); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + barrier.await(); + AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); + } catch (InterruptedIOException e) { + Assert.fail(e.getMessage()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (BrokenBarrierException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }; + // First test that our runnable thread only exits when tasks is zero. + Thread t = new Thread(runnable); + t.start(); + barrier.await(); + t.join(); + // Now assert we stay running if max == zero and tasks is > 0. + barrier.reset(); + tasks.set(1000000); + t = new Thread(runnable); + t.start(); + barrier.await(); + while (tasks.get() > 0) { + assertTrue(t.isAlive()); + tasks.set(tasks.get() - 1); + } + t.join(); + } }