diff --git a/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java index 8847dea0278..1fafa37a624 100644 --- a/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java @@ -23,11 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.java.util.common.parsers.ParseException; - import org.joda.time.DateTime; import java.util.List; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index bf8e008288b..ce02f4adf00 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -261,6 +261,7 @@ public class IndexTask extends AbstractTask // determine intervals containing data and prime HLL collectors final Map> hllCollectors = Maps.newHashMap(); int thrownAway = 0; + int unparseable = 0; log.info("Determining intervals and shardSpecs"); long determineShardSpecsStartMillis = System.currentTimeMillis(); @@ -269,48 +270,61 @@ public class IndexTask extends AbstractTask firehoseTempDir) ) { while (firehose.hasMore()) { - final InputRow inputRow = firehose.nextRow(); + try { + final InputRow inputRow = firehose.nextRow(); - // The null inputRow means the caller must skip this row. - if (inputRow == null) { - continue; - } - - final Interval interval; - if (determineIntervals) { - interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); - } else { - final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); - if (!optInterval.isPresent()) { - thrownAway++; + // The null inputRow means the caller must skip this row. + if (inputRow == null) { continue; } - interval = optInterval.get(); - } - if (!determineNumPartitions) { - // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() - // for the interval and don't instantiate a HLL collector - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.absent()); + final Interval interval; + if (determineIntervals) { + interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); + } else { + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + if (!optInterval.isPresent()) { + thrownAway++; + continue; + } + interval = optInterval.get(); } - continue; - } - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); - } + if (!determineNumPartitions) { + // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() + // for the interval and don't instantiate a HLL collector + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.absent()); + } + continue; + } - List groupKey = Rows.toGroupKey( - queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), - inputRow - ); - hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); + } + + List groupKey = Rows.toGroupKey( + queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), + inputRow + ); + hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); + } + catch (ParseException e) { + if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { + throw e; + } else { + unparseable++; + } + } } } + // These metrics are reported in generateAndPublishSegments() if (thrownAway > 0) { - log.warn("Unable to to find a matching interval for [%,d] events", thrownAway); + log.warn("Unable to find a matching interval for [%,d] events", thrownAway); + } + if (unparseable > 0) { + log.warn("Unable to parse [%,d] events", unparseable); } final ImmutableSortedMap> sortedMap = ImmutableSortedMap.copyOf( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 2a5e0b5dc08..22fddad373d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -37,8 +37,10 @@ import io.druid.indexing.common.actions.SegmentAllocateAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexIO; @@ -61,6 +63,7 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; @@ -78,6 +81,9 @@ public class IndexTaskTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec( "ts", @@ -443,7 +449,6 @@ public class IndexTaskTest writer.write("2014-01-01T00:00:10Z,a,1\n"); } - IndexTask indexTask = new IndexTask( null, null, @@ -484,6 +489,115 @@ public class IndexTaskTest Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); } + @Test + public void testIgnoreParseException() throws Exception + { + final File tmpDir = temporaryFolder.newFolder(); + + final File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,d,val\n"); + writer.write("unparseable,a,1\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in + // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false, + false // ignore parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testReportParseException() throws Exception + { + expectedException.expect(ParseException.class); + expectedException.expectMessage("Unparseable timestamp found!"); + + final File tmpDir = temporaryFolder.newFolder(); + + final File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,d,val\n"); + writer.write("unparseable,a,1\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false, + true // report parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + runTask(indexTask); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -574,6 +688,29 @@ public class IndexTaskTest boolean forceExtendableShardSpecs, boolean appendToExisting ) + { + return createIngestionSpec( + baseDir, + parseSpec, + granularitySpec, + targetPartitionSize, + numShards, + forceExtendableShardSpecs, + appendToExisting, + true + ); + } + + private IndexTask.IndexIngestionSpec createIngestionSpec( + File baseDir, + ParseSpec parseSpec, + GranularitySpec granularitySpec, + Integer targetPartitionSize, + Integer numShards, + boolean forceExtendableShardSpecs, + boolean appendToExisting, + boolean reportParseException + ) { return new IndexTask.IndexIngestionSpec( new DataSchema( @@ -611,7 +748,7 @@ public class IndexTaskTest null, true, forceExtendableShardSpecs, - true, + reportParseException, null ) );