HBASE-15931 Add log for long-running tasks in AsyncProcess

This commit is contained in:
Yu Li 2016-06-02 12:00:42 +08:00
parent 0cedd8b344
commit 36bb496e48
3 changed files with 49 additions and 12 deletions

View File

@ -119,6 +119,12 @@ class AsyncProcess {
*/ */
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
private final int thresholdToLogUndoneTaskDetails;
private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
"hbase.client.threshold.log.details";
private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
/** /**
* The context used to wait for results from one submit call. * The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@ -330,6 +336,10 @@ class AsyncProcess {
this.rpcCallerFactory = rpcCaller; this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory; this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
this.thresholdToLogUndoneTaskDetails =
conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
} }
/** /**
@ -387,7 +397,7 @@ class AsyncProcess {
List<Integer> locationErrorRows = null; List<Integer> locationErrorRows = null;
do { do {
// Wait until there is at least one slot for a new task. // Wait until there is at least one slot for a new task.
waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
// Remember the previous decisions about regions or region servers we put in the // Remember the previous decisions about regions or region servers we put in the
// final multi. // final multi.
@ -1765,18 +1775,19 @@ class AsyncProcess {
@VisibleForTesting @VisibleForTesting
/** Waits until all outstanding tasks are done. Used in tests. */ /** Waits until all outstanding tasks are done. Used in tests. */
void waitUntilDone() throws InterruptedIOException { void waitUntilDone() throws InterruptedIOException {
waitForMaximumCurrentTasks(0); waitForMaximumCurrentTasks(0, null);
} }
/** Wait until the async does not have more than max tasks in progress. */ /** Wait until the async does not have more than max tasks in progress. */
private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { private void waitForMaximumCurrentTasks(int max, String tableName)
waitForMaximumCurrentTasks(max, tasksInProgress, id); throws InterruptedIOException {
waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
} }
// Break out this method so testable // Break out this method so testable
@VisibleForTesting @VisibleForTesting
static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
throws InterruptedIOException { String tableName) throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTime(); long lastLog = EnvironmentEdgeManager.currentTime();
long currentInProgress, oldInProgress = Long.MAX_VALUE; long currentInProgress, oldInProgress = Long.MAX_VALUE;
while ((currentInProgress = tasksInProgress.get()) > max) { while ((currentInProgress = tasksInProgress.get()) > max) {
@ -1785,7 +1796,11 @@ class AsyncProcess {
if (now > lastLog + 10000) { if (now > lastLog + 10000) {
lastLog = now; lastLog = now;
LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
+ max + ", tasksInProgress=" + currentInProgress); + max + ", tasksInProgress=" + currentInProgress +
" hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
logDetailsOfUndoneTasks(currentInProgress);
}
} }
} }
oldInProgress = currentInProgress; oldInProgress = currentInProgress;
@ -1802,6 +1817,25 @@ class AsyncProcess {
} }
} }
private void logDetailsOfUndoneTasks(long taskInProgress) {
ArrayList<ServerName> servers = new ArrayList<ServerName>();
for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
if (entry.getValue().get() > 0) {
servers.add(entry.getKey());
}
}
LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
ArrayList<String> regions = new ArrayList<String>();
for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
if (entry.getValue().get() > 0) {
regions.add(Bytes.toString(entry.getKey()));
}
}
LOG.info("Regions against which left over task(s) are processed: " + regions);
}
}
/** /**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat. * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* @return Whether there were any errors in any request since the last time * @return Whether there were any errors in any request since the last time
@ -1817,12 +1851,13 @@ class AsyncProcess {
* failed operations themselves. * failed operations themselves.
* @param failedRows an optional list into which the rows that failed since the last time * @param failedRows an optional list into which the rows that failed since the last time
* {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
* @param tableName name of the table
* @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
* was called, or AP was created. * was called, or AP was created.
*/ */
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
List<Row> failedRows) throws InterruptedIOException { List<Row> failedRows, String tableName) throws InterruptedIOException {
waitForMaximumCurrentTasks(0); waitForMaximumCurrentTasks(0, tableName);
if (!globalErrors.hasErrors()) { if (!globalErrors.hasErrors()) {
return null; return null;
} }

View File

@ -237,7 +237,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
while (!buffer.isEmpty()) { while (!buffer.isEmpty()) {
ap.submit(tableName, buffer, true, null, false); ap.submit(tableName, buffer, true, null, false);
} }
RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); RetriesExhaustedWithDetailsException error =
ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
if (error != null) { if (error != null) {
if (listener == null) { if (listener == null) {
throw error; throw error;

View File

@ -1139,16 +1139,17 @@ public class TestAsyncProcess {
} }
@Test @Test
public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { public void testWaitForMaximumCurrentTasks() throws Exception {
final AtomicLong tasks = new AtomicLong(0); final AtomicLong tasks = new AtomicLong(0);
final AtomicInteger max = new AtomicInteger(0); final AtomicInteger max = new AtomicInteger(0);
final CyclicBarrier barrier = new CyclicBarrier(2); final CyclicBarrier barrier = new CyclicBarrier(2);
final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
Runnable runnable = new Runnable() { Runnable runnable = new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
barrier.await(); barrier.await();
AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
} catch (InterruptedIOException e) { } catch (InterruptedIOException e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} catch (InterruptedException e) { } catch (InterruptedException e) {