HBASE-16646 Enhance LoadIncrementalHFiles API to accept store file paths as input

This commit is contained in:
tedyu 2016-09-20 07:42:01 -07:00
parent b2eac0da33
commit 348eb2834a
2 changed files with 159 additions and 51 deletions

View File

@ -280,12 +280,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
/*
* Populate the Queue with given HFiles
*/
private void populateLoadQueue(final Deque<LoadQueueItem> ret,
Map<byte[], List<Path>> map) throws IOException {
for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
for (Path p : entry.getValue()) {
ret.add(new LoadQueueItem(entry.getKey(), p));
}
}
}
/**
* Walk the given directory for all HFiles, and return a Queue
* containing all such files.
*/
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
final boolean validateHFile) throws IOException {
final boolean validateHFile) throws IOException {
fs = hfofDir.getFileSystem(getConf());
visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
@Override
@ -321,6 +333,33 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
doBulkLoad(hfofDir, admin, table, regionLocator, false);
}
/**
* Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe.
*
* @param map map of family to List of hfiles
* @param admin the Admin
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
prepareHFileQueue(map, table, queue, silence);
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not get any files to load");
return;
}
performBulkLoad(admin, table, regionLocator, queue);
}
/**
* Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe.
@ -335,41 +374,44 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
ExecutorService pool = createExecutorService();
/*
* Checking hfile format is a time-consuming operation, we should have an option to skip
* this step when bulkloading millions of HFiles. See HBASE-13985.
*/
boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
if (!validateHFile) {
LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
"are not correct. If you fail to read data from your table after using this " +
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
// LQI queue does not need to be threadsafe -- all operations on this queue
// happen in this thread
Deque<LoadQueueItem> queue = new LinkedList<>();
prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not find any files to load in " +
"directory " + hfofDir != null ? hfofDir.toUri() : "" + ". Does it contain files in " +
"subdirectories that correspond to column family names?");
return;
}
performBulkLoad(admin, table, regionLocator, queue);
}
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue) throws IOException {
ExecutorService pool = createExecutorService();
SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
try {
/*
* Checking hfile format is a time-consuming operation, we should have an option to skip
* this step when bulkloading millions of HFiles. See HBASE-13985.
*/
boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
if(!validateHFile) {
LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
"are not correct. If you fail to read data from your table after using this " +
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
int count = 0;
if (queue.isEmpty()) {
LOG.warn("Bulk load operation did not find any files to load in " +
"directory " + hfofDir.toUri() + ". Does it contain files in " +
"subdirectories that correspond to column family names?");
return;
}
if(isSecureBulkLoadEndpointAvailable()) {
LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
LOG.warn("Secure bulk load has been integrated into HBase core.");
@ -421,7 +463,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
}
pool.shutdown();
if (queue != null && !queue.isEmpty()) {
if (!queue.isEmpty()) {
StringBuilder err = new StringBuilder();
err.append("-------------------------------------------------\n");
err.append("Bulk load aborted with some files not yet loaded:\n");
@ -433,7 +475,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
}
if (queue != null && !queue.isEmpty()) {
if (!queue.isEmpty()) {
throw new RuntimeException("Bulk load aborted with some files not yet loaded."
+ "Please check log for more details.");
}
@ -465,12 +507,28 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param silence true to ignore unmatched column families
* @throws IOException If any I/O or network error occurred
*/
public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
boolean validateHFile, boolean silence) throws IOException {
public void prepareHFileQueue(Path hfilesDir, Table table,
Deque<LoadQueueItem> queue, boolean validateHFile, boolean silence) throws IOException {
discoverLoadQueue(queue, hfilesDir, validateHFile);
validateFamiliesInHFiles(table, queue, silence);
}
/**
* Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
* passed directory and validates whether the prepared queue has all the valid table column
* families in it.
* @param map map of family to List of hfiles
* @param table table to which hfiles should be loaded
* @param queue queue which needs to be loaded into the table
* @param silence true to ignore unmatched column families
* @throws IOException If any I/O or network error occurred
*/
public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
Deque<LoadQueueItem> queue, boolean silence) throws IOException {
populateLoadQueue(queue, map);
validateFamiliesInHFiles(table, queue, silence);
}
// Initialize a thread pool
private ExecutorService createExecutorService() {
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
@ -1073,22 +1131,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.info("Table "+ tableName +" is available!!");
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
usage();
return -1;
}
public int run(String dirPath, Map<byte[], List<Path>> map, TableName tableName) throws Exception{
initialize();
try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) {
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
boolean tableExists = admin.tableExists(tableName);
if (!tableExists) {
if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
this.createTable(tableName, dirPath, admin);
} else {
String errorMsg = format("Table '%s' does not exist.", tableName);
@ -1096,19 +1146,37 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throw new TableNotFoundException(errorMsg);
}
}
Path hfofDir = new Path(dirPath);
Path hfofDir = null;
if (dirPath != null) {
hfofDir = new Path(dirPath);
}
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
doBulkLoad(hfofDir, admin, table, locator, silence);
if (dirPath != null) {
doBulkLoad(hfofDir, admin, table, locator, silence);
} else {
doBulkLoad(map, admin, table, locator, silence);
}
}
}
return 0;
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
usage();
return -1;
}
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
return run(dirPath, null, tableName);
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(), args);

View File

@ -24,7 +24,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@ -106,6 +109,15 @@ public class TestLoadIncrementalHFiles {
util.shutdownMiniCluster();
}
@Test(timeout = 120000)
public void testSimpleLoadWithMap() throws Exception {
runTest("testSimpleLoadWithMap", BloomType.NONE,
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
}, true);
}
/**
* Test case that creates some regions and loads
* HFiles that fit snugly inside those regions
@ -249,50 +261,78 @@ public class TestLoadIncrementalHFiles {
runTest(testName, bloomType, null, hfileRanges);
}
private void runTest(String testName, BloomType bloomType,
byte[][][] hfileRanges, boolean useMap) throws Exception {
runTest(testName, bloomType, null, hfileRanges, useMap);
}
private void runTest(String testName, BloomType bloomType,
byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
}
private void runTest(String testName, BloomType bloomType,
byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception {
final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
final boolean preCreateTable = tableSplitKeys != null;
// Run the test bulkloading the table to the default namespace
final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
useMap);
// Run the test bulkloading the table to the specified namespace
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
useMap);
}
private void runTest(String testName, TableName tableName, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap);
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
int hfileIdx = 0;
Map<byte[], List<Path>> map = null;
List<Path> list = null;
if (useMap) {
map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
list = new ArrayList<>();
map.put(FAMILY, list);
}
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
Path path = new Path(familyDir, "hfile_" + hfileIdx++);
HFileTestUtil.createHFile(util.getConfiguration(), fs, path, FAMILY, QUALIFIER, from, to, 1000);
if (useMap) {
list.add(path);
}
}
int expectedRows = hfileIdx * 1000;
if (preCreateTable) {
if (preCreateTable || map != null) {
util.getHBaseAdmin().createTable(htd, tableSplitKeys);
}
final TableName tableName = htd.getTableName();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
String [] args= {dir.toString(), tableName.toString()};
loader.run(args);
if (useMap) {
loader.run(null, map, tableName);
} else {
loader.run(args);
}
Table table = util.getConnection().getTable(tableName);
try {
@ -379,7 +419,7 @@ public class TestLoadIncrementalHFiles {
htd.addFamily(family);
try {
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false);
assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException);