From ee3accb37095c61dec23b9216b1f6e160ce3414d Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Wed, 6 Dec 2017 12:06:19 -0800 Subject: [PATCH] Add more flexibility for input directory structure to LoadIncrementalHFiles --- .../hbase/tool/LoadIncrementalHFiles.java | 50 ++++++++++---- .../hbase/tool/TestLoadIncrementalHFiles.java | 67 +++++++++++++------ 2 files changed, 85 insertions(+), 32 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java index 119ac49e443..c14c944f5bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.tool; -import static java.lang.String.format; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -48,7 +46,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; - +import static java.lang.String.format; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; @@ -60,9 +58,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClientServiceCallable; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; @@ -94,14 +89,20 @@ import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Multimap; import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; +import org.apache.hadoop.hbase.util.FSVisitor; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tool to load the output of HFileOutputFormat into an existing table. @@ -181,10 +182,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private void usage() { - System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + - CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + - " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + - IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + + System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable" + + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by " + + "this tool\n Note: if you set this to 'no', then the target table must already exist " + + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you" + + " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used " + + "to ignore unmatched column families\n" + "\n"); } @@ -1150,7 +1153,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), isAlwaysCopyFiles()); + return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), + isAlwaysCopyFiles()); } } } @@ -1178,13 +1182,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length < 2) { + if (args.length != 2 && args.length != 3) { usage(); return -1; } String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); - return !run(dirPath, tableName).isEmpty() ? 0 : -1; + + + if (args.length == 2) { + return !run(dirPath, tableName).isEmpty() ? 0 : -1; + } else { + Map> family2Files = Maps.newHashMap(); + FileSystem fs = FileSystem.get(getConf()); + for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) { + FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> { + Path path = new Path(regionDir.getPath(), new Path(family, hfileName)); + byte[] familyName = Bytes.toBytes(family); + if (family2Files.containsKey(familyName)) { + family2Files.get(familyName).add(path); + } else { + family2Files.put(familyName, Lists.newArrayList(path)); + } + }); + } + return !run(family2Files, tableName).isEmpty() ? 0 : -1; + } + } public static void main(String[] args) throws Exception { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java index 7e4d40edf38..8b1c96ed2b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,7 +29,6 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.TreeMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -53,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem; @@ -131,10 +130,10 @@ public class TestLoadIncrementalHFiles { public void testSimpleLoadWithFileCopy() throws Exception { String testName = tn.getMethodName(); final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName); - runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE, - false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, + runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), + false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, }, - false, true); + false, true, 2); } /** @@ -257,29 +256,55 @@ public class TestLoadIncrementalHFiles { // Run the test bulkloading the table to the default namespace final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); + useMap, 2); + + + /* Run the test bulkloading the table from a depth of 3 + directory structure is now + baseDirectory + -- regionDir + -- familyDir + -- storeFileDir + */ + if (preCreateTable) { + runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, + false, 3); + } // Run the test bulkloading the table to the specified namespace final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, - useMap); + useMap, 2); } private void runTest(String testName, TableName tableName, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) - throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, + boolean useMap, int depth) throws Exception { TableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false); + runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth); } public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, int initRowCount, int factor) throws Exception { - Path dir = util.getDataTestDirOnTestFS(testName); + return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges, + useMap, deleteFile, copyFiles, initRowCount, factor, 2); + } + + public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util, + byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys, + byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles, + int initRowCount, int factor, int depth) throws Exception { + Path baseDirectory = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory()); - Path familyDir = new Path(dir, Bytes.toString(fam)); + baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory()); + Path parentDir = baseDirectory; + if (depth == 3) { + assert !useMap; + parentDir = new Path(baseDirectory, "someRegion"); + } + Path familyDir = new Path(parentDir, Bytes.toString(fam)); int hfileIdx = 0; Map> map = null; @@ -314,7 +339,11 @@ public class TestLoadIncrementalHFiles { conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true); } LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf); - String[] args = { dir.toString(), tableName.toString() }; + List args = Lists.newArrayList(baseDirectory.toString(), tableName.toString()); + if (depth == 3) { + args.add("-loadTable"); + } + if (useMap) { if (deleteFile) { fs.delete(last, true); @@ -329,7 +358,7 @@ public class TestLoadIncrementalHFiles { } } } else { - loader.run(args); + loader.run(args.toArray(new String[]{})); } if (copyFiles) { @@ -348,11 +377,11 @@ public class TestLoadIncrementalHFiles { return expectedRows; } - private void runTest(String testName, TableDescriptor htd, BloomType bloomType, + private void runTest(String testName, TableDescriptor htd, boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, - boolean copyFiles) throws Exception { + boolean copyFiles, int depth) throws Exception { loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges, - useMap, true, copyFiles, 0, 1000); + useMap, true, copyFiles, 0, 1000, depth); final TableName tableName = htd.getTableName(); // verify staging folder has been cleaned up @@ -430,7 +459,7 @@ public class TestLoadIncrementalHFiles { .build(); try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false); + runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2); assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException);