diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index f775b820b20..6dea477307a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -333,6 +333,26 @@ public class LoadIncrementalHFiles extends Configured implements Tool { doBulkLoad(hfofDir, admin, table, regionLocator, false); } + void cleanup(Admin admin, Deque queue, ExecutorService pool, + SecureBulkLoadClient secureClient) throws IOException { + fsDelegationToken.releaseDelegationToken(); + if (bulkToken != null && secureClient != null) { + secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); + } + if (pool != null) { + pool.shutdown(); + } + if (!queue.isEmpty()) { + StringBuilder err = new StringBuilder(); + err.append("-------------------------------------------------\n"); + err.append("Bulk load aborted with some files not yet loaded:\n"); + err.append("-------------------------------------------------\n"); + for (LoadQueueItem q : queue) { + err.append(" ").append(q.hfilePath).append('\n'); + } + LOG.error(err); + } + } /** * Perform a bulk load of the given directory into the given * pre-existing table. This method is not threadsafe. @@ -352,12 +372,20 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque queue = new LinkedList<>(); - prepareHFileQueue(map, table, queue, silence); - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not get any files to load"); - return; + ExecutorService pool = null; + SecureBulkLoadClient secureClient = null; + try { + prepareHFileQueue(map, table, queue, silence); + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not get any files to load"); + return; + } + pool = createExecutorService(); + secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + performBulkLoad(admin, table, regionLocator, queue, pool, secureClient); + } finally { + cleanup(admin, queue, pool, secureClient); } - performBulkLoad(admin, table, regionLocator, queue); } /** @@ -392,87 +420,73 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque queue = new LinkedList<>(); - prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); + ExecutorService pool = null; + SecureBulkLoadClient secureClient = null; + try { + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + - "subdirectories that correspond to column family names?"); - return; + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return; + } + pool = createExecutorService(); + secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + performBulkLoad(admin, table, regionLocator, queue, pool, secureClient); + } finally { + cleanup(admin, queue, pool, secureClient); } - performBulkLoad(admin, table, regionLocator, queue); } void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, - Deque queue) throws IOException { - ExecutorService pool = createExecutorService(); + Deque queue, ExecutorService pool, + SecureBulkLoadClient secureClient) throws IOException { + int count = 0; - SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); + if(isSecureBulkLoadEndpointAvailable()) { + LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); + LOG.warn("Secure bulk load has been integrated into HBase core."); + } - try { - int count = 0; + //If using secure bulk load, get source delegation token, and + //prepare staging directory and token + // fs is the source filesystem + fsDelegationToken.acquireDelegationToken(fs); + bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); - if(isSecureBulkLoadEndpointAvailable()) { - LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); - LOG.warn("Secure bulk load has been integrated into HBase core."); + // Assumes that region splits can happen while this occurs. + while (!queue.isEmpty()) { + // need to reload split keys each iteration. + final Pair startEndKeys = regionLocator.getStartEndKeys(); + if (count != 0) { + LOG.info("Split occured while grouping HFiles, retry attempt " + + + count + " with " + queue.size() + " files remaining to group or split"); } - //If using secure bulk load, get source delegation token, and - //prepare staging directory and token - // fs is the source filesystem - fsDelegationToken.acquireDelegationToken(fs); - bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); - - // Assumes that region splits can happen while this occurs. - while (!queue.isEmpty()) { - // need to reload split keys each iteration. - final Pair startEndKeys = regionLocator.getStartEndKeys(); - if (count != 0) { - LOG.info("Split occured while grouping HFiles, retry attempt " + - + count + " with " + queue.size() + " files remaining to group or split"); - } - - int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); - maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); - if (maxRetries != 0 && count >= maxRetries) { - throw new IOException("Retry attempted " + count + + int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10); + maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1); + if (maxRetries != 0 && count >= maxRetries) { + throw new IOException("Retry attempted " + count + " times without completing, bailing out"); - } - count++; + } + count++; - // Using ByteBuffer for byte[] equality semantics - Multimap regionGroups = groupOrSplitPhase(table, - pool, queue, startEndKeys); + // Using ByteBuffer for byte[] equality semantics + Multimap regionGroups = groupOrSplitPhase(table, + pool, queue, startEndKeys); - if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { - // Error is logged inside checkHFilesCountPerRegionPerFamily. - throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { + // Error is logged inside checkHFilesCountPerRegionPerFamily. + throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily + " hfiles to one family of one region"); - } - - bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); - - // NOTE: The next iteration's split / group could happen in parallel to - // atomic bulkloads assuming that there are splits and no merges, and - // that we can atomically pull out the groups we want to retry. } - } finally { - fsDelegationToken.releaseDelegationToken(); - if(bulkToken != null) { - secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); - } - pool.shutdown(); - if (!queue.isEmpty()) { - StringBuilder err = new StringBuilder(); - err.append("-------------------------------------------------\n"); - err.append("Bulk load aborted with some files not yet loaded:\n"); - err.append("-------------------------------------------------\n"); - for (LoadQueueItem q : queue) { - err.append(" ").append(q.hfilePath).append('\n'); - } - LOG.error(err); - } + bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups); + + // NOTE: The next iteration's split / group could happen in parallel to + // atomic bulkloads assuming that there are splits and no merges, and + // that we can atomically pull out the groups we want to retry. } if (!queue.isEmpty()) {