Merge pull request #1226 from dkharrat/master

Honor ignoreInvalidRows in reducer of Hadoop indexer
This commit is contained in:
Fangjin Yang 2015-03-20 08:36:09 -07:00
commit bb9118319f
2 changed files with 19 additions and 3 deletions

View File

@ -17,6 +17,7 @@
package io.druid.indexer; package io.druid.indexer;
import com.metamx.common.logger.Logger;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.MapInputRowParser; import io.druid.data.input.impl.MapInputRowParser;
@ -42,6 +43,8 @@ import com.metamx.common.RE;
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT> public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
{ {
private static final Logger log = new Logger(HadoopDruidIndexerMapper.class);
private HadoopDruidIndexerConfig config; private HadoopDruidIndexerConfig config;
private InputRowParser parser; private InputRowParser parser;
protected GranularitySpec granularitySpec; protected GranularitySpec granularitySpec;
@ -77,6 +80,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
} }
catch (Exception e) { catch (Exception e) {
if (config.isIgnoreInvalidRows()) { if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row return; // we're ignoring this invalid row
} else { } else {

View File

@ -17,6 +17,7 @@
package io.druid.indexer; package io.druid.indexer;
import com.metamx.common.parsers.ParseException;
import io.druid.collections.StupidPool; import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
@ -378,10 +379,21 @@ public class IndexGeneratorJob implements Jobby
for (final Writable value : values) { for (final Writable value : values) {
context.progress(); context.progress();
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); int numRows;
allDimensionNames.addAll(inputRow.getDimensions()); try {
final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow); numRows = index.add(inputRow);
} catch (ParseException e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
continue;
} else {
throw e;
}
}
++lineCount; ++lineCount;
if (!index.canAppendRow()) { if (!index.canAppendRow()) {