From 6f330dc8163aa1bcaad72c1172eebbde100b60dd Mon Sep 17 00:00:00 2001 From: Nishant Date: Wed, 22 Jun 2016 16:38:29 -0700 Subject: [PATCH] Better handling for parseExceptions for Batch Ingestion (#3171) * Better handling for parseExceptions * make parseException handling consistent with Realtime * change combiner default val to true * review comments * review comments --- .../indexer/DetermineHashedPartitionsJob.java | 3 +- .../druid/indexer/DeterminePartitionsJob.java | 6 ++- .../indexer/HadoopDruidIndexerMapper.java | 19 ++++++---- .../io/druid/indexer/IndexGeneratorJob.java | 10 +++-- .../java/io/druid/indexer/InputRowSerde.java | 14 ++++++- .../indexer/IndexGeneratorCombinerTest.java | 4 +- .../io/druid/indexer/InputRowSerdeTest.java | 38 ++++++++++++++++--- 7 files changed, 70 insertions(+), 24 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 99c568770ac..4212e4d1d1a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -241,7 +241,8 @@ public class DetermineHashedPartitionsJob implements Jobby protected void innerMap( InputRow inputRow, Object value, - Context context + Context context, + boolean reportParseExceptions ) throws IOException, InterruptedException { diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java index c57c58bba6a..dda55506348 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java @@ -268,7 +268,8 @@ public class DeterminePartitionsJob implements Jobby protected void innerMap( InputRow inputRow, Object value, - Context context + Context context, + boolean reportParseExceptions ) throws IOException, InterruptedException { final List groupKey = Rows.toGroupKey( @@ -349,7 +350,8 @@ public class DeterminePartitionsJob implements Jobby protected void innerMap( InputRow inputRow, Object value, - Context context + Context context, + boolean reportParseExceptions ) throws IOException, InterruptedException { final Map> dims = Maps.newHashMap(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java index 8ee4ba9f5c9..dd7a5b1dd66 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerMapper.java @@ -21,6 +21,7 @@ package io.druid.indexer; import com.metamx.common.RE; import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.StringInputRowParser; @@ -38,6 +39,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< protected HadoopDruidIndexerConfig config; private InputRowParser parser; protected GranularitySpec granularitySpec; + private boolean reportParseExceptions; @Override protected void setup(Context context) @@ -46,6 +48,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); parser = config.getParser(); granularitySpec = config.getGranularitySpec(); + reportParseExceptions = !config.isIgnoreInvalidRows(); } public HadoopDruidIndexerConfig getConfig() @@ -68,20 +71,20 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< try { inputRow = parseInputRow(value, parser); } - catch (Exception e) { - if (config.isIgnoreInvalidRows()) { - log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; // we're ignoring this invalid row - } else { + catch (ParseException e) { + if (reportParseExceptions) { throw e; } + log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString()); + context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); + return; // we're ignoring this invalid row + } if (!granularitySpec.bucketIntervals().isPresent() || granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch())) .isPresent()) { - innerMap(inputRow, value, context); + innerMap(inputRow, value, context, reportParseExceptions); } } catch (RuntimeException e) { @@ -103,7 +106,7 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< } } - abstract protected void innerMap(InputRow inputRow, Object value, Context context) + abstract protected void innerMap(InputRow inputRow, Object value, Context context, boolean reportParseExceptions) throws IOException, InterruptedException; } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 414a9a4e3cc..00619e90db7 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -263,7 +263,8 @@ public class IndexGeneratorJob implements Jobby protected void innerMap( InputRow inputRow, Object value, - Context context + Context context, + boolean reportParseExceptions ) throws IOException, InterruptedException { // Group by bucket, sort by timestamp @@ -287,9 +288,9 @@ public class IndexGeneratorJob implements Jobby // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw // data byte[] serializedInputRow = inputRow instanceof SegmentInputRow ? - InputRowSerde.toBytes(inputRow, combiningAggs) + InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions) : - InputRowSerde.toBytes(inputRow, aggregators); + InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions); context.write( new SortableBytes( @@ -369,9 +370,10 @@ public class IndexGeneratorJob implements Jobby context.progress(); Row row = rows.next(); InputRow inputRow = getInputRowFromRow(row, dimensions); + // reportParseExceptions is true as any unparseable data is already handled by the mapper. context.write( key, - new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs)) + new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true)) ); } index.close(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index b179faefb73..20cf221a7f8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -28,6 +28,7 @@ import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; import com.metamx.common.IAE; import com.metamx.common.logger.Logger; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.query.aggregation.Aggregator; @@ -50,7 +51,7 @@ public class InputRowSerde { private static final Logger log = new Logger(InputRowSerde.class); - public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs) + public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions) { try { ByteArrayDataOutput out = ByteStreams.newDataOutput(); @@ -91,7 +92,16 @@ public class InputRowSerde true ) ); - agg.aggregate(); + try { + agg.aggregate(); + } + catch (ParseException e) { + // "aggregate" can throw ParseExceptions if a selector expects something but gets something else. + if (reportParseExceptions) { + throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName()); + } + log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName()); + } String t = aggFactory.getTypeName(); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index 151e1a43b93..855cfd973fa 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -159,8 +159,8 @@ public class IndexGeneratorCombinerTest ) ); List rows = Lists.newArrayList( - new BytesWritable(InputRowSerde.toBytes(row1, aggregators)), - new BytesWritable(InputRowSerde.toBytes(row2, aggregators)) + new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)), + new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true)) ); Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java index 828dc1ab400..26ddf5a29d3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java @@ -21,6 +21,7 @@ package io.druid.indexer; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.jackson.AggregatorsModule; @@ -44,6 +45,10 @@ public class InputRowSerdeTest private List dims; private Map event; + { + new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique + } + public InputRowSerdeTest() { this.timestamp = System.currentTimeMillis(); @@ -60,7 +65,7 @@ public class InputRowSerdeTest @Test public void testSerde() { - new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique + InputRow in = new MapBasedInputRow( timestamp, @@ -68,14 +73,15 @@ public class InputRowSerdeTest event ); - AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] { + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"), new DoubleSumAggregatorFactory("m1out", "m1"), new LongSumAggregatorFactory("m2out", "m2"), - new HyperUniquesAggregatorFactory("m3out", "m3") + new HyperUniquesAggregatorFactory("m3out", "m3"), + new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long }; - byte[] data = InputRowSerde.toBytes(in, aggregatorFactories); + byte[] data = InputRowSerde.toBytes(in, aggregatorFactories, false); // Ignore Unparseable aggregator InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories); Assert.assertEquals(timestamp, out.getTimestampFromEpoch()); @@ -87,6 +93,28 @@ public class InputRowSerdeTest Assert.assertEquals(0.0f, out.getFloatMetric("agg_non_existing"), 0.00001); Assert.assertEquals(5.0f, out.getFloatMetric("m1out"), 0.00001); Assert.assertEquals(100L, out.getLongMetric("m2out")); - Assert.assertEquals(1, ((HyperLogLogCollector)out.getRaw("m3out")).estimateCardinality(), 0.001); + Assert.assertEquals(1, ((HyperLogLogCollector) out.getRaw("m3out")).estimateCardinality(), 0.001); + Assert.assertEquals(0L, out.getLongMetric("unparseable")); + + } + + @Test(expected = ParseException.class) + public void testThrowParseExceptions() + { + InputRow in = new MapBasedInputRow( + timestamp, + dims, + event + ); + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"), + new DoubleSumAggregatorFactory("m1out", "m1"), + new LongSumAggregatorFactory("m2out", "m2"), + new HyperUniquesAggregatorFactory("m3out", "m3"), + new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long + }; + + InputRowSerde.toBytes(in, aggregatorFactories, true); + } }