diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java index ab22fc4aa11..f26dc8d59df 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java @@ -77,7 +77,16 @@ public class ImportTsv { private int rowKeyColumnIndex; - public static String ROWKEY_COLUMN_SPEC="HBASE_ROW_KEY"; + private int maxColumnCount; + + // Default value must be negative + public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1; + + private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX; + + public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY"; + + public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY"; /** * @param columnsSpecification the list of columns to parser out, comma separated. @@ -94,15 +103,20 @@ public class ImportTsv { ArrayList columnStrings = Lists.newArrayList( Splitter.on(',').trimResults().split(columnsSpecification)); - families = new byte[columnStrings.size()][]; - qualifiers = new byte[columnStrings.size()][]; - + maxColumnCount = columnStrings.size(); + families = new byte[maxColumnCount][]; + qualifiers = new byte[maxColumnCount][]; for (int i = 0; i < columnStrings.size(); i++) { String str = columnStrings.get(i); if (ROWKEY_COLUMN_SPEC.equals(str)) { rowKeyColumnIndex = i; continue; } + if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) { + timestampKeyColumnIndex = i; + continue; + } + String[] parts = str.split(":", 2); if (parts.length == 1) { families[i] = str.getBytes(); @@ -117,6 +131,15 @@ public class ImportTsv { public int getRowKeyColumnIndex() { return rowKeyColumnIndex; } + + public boolean hasTimestamp() { + return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX; + } + + public int getTimestampKeyColumnIndex() { + return timestampKeyColumnIndex; + } + public byte[] getFamily(int idx) { return families[idx]; } @@ -127,7 +150,7 @@ public class ImportTsv { public ParsedLine parse(byte[] lineBytes, int length) throws BadTsvLineException { // Enumerate separator offsets - ArrayList tabOffsets = new ArrayList(families.length); + ArrayList tabOffsets = new ArrayList(maxColumnCount); for (int i = 0; i < length; i++) { if (lineBytes[i] == separatorByte) { tabOffsets.add(i); @@ -139,10 +162,12 @@ public class ImportTsv { tabOffsets.add(length); - if (tabOffsets.size() > families.length) { + if (tabOffsets.size() > maxColumnCount) { throw new BadTsvLineException("Excessive columns"); } else if (tabOffsets.size() <= getRowKeyColumnIndex()) { throw new BadTsvLineException("No row key"); + } else if (hasTimestamp() && tabOffsets.size() <= getTimestampKeyColumnIndex()) { + throw new BadTsvLineException("No timestamp"); } return new ParsedLine(tabOffsets, lineBytes); } @@ -162,6 +187,22 @@ public class ImportTsv { public int getRowKeyLength() { return getColumnLength(rowKeyColumnIndex); } + + public long getTimestamp(long ts) throws BadTsvLineException { + // Return ts if HBASE_TS_KEY is not configured in column spec + if (!hasTimestamp()) { + return ts; + } + + try { + return Long.parseLong(Bytes.toString(lineBytes, getColumnOffset(timestampKeyColumnIndex), + getColumnLength(timestampKeyColumnIndex))); + } catch (NumberFormatException nfe) { + // treat this record as bad record + throw new BadTsvLineException("Invalid timestamp"); + } + } + public int getColumnOffset(int idx) { if (idx > 0) return tabOffsets.get(idx - 1) + 1; @@ -248,7 +289,7 @@ public class ImportTsv { if (errorMsg != null && errorMsg.length() > 0) { System.err.println("ERROR: " + errorMsg); } - String usage = + String usage = "Usage: " + NAME + " -Dimporttsv.columns=a,b,c \n" + "\n" + "Imports the given input directory of TSV data into the specified table.\n" + @@ -259,7 +300,11 @@ public class ImportTsv { "column name HBASE_ROW_KEY is used to designate that this column should be used\n" + "as the row key for each imported record. You must specify exactly one column\n" + "to be the row key, and you must specify a column name for every column that exists in the\n" + - "input data.\n" + + "input data. Another special column HBASE_TS_KEY designates that this column should be\n" + + "used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" + + "You must specify atmost one column as timestamp key for each imported record.\n" + + "Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" + + "Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" + "\n" + "By default importtsv will load data directly into HBase. To instead generate\n" + "HFiles of data to prepare for a bulk data load, pass the option:\n" + @@ -310,12 +355,28 @@ public class ImportTsv { System.exit(-1); } - // Make sure one or more columns are specified - if (columns.length < 2) { - usage("One or more columns in addition to the row key are required"); + // Make sure we have at most one column as the timestamp key + int tskeysFound=0; + for (String col : columns) { + if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) tskeysFound++; + } + if (tskeysFound > 1) { + usage("Must specify at most one column as " + TsvParser.TIMESTAMPKEY_COLUMN_SPEC); System.exit(-1); } + // Make sure one or more columns are specified excluding rowkey and timestamp key + if (columns.length - (rowkeysFound + tskeysFound) < 1) { + usage("One or more columns in addition to the row key and timestamp(optional) are required"); + System.exit(-1); + } + + // If timestamp option is not specified, use current system time. + long timstamp = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + + // Set it back to replace invalid timestamp (non-numeric) with current system time + conf.setLong(TIMESTAMP_CONF_KEY, timstamp); + Job job = createSubmittableJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java index 9eb3642064e..a20d258a442 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java @@ -82,8 +82,7 @@ extends Mapper Configuration conf = context.getConfiguration(); - parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), - separator); + parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator); if (parser.getRowKeyColumnIndex() == -1) { throw new RuntimeException("No row key column specified"); } @@ -105,10 +104,10 @@ extends Mapper separator = new String(Base64.decode(separator)); } - ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis()); + // Should never get 0 as we are setting this to a valid value in job configuration. + ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0); - skipBadLines = context.getConfiguration().getBoolean( - ImportTsv.SKIP_LINES_CONF_KEY, true); + skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true); badLineCount = context.getCounter("ImportTsv", "Bad Lines"); } @@ -116,22 +115,22 @@ extends Mapper * Convert a line of TSV text into an HBase table row. */ @Override - public void map(LongWritable offset, Text value, - Context context) - throws IOException { + public void map(LongWritable offset, Text value, Context context) throws IOException { + byte[] lineBytes = value.getBytes(); try { - ImportTsv.TsvParser.ParsedLine parsed = parser.parse( - lineBytes, value.getLength()); - ImmutableBytesWritable rowKey = - new ImmutableBytesWritable(lineBytes, - parsed.getRowKeyOffset(), - parsed.getRowKeyLength()); + ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, value.getLength()); + ImmutableBytesWritable rowKey = new ImmutableBytesWritable(lineBytes, + parsed.getRowKeyOffset(), parsed.getRowKeyLength()); + // Retrieve timestamp if exists + ts = parsed.getTimestamp(ts); Put put = new Put(rowKey.copyBytes()); for (int i = 0; i < parsed.getColumnCount(); i++) { - if (i == parser.getRowKeyColumnIndex()) continue; + if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) { + continue; + } KeyValue kv = new KeyValue( lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length, @@ -144,9 +143,7 @@ extends Mapper context.write(rowKey, put); } catch (ImportTsv.TsvParser.BadTsvLineException badLine) { if (skipBadLines) { - System.err.println( - "Bad line at offset: " + offset.get() + ":\n" + - badLine.getMessage()); + System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage()); incrementBadLineCount(1); return; } else { @@ -154,9 +151,7 @@ extends Mapper } } catch (IllegalArgumentException e) { if (skipBadLines) { - System.err.println( - "Bad line at offset: " + offset.get() + ":\n" + - e.getMessage()); + System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage()); incrementBadLineCount(1); return; } else { diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java index ac30a62677a..52c4db7f202 100644 --- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java +++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTsv.java @@ -61,6 +61,7 @@ public class TestImportTsv { assertNull(parser.getFamily(0)); assertNull(parser.getQualifier(0)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t"); assertNull(parser.getFamily(0)); @@ -68,6 +69,7 @@ public class TestImportTsv { assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t"); assertNull(parser.getFamily(0)); @@ -77,6 +79,18 @@ public class TestImportTsv { assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2)); assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2)); assertEquals(0, parser.getRowKeyColumnIndex()); + assertFalse(parser.hasTimestamp()); + + parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1)); + assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3)); + assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertTrue(parser.hasTimestamp()); + assertEquals(2, parser.getTimestampKeyColumnIndex()); } @Test @@ -89,12 +103,31 @@ public class TestImportTsv { assertNull(parser.getFamily(2)); assertNull(parser.getQualifier(2)); assertEquals(2, parser.getRowKeyColumnIndex()); - + assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d"); ParsedLine parsed = parser.parse(line, line.length); checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); } + @Test + public void testTsvParserWithTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertNull(parser.getFamily(0)); + assertNull(parser.getQualifier(0)); + assertNull(parser.getFamily(1)); + assertNull(parser.getQualifier(1)); + assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2)); + assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2)); + assertEquals(0, parser.getRowKeyColumnIndex()); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + + byte[] line = Bytes.toBytes("rowkey\t1234\tval_a"); + ParsedLine parsed = parser.parse(line, line.length); + assertEquals(1234l, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + private void checkParsing(ParsedLine parsed, Iterable expected) { ArrayList parsedCols = new ArrayList(); for (int i = 0; i < parsed.getColumnCount(); i++) { @@ -120,28 +153,46 @@ public class TestImportTsv { public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("val_a\tval_b\tval_c"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes(""); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException { TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t"); byte[] line = Bytes.toBytes("key_only"); - ParsedLine parsed = parser.parse(line, line.length); + parser.parse(line, line.length); } @Test(expected=BadTsvLineException.class) public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException { TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t"); byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key"); + parser.parse(line, line.length); + } + + @Test(expected=BadTsvLineException.class) + public void testTsvParserInvalidTimestamp() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t"); + assertEquals(1, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a"); ParsedLine parsed = parser.parse(line, line.length); + assertEquals(-1, parsed.getTimestamp(-1)); + checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line))); + } + + @Test(expected=BadTsvLineException.class) + public void testTsvParserNoTimestampValue() throws BadTsvLineException { + TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t"); + assertEquals(2, parser.getTimestampKeyColumnIndex()); + byte[] line = Bytes.toBytes("rowkey\tval_a"); + parser.parse(line, line.length); } @Test @@ -159,7 +210,22 @@ public class TestImportTsv { INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1); + } + + @Test + public void testMROnTableWithTimestamp() throws Exception { + String TABLE_NAME = "TestTable"; + String FAMILY = "FAM"; + String INPUT_FILE = "InputFile1.csv"; + + // Prepare the arguments required for the test. + String[] args = new String[] { + "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B", + "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE }; + + String data = "KEY,1234,VALUE1,VALUE2\n"; + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1); } @Test @@ -171,21 +237,21 @@ public class TestImportTsv { // Prepare the arguments required for the test. String[] args = new String[] { - "-D" + ImportTsv.MAPPER_CONF_KEY + "=org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper", + "-D" + ImportTsv.MAPPER_CONF_KEY + " = org.apache.hadoop.hbase.mapreduce.TsvImporterCustomTestMapper", TABLE_NAME, INPUT_FILE }; - doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3); + doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3); } - private void doMROnTableTest(String inputFile, String family, String tableName, - String[] args, int valueMultiplier) throws Exception { + private void doMROnTableTest(String inputFile, String family, String tableName, String data, + String[] args, int valueMultiplier) throws Exception { // Cluster HBaseTestingUtility htu1 = new HBaseTestingUtility(); - MiniHBaseCluster cluster = htu1.startMiniCluster(); + htu1.startMiniCluster(); htu1.startMiniMapReduceCluster(); GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args); @@ -196,14 +262,14 @@ public class TestImportTsv { FileSystem fs = FileSystem.get(conf); FSDataOutputStream op = fs.create(new Path(inputFile), true); - String line = "KEY\u001bVALUE1\u001bVALUE2\n"; - op.write(line.getBytes(HConstants.UTF8_ENCODING)); + if (data == null) { + data = "KEY\u001bVALUE1\u001bVALUE2\n"; + } + op.write(Bytes.toBytes(data)); op.close(); final byte[] FAM = Bytes.toBytes(family); final byte[] TAB = Bytes.toBytes(tableName); - final byte[] QA = Bytes.toBytes("A"); - final byte[] QB = Bytes.toBytes("B"); HTableDescriptor desc = new HTableDescriptor(TAB); desc.addFamily(new HColumnDescriptor(FAM)); @@ -212,7 +278,7 @@ public class TestImportTsv { Job job = ImportTsv.createSubmittableJob(conf, args); job.waitForCompletion(false); assertTrue(job.isSuccessful()); - + HTable table = new HTable(new Configuration(conf), TAB); boolean verified = false; long pause = conf.getLong("hbase.client.pause", 5 * 1000); @@ -254,9 +320,9 @@ public class TestImportTsv { htu1.shutdownMiniCluster(); } } - + public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException { - return new String(bytes, HConstants.UTF8_ENCODING); + return Bytes.toString(bytes); } @org.junit.Rule