From 48025cc84b791c7d64e7c01c59a06a4e16897459 Mon Sep 17 00:00:00 2001 From: Rahul Gidwani Date: Fri, 12 Jan 2018 13:35:26 -0800 Subject: [PATCH] HBASE-11409 - Add more flexibility for input directory structure to LoadIncrementalHFiles --- .../mapreduce/LoadIncrementalHFiles.java | 50 +++++++++---- .../mapreduce/TestLoadIncrementalHFiles.java | 74 +++++++++++-------- 2 files changed, 79 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index 853b59d7e55..9d7d80b802c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.mapreduce; +import com.google.common.annotations.VisibleForTesting; import static java.lang.String.format; import java.io.FileNotFoundException; @@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint; import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSHDFSUtils; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -135,6 +137,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private String bulkToken; private UserProvider userProvider; private int nrThreads; + private int depth = 2; private LoadIncrementalHFiles() {} @@ -143,6 +146,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool { initialize(); } + public void setDepth(int depth) { + this.depth = depth; + } + private void initialize() throws Exception { if (hbAdmin == null) { // make a copy, just to be sure we're not overriding someone else's config @@ -161,9 +168,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } private void usage() { - System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" - + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" - + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable" + + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by " + + "this tool\n Note: if you set this to 'no', then the target table must already exist " + + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you" + + " must have an existing table" + "\n"); } @@ -287,22 +296,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private void discoverLoadQueue(final Deque ret, final Path hfofDir, final boolean validateHFile) throws IOException { fs = hfofDir.getFileSystem(getConf()); - visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor() { - @Override - public byte[] bulkFamily(final byte[] familyName) { + BulkHFileVisitor visitor = new BulkHFileVisitor() { + @Override public byte[] bulkFamily(final byte[] familyName) { return familyName; } - @Override - public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { + + @Override public void bulkHFile(final byte[] family, final FileStatus hfile) + throws IOException { long length = hfile.getLen(); - if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, - HConstants.DEFAULT_MAX_FILE_SIZE)) { - LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + - length + " bytes can be problematic as it may lead to oversplitting."); + if (length > getConf() + .getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)) { + LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length + + " bytes can be problematic as it may lead to oversplitting."); } ret.add(new LoadQueueItem(family, hfile.getPath())); } - }, validateHFile); + }; + if (depth == 2) { + visitBulkHFiles(fs, hfofDir, visitor, validateHFile); + } else if (depth == 3) { + for (FileStatus fileStatus : FSUtils.listStatus(fs, hfofDir)) { + visitBulkHFiles(fs, fileStatus.getPath(), visitor, validateHFile); + } + } else { + throw new IllegalArgumentException("Depth of HFiles from directory must be 2 or 3"); + } + } /** @@ -1096,7 +1115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length != 2) { + if (args.length != 2 && args.length != 3) { usage(); return -1; } @@ -1105,6 +1124,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { String dirPath = args[0]; TableName tableName = TableName.valueOf(args[1]); + if (args.length == 3) { + this.setDepth(3); + } boolean tableExists = this.doesTableExist(tableName); if (!tableExists) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java index e200442a9a7..13a22747b57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java @@ -22,11 +22,9 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.Locale; import java.util.TreeMap; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -36,11 +34,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; @@ -118,7 +116,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, - }); + }, 2); } /** @@ -131,7 +129,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); + }, 2); } /** @@ -143,7 +141,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); + }, 2); } /** @@ -155,7 +153,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - }); + }, 2); } /** @@ -172,8 +170,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, - } - ); + }, 2); } /** @@ -221,8 +218,7 @@ public class TestLoadIncrementalHFiles { }, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, - } - ); + }, 2); } private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { @@ -234,8 +230,7 @@ public class TestLoadIncrementalHFiles { new byte[][][] { new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, - } - ); + }, 2); } private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) { @@ -246,39 +241,56 @@ public class TestLoadIncrementalHFiles { return htd; } - private void runTest(String testName, BloomType bloomType, - byte[][][] hfileRanges) throws Exception { - runTest(testName, bloomType, null, hfileRanges); + private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, + int depth) throws Exception { + runTest(testName, bloomType, null, hfileRanges, depth); } - private void runTest(String testName, BloomType bloomType, - byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys, + byte[][][] hfileRanges, int depth) throws Exception { final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName); final boolean preCreateTable = tableSplitKeys != null; // Run the test bulkloading the table to the default namespace final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); - runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 2); + + /* Run the test bulkloading the table from a depth of 3 + directory structure is now + baseDirectory + -- regionDir + -- familyDir + -- storeFileDir + */ + if (preCreateTable) { + runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, 3); + } // Run the test bulkloading the table to the specified namespace final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); - runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth); } private void runTest(String testName, TableName tableName, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth) + throws Exception { HTableDescriptor htd = buildHTD(tableName, bloomType); - runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges); + runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth); } private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, - boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { + boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth) + throws Exception { for (boolean managed : new boolean[] { true, false }) { - Path dir = util.getDataTestDirOnTestFS(testName); + Path baseDirectory = util.getDataTestDirOnTestFS(testName); FileSystem fs = util.getTestFileSystem(); - dir = dir.makeQualified(fs); - Path familyDir = new Path(dir, Bytes.toString(FAMILY)); + baseDirectory = baseDirectory.makeQualified(fs); + Path parentDir = baseDirectory; + if (depth == 3) { + parentDir = new Path(baseDirectory, "someRegion"); + } + Path familyDir = new Path(parentDir, Bytes.toString(FAMILY)); int hfileIdx = 0; for (byte[][] range : hfileRanges) { @@ -298,16 +310,16 @@ public class TestLoadIncrementalHFiles { util.getHBaseAdmin().createTable(htd); } LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); - + loader.setDepth(depth); if (managed) { try (HTable table = new HTable(util.getConfiguration(), tableName)) { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(baseDirectory, table); assertEquals(expectedRows, util.countRows(table)); } } else { try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); HTable table = (HTable) conn.getTable(tableName)) { - loader.doBulkLoad(dir, table); + loader.doBulkLoad(baseDirectory, table); } } @@ -390,7 +402,7 @@ public class TestLoadIncrementalHFiles { htd.addFamily(family); try { - runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges); + runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 2); assertTrue("Loading into table with non-existent family should have failed", false); } catch (Exception e) { assertTrue("IOException expected", e instanceof IOException);