HBASE-17165 Make use of retry setting in LoadIncrementalHFiles & fix test
This commit is contained in:
parent
3757915dac
commit
a9d9fa35a2
|
@ -132,6 +132,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
private final FsDelegationToken fsDelegationToken;
|
||||
private final UserProvider userProvider;
|
||||
private final int nrThreads;
|
||||
private AtomicInteger numRetries;
|
||||
private final RpcControllerFactory rpcControllerFactory;
|
||||
|
||||
private String bulkToken;
|
||||
|
@ -178,6 +179,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
|
||||
nrThreads = conf.getInt("hbase.loadincremental.threads.max",
|
||||
Runtime.getRuntime().availableProcessors());
|
||||
numRetries = new AtomicInteger(0);
|
||||
rpcControllerFactory = new RpcControllerFactory(conf);
|
||||
}
|
||||
|
||||
|
@ -784,8 +786,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
|
||||
final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
|
||||
throws IOException {
|
||||
try {
|
||||
List<LoadQueueItem> toRetry = new ArrayList<>();
|
||||
try {
|
||||
Configuration conf = getConf();
|
||||
byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
|
||||
.callWithRetries(serviceCallable, Integer.MAX_VALUE);
|
||||
|
@ -801,6 +803,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
LOG.error("Encountered unrecoverable error from region server, additional details: " +
|
||||
serviceCallable.getExceptionMessageAdditionalDetail(),
|
||||
e);
|
||||
LOG.warn(
|
||||
"Received a " + e.getClass().getSimpleName()
|
||||
+ " from region server: "
|
||||
+ serviceCallable.getExceptionMessageAdditionalDetail(), e);
|
||||
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
|
||||
&& numRetries.get() < getConf().getInt(
|
||||
HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
|
||||
LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
|
||||
+ numRetries.incrementAndGet());
|
||||
toRetry.addAll(lqis);
|
||||
return toRetry;
|
||||
}
|
||||
LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -331,7 +331,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
@Test
|
||||
public void testRetryOnIOException() throws Exception {
|
||||
final TableName table = TableName.valueOf(name.getMethodName());
|
||||
final AtomicInteger calls = new AtomicInteger(1);
|
||||
final AtomicInteger calls = new AtomicInteger(0);
|
||||
final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
|
||||
util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
|
||||
|
@ -340,9 +340,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
protected List<LoadQueueItem> tryAtomicRegionLoad(
|
||||
ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
|
||||
Collection<LoadQueueItem> lqis) throws IOException {
|
||||
if (calls.getAndIncrement() < util.getConfiguration().getInt(
|
||||
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
|
||||
1) {
|
||||
if (calls.get() < util.getConfiguration().getInt(
|
||||
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
|
||||
ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
|
||||
tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
|
||||
HConstants.PRIORITY_UNSET) {
|
||||
|
@ -351,6 +350,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
throw new IOException("Error calling something on RegionServer");
|
||||
}
|
||||
};
|
||||
calls.getAndIncrement();
|
||||
return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
|
||||
} else {
|
||||
return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
|
||||
|
@ -360,8 +360,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
setupTable(conn, table, 10);
|
||||
Path dir = buildBulkFiles(table, 1);
|
||||
lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
|
||||
assertEquals(calls.get(), 2);
|
||||
util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
|
||||
|
||||
}
|
||||
|
||||
private ClusterConnection getMockedConnection(final Configuration conf)
|
||||
|
|
Loading…
Reference in New Issue