diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f9199a820de..fc1c44f7e19 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -119,6 +119,12 @@ class AsyncProcess { */ public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details"; + private final int thresholdToLogUndoneTaskDetails; + private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = + "hbase.client.threshold.log.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; + private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2; + /** * The context used to wait for results from one submit call. * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts), @@ -330,6 +336,10 @@ class AsyncProcess { this.rpcCallerFactory = rpcCaller; this.rpcFactory = rpcFactory; this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false); + + this.thresholdToLogUndoneTaskDetails = + conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS, + DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } /** @@ -387,7 +397,7 @@ class AsyncProcess { List locationErrorRows = null; do { // Wait until there is at least one slot for a new task. - waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1); + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString()); // Remember the previous decisions about regions or region servers we put in the // final multi. @@ -1765,18 +1775,19 @@ class AsyncProcess { @VisibleForTesting /** Waits until all outstanding tasks are done. Used in tests. */ void waitUntilDone() throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + waitForMaximumCurrentTasks(0, null); } /** Wait until the async does not have more than max tasks in progress. */ - private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException { - waitForMaximumCurrentTasks(max, tasksInProgress, id); + private void waitForMaximumCurrentTasks(int max, String tableName) + throws InterruptedIOException { + waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName); } // Break out this method so testable @VisibleForTesting - static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id) - throws InterruptedIOException { + void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id, + String tableName) throws InterruptedIOException { long lastLog = EnvironmentEdgeManager.currentTime(); long currentInProgress, oldInProgress = Long.MAX_VALUE; while ((currentInProgress = tasksInProgress.get()) > max) { @@ -1785,7 +1796,11 @@ class AsyncProcess { if (now > lastLog + 10000) { lastLog = now; LOG.info("#" + id + ", waiting for some tasks to finish. Expected max=" - + max + ", tasksInProgress=" + currentInProgress); + + max + ", tasksInProgress=" + currentInProgress + + " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName); + if (currentInProgress <= thresholdToLogUndoneTaskDetails) { + logDetailsOfUndoneTasks(currentInProgress); + } } } oldInProgress = currentInProgress; @@ -1802,6 +1817,25 @@ class AsyncProcess { } } + private void logDetailsOfUndoneTasks(long taskInProgress) { + ArrayList servers = new ArrayList(); + for (Map.Entry entry : taskCounterPerServer.entrySet()) { + if (entry.getValue().get() > 0) { + servers.add(entry.getKey()); + } + } + LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers); + if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) { + ArrayList regions = new ArrayList(); + for (Map.Entry entry : taskCounterPerRegion.entrySet()) { + if (entry.getValue().get() > 0) { + regions.add(Bytes.toString(entry.getKey())); + } + } + LOG.info("Regions against which left over task(s) are processed: " + regions); + } + } + /** * Only used w/useGlobalErrors ctor argument, for HTable backward compat. * @return Whether there were any errors in any request since the last time @@ -1817,12 +1851,13 @@ class AsyncProcess { * failed operations themselves. * @param failedRows an optional list into which the rows that failed since the last time * {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved. + * @param tableName name of the table * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)} * was called, or AP was created. */ public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset( - List failedRows) throws InterruptedIOException { - waitForMaximumCurrentTasks(0); + List failedRows, String tableName) throws InterruptedIOException { + waitForMaximumCurrentTasks(0, tableName); if (!globalErrors.hasErrors()) { return null; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 6220cd6f92b..273f2e40f9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -237,7 +237,8 @@ public class BufferedMutatorImpl implements BufferedMutator { while (!buffer.isEmpty()) { ap.submit(tableName, buffer, true, null, false); } - RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null); + RetriesExhaustedWithDetailsException error = + ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString()); if (error != null) { if (listener == null) { throw error; diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 9809c2847b8..3a0c08df0ba 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -1139,16 +1139,17 @@ public class TestAsyncProcess { } @Test - public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException { + public void testWaitForMaximumCurrentTasks() throws Exception { final AtomicLong tasks = new AtomicLong(0); final AtomicInteger max = new AtomicInteger(0); final CyclicBarrier barrier = new CyclicBarrier(2); + final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf); Runnable runnable = new Runnable() { @Override public void run() { try { barrier.await(); - AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1); + ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null); } catch (InterruptedIOException e) { Assert.fail(e.getMessage()); } catch (InterruptedException e) {