HBASE-11262 Avoid empty columns while doing bulk-load (Ashish Kumar)
This commit is contained in:
parent
f9ce069e15
commit
8473fae1de
|
@ -92,6 +92,7 @@ public class ImportTsv extends Configured implements Tool {
|
|||
// If true, bad lines are logged to stderr. Default: false.
|
||||
public final static String LOG_BAD_LINES_CONF_KEY = "importtsv.log.bad.lines";
|
||||
public final static String SKIP_LINES_CONF_KEY = "importtsv.skip.bad.lines";
|
||||
public final static String SKIP_EMPTY_COLUMNS = "importtsv.skip.empty.columns";
|
||||
public final static String COLUMNS_CONF_KEY = "importtsv.columns";
|
||||
public final static String SEPARATOR_CONF_KEY = "importtsv.separator";
|
||||
public final static String ATTRIBUTE_SEPERATOR_CONF_KEY = "attributes.seperator";
|
||||
|
@ -687,6 +688,7 @@ public class ImportTsv extends Configured implements Tool {
|
|||
" table. If table does not exist, it is created but deleted in the end.\n" +
|
||||
" -D" + SKIP_LINES_CONF_KEY + "=false - fail if encountering an invalid line\n" +
|
||||
" -D" + LOG_BAD_LINES_CONF_KEY + "=true - logs invalid lines to stderr\n" +
|
||||
" -D" + SKIP_EMPTY_COLUMNS + "=false - If true then skip empty columns in bulk import\n" +
|
||||
" '-D" + SEPARATOR_CONF_KEY + "=|' - eg separate on pipes instead of tabs\n" +
|
||||
" -D" + TIMESTAMP_CONF_KEY + "=currentTimeAsLong - use the specified timestamp for the import\n" +
|
||||
" -D" + MAPPER_CONF_KEY + "=my.Mapper - A user-defined Mapper to use instead of " +
|
||||
|
|
|
@ -57,6 +57,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||
|
||||
/** Should skip bad lines */
|
||||
private boolean skipBadLines;
|
||||
/** Should skip empty columns*/
|
||||
private boolean skipEmptyColumns;
|
||||
private Counter badLineCount;
|
||||
private boolean logBadLines;
|
||||
|
||||
|
@ -128,6 +130,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||
// configuration.
|
||||
ts = conf.getLong(ImportTsv.TIMESTAMP_CONF_KEY, 0);
|
||||
|
||||
skipEmptyColumns = context.getConfiguration().getBoolean(
|
||||
ImportTsv.SKIP_EMPTY_COLUMNS, false);
|
||||
skipBadLines = context.getConfiguration().getBoolean(
|
||||
ImportTsv.SKIP_LINES_CONF_KEY, true);
|
||||
badLineCount = context.getCounter("ImportTsv", "Bad Lines");
|
||||
|
@ -160,7 +164,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
|
|||
for (int i = 0; i < parsed.getColumnCount(); i++) {
|
||||
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
|
||||
|| i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
|
||||
|| i == parser.getCellTTLColumnIndex()) {
|
||||
|| i == parser.getCellTTLColumnIndex() || (skipEmptyColumns
|
||||
&& parsed.getColumnLength(i) == 0)) {
|
||||
continue;
|
||||
}
|
||||
populatePut(lineBytes, parsed, put, i);
|
||||
|
|
|
@ -365,6 +365,19 @@ public class TestImportTsv implements Configurable {
|
|||
doMROnTableTest(data, 1, 4);
|
||||
util.deleteTable(table);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSkipEmptyColumns() throws Exception {
|
||||
Path bulkOutputPath = new Path(util.getDataTestDirOnTestFS(tn.getNameAsString()), "hfiles");
|
||||
args.put(ImportTsv.BULK_OUTPUT_CONF_KEY, bulkOutputPath.toString());
|
||||
args.put(ImportTsv.COLUMNS_CONF_KEY, "HBASE_ROW_KEY,HBASE_TS_KEY,FAM:A,FAM:B");
|
||||
args.put(ImportTsv.SEPARATOR_CONF_KEY, ",");
|
||||
args.put(ImportTsv.SKIP_EMPTY_COLUMNS, "true");
|
||||
// 2 Rows of data as input. Both rows are valid and only 3 columns are no-empty among 4
|
||||
String data = "KEY,1234,VALUE1,VALUE2\nKEY,1235,,VALUE2\n";
|
||||
doMROnTableTest(util, tn, FAMILY, data, args, 1, 3);
|
||||
util.deleteTable(tn);
|
||||
}
|
||||
|
||||
private Tool doMROnTableTest(String data, int valueMultiplier,int expectedKVCount)
|
||||
throws Exception {
|
||||
|
|
Loading…
Reference in New Issue