From 9cb0094bdbecf95455388278430833188bf021ef Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 30 Aug 2016 03:00:59 -0700 Subject: [PATCH] HBASE-16509 Add option to LoadIncrementalHFiles which allows skipping unmatched column families --- .../mapreduce/LoadIncrementalHFiles.java | 62 +++++++++++++++---- 1 file changed, 51 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index a34dc0ab097..6978e2367ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -114,6 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily"; private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers"; public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String SILENCE_CONF_KEY = "ignore.unmatched.families"; // We use a '.' prefix which is ignored when walking directory trees // above. It is invalid family name. @@ -121,6 +122,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private int maxFilesPerRegionPerFamily; private boolean assignSeqIds; + private Set unmatchedFamilies = new HashSet(); // Source filesystem private FileSystem fs; @@ -160,7 +162,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { private void usage() { System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" - + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D" + + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" + "\n"); } @@ -308,11 +311,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool { * * @param hfofDir the directory that was provided as the output path * of a job using HFileOutputFormat + * @param admin the Admin * @param table the table to load into + * @param regionLocator region locator * @throws TableNotFoundException if table does not yet exist */ public void doBulkLoad(Path hfofDir, final Admin admin, Table table, - RegionLocator regionLocator) throws TableNotFoundException, IOException { + RegionLocator regionLocator) throws TableNotFoundException, IOException { + doBulkLoad(hfofDir, admin, table, regionLocator, false); + } + + /** + * Perform a bulk load of the given directory into the given + * pre-existing table. This method is not threadsafe. + * + * @param hfofDir the directory that was provided as the output path + * of a job using HFileOutputFormat + * @param admin the Admin + * @param table the table to load into + * @param regionLocator region locator + * @param silence true to ignore unmatched column families + * @throws TableNotFoundException if table does not yet exist + */ + public void doBulkLoad(Path hfofDir, final Admin admin, Table table, + RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException { if (!admin.isTableAvailable(regionLocator.getName())) { throw new TableNotFoundException("Table " + table.getName() + " is not currently available."); @@ -337,7 +359,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { "option, consider removing the files and bulkload again without this option. " + "See HBASE-13985"); } - prepareHFileQueue(hfofDir, table, queue, validateHFile); + prepareHFileQueue(hfofDir, table, queue, validateHFile, silence); int count = 0; @@ -429,8 +451,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool { */ public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, boolean validateHFile) throws IOException { + prepareHFileQueue(hfilesDir, table, queue, validateHFile, false); + } + + /** + * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the + * passed directory and validates whether the prepared queue has all the valid table column + * families in it. + * @param hfilesDir directory containing list of hfiles to be loaded into the table + * @param table table to which hfiles should be loaded + * @param queue queue which needs to be loaded into the table + * @param validateHFile if true hfiles will be validated for its format + * @param silence true to ignore unmatched column families + * @throws IOException If any I/O or network error occurred + */ + public void prepareHFileQueue(Path hfilesDir, Table table, Deque queue, + boolean validateHFile, boolean silence) throws IOException { discoverLoadQueue(queue, hfilesDir, validateHFile); - validateFamiliesInHFiles(table, queue); + validateFamiliesInHFiles(table, queue, silence); } // Initialize a thread pool @@ -446,14 +484,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool { /** * Checks whether there is any invalid family name in HFiles to be bulk loaded. */ - private void validateFamiliesInHFiles(Table table, Deque queue) + private void validateFamiliesInHFiles(Table table, Deque queue, boolean silence) throws IOException { Collection families = table.getTableDescriptor().getFamilies(); List familyNames = new ArrayList<>(families.size()); for (HColumnDescriptor family : families) { familyNames.add(family.getNameAsString()); } - List unmatchedFamilies = new ArrayList(); Iterator queueIter = queue.iterator(); while (queueIter.hasNext()) { LoadQueueItem lqi = queueIter.next(); @@ -468,7 +505,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " + familyNames; LOG.error(msg); - throw new IOException(msg); + if (!silence) throw new IOException(msg); } } @@ -781,7 +818,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool { throws IOException { final List> famPaths = new ArrayList<>(lqis.size()); for (LoadQueueItem lqi : lqis) { - famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) { + famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString())); + } } final RegionServerCallable svrCallable = new RegionServerCallable(conn, rpcControllerFactory, tableName, first) { @@ -1036,7 +1075,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { @Override public int run(String[] args) throws Exception { - if (args.length != 2) { + if (args.length < 2) { usage(); return -1; } @@ -1062,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { try (Table table = connection.getTable(tableName); RegionLocator locator = connection.getRegionLocator(tableName)) { - doBulkLoad(hfofDir, admin, table, locator); + boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, "")); + doBulkLoad(hfofDir, admin, table, locator, silence); } } @@ -1087,4 +1127,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool { public void setBulkToken(String stagingDir) { this.bulkToken = stagingDir; } -} \ No newline at end of file +}