HBASE-11409 - Add more flexibility for input directory structure to LoadIncrementalHFiles

This commit is contained in:
Rahul Gidwani 2018-01-12 13:35:26 -08:00
parent 6f29a39d76
commit 48025cc84b
2 changed files with 79 additions and 45 deletions

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.mapreduce; package org.apache.hadoop.hbase.mapreduce;
import com.google.common.annotations.VisibleForTesting;
import static java.lang.String.format; import static java.lang.String.format;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken; import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils; import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -135,6 +137,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private String bulkToken; private String bulkToken;
private UserProvider userProvider; private UserProvider userProvider;
private int nrThreads; private int nrThreads;
private int depth = 2;
private LoadIncrementalHFiles() {} private LoadIncrementalHFiles() {}
@ -143,6 +146,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
initialize(); initialize();
} }
public void setDepth(int depth) {
this.depth = depth;
}
private void initialize() throws Exception { private void initialize() throws Exception {
if (hbAdmin == null) { if (hbAdmin == null) {
// make a copy, just to be sure we're not overriding someone else's config // make a copy, just to be sure we're not overriding someone else's config
@ -161,9 +168,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
} }
private void usage() { private void usage() {
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable"
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by "
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n" + "this tool\n Note: if you set this to 'no', then the target table must already exist "
+ "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you"
+ " must have an existing table"
+ "\n"); + "\n");
} }
@ -287,22 +296,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir, private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
final boolean validateHFile) throws IOException { final boolean validateHFile) throws IOException {
fs = hfofDir.getFileSystem(getConf()); fs = hfofDir.getFileSystem(getConf());
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() { BulkHFileVisitor<byte[]> visitor = new BulkHFileVisitor<byte[]>() {
@Override @Override public byte[] bulkFamily(final byte[] familyName) {
public byte[] bulkFamily(final byte[] familyName) {
return familyName; return familyName;
} }
@Override
public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException { @Override public void bulkHFile(final byte[] family, final FileStatus hfile)
throws IOException {
long length = hfile.getLen(); long length = hfile.getLen();
if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE, if (length > getConf()
HConstants.DEFAULT_MAX_FILE_SIZE)) { .getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)) {
LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
length + " bytes can be problematic as it may lead to oversplitting."); + " bytes can be problematic as it may lead to oversplitting.");
} }
ret.add(new LoadQueueItem(family, hfile.getPath())); ret.add(new LoadQueueItem(family, hfile.getPath()));
} }
}, validateHFile); };
if (depth == 2) {
visitBulkHFiles(fs, hfofDir, visitor, validateHFile);
} else if (depth == 3) {
for (FileStatus fileStatus : FSUtils.listStatus(fs, hfofDir)) {
visitBulkHFiles(fs, fileStatus.getPath(), visitor, validateHFile);
}
} else {
throw new IllegalArgumentException("Depth of HFiles from directory must be 2 or 3");
}
} }
/** /**
@ -1096,7 +1115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override @Override
public int run(String[] args) throws Exception { public int run(String[] args) throws Exception {
if (args.length != 2) { if (args.length != 2 && args.length != 3) {
usage(); usage();
return -1; return -1;
} }
@ -1105,6 +1124,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String dirPath = args[0]; String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]); TableName tableName = TableName.valueOf(args[1]);
if (args.length == 3) {
this.setDepth(3);
}
boolean tableExists = this.doesTableExist(tableName); boolean tableExists = this.doesTableExist(tableName);
if (!tableExists) { if (!tableExists) {

View File

@ -22,11 +22,9 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.Locale; import java.util.Locale;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -36,11 +34,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
@ -118,7 +116,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}); }, 2);
} }
/** /**
@ -131,7 +129,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
}); }, 2);
} }
/** /**
@ -143,7 +141,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
}); }, 2);
} }
/** /**
@ -155,7 +153,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
}); }, 2);
} }
/** /**
@ -172,8 +170,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
} }, 2);
);
} }
/** /**
@ -221,8 +218,7 @@ public class TestLoadIncrementalHFiles {
}, },
new byte[][][] { new byte[][][] {
new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
} }, 2);
);
} }
private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception { private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
@ -234,8 +230,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] { new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") }, new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
} }, 2);
);
} }
private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) { private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
@ -246,39 +241,56 @@ public class TestLoadIncrementalHFiles {
return htd; return htd;
} }
private void runTest(String testName, BloomType bloomType, private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges,
byte[][][] hfileRanges) throws Exception { int depth) throws Exception {
runTest(testName, bloomType, null, hfileRanges); runTest(testName, bloomType, null, hfileRanges, depth);
} }
private void runTest(String testName, BloomType bloomType, private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { byte[][][] hfileRanges, int depth) throws Exception {
final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName); final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
final boolean preCreateTable = tableSplitKeys != null; final boolean preCreateTable = tableSplitKeys != null;
// Run the test bulkloading the table to the default namespace // Run the test bulkloading the table to the default namespace
final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME); final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 2);
/* Run the test bulkloading the table from a depth of 3
directory structure is now
baseDirectory
-- regionDir
-- familyDir
-- storeFileDir
*/
if (preCreateTable) {
runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, 3);
}
// Run the test bulkloading the table to the specified namespace // Run the test bulkloading the table to the specified namespace
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME); final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges); runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
} }
private void runTest(String testName, TableName tableName, BloomType bloomType, private void runTest(String testName, TableName tableName, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType); HTableDescriptor htd = buildHTD(tableName, bloomType);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges); runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
} }
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType, private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception { boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
throws Exception {
for (boolean managed : new boolean[] { true, false }) { for (boolean managed : new boolean[] { true, false }) {
Path dir = util.getDataTestDirOnTestFS(testName); Path baseDirectory = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem(); FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs); baseDirectory = baseDirectory.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY)); Path parentDir = baseDirectory;
if (depth == 3) {
parentDir = new Path(baseDirectory, "someRegion");
}
Path familyDir = new Path(parentDir, Bytes.toString(FAMILY));
int hfileIdx = 0; int hfileIdx = 0;
for (byte[][] range : hfileRanges) { for (byte[][] range : hfileRanges) {
@ -298,16 +310,16 @@ public class TestLoadIncrementalHFiles {
util.getHBaseAdmin().createTable(htd); util.getHBaseAdmin().createTable(htd);
} }
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()); LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
loader.setDepth(depth);
if (managed) { if (managed) {
try (HTable table = new HTable(util.getConfiguration(), tableName)) { try (HTable table = new HTable(util.getConfiguration(), tableName)) {
loader.doBulkLoad(dir, table); loader.doBulkLoad(baseDirectory, table);
assertEquals(expectedRows, util.countRows(table)); assertEquals(expectedRows, util.countRows(table));
} }
} else { } else {
try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration()); try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
HTable table = (HTable) conn.getTable(tableName)) { HTable table = (HTable) conn.getTable(tableName)) {
loader.doBulkLoad(dir, table); loader.doBulkLoad(baseDirectory, table);
} }
} }
@ -390,7 +402,7 @@ public class TestLoadIncrementalHFiles {
htd.addFamily(family); htd.addFamily(family);
try { try {
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges); runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 2);
assertTrue("Loading into table with non-existent family should have failed", false); assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) { } catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException); assertTrue("IOException expected", e instanceof IOException);