HBASE-15931 Add log for long-running tasks in AsyncProcess

This commit is contained in:
Yu Li 2016-06-02 12:00:42 +08:00
parent cbb95cd3a9
commit 53eb27bb60
3 changed files with 49 additions and 12 deletions

View File

@ -120,6 +120,12 @@ class AsyncProcess {
*/
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
private final int thresholdToLogUndoneTaskDetails;
private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
"hbase.client.threshold.log.details";
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@ -332,6 +338,10 @@ class AsyncProcess {
this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
}
/**
@ -389,7 +399,7 @@ class AsyncProcess {
List<Integer> locationErrorRows = null;
do {
// Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
// Remember the previous decisions about regions or region servers we put in the
// final multi.
@ -1765,18 +1775,19 @@ class AsyncProcess {
@VisibleForTesting
/** Waits until all outstanding tasks are done. Used in tests. */
void waitUntilDone() throws InterruptedIOException {
waitForMaximumCurrentTasks(0);
waitForMaximumCurrentTasks(0, null);
}
/** Wait until the async does not have more than max tasks in progress. */
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
waitForMaximumCurrentTasks(max, tasksInProgress, id);
private void waitForMaximumCurrentTasks(int max, String tableName)
throws InterruptedIOException {
waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
}
// Break out this method so testable
@VisibleForTesting
static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id)
throws InterruptedIOException {
void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
String tableName) throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTime();
long currentInProgress, oldInProgress = Long.MAX_VALUE;
while ((currentInProgress = tasksInProgress.get()) > max) {
@ -1785,7 +1796,11 @@ class AsyncProcess {
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
+ max + ", tasksInProgress=" + currentInProgress);
+ max + ", tasksInProgress=" + currentInProgress +
" hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
logDetailsOfUndoneTasks(currentInProgress);
}
}
}
oldInProgress = currentInProgress;
@ -1802,6 +1817,25 @@ class AsyncProcess {
}
}
private void logDetailsOfUndoneTasks(long taskInProgress) {
ArrayList<ServerName> servers = new ArrayList<ServerName>();
for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
if (entry.getValue().get() > 0) {
servers.add(entry.getKey());
}
}
LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
ArrayList<String> regions = new ArrayList<String>();
for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
if (entry.getValue().get() > 0) {
regions.add(Bytes.toString(entry.getKey()));
}
}
LOG.info("Regions against which left over task(s) are processed: " + regions);
}
}
/**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* @return Whether there were any errors in any request since the last time
@ -1817,12 +1851,13 @@ class AsyncProcess {
* failed operations themselves.
* @param failedRows an optional list into which the rows that failed since the last time
* {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
* @param tableName name of the table
* @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
* was called, or AP was created.
*/
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
List<Row> failedRows) throws InterruptedIOException {
waitForMaximumCurrentTasks(0);
List<Row> failedRows, String tableName) throws InterruptedIOException {
waitForMaximumCurrentTasks(0, tableName);
if (!globalErrors.hasErrors()) {
return null;
}

View File

@ -238,7 +238,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
while (!buffer.isEmpty()) {
ap.submit(tableName, buffer, true, null, false);
}
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
RetriesExhaustedWithDetailsException error =
ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
if (error != null) {
if (listener == null) {
throw error;

View File

@ -1135,16 +1135,17 @@ public class TestAsyncProcess {
}
@Test
public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException {
public void testWaitForMaximumCurrentTasks() throws Exception {
final AtomicLong tasks = new AtomicLong(0);
final AtomicInteger max = new AtomicInteger(0);
final CyclicBarrier barrier = new CyclicBarrier(2);
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
barrier.await();
AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
} catch (InterruptedIOException e) {
Assert.fail(e.getMessage());
} catch (InterruptedException e) {