diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java index 0d6ccf500fb..170cae5027d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java @@ -438,6 +438,13 @@ public class HadoopDruidIndexerConfig ); } + public List getInputIntervals() + { + return schema.getDataSchema() + .getGranularitySpec() + .inputIntervals(); + } + public Optional> getAllBuckets() { Optional> intervals = getSegmentGranularIntervals(); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java index 2d3c84ca1d5..e7659b1d528 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/GranularityPathSpec.java @@ -20,7 +20,6 @@ package io.druid.indexer.path; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Optional; import com.google.common.collect.Sets; import io.druid.indexer.HadoopDruidIndexerConfig; @@ -113,13 +112,10 @@ public class GranularityPathSpec implements PathSpec @Override public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException { - final Set intervals = Sets.newTreeSet(Comparators.intervals()); - Optional> optionalIntervals = config.getSegmentGranularIntervals(); - if (optionalIntervals.isPresent()) { - for (Interval segmentInterval : optionalIntervals.get()) { - for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) { - intervals.add(dataInterval); - } + final Set intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd()); + for (Interval inputInterval : config.getInputIntervals()) { + for (Interval interval : dataGranularity.getIterable(inputInterval)) { + intervals.add(trim(inputInterval, interval)); } } @@ -158,4 +154,22 @@ public class GranularityPathSpec implements PathSpec return job; } + + private Interval trim(Interval inputInterval, Interval interval) + { + long start = interval.getStartMillis(); + long end = interval.getEndMillis(); + + boolean makeNew = false; + if (start < inputInterval.getStartMillis()) { + start = inputInterval.getStartMillis(); + makeNew = true; + } + if (end > inputInterval.getEndMillis()) { + end = inputInterval.getEndMillis(); + makeNew = true; + } + return makeNew ? new Interval(start, end) : interval; + } + } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index be59daee08f..0600c16bf5d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -44,6 +44,9 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.IOException; +import java.util.Arrays; + public class GranularityPathSpecTest { private GranularityPathSpec granularityPathSpec; @@ -173,6 +176,67 @@ public class GranularityPathSpecTest Assert.assertEquals("Did not find expected input paths", expected, actual); } + @Test + public void testIntervalTrimming() throws Exception + { + UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"})); + HadoopIngestionSpec spec = new HadoopIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularity.DAY, + QueryGranularity.ALL, + ImmutableList.of(new Interval("2015-01-01T11Z/2015-01-02T05Z")) + ), + jsonMapper + ), + new HadoopIOConfig(null, null, null), + new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null) + ); + + granularityPathSpec.setDataGranularity(Granularity.HOUR); + granularityPathSpec.setPathFormat("yyyy/MM/dd/HH"); + granularityPathSpec.setFilePattern(".*"); + granularityPathSpec.setInputFormat(TextInputFormat.class); + + Job job = Job.getInstance(); + String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat"; + + createFile( + testFolder, + "test/2015/01/01/00/file1", "test/2015/01/01/10/file2", "test/2015/01/01/18/file3", "test/2015/01/02/00/file1", + "test/2015/01/02/03/file2", "test/2015/01/02/05/file3", "test/2015/01/02/07/file4", "test/2015/01/02/09/file5" + ); + + granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test"); + + granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job); + + String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats"); + + String expected = Joiner.on(",").join( + Lists.newArrayList( + String.format(formatStr, testFolder.getRoot(), "test/2015/01/01/18/file3"), + String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/00/file1"), + String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/03/file2") + ) + ); + + Assert.assertEquals("Did not find expected input paths", expected, actual); + } + + private void createFile(TemporaryFolder folder, String... files) throws IOException + { + for (String file : files) { + String[] split = file.split("/"); + Assert.assertTrue(split.length > 1); + folder.newFolder(Arrays.copyOfRange(split, 0, split.length - 1)); + folder.newFile(file); + } + } + private void testSerde( String inputPath, String filePattern, diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java index b72ebba0da0..ee197368b73 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/ArbitraryGranularitySpec.java @@ -22,6 +22,7 @@ package io.druid.segment.indexing.granularity; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.PeekingIterator; @@ -107,7 +108,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec @JsonProperty("intervals") public Optional> bucketIntervals() { - return Optional.of((SortedSet) intervals); + return Optional.>of(intervals); + } + + @Override + public List inputIntervals() + { + return ImmutableList.copyOf(intervals); } @Override diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java index 0e1981c91b6..aa4de777f8e 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/GranularitySpec.java @@ -29,11 +29,12 @@ import io.druid.java.util.common.Granularity; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.List; import java.util.SortedSet; /** * Tells the indexer how to group events based on timestamp. The events may then be further partitioned based - * on anything, using a ShardSpec. + * on anything, using a ShardSpec. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = UniformGranularitySpec.class) @JsonSubTypes(value = { @@ -47,14 +48,22 @@ public interface GranularitySpec * * @return set of all time groups */ - public Optional> bucketIntervals(); + public Optional> bucketIntervals(); + + /** + * Returns user provided intervals as-is state. used for configuring granular path spec + * + * @return + */ + public List inputIntervals(); /** * Time-grouping interval corresponding to some instant, if any. * * @param dt instant to return time interval for + * * @return optional time interval - * */ + */ public Optional bucketInterval(DateTime dt); public Granularity getSegmentGranularity(); diff --git a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java index 3a50248d084..a70affec76d 100644 --- a/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java +++ b/server/src/main/java/io/druid/segment/indexing/granularity/UniformGranularitySpec.java @@ -101,6 +101,12 @@ public class UniformGranularitySpec implements GranularitySpec } } + @Override + public List inputIntervals() + { + return inputIntervals == null ? ImmutableList.of() : ImmutableList.copyOf(inputIntervals); + } + @Override public Optional bucketInterval(DateTime dt) {