Better handling for parseExceptions for Batch Ingestion (#3171)

* Better handling for parseExceptions

* make parseException handling consistent with Realtime

* change combiner default val to true

* review comments

* review comments
This commit is contained in:
Nishant 2016-06-22 16:38:29 -07:00 committed by Fangjin Yang
parent 24860a1391
commit 6f330dc816
7 changed files with 70 additions and 24 deletions

View File

@ -241,7 +241,8 @@ public class DetermineHashedPartitionsJob implements Jobby
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{

View File

@ -268,7 +268,8 @@ public class DeterminePartitionsJob implements Jobby
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
@ -349,7 +350,8 @@ public class DeterminePartitionsJob implements Jobby
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();

View File

@ -21,6 +21,7 @@ package io.druid.indexer;
import com.metamx.common.RE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
@ -38,6 +39,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
private boolean reportParseExceptions;
@Override
protected void setup(Context context)
@ -46,6 +48,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
granularitySpec = config.getGranularitySpec();
reportParseExceptions = !config.isIgnoreInvalidRows();
}
public HadoopDruidIndexerConfig getConfig()
@ -68,20 +71,20 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
try {
inputRow = parseInputRow(value, parser);
}
catch (Exception e) {
if (config.isIgnoreInvalidRows()) {
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
} else {
catch (ParseException e) {
if (reportParseExceptions) {
throw e;
}
log.debug(e, "Ignoring invalid row [%s] due to parsing error", value.toString());
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1);
return; // we're ignoring this invalid row
}
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(new DateTime(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, value, context);
innerMap(inputRow, value, context, reportParseExceptions);
}
}
catch (RuntimeException e) {
@ -103,7 +106,7 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
}
}
abstract protected void innerMap(InputRow inputRow, Object value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context, boolean reportParseExceptions)
throws IOException, InterruptedException;
}

View File

@ -263,7 +263,8 @@ public class IndexGeneratorJob implements Jobby
protected void innerMap(
InputRow inputRow,
Object value,
Context context
Context context,
boolean reportParseExceptions
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp
@ -287,9 +288,9 @@ public class IndexGeneratorJob implements Jobby
// and they contain the columns as they show up in the segment after ingestion, not what you would see in raw
// data
byte[] serializedInputRow = inputRow instanceof SegmentInputRow ?
InputRowSerde.toBytes(inputRow, combiningAggs)
InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions)
:
InputRowSerde.toBytes(inputRow, aggregators);
InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions);
context.write(
new SortableBytes(
@ -369,9 +370,10 @@ public class IndexGeneratorJob implements Jobby
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, dimensions);
// reportParseExceptions is true as any unparseable data is already handled by the mapper.
context.write(
key,
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs))
new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true))
);
}
index.close();

View File

@ -28,6 +28,7 @@ import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.metamx.common.IAE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.query.aggregation.Aggregator;
@ -50,7 +51,7 @@ public class InputRowSerde
{
private static final Logger log = new Logger(InputRowSerde.class);
public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs)
public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions)
{
try {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
@ -91,7 +92,16 @@ public class InputRowSerde
true
)
);
agg.aggregate();
try {
agg.aggregate();
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
throw new ParseException(e, "Encountered parse error for aggregator[%s]", agg.getName());
}
log.debug(e, "Encountered parse error, skipping aggregator[%s].", agg.getName());
}
String t = aggFactory.getTypeName();

View File

@ -159,8 +159,8 @@ public class IndexGeneratorCombinerTest
)
);
List<BytesWritable> rows = Lists.newArrayList(
new BytesWritable(InputRowSerde.toBytes(row1, aggregators)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators))
new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)),
new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true))
);
Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class);

View File

@ -21,6 +21,7 @@ package io.druid.indexer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.jackson.AggregatorsModule;
@ -44,6 +45,10 @@ public class InputRowSerdeTest
private List<String> dims;
private Map<String, Object> event;
{
new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique
}
public InputRowSerdeTest()
{
this.timestamp = System.currentTimeMillis();
@ -60,7 +65,7 @@ public class InputRowSerdeTest
@Test
public void testSerde()
{
new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique
InputRow in = new MapBasedInputRow(
timestamp,
@ -68,14 +73,15 @@ public class InputRowSerdeTest
event
);
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[] {
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new HyperUniquesAggregatorFactory("m3out", "m3")
new HyperUniquesAggregatorFactory("m3out", "m3"),
new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long
};
byte[] data = InputRowSerde.toBytes(in, aggregatorFactories);
byte[] data = InputRowSerde.toBytes(in, aggregatorFactories, false); // Ignore Unparseable aggregator
InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories);
Assert.assertEquals(timestamp, out.getTimestampFromEpoch());
@ -87,6 +93,28 @@ public class InputRowSerdeTest
Assert.assertEquals(0.0f, out.getFloatMetric("agg_non_existing"), 0.00001);
Assert.assertEquals(5.0f, out.getFloatMetric("m1out"), 0.00001);
Assert.assertEquals(100L, out.getLongMetric("m2out"));
Assert.assertEquals(1, ((HyperLogLogCollector)out.getRaw("m3out")).estimateCardinality(), 0.001);
Assert.assertEquals(1, ((HyperLogLogCollector) out.getRaw("m3out")).estimateCardinality(), 0.001);
Assert.assertEquals(0L, out.getLongMetric("unparseable"));
}
@Test(expected = ParseException.class)
public void testThrowParseExceptions()
{
InputRow in = new MapBasedInputRow(
timestamp,
dims,
event
);
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new DoubleSumAggregatorFactory("agg_non_existing", "agg_non_existing_in"),
new DoubleSumAggregatorFactory("m1out", "m1"),
new LongSumAggregatorFactory("m2out", "m2"),
new HyperUniquesAggregatorFactory("m3out", "m3"),
new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long
};
InputRowSerde.toBytes(in, aggregatorFactories, true);
}
}