From bfe10bd1564b470aa8b1f13165d70cbf5e343b65 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 17 Mar 2015 10:36:35 -0700 Subject: [PATCH] This fixes arbitrary gran spec breaking --- docs/content/Ingestion.md | 17 +++++ .../path/GranularUnprocessedPathSpec.java | 6 +- .../druid/indexing/common/task/IndexTask.java | 11 ++- .../indexing/common/task/IndexTaskTest.java | 70 ++++++++++++++++++- 4 files changed, 97 insertions(+), 7 deletions(-) diff --git a/docs/content/Ingestion.md b/docs/content/Ingestion.md index 09fb792bca9..efdb43c5cb5 100644 --- a/docs/content/Ingestion.md +++ b/docs/content/Ingestion.md @@ -152,12 +152,29 @@ This is a special variation of the JSON ParseSpec that lower cases all the colum ## GranularitySpec +The default granularity spec is `uniform`. + +### Uniform Granularity Spec + +This spec is used to generated segments with uniform intervals. + | Field | Type | Description | Required | |-------|------|-------------|----------| +| type | string | The type of granularity spec. | no (default == 'uniform') | | segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') | | queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | | intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | +### Arbitrary Granularity Spec + +This spec is used to generate segments with arbitrary intervals (it tries to create evenly sized segments). This spec is not supported for real-time processing. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| type | string | The type of granularity spec. | no (default == 'uniform') | +| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') | +| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time | + # IO Config Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html). diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java index 7cb941d09f5..4613f934ac8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularUnprocessedPathSpec.java @@ -70,10 +70,10 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec final Path betaInput = new Path(getInputPath()); final FileSystem fs = betaInput.getFileSystem(job.getConfiguration()); - final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getSegmentGranularity(); + final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity(); - Map inputModifiedTimes = new TreeMap( - Comparators.inverse(Comparators.comparable()) + Map inputModifiedTimes = new TreeMap<>( + Comparators.inverse(Comparators.comparable()) ); for (FileStatus status : FSSpideringIterator.spiderIterable(fs, betaInput)) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 7f7349690d4..888518ae8e9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -205,9 +206,13 @@ public class IndexTask extends AbstractFixedIntervalTask try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) { while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - Interval interval = granularitySpec.getSegmentGranularity() - .bucket(new DateTime(inputRow.getTimestampFromEpoch())); - retVal.add(interval); + DateTime dt = new DateTime(inputRow.getTimestampFromEpoch()); + Optional interval = granularitySpec.bucketInterval(dt); + if (interval.isPresent()) { + retVal.add(interval.get()); + } else { + throw new ISE("Unable to to find a matching interval for [%s]", dt); + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 6e54532c08d..39f5621e0a0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -36,6 +36,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; @@ -113,6 +114,73 @@ public class IndexTaskTest new DefaultObjectMapper() ); + final List segments = runTask(indexTask); + + Assert.assertEquals(2, segments.size()); + } + + @Test + public void testWithArbitraryGranularity() throws Exception + { + File tmpDir = Files.createTempDir(); + tmpDir.deleteOnExit(); + + File tmpFile = File.createTempFile("druid", "index", tmpDir); + tmpFile.deleteOnExit(); + + PrintWriter writer = new PrintWriter(tmpFile); + writer.println("2014-01-01T00:00:10Z,a,1"); + writer.println("2014-01-01T01:00:20Z,b,1"); + writer.println("2014-01-01T02:00:30Z,c,1"); + writer.close(); + + IndexTask indexTask = new IndexTask( + null, + new IndexTask.IndexIngestionSpec( + new DataSchema( + "test", + new StringInputRowParser( + new CSVParseSpec( + new TimestampSpec( + "ts", + "auto" + ), + new DimensionsSpec( + Arrays.asList("ts"), + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("ts", "dim", "val") + ) + ), + new AggregatorFactory[]{ + new LongSumAggregatorFactory("val", "val") + }, + new ArbitraryGranularitySpec( + QueryGranularity.MINUTE, + Arrays.asList(new Interval("2014/2015")) + ) + ), + new IndexTask.IndexIOConfig( + new LocalFirehoseFactory( + tmpDir, + "druid*", + null + ) + ), + null + ), + new DefaultObjectMapper() + ); + + List segments = runTask(indexTask); + + Assert.assertEquals(1, segments.size()); + } + + private final List runTask(final IndexTask indexTask) throws Exception + { final List segments = Lists.newArrayList(); indexTask.run( @@ -156,6 +224,6 @@ public class IndexTaskTest ) ); - Assert.assertEquals(2, segments.size()); + return segments; } }