HBASE-5564 Bulkload is discarding duplicate records
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1306907 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8a864815de
commit
ec4d59770a
@ -77,8 +77,17 @@ public class ImportTsv {
|
|||||||
|
|
||||||
private int rowKeyColumnIndex;
|
private int rowKeyColumnIndex;
|
||||||
|
|
||||||
|
private int maxColumnCount;
|
||||||
|
|
||||||
|
// Default value must be negative
|
||||||
|
public static final int DEFAULT_TIMESTAMP_COLUMN_INDEX = -1;
|
||||||
|
|
||||||
|
private int timestampKeyColumnIndex = DEFAULT_TIMESTAMP_COLUMN_INDEX;
|
||||||
|
|
||||||
public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
|
public static String ROWKEY_COLUMN_SPEC = "HBASE_ROW_KEY";
|
||||||
|
|
||||||
|
public static String TIMESTAMPKEY_COLUMN_SPEC = "HBASE_TS_KEY";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param columnsSpecification the list of columns to parser out, comma separated.
|
* @param columnsSpecification the list of columns to parser out, comma separated.
|
||||||
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
|
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
|
||||||
@ -94,15 +103,20 @@ public class ImportTsv {
|
|||||||
ArrayList<String> columnStrings = Lists.newArrayList(
|
ArrayList<String> columnStrings = Lists.newArrayList(
|
||||||
Splitter.on(',').trimResults().split(columnsSpecification));
|
Splitter.on(',').trimResults().split(columnsSpecification));
|
||||||
|
|
||||||
families = new byte[columnStrings.size()][];
|
maxColumnCount = columnStrings.size();
|
||||||
qualifiers = new byte[columnStrings.size()][];
|
families = new byte[maxColumnCount][];
|
||||||
|
qualifiers = new byte[maxColumnCount][];
|
||||||
for (int i = 0; i < columnStrings.size(); i++) {
|
for (int i = 0; i < columnStrings.size(); i++) {
|
||||||
String str = columnStrings.get(i);
|
String str = columnStrings.get(i);
|
||||||
if (ROWKEY_COLUMN_SPEC.equals(str)) {
|
if (ROWKEY_COLUMN_SPEC.equals(str)) {
|
||||||
rowKeyColumnIndex = i;
|
rowKeyColumnIndex = i;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
if (TIMESTAMPKEY_COLUMN_SPEC.equals(str)) {
|
||||||
|
timestampKeyColumnIndex = i;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
String[] parts = str.split(":", 2);
|
String[] parts = str.split(":", 2);
|
||||||
if (parts.length == 1) {
|
if (parts.length == 1) {
|
||||||
families[i] = str.getBytes();
|
families[i] = str.getBytes();
|
||||||
@ -117,6 +131,15 @@ public class ImportTsv {
|
|||||||
public int getRowKeyColumnIndex() {
|
public int getRowKeyColumnIndex() {
|
||||||
return rowKeyColumnIndex;
|
return rowKeyColumnIndex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean hasTimestamp() {
|
||||||
|
return timestampKeyColumnIndex != DEFAULT_TIMESTAMP_COLUMN_INDEX;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTimestampKeyColumnIndex() {
|
||||||
|
return timestampKeyColumnIndex;
|
||||||
|
}
|
||||||
|
|
||||||
public byte[] getFamily(int idx) {
|
public byte[] getFamily(int idx) {
|
||||||
return families[idx];
|
return families[idx];
|
||||||
}
|
}
|
||||||
@ -127,7 +150,7 @@ public class ImportTsv {
|
|||||||
public ParsedLine parse(byte[] lineBytes, int length)
|
public ParsedLine parse(byte[] lineBytes, int length)
|
||||||
throws BadTsvLineException {
|
throws BadTsvLineException {
|
||||||
// Enumerate separator offsets
|
// Enumerate separator offsets
|
||||||
ArrayList<Integer> tabOffsets = new ArrayList<Integer>(families.length);
|
ArrayList<Integer> tabOffsets = new ArrayList<Integer>(maxColumnCount);
|
||||||
for (int i = 0; i < length; i++) {
|
for (int i = 0; i < length; i++) {
|
||||||
if (lineBytes[i] == separatorByte) {
|
if (lineBytes[i] == separatorByte) {
|
||||||
tabOffsets.add(i);
|
tabOffsets.add(i);
|
||||||
@ -139,10 +162,12 @@ public class ImportTsv {
|
|||||||
|
|
||||||
tabOffsets.add(length);
|
tabOffsets.add(length);
|
||||||
|
|
||||||
if (tabOffsets.size() > families.length) {
|
if (tabOffsets.size() > maxColumnCount) {
|
||||||
throw new BadTsvLineException("Excessive columns");
|
throw new BadTsvLineException("Excessive columns");
|
||||||
} else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
|
} else if (tabOffsets.size() <= getRowKeyColumnIndex()) {
|
||||||
throw new BadTsvLineException("No row key");
|
throw new BadTsvLineException("No row key");
|
||||||
|
} else if (hasTimestamp() && tabOffsets.size() <= getTimestampKeyColumnIndex()) {
|
||||||
|
throw new BadTsvLineException("No timestamp");
|
||||||
}
|
}
|
||||||
return new ParsedLine(tabOffsets, lineBytes);
|
return new ParsedLine(tabOffsets, lineBytes);
|
||||||
}
|
}
|
||||||
@ -162,6 +187,22 @@ public class ImportTsv {
|
|||||||
public int getRowKeyLength() {
|
public int getRowKeyLength() {
|
||||||
return getColumnLength(rowKeyColumnIndex);
|
return getColumnLength(rowKeyColumnIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTimestamp(long ts) throws BadTsvLineException {
|
||||||
|
// Return ts if HBASE_TS_KEY is not configured in column spec
|
||||||
|
if (!hasTimestamp()) {
|
||||||
|
return ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return Long.parseLong(Bytes.toString(lineBytes, getColumnOffset(timestampKeyColumnIndex),
|
||||||
|
getColumnLength(timestampKeyColumnIndex)));
|
||||||
|
} catch (NumberFormatException nfe) {
|
||||||
|
// treat this record as bad record
|
||||||
|
throw new BadTsvLineException("Invalid timestamp");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public int getColumnOffset(int idx) {
|
public int getColumnOffset(int idx) {
|
||||||
if (idx > 0)
|
if (idx > 0)
|
||||||
return tabOffsets.get(idx - 1) + 1;
|
return tabOffsets.get(idx - 1) + 1;
|
||||||
@ -259,7 +300,11 @@ public class ImportTsv {
|
|||||||
"column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
|
"column name HBASE_ROW_KEY is used to designate that this column should be used\n" +
|
||||||
"as the row key for each imported record. You must specify exactly one column\n" +
|
"as the row key for each imported record. You must specify exactly one column\n" +
|
||||||
"to be the row key, and you must specify a column name for every column that exists in the\n" +
|
"to be the row key, and you must specify a column name for every column that exists in the\n" +
|
||||||
"input data.\n" +
|
"input data. Another special column HBASE_TS_KEY designates that this column should be\n" +
|
||||||
|
"used as timestamp for each record. Unlike HBASE_ROW_KEY, HBASE_TS_KEY is optional.\n" +
|
||||||
|
"You must specify atmost one column as timestamp key for each imported record.\n" +
|
||||||
|
"Record with invalid timestamps (blank, non-numeric) will be treated as bad record.\n" +
|
||||||
|
"Note: if you use this option, then 'importtsv.timestamp' option will be ignored.\n" +
|
||||||
"\n" +
|
"\n" +
|
||||||
"By default importtsv will load data directly into HBase. To instead generate\n" +
|
"By default importtsv will load data directly into HBase. To instead generate\n" +
|
||||||
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
|
"HFiles of data to prepare for a bulk data load, pass the option:\n" +
|
||||||
@ -310,12 +355,28 @@ public class ImportTsv {
|
|||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure one or more columns are specified
|
// Make sure we have at most one column as the timestamp key
|
||||||
if (columns.length < 2) {
|
int tskeysFound=0;
|
||||||
usage("One or more columns in addition to the row key are required");
|
for (String col : columns) {
|
||||||
|
if (col.equals(TsvParser.TIMESTAMPKEY_COLUMN_SPEC)) tskeysFound++;
|
||||||
|
}
|
||||||
|
if (tskeysFound > 1) {
|
||||||
|
usage("Must specify at most one column as " + TsvParser.TIMESTAMPKEY_COLUMN_SPEC);
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure one or more columns are specified excluding rowkey and timestamp key
|
||||||
|
if (columns.length - (rowkeysFound + tskeysFound) < 1) {
|
||||||
|
usage("One or more columns in addition to the row key and timestamp(optional) are required");
|
||||||
|
System.exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If timestamp option is not specified, use current system time.
|
||||||
|
long timstamp = conf.getLong(TIMESTAMP_CONF_KEY, System.currentTimeMillis());
|
||||||
|
|
||||||
|
// Set it back to replace invalid timestamp (non-numeric) with current system time
|
||||||
|
conf.setLong(TIMESTAMP_CONF_KEY, timstamp);
|
||||||
|
|
||||||
Job job = createSubmittableJob(conf, otherArgs);
|
Job job = createSubmittableJob(conf, otherArgs);
|
||||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||||
}
|
}
|
||||||
|
@ -82,8 +82,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||||||
|
|
||||||
Configuration conf = context.getConfiguration();
|
Configuration conf = context.getConfiguration();
|
||||||
|
|
||||||
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY),
|
parser = new ImportTsv.TsvParser(conf.get(ImportTsv.COLUMNS_CONF_KEY), separator);
|
||||||
separator);
|
|
||||||
if (parser.getRowKeyColumnIndex() == -1) {
|
if (parser.getRowKeyColumnIndex() == -1) {
|
||||||
throw new RuntimeException("No row key column specified");
|
throw new RuntimeException("No row key column specified");
|
||||||
}
|
}
|
||||||
@ -105,10 +104,10 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||||||
separator = new String(Base64.decode(separator));
|
separator = new String(Base64.decode(separator));
|
||||||
}
|
}
|
||||||
|
|
||||||
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, System.currentTimeMillis());
|
// Should never get 0 as we are setting this to a valid value in job configuration.
|
||||||
|
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
|
||||||
|
|
||||||
skipBadLines = context.getConfiguration().getBoolean(
|
skipBadLines = context.getConfiguration().getBoolean(ImportTsv.SKIP_LINES_CONF_KEY, true);
|
||||||
ImportTsv.SKIP_LINES_CONF_KEY, true);
|
|
||||||
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,22 +115,22 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||||||
* Convert a line of TSV text into an HBase table row.
|
* Convert a line of TSV text into an HBase table row.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void map(LongWritable offset, Text value,
|
public void map(LongWritable offset, Text value, Context context) throws IOException {
|
||||||
Context context)
|
|
||||||
throws IOException {
|
|
||||||
byte[] lineBytes = value.getBytes();
|
byte[] lineBytes = value.getBytes();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ImportTsv.TsvParser.ParsedLine parsed = parser.parse(
|
ImportTsv.TsvParser.ParsedLine parsed = parser.parse(lineBytes, value.getLength());
|
||||||
lineBytes, value.getLength());
|
ImmutableBytesWritable rowKey = new ImmutableBytesWritable(lineBytes,
|
||||||
ImmutableBytesWritable rowKey =
|
parsed.getRowKeyOffset(), parsed.getRowKeyLength());
|
||||||
new ImmutableBytesWritable(lineBytes,
|
// Retrieve timestamp if exists
|
||||||
parsed.getRowKeyOffset(),
|
ts = parsed.getTimestamp(ts);
|
||||||
parsed.getRowKeyLength());
|
|
||||||
|
|
||||||
Put put = new Put(rowKey.copyBytes());
|
Put put = new Put(rowKey.copyBytes());
|
||||||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||||
if (i == parser.getRowKeyColumnIndex()) continue;
|
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
KeyValue kv = new KeyValue(
|
KeyValue kv = new KeyValue(
|
||||||
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
|
lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
|
||||||
parser.getFamily(i), 0, parser.getFamily(i).length,
|
parser.getFamily(i), 0, parser.getFamily(i).length,
|
||||||
@ -144,9 +143,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||||||
context.write(rowKey, put);
|
context.write(rowKey, put);
|
||||||
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
|
} catch (ImportTsv.TsvParser.BadTsvLineException badLine) {
|
||||||
if (skipBadLines) {
|
if (skipBadLines) {
|
||||||
System.err.println(
|
System.err.println("Bad line at offset: " + offset.get() + ":\n" + badLine.getMessage());
|
||||||
"Bad line at offset: " + offset.get() + ":\n" +
|
|
||||||
badLine.getMessage());
|
|
||||||
incrementBadLineCount(1);
|
incrementBadLineCount(1);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
@ -154,9 +151,7 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||||||
}
|
}
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
if (skipBadLines) {
|
if (skipBadLines) {
|
||||||
System.err.println(
|
System.err.println("Bad line at offset: " + offset.get() + ":\n" + e.getMessage());
|
||||||
"Bad line at offset: " + offset.get() + ":\n" +
|
|
||||||
e.getMessage());
|
|
||||||
incrementBadLineCount(1);
|
incrementBadLineCount(1);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
@ -61,6 +61,7 @@ public class TestImportTsv {
|
|||||||
assertNull(parser.getFamily(0));
|
assertNull(parser.getFamily(0));
|
||||||
assertNull(parser.getQualifier(0));
|
assertNull(parser.getQualifier(0));
|
||||||
assertEquals(0, parser.getRowKeyColumnIndex());
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
assertFalse(parser.hasTimestamp());
|
||||||
|
|
||||||
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
|
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1", "\t");
|
||||||
assertNull(parser.getFamily(0));
|
assertNull(parser.getFamily(0));
|
||||||
@ -68,6 +69,7 @@ public class TestImportTsv {
|
|||||||
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
|
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
|
||||||
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
|
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
|
||||||
assertEquals(0, parser.getRowKeyColumnIndex());
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
assertFalse(parser.hasTimestamp());
|
||||||
|
|
||||||
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
|
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,col1:scol2", "\t");
|
||||||
assertNull(parser.getFamily(0));
|
assertNull(parser.getFamily(0));
|
||||||
@ -77,6 +79,18 @@ public class TestImportTsv {
|
|||||||
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
|
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(2));
|
||||||
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
|
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(2));
|
||||||
assertEquals(0, parser.getRowKeyColumnIndex());
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
assertFalse(parser.hasTimestamp());
|
||||||
|
|
||||||
|
parser = new TsvParser("HBASE_ROW_KEY,col1:scol1,HBASE_TS_KEY,col1:scol2", "\t");
|
||||||
|
assertNull(parser.getFamily(0));
|
||||||
|
assertNull(parser.getQualifier(0));
|
||||||
|
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(1));
|
||||||
|
assertBytesEquals(Bytes.toBytes("scol1"), parser.getQualifier(1));
|
||||||
|
assertBytesEquals(Bytes.toBytes("col1"), parser.getFamily(3));
|
||||||
|
assertBytesEquals(Bytes.toBytes("scol2"), parser.getQualifier(3));
|
||||||
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
assertTrue(parser.hasTimestamp());
|
||||||
|
assertEquals(2, parser.getTimestampKeyColumnIndex());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -89,12 +103,31 @@ public class TestImportTsv {
|
|||||||
assertNull(parser.getFamily(2));
|
assertNull(parser.getFamily(2));
|
||||||
assertNull(parser.getQualifier(2));
|
assertNull(parser.getQualifier(2));
|
||||||
assertEquals(2, parser.getRowKeyColumnIndex());
|
assertEquals(2, parser.getRowKeyColumnIndex());
|
||||||
|
assertEquals(TsvParser.DEFAULT_TIMESTAMP_COLUMN_INDEX, parser.getTimestampKeyColumnIndex());
|
||||||
|
|
||||||
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
|
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c\tval_d");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
ParsedLine parsed = parser.parse(line, line.length);
|
||||||
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
|
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTsvParserWithTimestamp() throws BadTsvLineException {
|
||||||
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
|
||||||
|
assertNull(parser.getFamily(0));
|
||||||
|
assertNull(parser.getQualifier(0));
|
||||||
|
assertNull(parser.getFamily(1));
|
||||||
|
assertNull(parser.getQualifier(1));
|
||||||
|
assertBytesEquals(Bytes.toBytes("col_a"), parser.getFamily(2));
|
||||||
|
assertBytesEquals(HConstants.EMPTY_BYTE_ARRAY, parser.getQualifier(2));
|
||||||
|
assertEquals(0, parser.getRowKeyColumnIndex());
|
||||||
|
assertEquals(1, parser.getTimestampKeyColumnIndex());
|
||||||
|
|
||||||
|
byte[] line = Bytes.toBytes("rowkey\t1234\tval_a");
|
||||||
|
ParsedLine parsed = parser.parse(line, line.length);
|
||||||
|
assertEquals(1234l, parsed.getTimestamp(-1));
|
||||||
|
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
|
||||||
|
}
|
||||||
|
|
||||||
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
|
private void checkParsing(ParsedLine parsed, Iterable<String> expected) {
|
||||||
ArrayList<String> parsedCols = new ArrayList<String>();
|
ArrayList<String> parsedCols = new ArrayList<String>();
|
||||||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||||
@ -120,28 +153,46 @@ public class TestImportTsv {
|
|||||||
public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
|
public void testTsvParserBadTsvLineExcessiveColumns() throws BadTsvLineException {
|
||||||
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
||||||
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
|
byte[] line = Bytes.toBytes("val_a\tval_b\tval_c");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected=BadTsvLineException.class)
|
@Test(expected=BadTsvLineException.class)
|
||||||
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
|
public void testTsvParserBadTsvLineZeroColumn() throws BadTsvLineException {
|
||||||
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
||||||
byte[] line = Bytes.toBytes("");
|
byte[] line = Bytes.toBytes("");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected=BadTsvLineException.class)
|
@Test(expected=BadTsvLineException.class)
|
||||||
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
|
public void testTsvParserBadTsvLineOnlyKey() throws BadTsvLineException {
|
||||||
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a", "\t");
|
||||||
byte[] line = Bytes.toBytes("key_only");
|
byte[] line = Bytes.toBytes("key_only");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected=BadTsvLineException.class)
|
@Test(expected=BadTsvLineException.class)
|
||||||
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
|
public void testTsvParserBadTsvLineNoRowKey() throws BadTsvLineException {
|
||||||
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
|
TsvParser parser = new TsvParser("col_a,HBASE_ROW_KEY", "\t");
|
||||||
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
|
byte[] line = Bytes.toBytes("only_cola_data_and_no_row_key");
|
||||||
|
parser.parse(line, line.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=BadTsvLineException.class)
|
||||||
|
public void testTsvParserInvalidTimestamp() throws BadTsvLineException {
|
||||||
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,HBASE_TS_KEY,col_a,", "\t");
|
||||||
|
assertEquals(1, parser.getTimestampKeyColumnIndex());
|
||||||
|
byte[] line = Bytes.toBytes("rowkey\ttimestamp\tval_a");
|
||||||
ParsedLine parsed = parser.parse(line, line.length);
|
ParsedLine parsed = parser.parse(line, line.length);
|
||||||
|
assertEquals(-1, parsed.getTimestamp(-1));
|
||||||
|
checkParsing(parsed, Splitter.on("\t").split(Bytes.toString(line)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=BadTsvLineException.class)
|
||||||
|
public void testTsvParserNoTimestampValue() throws BadTsvLineException {
|
||||||
|
TsvParser parser = new TsvParser("HBASE_ROW_KEY,col_a,HBASE_TS_KEY", "\t");
|
||||||
|
assertEquals(2, parser.getTimestampKeyColumnIndex());
|
||||||
|
byte[] line = Bytes.toBytes("rowkey\tval_a");
|
||||||
|
parser.parse(line, line.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -159,7 +210,22 @@ public class TestImportTsv {
|
|||||||
INPUT_FILE
|
INPUT_FILE
|
||||||
};
|
};
|
||||||
|
|
||||||
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 1);
|
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMROnTableWithTimestamp() throws Exception {
|
||||||
|
String TABLE_NAME = "TestTable";
|
||||||
|
String FAMILY = "FAM";
|
||||||
|
String INPUT_FILE = "InputFile1.csv";
|
||||||
|
|
||||||
|
// Prepare the arguments required for the test.
|
||||||
|
String[] args = new String[] {
|
||||||
|
"-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B",
|
||||||
|
"-D" + ImportTsv.SEPARATOR_CONF_KEY + "=,", TABLE_NAME, INPUT_FILE };
|
||||||
|
|
||||||
|
String data = "KEY,1234,VALUE1,VALUE2\n";
|
||||||
|
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, data, args, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -176,16 +242,16 @@ public class TestImportTsv {
|
|||||||
INPUT_FILE
|
INPUT_FILE
|
||||||
};
|
};
|
||||||
|
|
||||||
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, args, 3);
|
doMROnTableTest(INPUT_FILE, FAMILY, TABLE_NAME, null, args, 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doMROnTableTest(String inputFile, String family, String tableName,
|
private void doMROnTableTest(String inputFile, String family, String tableName, String data,
|
||||||
String[] args, int valueMultiplier) throws Exception {
|
String[] args, int valueMultiplier) throws Exception {
|
||||||
|
|
||||||
// Cluster
|
// Cluster
|
||||||
HBaseTestingUtility htu1 = new HBaseTestingUtility();
|
HBaseTestingUtility htu1 = new HBaseTestingUtility();
|
||||||
|
|
||||||
MiniHBaseCluster cluster = htu1.startMiniCluster();
|
htu1.startMiniCluster();
|
||||||
htu1.startMiniMapReduceCluster();
|
htu1.startMiniMapReduceCluster();
|
||||||
|
|
||||||
GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
|
GenericOptionsParser opts = new GenericOptionsParser(htu1.getConfiguration(), args);
|
||||||
@ -196,14 +262,14 @@ public class TestImportTsv {
|
|||||||
|
|
||||||
FileSystem fs = FileSystem.get(conf);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
FSDataOutputStream op = fs.create(new Path(inputFile), true);
|
||||||
String line = "KEY\u001bVALUE1\u001bVALUE2\n";
|
if (data == null) {
|
||||||
op.write(line.getBytes(HConstants.UTF8_ENCODING));
|
data = "KEY\u001bVALUE1\u001bVALUE2\n";
|
||||||
|
}
|
||||||
|
op.write(Bytes.toBytes(data));
|
||||||
op.close();
|
op.close();
|
||||||
|
|
||||||
final byte[] FAM = Bytes.toBytes(family);
|
final byte[] FAM = Bytes.toBytes(family);
|
||||||
final byte[] TAB = Bytes.toBytes(tableName);
|
final byte[] TAB = Bytes.toBytes(tableName);
|
||||||
final byte[] QA = Bytes.toBytes("A");
|
|
||||||
final byte[] QB = Bytes.toBytes("B");
|
|
||||||
|
|
||||||
HTableDescriptor desc = new HTableDescriptor(TAB);
|
HTableDescriptor desc = new HTableDescriptor(TAB);
|
||||||
desc.addFamily(new HColumnDescriptor(FAM));
|
desc.addFamily(new HColumnDescriptor(FAM));
|
||||||
@ -256,7 +322,7 @@ public class TestImportTsv {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
|
public static String toU8Str(byte[] bytes) throws UnsupportedEncodingException {
|
||||||
return new String(bytes, HConstants.UTF8_ENCODING);
|
return Bytes.toString(bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.Rule
|
@org.junit.Rule
|
||||||
|
Loading…
x
Reference in New Issue
Block a user