HBASE-15811 Batch Get after batch Put does not fetch all Cells We were not waiting on all executors in a batch to complete. The test for no-more-executors was damaged by the 0.99/0.98.4 fix "HBASE-11403 Fix race conditions around Object#notify"
It added this in AsyncProcess#waitForMaximumCurrentTasks: synchronized (this.tasksInProgress) { + if (tasksInProgress.get() != oldInProgress) break; this.tasksInProgress.wait(100); which added a break out of our waiting loop if any change in count of tasks; it seems that what was wanted was instead to avoid the wait if there was movement in the count of completed task. Reformats waitForMaximumCurrentTasks so it is testable. Adds test that we indeed wait on the specified parameter.
This commit is contained in:
parent
92f415976e
commit
77f511fceb
|
@ -1679,7 +1679,7 @@ class AsyncProcess {
|
|||
synchronized (actionsInProgress) {
|
||||
if (actionsInProgress.get() == 0) break;
|
||||
if (!hasWait) {
|
||||
actionsInProgress.wait(100);
|
||||
actionsInProgress.wait(10);
|
||||
} else {
|
||||
long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
|
||||
TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
|
||||
|
@ -1770,9 +1770,16 @@ class AsyncProcess {
|
|||
|
||||
/** Wait until the async does not have more than max tasks in progress. */
|
||||
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
|
||||
waitForMaximumCurrentTasks(max, tasksInProgress, id);
|
||||
}
|
||||
|
||||
// Break out this method so testable
|
||||
@VisibleForTesting
|
||||
static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id)
|
||||
throws InterruptedIOException {
|
||||
long lastLog = EnvironmentEdgeManager.currentTime();
|
||||
long currentInProgress, oldInProgress = Long.MAX_VALUE;
|
||||
while ((currentInProgress = this.tasksInProgress.get()) > max) {
|
||||
while ((currentInProgress = tasksInProgress.get()) > max) {
|
||||
if (oldInProgress != currentInProgress) { // Wait for in progress to change.
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
if (now > lastLog + 10000) {
|
||||
|
@ -1783,9 +1790,10 @@ class AsyncProcess {
|
|||
}
|
||||
oldInProgress = currentInProgress;
|
||||
try {
|
||||
synchronized (this.tasksInProgress) {
|
||||
if (tasksInProgress.get() != oldInProgress) break;
|
||||
this.tasksInProgress.wait(100);
|
||||
synchronized (tasksInProgress) {
|
||||
if (tasksInProgress.get() == oldInProgress) {
|
||||
tasksInProgress.wait(10);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException("#" + id + ", interrupted." +
|
||||
|
|
|
@ -20,31 +20,7 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
|
@ -56,6 +32,8 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
@ -68,6 +46,32 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.RegionLocations;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFuture;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestAsyncProcess {
|
||||
private final static Log LOG = LogFactory.getLog(TestAsyncProcess.class);
|
||||
|
@ -1134,4 +1138,43 @@ public class TestAsyncProcess {
|
|||
Assert.assertTrue(puts.isEmpty());
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException {
|
||||
final AtomicLong tasks = new AtomicLong(0);
|
||||
final AtomicInteger max = new AtomicInteger(0);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
Runnable runnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
barrier.await();
|
||||
AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
|
||||
} catch (InterruptedIOException e) {
|
||||
Assert.fail(e.getMessage());
|
||||
} catch (InterruptedException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
} catch (BrokenBarrierException e) {
|
||||
// TODO Auto-generated catch block
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
// First test that our runnable thread only exits when tasks is zero.
|
||||
Thread t = new Thread(runnable);
|
||||
t.start();
|
||||
barrier.await();
|
||||
t.join();
|
||||
// Now assert we stay running if max == zero and tasks is > 0.
|
||||
barrier.reset();
|
||||
tasks.set(1000000);
|
||||
t = new Thread(runnable);
|
||||
t.start();
|
||||
barrier.await();
|
||||
while (tasks.get() > 0) {
|
||||
assertTrue(t.isAlive());
|
||||
tasks.set(tasks.get() - 1);
|
||||
}
|
||||
t.join();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue