HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around Object#notify"

It added this in AsyncProcess#waitForMaximumCurrentTasks:

synchronized (this.tasksInProgress) {
+          if (tasksInProgress.get() != oldInProgress) break;
           this.tasksInProgress.wait(100);

which added a break out of our waiting loop if any change in
count of tasks; it seems that what was wanted was instead to
avoid the wait if there was movement in the count of completed
task.

Reformats waitForMaximumCurrentTasks so it is testable. Adds
test that we indeed wait on the specified parameter.
This commit is contained in:
stack 2016-05-12 14:37:29 -07:00
parent 60e19f60a9
commit 6904430a3d
2 changed files with 55 additions and 5 deletions

View File

@ -1678,7 +1678,7 @@ class AsyncProcess {
synchronized (actionsInProgress) {
if (actionsInProgress.get() == 0) break;
if (!hasWait) {
actionsInProgress.wait(100);
actionsInProgress.wait(10);
} else {
long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
@ -1770,9 +1770,16 @@ class AsyncProcess {
/** Wait until the async does not have more than max tasks in progress. */
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
waitForMaximumCurrentTasks(max, tasksInProgress, id);
}
// Break out this method so testable
@VisibleForTesting
static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id)
throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTime();
long currentInProgress, oldInProgress = Long.MAX_VALUE;
while ((currentInProgress = this.tasksInProgress.get()) > max) {
while ((currentInProgress = tasksInProgress.get()) > max) {
if (oldInProgress != currentInProgress) { // Wait for in progress to change.
long now = EnvironmentEdgeManager.currentTime();
if (now > lastLog + 10000) {
@ -1783,9 +1790,10 @@ class AsyncProcess {
}
oldInProgress = currentInProgress;
try {
synchronized (this.tasksInProgress) {
if (tasksInProgress.get() != oldInProgress) break;
this.tasksInProgress.wait(100);
synchronized (tasksInProgress) {
if (tasksInProgress.get() == oldInProgress) {
tasksInProgress.wait(10);
}
}
} catch (InterruptedException e) {
throw new InterruptedIOException("#" + id + ", interrupted." +

View File

@ -30,6 +30,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@ -63,6 +65,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Assert;
import static org.junit.Assert.assertTrue;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
@ -1129,4 +1132,43 @@ public class TestAsyncProcess {
Assert.assertTrue(puts.isEmpty());
}
@Test
public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException {
final AtomicLong tasks = new AtomicLong(0);
final AtomicInteger max = new AtomicInteger(0);
final CyclicBarrier barrier = new CyclicBarrier(2);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
barrier.await();
AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
} catch (InterruptedIOException e) {
Assert.fail(e.getMessage());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
// First test that our runnable thread only exits when tasks is zero.
Thread t = new Thread(runnable);
t.start();
barrier.await();
t.join();
// Now assert we stay running if max == zero and tasks is > 0.
barrier.reset();
tasks.set(1000000);
t = new Thread(runnable);
t.start();
barrier.await();
while (tasks.get() > 0) {
assertTrue(t.isAlive());
tasks.set(tasks.get() - 1);
}
t.join();
}
}