HBASE-16509 Add option to LoadIncrementalHFiles which allows skipping unmatched column families

This commit is contained in:
tedyu 2016-08-30 03:00:59 -07:00
parent 0d05c75184
commit 9cb0094bdb
1 changed files with 51 additions and 11 deletions

View File

@ -114,6 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@ -121,6 +122,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private int maxFilesPerRegionPerFamily;
private boolean assignSeqIds;
private Set<String> unmatchedFamilies = new HashSet<String>();
// Source filesystem
private FileSystem fs;
@ -160,7 +162,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private void usage() {
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n"
+ " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
+ SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
+ "\n");
}
@ -308,11 +311,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
* @param admin the Admin
* @param table the table to load into
* @param regionLocator region locator
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
doBulkLoad(hfofDir, admin, table, regionLocator, false);
}
/**
* Perform a bulk load of the given directory into the given
* pre-existing table. This method is not threadsafe.
*
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
* @param admin the Admin
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
@ -337,7 +359,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
prepareHFileQueue(hfofDir, table, queue, validateHFile);
prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
int count = 0;
@ -429,8 +451,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
boolean validateHFile) throws IOException {
prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
}
/**
* Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
* passed directory and validates whether the prepared queue has all the valid table column
* families in it.
* @param hfilesDir directory containing list of hfiles to be loaded into the table
* @param table table to which hfiles should be loaded
* @param queue queue which needs to be loaded into the table
* @param validateHFile if true hfiles will be validated for its format
* @param silence true to ignore unmatched column families
* @throws IOException If any I/O or network error occurred
*/
public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
boolean validateHFile, boolean silence) throws IOException {
discoverLoadQueue(queue, hfilesDir, validateHFile);
validateFamiliesInHFiles(table, queue);
validateFamiliesInHFiles(table, queue, silence);
}
// Initialize a thread pool
@ -446,14 +484,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Checks whether there is any invalid family name in HFiles to be bulk loaded.
*/
private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
throws IOException {
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
List<String> familyNames = new ArrayList<>(families.size());
for (HColumnDescriptor family : families) {
familyNames.add(family.getNameAsString());
}
List<String> unmatchedFamilies = new ArrayList<String>();
Iterator<LoadQueueItem> queueIter = queue.iterator();
while (queueIter.hasNext()) {
LoadQueueItem lqi = queueIter.next();
@ -468,7 +505,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+ familyNames;
LOG.error(msg);
throw new IOException(msg);
if (!silence) throw new IOException(msg);
}
}
@ -781,8 +818,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
}
}
final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
rpcControllerFactory, tableName, first) {
@Override
@ -1036,7 +1075,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
if (args.length < 2) {
usage();
return -1;
}
@ -1062,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
doBulkLoad(hfofDir, admin, table, locator);
boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
doBulkLoad(hfofDir, admin, table, locator, silence);
}
}