Honor ignoreInvalidRows in Hadoop indexer

The reducer of the hadoop indexer now ignores lines with parsing
exceptions (if enabled by the indexer config).
This commit is contained in:
Dia Kharrat 2015-03-19 17:33:54 -07:00
parent 46eae49bf5
commit 58d5f5e7f0
1 changed files with 15 additions and 3 deletions

View File

@ -17,6 +17,7 @@
package io.druid.indexer; package io.druid.indexer;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
@ -378,10 +379,21 @@ public class IndexGeneratorJob implements Jobby
for (final Writable value : values) { for (final Writable value : values) {
context.progress(); context.progress();
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); int numRows;
allDimensionNames.addAll(inputRow.getDimensions()); try {
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow); numRows = index.add(inputRow);
} catch (ParseException e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
continue;
} else {
throw e;
}
}
++lineCount; ++lineCount;
if (!index.canAppendRow()) { if (!index.canAppendRow()) {