Trimming out outside of given interval (#2798)

* Trimming out outside of given interval (Fix for #2659)

* addressed comments
This commit is contained in:
Navis Ryu 2016-12-08 11:05:50 +09:00 committed by Fangjin Yang
parent bb9e35e1af
commit f794246ec1
6 changed files with 119 additions and 12 deletions

View File

@ -438,6 +438,13 @@ public class HadoopDruidIndexerConfig
); );
} }
public List<Interval> getInputIntervals()
{
return schema.getDataSchema()
.getGranularitySpec()
.inputIntervals();
}
public Optional<Iterable<Bucket>> getAllBuckets() public Optional<Iterable<Bucket>> getAllBuckets()
{ {
Optional<Set<Interval>> intervals = getSegmentGranularIntervals(); Optional<Set<Interval>> intervals = getSegmentGranularIntervals();

View File

@ -20,7 +20,6 @@
package io.druid.indexer.path; package io.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.indexer.HadoopDruidIndexerConfig; import io.druid.indexer.HadoopDruidIndexerConfig;
@ -113,13 +112,10 @@ public class GranularityPathSpec implements PathSpec
@Override @Override
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{ {
final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals()); final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
Optional<Set<Interval>> optionalIntervals = config.getSegmentGranularIntervals(); for (Interval inputInterval : config.getInputIntervals()) {
if (optionalIntervals.isPresent()) { for (Interval interval : dataGranularity.getIterable(inputInterval)) {
for (Interval segmentInterval : optionalIntervals.get()) { intervals.add(trim(inputInterval, interval));
for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
intervals.add(dataInterval);
}
} }
} }
@ -158,4 +154,22 @@ public class GranularityPathSpec implements PathSpec
return job; return job;
} }
private Interval trim(Interval inputInterval, Interval interval)
{
long start = interval.getStartMillis();
long end = interval.getEndMillis();
boolean makeNew = false;
if (start < inputInterval.getStartMillis()) {
start = inputInterval.getStartMillis();
makeNew = true;
}
if (end > inputInterval.getEndMillis()) {
end = inputInterval.getEndMillis();
makeNew = true;
}
return makeNew ? new Interval(start, end) : interval;
}
} }

View File

@ -44,6 +44,9 @@ import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TemporaryFolder; import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
public class GranularityPathSpecTest public class GranularityPathSpecTest
{ {
private GranularityPathSpec granularityPathSpec; private GranularityPathSpec granularityPathSpec;
@ -173,6 +176,67 @@ public class GranularityPathSpecTest
Assert.assertEquals("Did not find expected input paths", expected, actual); Assert.assertEquals("Did not find expected input paths", expected, actual);
} }
@Test
public void testIntervalTrimming() throws Exception
{
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"}));
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.ALL,
ImmutableList.of(new Interval("2015-01-01T11Z/2015-01-02T05Z"))
),
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null)
);
granularityPathSpec.setDataGranularity(Granularity.HOUR);
granularityPathSpec.setPathFormat("yyyy/MM/dd/HH");
granularityPathSpec.setFilePattern(".*");
granularityPathSpec.setInputFormat(TextInputFormat.class);
Job job = Job.getInstance();
String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat";
createFile(
testFolder,
"test/2015/01/01/00/file1", "test/2015/01/01/10/file2", "test/2015/01/01/18/file3", "test/2015/01/02/00/file1",
"test/2015/01/02/03/file2", "test/2015/01/02/05/file3", "test/2015/01/02/07/file4", "test/2015/01/02/09/file5"
);
granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test");
granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job);
String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats");
String expected = Joiner.on(",").join(
Lists.newArrayList(
String.format(formatStr, testFolder.getRoot(), "test/2015/01/01/18/file3"),
String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/00/file1"),
String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/03/file2")
)
);
Assert.assertEquals("Did not find expected input paths", expected, actual);
}
private void createFile(TemporaryFolder folder, String... files) throws IOException
{
for (String file : files) {
String[] split = file.split("/");
Assert.assertTrue(split.length > 1);
folder.newFolder(Arrays.copyOfRange(split, 0, split.length - 1));
folder.newFile(file);
}
}
private void testSerde( private void testSerde(
String inputPath, String inputPath,
String filePattern, String filePattern,

View File

@ -22,6 +22,7 @@ package io.druid.segment.indexing.granularity;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator; import com.google.common.collect.PeekingIterator;
@ -107,7 +108,13 @@ public class ArbitraryGranularitySpec implements GranularitySpec
@JsonProperty("intervals") @JsonProperty("intervals")
public Optional<SortedSet<Interval>> bucketIntervals() public Optional<SortedSet<Interval>> bucketIntervals()
{ {
return Optional.of((SortedSet<Interval>) intervals); return Optional.<SortedSet<Interval>>of(intervals);
}
@Override
public List<Interval> inputIntervals()
{
return ImmutableList.copyOf(intervals);
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import io.druid.java.util.common.Granularity;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.List;
import java.util.SortedSet; import java.util.SortedSet;
/** /**
@ -49,12 +50,20 @@ public interface GranularitySpec
*/ */
public Optional<SortedSet<Interval>> bucketIntervals(); public Optional<SortedSet<Interval>> bucketIntervals();
/**
* Returns user provided intervals as-is state. used for configuring granular path spec
*
* @return
*/
public List<Interval> inputIntervals();
/** /**
* Time-grouping interval corresponding to some instant, if any. * Time-grouping interval corresponding to some instant, if any.
* *
* @param dt instant to return time interval for * @param dt instant to return time interval for
*
* @return optional time interval * @return optional time interval
* */ */
public Optional<Interval> bucketInterval(DateTime dt); public Optional<Interval> bucketInterval(DateTime dt);
public Granularity getSegmentGranularity(); public Granularity getSegmentGranularity();

View File

@ -101,6 +101,12 @@ public class UniformGranularitySpec implements GranularitySpec
} }
} }
@Override
public List<Interval> inputIntervals()
{
return inputIntervals == null ? ImmutableList.<Interval>of() : ImmutableList.copyOf(inputIntervals);
}
@Override @Override
public Optional<Interval> bucketInterval(DateTime dt) public Optional<Interval> bucketInterval(DateTime dt)
{ {