diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index b54e3ea6e2d..6d3f4c59c48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -43,10 +43,8 @@ import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; @@ -99,6 +97,7 @@ public class ImportTsv extends Configured implements Tool { final static String DEFAULT_MULTIPLE_ATTRIBUTES_SEPERATOR = ","; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; public final static String CREATE_TABLE_CONF_KEY = "create.table"; + public final static String NO_STRICT_COL_FAMILY = "no.strict"; public static class TsvParser { /** @@ -450,6 +449,32 @@ public class ImportTsv extends Configured implements Tool { } } try (HTable table = (HTable)connection.getTable(tableName)) { + boolean noStrict = conf.getBoolean(NO_STRICT_COL_FAMILY, false); + // if no.strict is false then check column family + if(!noStrict) { + ArrayList unmatchedFamilies = new ArrayList(); + Set cfSet = getColumnFamilies(columns); + HTableDescriptor tDesc = table.getTableDescriptor(); + for (String cf : cfSet) { + if(tDesc.getFamily(Bytes.toBytes(cf)) == null) { + unmatchedFamilies.add(cf); + } + } + if(unmatchedFamilies.size() > 0) { + ArrayList familyNames = new ArrayList(); + for (HColumnDescriptor family : table.getTableDescriptor().getFamilies()) { + familyNames.add(family.getNameAsString()); + } + String msg = + "Column Families " + unmatchedFamilies + " specified in " + COLUMNS_CONF_KEY + + " does not match with any of the table " + tableName + + " column families " + familyNames + ".\n" + + "To disable column family check, use -D" + NO_STRICT_COL_FAMILY + + "=true.\n"; + usage(msg); + System.exit(-1); + } + } job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); FileOutputFormat.setOutputPath(job, outputDir); @@ -494,6 +519,17 @@ public class ImportTsv extends Configured implements Tool { private static void createTable(Admin admin, TableName tableName, String[] columns) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName); + Set cfSet = getColumnFamilies(columns); + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); + } + + private static Set getColumnFamilies(String[] columns) { Set cfSet = new HashSet(); for (String aColumn : columns) { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn) @@ -504,13 +540,7 @@ public class ImportTsv extends Configured implements Tool { // we are only concerned with the first one (in case this is a cf:cq) cfSet.add(aColumn.split(":", 2)[0]); } - for (String cf : cfSet) { - HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); - htd.addFamily(hcd); - } - LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", - tableName, cfSet)); - admin.createTable(htd); + return cfSet; } /* @@ -556,7 +586,8 @@ public class ImportTsv extends Configured implements Tool { " -D" + JOB_NAME_CONF_KEY + "=jobName - use the specified mapreduce job name for the import\n" + " -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" + " Note: if you set this to 'no', then the target table must already exist in HBase\n" + - "\n" + + " -D" + NO_STRICT_COL_FAMILY + "=true - ignore column family check in hbase table. " + + "Default is false\n\n" + "For performance consider the following options:\n" + " -Dmapreduce.map.speculative=false\n" + " -Dmapreduce.reduce.speculative=false"; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 7607c78381f..3844a64d5c4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -187,6 +187,23 @@ public class TestImportTsv implements Configurable { doMROnTableTest(util, FAMILY, null, args, 3); util.deleteTable(table); } + + @Test + public void testBulkOutputWithAnExistingTableNoStrictTrue() throws Exception { + String table = "test-" + UUID.randomUUID(); + // Prepare the arguments required for the test. + Path hfiles = new Path(util.getDataTestDirOnTestFS(table), "hfiles"); + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=" + hfiles.toString(), + "-D" + ImportTsv.NO_STRICT_COL_FAMILY + "=true", + table + }; + util.createTable(TableName.valueOf(table), FAMILY); + doMROnTableTest(util, FAMILY, null, args, 3); + util.deleteTable(table); + } @Test public void testJobConfigurationsWithTsvImporterTextMapper() throws Exception {