diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index ab22fc4aa11..dc92e0ea91f 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -23,13 +23,18 @@ import org.apache.hadoop.hbase.util.Base64; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -65,6 +70,7 @@ public class ImportTsv { final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp"; final static String DEFAULT_SEPARATOR = "\t"; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; + private static HBaseAdmin hbaseAdmin; static class TsvParser { /** @@ -221,6 +227,9 @@ public class ImportTsv { String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); if (hfileOutPath != null) { + if (!doesTableExist(tableName)) { + createTable(conf, tableName); + } HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); Path outputDir = new Path(hfileOutPath); @@ -241,6 +250,27 @@ public class ImportTsv { return job; } + private static boolean doesTableExist(String tableName) throws IOException { + return hbaseAdmin.tableExists(tableName.getBytes()); + } + + private static void createTable(Configuration conf, String tableName) + throws IOException { + HTableDescriptor htd = new HTableDescriptor(tableName.getBytes()); + String columns[] = conf.getStrings(COLUMNS_CONF_KEY); + Set cfSet = new HashSet(); + for (String aColumn : columns) { + if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue; + // we are only concerned with the first one (in case this is a cf:cq) + cfSet.add(aColumn.split(":", 2)[0]); + } + for (String cf : cfSet) { + HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); + htd.addFamily(hcd); + } + hbaseAdmin.createTable(htd); + } + /* * @param errorMsg Error message. Can be null. */ @@ -278,6 +308,14 @@ public class ImportTsv { System.err.println(usage); } + /** + * Used only by test method + * @param conf + */ + static void createHbaseAdmin(Configuration conf) throws IOException { + hbaseAdmin = new HBaseAdmin(conf); + } + /** * Main entry point. * @@ -315,7 +353,7 @@ public class ImportTsv { usage("One or more columns in addition to the row key are required"); System.exit(-1); } - + hbaseAdmin = new HBaseAdmin(conf); Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index ac30a62677a..ff1ede3ec3d 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -204,11 +204,13 @@ public class TestImportTsv { final byte[] TAB = Bytes.toBytes(tableName); final byte[] QA = Bytes.toBytes("A"); final byte[] QB = Bytes.toBytes("B"); - - HTableDescriptor desc = new HTableDescriptor(TAB); - desc.addFamily(new HColumnDescriptor(FAM)); - new HBaseAdmin(conf).createTable(desc); - + if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { + HTableDescriptor desc = new HTableDescriptor(TAB); + desc.addFamily(new HColumnDescriptor(FAM)); + new HBaseAdmin(conf).createTable(desc); + } else { // set the hbaseAdmin as we are not going through main() + ImportTsv.createHbaseAdmin(conf); + } Job job = ImportTsv.createSubmittableJob(conf, args); job.waitForCompletion(false); assertTrue(job.isSuccessful()); @@ -255,6 +257,21 @@ public class TestImportTsv { } } + @Test + public void testBulkOutputWithoutAnExistingTable() throws Exception { + String TABLE_NAME = "TestTable"; + String FAMILY = "FAM"; + String INPUT_FILE = "InputFile2.esv"; + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", + "-D" + ImportTsv.BULK_OUTPUT_CONF_KEY + "=output", TABLE_NAME, + INPUT_FILE }; + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); + } + public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { return new String(bytes, HConstants.UTF8_ENCODING); }