Merge pull request #1264 from druid-io/fix-index-task

Ignore rows with invalid interval for index task
This commit is contained in:
Xavier Léauté 2015-04-06 17:23:07 -07:00
commit e03709b396
2 changed files with 66 additions and 1 deletions

View File

@ -203,6 +203,7 @@ public class IndexTask extends AbstractFixedIntervalTask
final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec(); final GranularitySpec granularitySpec = ingestionSchema.getDataSchema().getGranularitySpec();
SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); SortedSet<Interval> retVal = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
int unparsed = 0;
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow(); final InputRow inputRow = firehose.nextRow();
@ -211,10 +212,13 @@ public class IndexTask extends AbstractFixedIntervalTask
if (interval.isPresent()) { if (interval.isPresent()) {
retVal.add(interval.get()); retVal.add(interval.get());
} else { } else {
throw new ISE("Unable to to find a matching interval for [%s]", dt); unparsed++;
} }
} }
} }
if (unparsed > 0) {
log.warn("Unable to to find a matching interval for [%,d] events", unparsed);
}
return retVal; return retVal;
} }

View File

@ -226,4 +226,65 @@ public class IndexTaskTest
return segments; return segments;
} }
@Test
public void testIntervalBucketing() throws Exception
{
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2015-03-01T07:59:59.977Z,a,1");
writer.println("2015-03-01T08:00:00.000Z,b,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto"
),
new DimensionsSpec(
Arrays.asList("dim"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new UniformGranularitySpec(
Granularity.HOUR,
QueryGranularity.HOUR,
Arrays.asList(new Interval("2015-03-01T08:00:00Z/2015-03-01T09:00:00Z"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
),
new DefaultObjectMapper()
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
}
} }