Add more flexibility for input directory structure to LoadIncrementalHFiles
This commit is contained in:
parent
efa911f56f
commit
d524768528
|
@ -17,8 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.tool;
|
package org.apache.hadoop.hbase.tool;
|
||||||
|
|
||||||
import static java.lang.String.format;
|
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
@ -48,7 +46,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import static java.lang.String.format;
|
||||||
import org.apache.commons.lang3.mutable.MutableInt;
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
|
@ -60,9 +58,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
import org.apache.hadoop.hbase.client.ClientServiceCallable;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
@ -94,14 +89,20 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
import org.apache.hadoop.hbase.util.FSHDFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.FSVisitor;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tool to load the output of HFileOutputFormat into an existing table.
|
* Tool to load the output of HFileOutputFormat into an existing table.
|
||||||
|
@ -181,10 +182,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void usage() {
|
private void usage() {
|
||||||
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" +
|
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable"
|
||||||
CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
|
+ "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by "
|
||||||
" Note: if you set this to 'no', then the target table must already exist in HBase\n -D" +
|
+ "this tool\n Note: if you set this to 'no', then the target table must already exist "
|
||||||
IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" +
|
+ "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you"
|
||||||
|
+ " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used "
|
||||||
|
+ "to ignore unmatched column families\n" +
|
||||||
"\n");
|
"\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1150,7 +1153,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
try (Table table = connection.getTable(tableName);
|
try (Table table = connection.getTable(tableName);
|
||||||
RegionLocator locator = connection.getRegionLocator(tableName)) {
|
RegionLocator locator = connection.getRegionLocator(tableName)) {
|
||||||
return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), isAlwaysCopyFiles());
|
return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(),
|
||||||
|
isAlwaysCopyFiles());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1178,13 +1182,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
if (args.length < 2) {
|
if (args.length != 2 && args.length != 3) {
|
||||||
usage();
|
usage();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
String dirPath = args[0];
|
String dirPath = args[0];
|
||||||
TableName tableName = TableName.valueOf(args[1]);
|
TableName tableName = TableName.valueOf(args[1]);
|
||||||
return !run(dirPath, tableName).isEmpty() ? 0 : -1;
|
|
||||||
|
|
||||||
|
if (args.length == 2) {
|
||||||
|
return !run(dirPath, tableName).isEmpty() ? 0 : -1;
|
||||||
|
} else {
|
||||||
|
Map<byte[], List<Path>> family2Files = Maps.newHashMap();
|
||||||
|
FileSystem fs = FileSystem.get(getConf());
|
||||||
|
for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
|
||||||
|
FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
|
||||||
|
Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
|
||||||
|
byte[] familyName = Bytes.toBytes(family);
|
||||||
|
if (family2Files.containsKey(familyName)) {
|
||||||
|
family2Files.get(familyName).add(path);
|
||||||
|
} else {
|
||||||
|
family2Files.put(familyName, Lists.newArrayList(path));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return !run(family2Files, tableName).isEmpty() ? 0 : -1;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
|
@ -22,7 +22,6 @@ import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -30,7 +29,6 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -53,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
|
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
|
||||||
|
@ -131,10 +130,10 @@ public class TestLoadIncrementalHFiles {
|
||||||
public void testSimpleLoadWithFileCopy() throws Exception {
|
public void testSimpleLoadWithFileCopy() throws Exception {
|
||||||
String testName = tn.getMethodName();
|
String testName = tn.getMethodName();
|
||||||
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
|
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
|
||||||
runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
|
runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE),
|
||||||
false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
|
||||||
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
|
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
|
||||||
false, true);
|
false, true, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -257,29 +256,55 @@ public class TestLoadIncrementalHFiles {
|
||||||
// Run the test bulkloading the table to the default namespace
|
// Run the test bulkloading the table to the default namespace
|
||||||
final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
|
final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
|
||||||
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
|
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
|
||||||
useMap);
|
useMap, 2);
|
||||||
|
|
||||||
|
|
||||||
|
/* Run the test bulkloading the table from a depth of 3
|
||||||
|
directory structure is now
|
||||||
|
baseDirectory
|
||||||
|
-- regionDir
|
||||||
|
-- familyDir
|
||||||
|
-- storeFileDir
|
||||||
|
*/
|
||||||
|
if (preCreateTable) {
|
||||||
|
runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges,
|
||||||
|
false, 3);
|
||||||
|
}
|
||||||
|
|
||||||
// Run the test bulkloading the table to the specified namespace
|
// Run the test bulkloading the table to the specified namespace
|
||||||
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
|
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
|
||||||
runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
|
runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
|
||||||
useMap);
|
useMap, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTest(String testName, TableName tableName, BloomType bloomType,
|
private void runTest(String testName, TableName tableName, BloomType bloomType,
|
||||||
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
|
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges,
|
||||||
throws Exception {
|
boolean useMap, int depth) throws Exception {
|
||||||
TableDescriptor htd = buildHTD(tableName, bloomType);
|
TableDescriptor htd = buildHTD(tableName, bloomType);
|
||||||
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
|
runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
|
public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
|
||||||
byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
|
byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
|
||||||
byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
|
byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
|
||||||
int initRowCount, int factor) throws Exception {
|
int initRowCount, int factor) throws Exception {
|
||||||
Path dir = util.getDataTestDirOnTestFS(testName);
|
return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
|
||||||
|
useMap, deleteFile, copyFiles, initRowCount, factor, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
|
||||||
|
byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
|
||||||
|
byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
|
||||||
|
int initRowCount, int factor, int depth) throws Exception {
|
||||||
|
Path baseDirectory = util.getDataTestDirOnTestFS(testName);
|
||||||
FileSystem fs = util.getTestFileSystem();
|
FileSystem fs = util.getTestFileSystem();
|
||||||
dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
baseDirectory = baseDirectory.makeQualified(fs.getUri(), fs.getWorkingDirectory());
|
||||||
Path familyDir = new Path(dir, Bytes.toString(fam));
|
Path parentDir = baseDirectory;
|
||||||
|
if (depth == 3) {
|
||||||
|
assert !useMap;
|
||||||
|
parentDir = new Path(baseDirectory, "someRegion");
|
||||||
|
}
|
||||||
|
Path familyDir = new Path(parentDir, Bytes.toString(fam));
|
||||||
|
|
||||||
int hfileIdx = 0;
|
int hfileIdx = 0;
|
||||||
Map<byte[], List<Path>> map = null;
|
Map<byte[], List<Path>> map = null;
|
||||||
|
@ -314,7 +339,11 @@ public class TestLoadIncrementalHFiles {
|
||||||
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
|
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
|
||||||
}
|
}
|
||||||
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
|
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
|
||||||
String[] args = { dir.toString(), tableName.toString() };
|
List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
|
||||||
|
if (depth == 3) {
|
||||||
|
args.add("-loadTable");
|
||||||
|
}
|
||||||
|
|
||||||
if (useMap) {
|
if (useMap) {
|
||||||
if (deleteFile) {
|
if (deleteFile) {
|
||||||
fs.delete(last, true);
|
fs.delete(last, true);
|
||||||
|
@ -329,7 +358,7 @@ public class TestLoadIncrementalHFiles {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
loader.run(args);
|
loader.run(args.toArray(new String[]{}));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (copyFiles) {
|
if (copyFiles) {
|
||||||
|
@ -348,11 +377,11 @@ public class TestLoadIncrementalHFiles {
|
||||||
return expectedRows;
|
return expectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void runTest(String testName, TableDescriptor htd, BloomType bloomType,
|
private void runTest(String testName, TableDescriptor htd,
|
||||||
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
|
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
|
||||||
boolean copyFiles) throws Exception {
|
boolean copyFiles, int depth) throws Exception {
|
||||||
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
|
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
|
||||||
useMap, true, copyFiles, 0, 1000);
|
useMap, true, copyFiles, 0, 1000, depth);
|
||||||
|
|
||||||
final TableName tableName = htd.getTableName();
|
final TableName tableName = htd.getTableName();
|
||||||
// verify staging folder has been cleaned up
|
// verify staging folder has been cleaned up
|
||||||
|
@ -430,7 +459,7 @@ public class TestLoadIncrementalHFiles {
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
|
runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
|
||||||
assertTrue("Loading into table with non-existent family should have failed", false);
|
assertTrue("Loading into table with non-existent family should have failed", false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
assertTrue("IOException expected", e instanceof IOException);
|
assertTrue("IOException expected", e instanceof IOException);
|
||||||
|
|
Loading…
Reference in New Issue