diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 6978e2367ff..f775b820b20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -280,12 +280,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } + /* + * Populate the Queue with given HFiles + */ + private void populateLoadQueue(final Deque ret, + Map> map) throws IOException { + for (Map.Entry> entry : map.entrySet()) { + for (Path p : entry.getValue()) { + ret.add(new LoadQueueItem(entry.getKey(), p)); + } + } + } + /** * Walk the given directory for all HFiles, and return a Queue * containing all such files. */ private void discoverLoadQueue(final Deque ret, final Path hfofDir, - final boolean validateHFile) throws IOException { + final boolean validateHFile) throws IOException { fs = hfofDir.getFileSystem(getConf()); visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor() { @Override @@ -321,6 +333,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool { doBulkLoad(hfofDir, admin, table, regionLocator, false); } + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param map map of family to List of hfiles + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Map> map, final Admin admin, Table table, + RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { + if (!admin.isTableAvailable(regionLocator.getName())) { + throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); + } + // LQI queue does not need to be threadsafe -- all operations on this queue + // happen in this thread + Deque queue = new LinkedList<>(); + prepareHFileQueue(map, table, queue, silence); + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not get any files to load"); + return; + } + performBulkLoad(admin, table, regionLocator, queue); + } + /** * Perform a bulk load of the given directory into the given * pre-existing table. This method is not threadsafe. @@ -335,41 +374,44 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { - if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); } - ExecutorService pool = createExecutorService(); - + /* + * Checking hfile format is a time-consuming operation, we should have an option to skip + * this step when bulkloading millions of HFiles. See HBASE-13985. + */ + boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); + if (!validateHFile) { + LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + + "are not correct. If you fail to read data from your table after using this " + + "option, consider removing the files and bulkload again without this option. " + + "See HBASE-13985"); + } // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque queue = new LinkedList<>(); + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); + + if (queue.isEmpty()) { + LOG.warn("Bulk load operation did not find any files to load in " + + "directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " + + "subdirectories that correspond to column family names?"); + return; + } + performBulkLoad(admin, table, regionLocator, queue); + } + + void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator, + Deque queue) throws IOException { + ExecutorService pool = createExecutorService(); + SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table); try { - /* - * Checking hfile format is a time-consuming operation, we should have an option to skip - * this step when bulkloading millions of HFiles. See HBASE-13985. - */ - boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true); - if(!validateHFile) { - LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " + - "are not correct. If you fail to read data from your table after using this " + - "option, consider removing the files and bulkload again without this option. " + - "See HBASE-13985"); - } - prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); - int count = 0; - if (queue.isEmpty()) { - LOG.warn("Bulk load operation did not find any files to load in " + - "directory " + hfofDir.toUri() + ". Does it contain files in " + - "subdirectories that correspond to column family names?"); - return; - } - if(isSecureBulkLoadEndpointAvailable()) { LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); LOG.warn("Secure bulk load has been integrated into HBase core."); @@ -421,7 +463,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); } pool.shutdown(); - if (queue != null && !queue.isEmpty()) { + if (!queue.isEmpty()) { StringBuilder err = new StringBuilder(); err.append("-------------------------------------------------\n"); err.append("Bulk load aborted with some files not yet loaded:\n"); @@ -433,7 +475,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } } - if (queue != null && !queue.isEmpty()) { + if (!queue.isEmpty()) { throw new RuntimeException("Bulk load aborted with some files not yet loaded." + "Please check log for more details."); } @@ -465,12 +507,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * @param silence true to ignore unmatched column families * @throws IOException If any I/O or network error occurred */ - public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, - boolean validateHFile, boolean silence) throws IOException { + public void prepareHFileQueue(Path hfilesDir, Table table, + Deque queue, boolean validateHFile, boolean silence) throws IOException { discoverLoadQueue(queue, hfilesDir, validateHFile); validateFamiliesInHFiles(table, queue, silence); } + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param map map of family to List of hfiles + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param silence true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Map> map, Table table, + Deque queue, boolean silence) throws IOException { + populateLoadQueue(queue, map); + validateFamiliesInHFiles(table, queue, silence); + } + // Initialize a thread pool private ExecutorService createExecutorService() { ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); @@ -1073,22 +1131,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.info("Table "+ tableName +" is available!!"); } - @Override - public int run(String[] args) throws Exception { - if (args.length < 2) { - usage(); - return -1; - } - + public int run(String dirPath, Map> map, TableName tableName) throws Exception{ initialize(); try (Connection connection = ConnectionFactory.createConnection(getConf()); Admin admin = connection.getAdmin()) { - String dirPath = args[0]; - TableName tableName = TableName.valueOf(args[1]); boolean tableExists = admin.tableExists(tableName); if (!tableExists) { - if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { + if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) { this.createTable(tableName, dirPath, admin); } else { String errorMsg = format("Table '%s' does not exist.", tableName); @@ -1096,19 +1146,37 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throw new TableNotFoundException(errorMsg); } } - - Path hfofDir = new Path(dirPath); + Path hfofDir = null; + if (dirPath != null) { + hfofDir = new Path(dirPath); + } try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); - doBulkLoad(hfofDir, admin, table, locator, silence); + if (dirPath != null) { + doBulkLoad(hfofDir, admin, table, locator, silence); + } else { + doBulkLoad(map, admin, table, locator, silence); + } } } return 0; } + @Override + public int run(String[] args) throws Exception { + if (args.length < 2) { + usage(); + return -1; + } + + String dirPath = args[0]; + TableName tableName = TableName.valueOf(args[1]); + return run(dirPath, null, tableName); + } + public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index 9f2596c86fc..cff80059c9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -24,7 +24,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; @@ -106,6 +109,15 @@ public class TestLoadIncrementalHFiles { util.shutdownMiniCluster(); } + @Test(timeout = 120000) + public void testSimpleLoadWithMap() throws Exception { + runTest("testSimpleLoadWithMap", BloomType.NONE, + new byte[][][] { + new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, + }, true); + } + /** * Test case that creates some regions and loads * HFiles that fit snugly inside those regions @@ -249,50 +261,78 @@ public class TestLoadIncrementalHFiles { runTest(testName, bloomType, null, hfileRanges); } + private void runTest(String testName, BloomType bloomType, + byte[][][] hfileRanges, boolean useMap) throws Exception { + runTest(testName, bloomType, null, hfileRanges, useMap); + } + private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + runTest(testName, bloomType, tableSplitKeys, hfileRanges, false); + } + + private void runTest(String testName, BloomType bloomType, + byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception { final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName); final boolean preCreateTable = tableSplitKeys != null; // Run the test bulkloading the table to the default namespace final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); - runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, + useMap); // Run the test bulkloading the table to the specified namespace final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); - runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, + useMap); } private void runTest(String testName, TableName tableName, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { HTableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap); } private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) + throws Exception { Path dir = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); dir = dir.makeQualified(fs); Path familyDir = new Path(dir, Bytes.toString(FAMILY)); int hfileIdx = 0; + Map> map = null; + List list = null; + if (useMap) { + map = new TreeMap>(Bytes.BYTES_COMPARATOR); + list = new ArrayList<>(); + map.put(FAMILY, list); + } for (byte[][] range : hfileRanges) { byte[] from = range[0]; byte[] to = range[1]; - HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" - + hfileIdx++), FAMILY, QUALIFIER, from, to, 1000); + Path path = new Path(familyDir, "hfile_" + hfileIdx++); + HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000); + if (useMap) { + list.add(path); + } } int expectedRows = hfileIdx * 1000; - if (preCreateTable) { + if (preCreateTable || map != null) { util.getHBaseAdmin().createTable(htd, tableSplitKeys); } final TableName tableName = htd.getTableName(); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); String [] args= {dir.toString(), tableName.toString()}; - loader.run(args); + if (useMap) { + loader.run(null, map, tableName); + } else { + loader.run(args); + } Table table = util.getConnection().getTable(tableName); try { @@ -379,7 +419,7 @@ public class TestLoadIncrementalHFiles { htd.addFamily(family); try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges); + runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false); assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException);