diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 888518ae8e9..18a2c476c19 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -203,6 +203,7 @@ public class IndexTask extends AbstractFixedIntervalTask final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); SortedSet retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); + int unparsed = 0; try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); @@ -211,10 +212,13 @@ public class IndexTask extends AbstractFixedIntervalTask if (interval.isPresent()) { retVal.add(interval.get()); } else { - throw new ISE("Unable to to find a matching interval for [%s]", dt); + unparsed++; } } } + if (unparsed > 0) { + log.warn("Unable to to find a matching interval for [%,d] events", unparsed); + } return retVal; } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 39f5621e0a0..d7c51608a95 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -226,4 +226,65 @@ public class IndexTaskTest return segments; } + + @Test + public void testIntervalBucketing() throws Exception + { + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + tmpFile.deleteOnExit(); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2015-03-01T07:59:59.977Z,a,1"); + writer.println("2015-03-01T08:00:00.000Z,b,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto" + ), + new DimensionsSpec( + Arrays.asList("dim"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new UniformGranularitySpec( + Granularity.HOUR, + QueryGranularity.HOUR, + Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z")) + ) + ), + new IndexTask.IndexIOConfig( + new LocalFirehoseFactory( + tmpDir, + "druid*", + null + ) + ), + null + ), + new DefaultObjectMapper() + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + } + }