HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as input - addendum adheres to original cleanup logic

This commit is contained in:
tedyu 2016-09-20 09:38:18 -07:00
parent 348eb2834a
commit 08d9a2b662
1 changed files with 84 additions and 70 deletions

View File

@ -333,6 +333,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
doBulkLoad(hfofDir, admin, table, regionLocator, false); doBulkLoad(hfofDir, admin, table, regionLocator, false);
} }
void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
SecureBulkLoadClient secureClient) throws IOException {
fsDelegationToken.releaseDelegationToken();
if (bulkToken != null && secureClient != null) {
secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
}
if (pool != null) {
pool.shutdown();
}
if (!queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
LOG.error(err);
}
}
/** /**
* Perform a bulk load of the given directory into the given * Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe. * pre-existing table. This method is not threadsafe.
@ -352,12 +372,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue // LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread // happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>(); Deque<LoadQueueItem> queue = new LinkedList<>();
prepareHFileQueue(map, table, queue, silence); ExecutorService pool = null;
if (queue.isEmpty()) { SecureBulkLoadClient secureClient = null;
LOG.warn("Bulk load operation did not get any files to load"); try {
return; prepareHFileQueue(map, table, queue, silence);
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not get any files to load");
return;
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
} finally {
cleanup(admin, queue, pool, secureClient);
} }
performBulkLoad(admin, table, regionLocator, queue);
} }
/** /**
@ -392,87 +420,73 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
// LQI queue does not need to be threadsafe -- all operations on this queue // LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread // happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>(); Deque<LoadQueueItem> queue = new LinkedList<>();
prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); ExecutorService pool = null;
SecureBulkLoadClient secureClient = null;
try {
prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
if (queue.isEmpty()) { if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not find any files to load in " + LOG.warn("Bulk load operation did not find any files to load in " +
"directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " +
"subdirectories that correspond to column family names?"); "subdirectories that correspond to column family names?");
return; return;
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
} finally {
cleanup(admin, queue, pool, secureClient);
} }
performBulkLoad(admin, table, regionLocator, queue);
} }
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue) throws IOException { Deque<LoadQueueItem> queue, ExecutorService pool,
ExecutorService pool = createExecutorService(); SecureBulkLoadClient secureClient) throws IOException {
int count = 0;
SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); if(isSecureBulkLoadEndpointAvailable()) {
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
LOG.warn("Secure bulk load has been integrated into HBase core.");
}
try { //If using secure bulk load, get source delegation token, and
int count = 0; //prepare staging directory and token
// fs is the source filesystem
fsDelegationToken.acquireDelegationToken(fs);
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
if(isSecureBulkLoadEndpointAvailable()) { // Assumes that region splits can happen while this occurs.
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); while (!queue.isEmpty()) {
LOG.warn("Secure bulk load has been integrated into HBase core."); // need to reload split keys each iteration.
final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " +
+ count + " with " + queue.size() + " files remaining to group or split");
} }
//If using secure bulk load, get source delegation token, and int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
//prepare staging directory and token maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
// fs is the source filesystem if (maxRetries != 0 && count >= maxRetries) {
fsDelegationToken.acquireDelegationToken(fs); throw new IOException("Retry attempted " + count +
bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
// Assumes that region splits can happen while this occurs.
while (!queue.isEmpty()) {
// need to reload split keys each iteration.
final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
if (count != 0) {
LOG.info("Split occured while grouping HFiles, retry attempt " +
+ count + " with " + queue.size() + " files remaining to group or split");
}
int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
if (maxRetries != 0 && count >= maxRetries) {
throw new IOException("Retry attempted " + count +
" times without completing, bailing out"); " times without completing, bailing out");
} }
count++; count++;
// Using ByteBuffer for byte[] equality semantics // Using ByteBuffer for byte[] equality semantics
Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table, Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(table,
pool, queue, startEndKeys); pool, queue, startEndKeys);
if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
// Error is logged inside checkHFilesCountPerRegionPerFamily. // Error is logged inside checkHFilesCountPerRegionPerFamily.
throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
+ " hfiles to one family of one region"); + " hfiles to one family of one region");
}
bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
// NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and
// that we can atomically pull out the groups we want to retry.
} }
} finally { bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
fsDelegationToken.releaseDelegationToken();
if(bulkToken != null) { // NOTE: The next iteration's split / group could happen in parallel to
secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); // atomic bulkloads assuming that there are splits and no merges, and
} // that we can atomically pull out the groups we want to retry.
pool.shutdown();
if (!queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
err.append("-------------------------------------------------\n");
for (LoadQueueItem q : queue) {
err.append(" ").append(q.hfilePath).append('\n');
}
LOG.error(err);
}
} }
if (!queue.isEmpty()) { if (!queue.isEmpty()) {