diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 3cff04749d9..e6b64b479db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -132,6 +132,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private final FsDelegationToken fsDelegationToken; private final UserProvider userProvider; private final int nrThreads; + private AtomicInteger numRetries; private final RpcControllerFactory rpcControllerFactory; private String bulkToken; @@ -178,6 +179,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32); nrThreads = conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors()); + numRetries = new AtomicInteger(0); rpcControllerFactory = new RpcControllerFactory(conf); } @@ -784,8 +786,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { protected List tryAtomicRegionLoad(ClientServiceCallable serviceCallable, final TableName tableName, final byte[] first, final Collection lqis) throws IOException { + List toRetry = new ArrayList<>(); try { - List toRetry = new ArrayList<>(); Configuration conf = getConf(); byte[] region = RpcRetryingCallerFactory.instantiate(conf, null). newCaller() .callWithRetries(serviceCallable, Integer.MAX_VALUE); @@ -799,8 +801,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return toRetry; } catch (IOException e) { LOG.error("Encountered unrecoverable error from region server, additional details: " + - serviceCallable.getExceptionMessageAdditionalDetail(), - e); + serviceCallable.getExceptionMessageAdditionalDetail(), + e); + LOG.warn( + "Received a " + e.getClass().getSimpleName() + + " from region server: " + + serviceCallable.getExceptionMessageAdditionalDetail(), e); + if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) + && numRetries.get() < getConf().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { + LOG.warn("Will attempt to retry loading failed HFiles. Retry #" + + numRetries.incrementAndGet()); + toRetry.addAll(lqis); + return toRetry; + } + LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover"); throw e; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java index 7e051b38084..48a6d23c0c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java @@ -331,7 +331,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { @Test public void testRetryOnIOException() throws Exception { final TableName table = TableName.valueOf(name.getMethodName()); - final AtomicInteger calls = new AtomicInteger(1); + final AtomicInteger calls = new AtomicInteger(0); final Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true); @@ -340,9 +340,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { protected List tryAtomicRegionLoad( ClientServiceCallable serverCallable, TableName tableName, final byte[] first, Collection lqis) throws IOException { - if (calls.getAndIncrement() < util.getConfiguration().getInt( - HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - - 1) { + if (calls.get() < util.getConfiguration().getInt( + HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) { ClientServiceCallable newServerCallable = new ClientServiceCallable(conn, tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) { @@ -351,6 +350,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { throw new IOException("Error calling something on RegionServer"); } }; + calls.getAndIncrement(); return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis); } else { return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis); @@ -360,8 +360,8 @@ public class TestLoadIncrementalHFilesSplitRecovery { setupTable(conn, table, 10); Path dir = buildBulkFiles(table, 1); lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table)); + assertEquals(calls.get(), 2); util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false); - } private ClusterConnection getMockedConnection(final Configuration conf)