diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index ccf44da858c..3647637beb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -362,9 +362,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param regionLocator region locator * @param silence true to ignore unmatched column families * @param copyFile always copy hfiles if true + * @return List of filenames which were not found * @throws TableNotFoundException if table does not yet exist */ - public void doBulkLoad(Map> map, final Admin admin, Table table, + public List doBulkLoad(Map> map, final Admin admin, Table table, RegionLocator regionLocator, boolean silence, boolean copyFile) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { @@ -379,7 +380,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { prepareHFileQueue(map, table, queue, silence); if (queue.isEmpty()) { LOG.warn("Bulk load operation did not get any files to load"); - return; + return null; } pool = createExecutorService(); secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); @@ -389,7 +390,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { break; } } - performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); + return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile); } finally { cleanup(admin, queue, pool, secureClient); } @@ -448,7 +449,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } - void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, + List performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, Deque queue, ExecutorService pool, SecureBulkLoadClient secureClient, boolean copyFile) throws IOException { int count = 0; @@ -463,6 +464,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); + Pair, List> pair = null; // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { @@ -482,8 +484,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { count++; // Using ByteBuffer for byte[] equality semantics - Multimap regionGroups = groupOrSplitPhase(table, - pool, queue, startEndKeys); + pair = groupOrSplitPhase(table, pool, queue, startEndKeys); + Multimap regionGroups = pair.getFirst(); if (!checkHFilesCountPerRegionPerFamily(regionGroups)) { // Error is logged inside checkHFilesCountPerRegionPerFamily. @@ -502,6 +504,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new RuntimeException("Bulk load aborted with some files not yet loaded." + "Please check log for more details."); } + if (pair == null) return null; + return pair.getSecond(); } /** @@ -625,7 +629,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try { pool = createExecutorService(); Multimap regionGroups = - groupOrSplitPhase(table, pool, queue, startEndKeys); + groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst(); bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile); } finally { if (pool != null) { @@ -709,25 +713,34 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } /** - * @return A map that groups LQI by likely bulk load region targets. + * @param table the table to load into + * @param pool the ExecutorService + * @param queue the queue for LoadQueueItem + * @param startEndKeys start and end keys + * @return A map that groups LQI by likely bulk load region targets and List of missing hfiles. */ - private Multimap groupOrSplitPhase(final Table table, - ExecutorService pool, Deque queue, + private Pair, List> groupOrSplitPhase( + final Table table, ExecutorService pool, Deque queue, final Pair startEndKeys) throws IOException { // need synchronized only within this scope of this // phase because of the puts that happen in futures. Multimap rgs = HashMultimap.create(); final Multimap regionGroups = Multimaps.synchronizedMultimap(rgs); + List missingHFiles = new ArrayList<>(); + Pair, List> pair = new Pair<>(regionGroups, + missingHFiles); // drain LQIs and figure out bulk load groups - Set>> splittingFutures = new HashSet<>(); + Set, String>>> splittingFutures = new HashSet<>(); while (!queue.isEmpty()) { final LoadQueueItem item = queue.remove(); - final Callable> call = new Callable>() { + final Callable, String>> call = + new Callable, String>>() { @Override - public List call() throws Exception { - List splits = groupOrSplit(regionGroups, item, table, startEndKeys); + public Pair, String> call() throws Exception { + Pair, String> splits = groupOrSplit(regionGroups, item, table, + startEndKeys); return splits; } }; @@ -735,11 +748,15 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } // get all the results. All grouping and splitting must finish before // we can attempt the atomic loads. - for (Future> lqis : splittingFutures) { + for (Future, String>> lqis : splittingFutures) { try { - List splits = lqis.get(); + Pair, String> splits = lqis.get(); if (splits != null) { - queue.addAll(splits); + if (splits.getFirst() != null) { + queue.addAll(splits.getFirst()); + } else { + missingHFiles.add(splits.getSecond()); + } } } catch (ExecutionException e1) { Throwable t = e1.getCause(); @@ -754,7 +771,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw (InterruptedIOException)new InterruptedIOException().initCause(e1); } } - return regionGroups; + return pair; } // unique file name for the table @@ -817,17 +834,22 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * protected for testing * @throws IOException if an IO failure is encountered */ - protected List groupOrSplit(Multimap regionGroups, - final LoadQueueItem item, final Table table, - final Pair startEndKeys) - throws IOException { + protected Pair, String> groupOrSplit( + Multimap regionGroups, final LoadQueueItem item, final Table table, + final Pair startEndKeys) throws IOException { final Path hfilePath = item.hfilePath; // fs is the source filesystem if (fs == null) { fs = hfilePath.getFileSystem(getConf()); } - HFile.Reader hfr = HFile.createReader(fs, hfilePath, - new CacheConfig(getConf()), getConf()); + HFile.Reader hfr = null; + try { + hfr = HFile.createReader(fs, hfilePath, + new CacheConfig(getConf()), getConf()); + } catch (FileNotFoundException fnfe) { + LOG.debug("encountered", fnfe); + return new Pair<>(null, hfilePath.getName()); + } final byte[] first, last; try { hfr.loadFileInfo(); @@ -890,7 +912,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { List lqis = splitStoreFile(item, table, startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]); - return lqis; + return new Pair<>(lqis, null); } // group regions. @@ -1171,7 +1193,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - public int run(String dirPath, Map> map, TableName tableName) throws Exception{ + public List run(String dirPath, Map> map, TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { @@ -1197,13 +1219,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, "")); if (dirPath != null) { doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles); + return null; } else { - doBulkLoad(map, admin, table, locator, silence, copyFiles); + return doBulkLoad(map, admin, table, locator, silence, copyFiles); } } } - - return 0; } @Override @@ -1215,7 +1236,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); - return run(dirPath, null, tableName); + List missingHFiles = run(dirPath, null, tableName); + if (missingHFiles == null) return 0; + return -1; } public static void main(String[] args) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 88b9247bb6a..fe7abcd6e65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -323,12 +323,14 @@ public class TestLoadIncrementalHFiles { map = new TreeMap>(Bytes.BYTES_COMPARATOR); map.put(FAMILY, list); } + Path last = null; for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; Path path = new Path(familyDir, "hfile_" + hfileIdx++); HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); if (useMap) { + last = path; list.add(path); } } @@ -346,7 +348,10 @@ public class TestLoadIncrementalHFiles { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); String [] args= {dir.toString(), tableName.toString()}; if (useMap) { - loader.run(null, map, tableName); + fs.delete(last); + List missingHFiles = loader.run(null, map, tableName); + expectedRows -= 1000; + assertTrue(missingHFiles.contains(last.getName())); } else { loader.run(args); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java index 2060726d7f5..a1ed832b751 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java @@ -404,13 +404,14 @@ public class TestLoadIncrementalHFilesSplitRecovery { LoadIncrementalHFiles lih = new LoadIncrementalHFiles( util.getConfiguration()) { @Override - protected List groupOrSplit( + protected Pair, String> groupOrSplit( Multimap regionGroups, final LoadQueueItem item, final Table htable, final Pair startEndKeys) throws IOException { - List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + Pair, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); } return lqis; } @@ -479,7 +480,7 @@ public class TestLoadIncrementalHFilesSplitRecovery { int i = 0; @Override - protected List groupOrSplit( + protected Pair, String> groupOrSplit( Multimap regionGroups, final LoadQueueItem item, final Table table, final Pair startEndKeys) throws IOException { @@ -521,13 +522,14 @@ public class TestLoadIncrementalHFilesSplitRecovery { LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) { @Override - protected List groupOrSplit( + protected Pair, String> groupOrSplit( Multimap regionGroups, final LoadQueueItem item, final Table htable, final Pair startEndKeys) throws IOException { - List lqis = super.groupOrSplit(regionGroups, item, htable, startEndKeys); - if (lqis != null) { - countedLqis.addAndGet(lqis.size()); + Pair, String> lqis = super.groupOrSplit(regionGroups, item, htable, + startEndKeys); + if (lqis != null && lqis.getFirst() != null) { + countedLqis.addAndGet(lqis.getFirst().size()); } return lqis; }