HBASE-17165 Make use of retry setting in LoadIncrementalHFiles & fix test

This commit is contained in:
Mike Grimes 2017-11-17 19:47:54 -08:00 committed by Michael Stack
parent 1b66444846
commit b16e03c130
2 changed files with 24 additions and 8 deletions

View File

@ -132,6 +132,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private final FsDelegationToken fsDelegationToken;
private final UserProvider userProvider;
private final int nrThreads;
private AtomicInteger numRetries;
private final RpcControllerFactory rpcControllerFactory;
private String bulkToken;
@ -178,6 +179,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
Runtime.getRuntime().availableProcessors());
numRetries = new AtomicInteger(0);
rpcControllerFactory = new RpcControllerFactory(conf);
}
@ -784,8 +786,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
throws IOException {
List<LoadQueueItem> toRetry = new ArrayList<>();
try {
List<LoadQueueItem> toRetry = new ArrayList<>();
Configuration conf = getConf();
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
@ -799,8 +801,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return toRetry;
} catch (IOException e) {
LOG.error("Encountered unrecoverable error from region server, additional details: " +
serviceCallable.getExceptionMessageAdditionalDetail(),
e);
serviceCallable.getExceptionMessageAdditionalDetail(),
e);
LOG.warn(
"Received a " + e.getClass().getSimpleName()
+ " from region server: "
+ serviceCallable.getExceptionMessageAdditionalDetail(), e);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
&& numRetries.get() < getConf().getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
+ numRetries.incrementAndGet());
toRetry.addAll(lqis);
return toRetry;
}
LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
throw e;
}
}

View File

@ -331,7 +331,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
@Test
public void testRetryOnIOException() throws Exception {
final TableName table = TableName.valueOf(name.getMethodName());
final AtomicInteger calls = new AtomicInteger(1);
final AtomicInteger calls = new AtomicInteger(0);
final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
@ -340,9 +340,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
protected List<LoadQueueItem> tryAtomicRegionLoad(
ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
Collection<LoadQueueItem> lqis) throws IOException {
if (calls.getAndIncrement() < util.getConfiguration().getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
1) {
if (calls.get() < util.getConfiguration().getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
HConstants.PRIORITY_UNSET) {
@ -351,6 +350,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throw new IOException("Error calling something on RegionServer");
}
};
calls.getAndIncrement();
return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
} else {
return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
@ -360,8 +360,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
setupTable(conn, table, 10);
Path dir = buildBulkFiles(table, 1);
lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
assertEquals(calls.get(), 2);
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
}
private ClusterConnection getMockedConnection(final Configuration conf)