Merge pull request #1214 from druid-io/fix-arb-gran

This fixes arbitrary gran spec breaking for the benchmarking blog post and other places
This commit is contained in:
Xavier Léauté 2015-03-17 12:28:19 -07:00
commit f60f77d0f5
4 changed files with 97 additions and 7 deletions

View File

@ -152,12 +152,29 @@ This is a special variation of the JSON ParseSpec that lower cases all the colum
## GranularitySpec
The default granularity spec is `uniform`.
### Uniform Granularity Spec
This spec is used to generated segments with uniform intervals.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
### Arbitrary Granularity Spec
This spec is used to generate segments with arbitrary intervals (it tries to create evenly sized segments). This spec is not supported for real-time processing.
| Field | Type | Description | Required |
|-------|------|-------------|----------|
| type | string | The type of granularity spec. | no (default == 'uniform') |
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
# IO Config
Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html).

View File

@ -70,10 +70,10 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
final Path betaInput = new Path(getInputPath());
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getSegmentGranularity();
final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity();
Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
Comparators.inverse(Comparators.<Comparable>comparable())
Map<DateTime, Long> inputModifiedTimes = new TreeMap<>(
Comparators.inverse(Comparators.comparable())
);
for (FileStatus status : FSSpideringIterator.spiderIterable(fs, betaInput)) {

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@ -205,9 +206,13 @@ public class IndexTask extends AbstractFixedIntervalTask
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
Interval interval = granularitySpec.getSegmentGranularity()
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
retVal.add(interval);
DateTime dt = new DateTime(inputRow.getTimestampFromEpoch());
Optional<Interval> interval = granularitySpec.bucketInterval(dt);
if (interval.isPresent()) {
retVal.add(interval.get());
} else {
throw new ISE("Unable to to find a matching interval for [%s]", dt);
}
}
}

View File

@ -36,6 +36,7 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
@ -113,6 +114,73 @@ public class IndexTaskTest
new DefaultObjectMapper()
);
final List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(2, segments.size());
}
@Test
public void testWithArbitraryGranularity() throws Exception
{
File tmpDir = Files.createTempDir();
tmpDir.deleteOnExit();
File tmpFile = File.createTempFile("druid", "index", tmpDir);
tmpFile.deleteOnExit();
PrintWriter writer = new PrintWriter(tmpFile);
writer.println("2014-01-01T00:00:10Z,a,1");
writer.println("2014-01-01T01:00:20Z,b,1");
writer.println("2014-01-01T02:00:30Z,c,1");
writer.close();
IndexTask indexTask = new IndexTask(
null,
new IndexTask.IndexIngestionSpec(
new DataSchema(
"test",
new StringInputRowParser(
new CSVParseSpec(
new TimestampSpec(
"ts",
"auto"
),
new DimensionsSpec(
Arrays.asList("ts"),
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("ts", "dim", "val")
)
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("val", "val")
},
new ArbitraryGranularitySpec(
QueryGranularity.MINUTE,
Arrays.asList(new Interval("2014/2015"))
)
),
new IndexTask.IndexIOConfig(
new LocalFirehoseFactory(
tmpDir,
"druid*",
null
)
),
null
),
new DefaultObjectMapper()
);
List<DataSegment> segments = runTask(indexTask);
Assert.assertEquals(1, segments.size());
}
private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{
final List<DataSegment> segments = Lists.newArrayList();
indexTask.run(
@ -156,6 +224,6 @@ public class IndexTaskTest
)
);
Assert.assertEquals(2, segments.size());
return segments;
}
}