mirror of https://github.com/apache/druid.git
This fixes arbitrary gran spec breaking
This commit is contained in:
parent
36b4c6a371
commit
bfe10bd156
|
@ -152,12 +152,29 @@ This is a special variation of the JSON ParseSpec that lower cases all the colum
|
|||
|
||||
## GranularitySpec
|
||||
|
||||
The default granularity spec is `uniform`.
|
||||
|
||||
### Uniform Granularity Spec
|
||||
|
||||
This spec is used to generated segments with uniform intervals.
|
||||
|
||||
| Field | Type | Description | Required |
|
||||
|-------|------|-------------|----------|
|
||||
| type | string | The type of granularity spec. | no (default == 'uniform') |
|
||||
| segmentGranularity | string | The granularity to create segments at. | no (default == 'DAY') |
|
||||
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
|
||||
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
|
||||
|
||||
### Arbitrary Granularity Spec
|
||||
|
||||
This spec is used to generate segments with arbitrary intervals (it tries to create evenly sized segments). This spec is not supported for real-time processing.
|
||||
|
||||
| Field | Type | Description | Required |
|
||||
|-------|------|-------------|----------|
|
||||
| type | string | The type of granularity spec. | no (default == 'uniform') |
|
||||
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
|
||||
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
|
||||
|
||||
# IO Config
|
||||
|
||||
Real-time Ingestion: See [Real-time ingestion](Realtime-ingestion.html).
|
||||
|
|
|
@ -70,10 +70,10 @@ public class GranularUnprocessedPathSpec extends GranularityPathSpec
|
|||
|
||||
final Path betaInput = new Path(getInputPath());
|
||||
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
|
||||
final Granularity segmentGranularity = ((UniformGranularitySpec) config.getGranularitySpec()).getSegmentGranularity();
|
||||
final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity();
|
||||
|
||||
Map<DateTime, Long> inputModifiedTimes = new TreeMap<DateTime, Long>(
|
||||
Comparators.inverse(Comparators.<Comparable>comparable())
|
||||
Map<DateTime, Long> inputModifiedTimes = new TreeMap<>(
|
||||
Comparators.inverse(Comparators.comparable())
|
||||
);
|
||||
|
||||
for (FileStatus status : FSSpideringIterator.spiderIterable(fs, betaInput)) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeName;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterables;
|
||||
|
@ -205,9 +206,13 @@ public class IndexTask extends AbstractFixedIntervalTask
|
|||
try (Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser())) {
|
||||
while (firehose.hasMore()) {
|
||||
final InputRow inputRow = firehose.nextRow();
|
||||
Interval interval = granularitySpec.getSegmentGranularity()
|
||||
.bucket(new DateTime(inputRow.getTimestampFromEpoch()));
|
||||
retVal.add(interval);
|
||||
DateTime dt = new DateTime(inputRow.getTimestampFromEpoch());
|
||||
Optional<Interval> interval = granularitySpec.bucketInterval(dt);
|
||||
if (interval.isPresent()) {
|
||||
retVal.add(interval.get());
|
||||
} else {
|
||||
throw new ISE("Unable to to find a matching interval for [%s]", dt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ import io.druid.jackson.DefaultObjectMapper;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.segment.indexing.DataSchema;
|
||||
import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec;
|
||||
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
|
||||
import io.druid.segment.loading.DataSegmentPusher;
|
||||
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
|
||||
|
@ -113,6 +114,73 @@ public class IndexTaskTest
|
|||
new DefaultObjectMapper()
|
||||
);
|
||||
|
||||
final List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(2, segments.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithArbitraryGranularity() throws Exception
|
||||
{
|
||||
File tmpDir = Files.createTempDir();
|
||||
tmpDir.deleteOnExit();
|
||||
|
||||
File tmpFile = File.createTempFile("druid", "index", tmpDir);
|
||||
tmpFile.deleteOnExit();
|
||||
|
||||
PrintWriter writer = new PrintWriter(tmpFile);
|
||||
writer.println("2014-01-01T00:00:10Z,a,1");
|
||||
writer.println("2014-01-01T01:00:20Z,b,1");
|
||||
writer.println("2014-01-01T02:00:30Z,c,1");
|
||||
writer.close();
|
||||
|
||||
IndexTask indexTask = new IndexTask(
|
||||
null,
|
||||
new IndexTask.IndexIngestionSpec(
|
||||
new DataSchema(
|
||||
"test",
|
||||
new StringInputRowParser(
|
||||
new CSVParseSpec(
|
||||
new TimestampSpec(
|
||||
"ts",
|
||||
"auto"
|
||||
),
|
||||
new DimensionsSpec(
|
||||
Arrays.asList("ts"),
|
||||
Lists.<String>newArrayList(),
|
||||
Lists.<SpatialDimensionSchema>newArrayList()
|
||||
),
|
||||
null,
|
||||
Arrays.asList("ts", "dim", "val")
|
||||
)
|
||||
),
|
||||
new AggregatorFactory[]{
|
||||
new LongSumAggregatorFactory("val", "val")
|
||||
},
|
||||
new ArbitraryGranularitySpec(
|
||||
QueryGranularity.MINUTE,
|
||||
Arrays.asList(new Interval("2014/2015"))
|
||||
)
|
||||
),
|
||||
new IndexTask.IndexIOConfig(
|
||||
new LocalFirehoseFactory(
|
||||
tmpDir,
|
||||
"druid*",
|
||||
null
|
||||
)
|
||||
),
|
||||
null
|
||||
),
|
||||
new DefaultObjectMapper()
|
||||
);
|
||||
|
||||
List<DataSegment> segments = runTask(indexTask);
|
||||
|
||||
Assert.assertEquals(1, segments.size());
|
||||
}
|
||||
|
||||
private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
|
||||
{
|
||||
final List<DataSegment> segments = Lists.newArrayList();
|
||||
|
||||
indexTask.run(
|
||||
|
@ -156,6 +224,6 @@ public class IndexTaskTest
|
|||
)
|
||||
);
|
||||
|
||||
Assert.assertEquals(2, segments.size());
|
||||
return segments;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue