From 58d5f5e7f07068cf7a9de8ad0d17ccdaf9b0701d Mon Sep 17 00:00:00 2001 From: Dia Kharrat Date: Thu, 19 Mar 2015 17:33:54 -0700 Subject: [PATCH 1/2] Honor ignoreInvalidRows in Hadoop indexer The reducer of the hadoop indexer now ignores lines with parsing exceptions (if enabled by the indexer config). --- .../io/druid/indexer/IndexGeneratorJob.java | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 5f97f431482..033df56cdf5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -17,6 +17,7 @@ package io.druid.indexer; +import com.metamx.common.parsers.ParseException; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.Rows; @@ -378,10 +379,21 @@ public class IndexGeneratorJob implements Jobby for (final Writable value : values) { context.progress(); - final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); - allDimensionNames.addAll(inputRow.getDimensions()); + int numRows; + try { + final InputRow inputRow = index.formatRow(HadoopDruidIndexerMapper.parseInputRow(value, parser)); + allDimensionNames.addAll(inputRow.getDimensions()); - int numRows = index.add(inputRow); + numRows = index.add(inputRow); + } catch (ParseException e) { + if (config.isIgnoreInvalidRows()) { + log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); + context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); + continue; + } else { + throw e; + } + } ++lineCount; if (!index.canAppendRow()) { From 3a6dc99384d169e50a1837ea3cbab0c8302457ad Mon Sep 17 00:00:00 2001 From: Dia Kharrat Date: Thu, 19 Mar 2015 17:34:37 -0700 Subject: [PATCH 2/2] log invalid rows in mapper of Hadoop indexer --- .../main/java/io/druid/indexer/HadoopDruidIndexerMapper.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 4fd3cc93081..bd561106f70 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -17,6 +17,7 @@ package io.druid.indexer; +import com.metamx.common.logger.Logger; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.MapInputRowParser; @@ -42,6 +43,8 @@ import com.metamx.common.RE; public abstract class HadoopDruidIndexerMapper extends Mapper { + private static final Logger log = new Logger(HadoopDruidIndexerMapper.class); + private HadoopDruidIndexerConfig config; private InputRowParser parser; protected GranularitySpec granularitySpec; @@ -77,6 +80,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } catch (Exception e) { if (config.isIgnoreInvalidRows()) { + log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); return; // we're ignoring this invalid row } else {