diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index 63ec90296db..99639dde9f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -18,14 +18,19 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static java.lang.String.format; + import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; @@ -42,6 +47,8 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; @@ -57,20 +64,25 @@ import com.google.common.collect.Lists; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class ImportTsv { +public class ImportTsv extends Configured implements Tool { + + protected static final Log LOG = LogFactory.getLog(ImportTsv.class); + final static String NAME = "importtsv"; - final static String MAPPER_CONF_KEY = "importtsv.mapper.class"; - final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; - final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output"; - final static String COLUMNS_CONF_KEY = "importtsv.columns"; - final static String SEPARATOR_CONF_KEY = "importtsv.separator"; - final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp"; + public final static String MAPPER_CONF_KEY = "importtsv.mapper.class"; + public final static String BULK_OUTPUT_CONF_KEY = "importtsv.bulk.output"; + public final static String TIMESTAMP_CONF_KEY = "importtsv.timestamp"; + // TODO: the rest of these configs are used exclusively by TsvImporterMapper. + // Move them out of the tool and let the mapper handle its own validation. + public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines"; + public final static String COLUMNS_CONF_KEY = "importtsv.columns"; + public final static String SEPARATOR_CONF_KEY = "importtsv.separator"; + final static String DEFAULT_SEPARATOR = "\t"; final static Class DEFAULT_MAPPER = TsvImporterMapper.class; - private static HBaseAdmin hbaseAdmin; - static class TsvParser { + public static class TsvParser { /** * Column families and qualifiers mapped to the TSV columns */ @@ -245,7 +257,9 @@ public class ImportTsv { * @throws IOException When setting up the job fails. */ public static Job createSubmittableJob(Configuration conf, String[] args) - throws IOException, ClassNotFoundException { + throws IOException, ClassNotFoundException { + + HBaseAdmin admin = new HBaseAdmin(conf); // Support non-XML supported characters // by re-encoding the passed separator as a Base64 string. @@ -272,9 +286,13 @@ public class ImportTsv { job.setMapperClass(mapperClass); String hfileOutPath = conf.get(BULK_OUTPUT_CONF_KEY); + String columns[] = conf.getStrings(COLUMNS_CONF_KEY); if (hfileOutPath != null) { - if (!doesTableExist(tableName)) { - createTable(conf, tableName); + if (!admin.tableExists(tableName)) { + LOG.warn(format("Table '%s' does not exist.", tableName)); + // TODO: this is backwards. Instead of depending on the existence of a table, + // create a sane splits file for HFileOutputFormat based on data sampling. + createTable(admin, tableName, columns); } HTable table = new HTable(conf, tableName); job.setReducerClass(PutSortReducer.class); @@ -285,7 +303,7 @@ public class ImportTsv { job.setCombinerClass(PutCombiner.class); HFileOutputFormat.configureIncrementalLoad(job, table); } else { - // No reducers. Just write straight to table. Call initTableReducerJob + // No reducers. Just write straight to table. Call initTableReducerJob // to set up the TableOutputFormat. TableMapReduceUtil.initTableReducerJob(tableName, null, job); job.setNumReduceTasks(0); @@ -297,14 +315,9 @@ public class ImportTsv { return job; } - private static boolean doesTableExist(String tableName) throws IOException { - return hbaseAdmin.tableExists(tableName.getBytes()); - } - - private static void createTable(Configuration conf, String tableName) + private static void createTable(HBaseAdmin admin, String tableName, String[] columns) throws IOException { HTableDescriptor htd = new HTableDescriptor(tableName.getBytes()); - String columns[] = conf.getStrings(COLUMNS_CONF_KEY); Set cfSet = new HashSet(); for (String aColumn : columns) { if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)) continue; @@ -315,7 +328,9 @@ public class ImportTsv { HColumnDescriptor hcd = new HColumnDescriptor(Bytes.toBytes(cf)); htd.addFamily(hcd); } - hbaseAdmin.createTable(htd); + LOG.warn(format("Creating table '%s' with '%s' columns and default descriptors.", + tableName, cfSet)); + admin.createTable(htd); } /* @@ -326,21 +341,23 @@ public class ImportTsv { System.err.println("ERROR: " + errorMsg); } String usage = - "Usage: " + NAME + " -Dimporttsv.columns=a,b,c \n" + + "Usage: " + NAME + " -D"+ COLUMNS_CONF_KEY + "=a,b,c \n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + "\n" + - "The column names of the TSV data must be specified using the -Dimporttsv.columns\n" + + "The column names of the TSV data must be specified using the -D" + COLUMNS_CONF_KEY + "\n" + "option. This option takes the form of comma-separated column names, where each\n" + "column name is either a simple column family, or a columnfamily:qualifier. The special\n" + - "column name HBASE_ROW_KEY is used to designate that this column should be used\n" + + "column name " + TsvParser.ROWKEY_COLUMN_SPEC + " is used to designate that this column should be used\n" + "as the row key for each imported record. You must specify exactly one column\n" + "to be the row key, and you must specify a column name for every column that exists in the\n" + - "input data. Another special column HBASE_TS_KEY designates that this column should be\n" + - "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" + - "You must specify atmost one column as timestamp key for each imported record.\n" + + "input data. Another special column" + TsvParser.TIMESTAMPKEY_COLUMN_SPEC + + " designates that this column should be\n" + + "used as timestamp for each record. Unlike " + TsvParser.ROWKEY_COLUMN_SPEC + ", " + + TsvParser.TIMESTAMPKEY_COLUMN_SPEC + " is optional.\n" + + "You must specify at most one column as timestamp key for each imported record.\n" + "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + - "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" + + "Note: if you use this option, then '" + TIMESTAMP_CONF_KEY + "' option will be ignored.\n" + "\n" + "By default importtsv will load data directly into HBase. To instead generate\n" + "HFiles of data to prepare for a bulk data load, pass the option:\n" + @@ -351,7 +368,8 @@ public class ImportTsv { " -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" + " '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" + " -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" + - " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + DEFAULT_MAPPER.getName() + "\n" + + " -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " + + DEFAULT_MAPPER.getName() + "\n" + "For performance consider the following options:\n" + " -Dmapred.map.tasks.speculative.execution=false\n" + " -Dmapred.reduce.tasks.speculative.execution=false"; @@ -359,76 +377,71 @@ public class ImportTsv { System.err.println(usage); } - /** - * Used only by test method - * @param conf - */ - static void createHbaseAdmin(Configuration conf) throws IOException { - hbaseAdmin = new HBaseAdmin(conf); - } - - /** - * Main entry point. - * - * @param args The command line parameters. - * @throws Exception When running the job fails. - */ - public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); - String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + @Override + public int run(String[] args) throws Exception { + setConf(HBaseConfiguration.create(getConf())); + String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); if (otherArgs.length < 2) { usage("Wrong number of arguments: " + otherArgs.length); - System.exit(-1); + return -1; } - // Make sure columns are specified - String columns[] = conf.getStrings(COLUMNS_CONF_KEY); - if (columns == null) { - usage("No columns specified. Please specify with -D" + + // When MAPPER_CONF_KEY is null, the user wants to use the provided TsvImporterMapper, so + // perform validation on these additional args. When it's not null, user has provided their + // own mapper, thus these validation are not relevant. + // TODO: validation for TsvImporterMapper, not this tool. Move elsewhere. + if (null == getConf().get(MAPPER_CONF_KEY)) { + // Make sure columns are specified + String columns[] = getConf().getStrings(COLUMNS_CONF_KEY); + if (columns == null) { + usage("No columns specified. Please specify with -D" + COLUMNS_CONF_KEY+"=..."); - System.exit(-1); - } + return -1; + } - // Make sure they specify exactly one column as the row key - int rowkeysFound=0; - for (String col : columns) { - if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++; - } - if (rowkeysFound != 1) { - usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); - System.exit(-1); - } + // Make sure they specify exactly one column as the row key + int rowkeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.ROWKEY_COLUMN_SPEC)) rowkeysFound++; + } + if (rowkeysFound != 1) { + usage("Must specify exactly one column as " + TsvParser.ROWKEY_COLUMN_SPEC); + return -1; + } - // Make sure we have at most one column as the timestamp key - int tskeysFound = 0; - for (String col : columns) { - if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) - tskeysFound++; - } - if (tskeysFound > 1) { - usage("Must specify at most one column as " - + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); - System.exit(-1); - } + // Make sure we have at most one column as the timestamp key + int tskeysFound = 0; + for (String col : columns) { + if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) + tskeysFound++; + } + if (tskeysFound > 1) { + usage("Must specify at most one column as " + + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); + return -1; + } - // Make sure one or more columns are specified excluding rowkey and - // timestamp key - if (columns.length - (rowkeysFound + tskeysFound) < 1) { - usage("One or more columns in addition to the row key and timestamp(optional) are required"); - System.exit(-1); + // Make sure one or more columns are specified excluding rowkey and + // timestamp key + if (columns.length - (rowkeysFound + tskeysFound) < 1) { + usage("One or more columns in addition to the row key and timestamp(optional) are required"); + return -1; + } } // If timestamp option is not specified, use current system time. - long timstamp = conf - .getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + long timstamp = getConf().getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); // Set it back to replace invalid timestamp (non-numeric) with current // system time - conf.setLong(TIMESTAMP_CONF_KEY, timstamp); + getConf().setLong(TIMESTAMP_CONF_KEY, timstamp); - hbaseAdmin = new HBaseAdmin(conf); - Job job = createSubmittableJob(conf, otherArgs); - System.exit(job.waitForCompletion(true) ? 0 : 1); + Job job = createSubmittableJob(getConf(), otherArgs); + return job.waitForCompletion(true) ? 0 : 1; } + public static void main(String[] args) throws Exception { + int status = ToolRunner.run(new ImportTsv(), args); + System.exit(status); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index 617634ad66f..172b759aa49 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -18,38 +18,44 @@ */ package org.apache.hadoop.hbase.mapreduce; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.UnsupportedEncodingException; -import java.util.List; import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.*; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.GenericOptionsParser; - +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException; import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.ParsedLine; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Result; - +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.junit.Test; +import org.junit.experimental.categories.Category; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import org.junit.experimental.categories.Category; - -import static org.junit.Assert.*; @Category(MediumTests.class) public class TestImportTsv { @@ -264,46 +270,39 @@ public class TestImportTsv { htu1.startMiniCluster(); htu1.startMiniMapReduceCluster(); - GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args); - Configuration conf = opts.getConfiguration(); - args = opts.getRemainingArgs(); + Tool tool = new ImportTsv(); + tool.setConf(htu1.getConfiguration()); try { - FileSystem fs = FileSystem.get(conf); + FileSystem fs = FileSystem.get(tool.getConf()); FSDataOutputStream op = fs.create(new Path(inputFile), true); if (data == null) { data = "KEY\u001bVALUE1\u001bVALUE2\n"; } op.write(Bytes.toBytes(data)); op.close(); + LOG.debug(String.format("Wrote test data to file: %s", fs.makeQualified(new Path(inputFile)))); - final byte[] FAM = Bytes.toBytes(family); - final byte[] TAB = Bytes.toBytes(tableName); - if (conf.get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { - HTableDescriptor desc = new HTableDescriptor(TAB); - desc.addFamily(new HColumnDescriptor(FAM)); - HBaseAdmin admin = new HBaseAdmin(conf); + if (tool.getConf().get(ImportTsv.BULK_OUTPUT_CONF_KEY) == null) { + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(family)); + HBaseAdmin admin = new HBaseAdmin(tool.getConf()); admin.createTable(desc); admin.close(); - } else { // set the hbaseAdmin as we are not going through main() - LOG.info("set the hbaseAdmin"); - ImportTsv.createHbaseAdmin(conf); } // force use of combiner for testing purposes - conf.setInt("min.num.spills.for.combine", 1); - Job job = ImportTsv.createSubmittableJob(conf, args); - job.waitForCompletion(false); - assertTrue(job.isSuccessful()); + tool.getConf().setInt("min.num.spills.for.combine", 1); + assertEquals(0, ToolRunner.run(tool, args)); - HTable table = new HTable(new Configuration(conf), TAB); + HTable table = new HTable(tool.getConf(), tableName); boolean verified = false; - long pause = conf.getLong("hbase.client.pause", 5 * 1000); - int numRetries = conf.getInt("hbase.client.retries.number", 5); + long pause = tool.getConf().getLong("hbase.client.pause", 5 * 1000); + int numRetries = tool.getConf().getInt("hbase.client.retries.number", 5); for (int i = 0; i < numRetries; i++) { try { Scan scan = new Scan(); // Scan entire family. - scan.addFamily(FAM); + scan.addFamily(Bytes.toBytes(family)); ResultScanner resScanner = table.getScanner(scan); for (Result res : resScanner) { assertTrue(res.size() == 2);