continue hadoop job for sparse intervals

This commit is contained in:
dclim 2015-11-23 19:27:45 -07:00
parent 3048b1f0a5
commit 2308c8c07f
2 changed files with 78 additions and 5 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
@ -31,12 +32,15 @@ import java.util.NoSuchElementException;
*/
public class FSSpideringIterator implements Iterator<FileStatus>
{
public static FSSpideringIterator spiderPathPropogateExceptions(FileSystem fs, Path path)
public static FSSpideringIterator spiderPathPropagateExceptions(FileSystem fs, Path path)
{
try {
final FileStatus[] statii = fs.listStatus(path);
return new FSSpideringIterator(fs, statii == null ? new FileStatus[]{} : statii);
}
catch (FileNotFoundException e) {
return new FSSpideringIterator(fs, new FileStatus[]{});
}
catch (IOException e) {
throw new RuntimeException(e);
}
@ -48,7 +52,7 @@ public class FSSpideringIterator implements Iterator<FileStatus>
{
public Iterator<FileStatus> iterator()
{
return spiderPathPropogateExceptions(fs, path);
return spiderPathPropagateExceptions(fs, path);
}
};
}
@ -82,7 +86,7 @@ public class FSSpideringIterator implements Iterator<FileStatus>
while (hasNext()) {
if (statii[index].isDir()) {
if (statuses == null) {
statuses = spiderPathPropogateExceptions(fs, statii[index].getPath());
statuses = spiderPathPropagateExceptions(fs, statii[index].getPath());
} else if (statuses.hasNext()) {
return statuses.next();
}

View File

@ -20,14 +20,29 @@
package io.druid.indexer.path;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class GranularityPathSpecTest
{
@ -38,6 +53,9 @@ public class GranularityPathSpecTest
private final ObjectMapper jsonMapper = new DefaultObjectMapper();
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
@Before public void setUp()
{
granularityPathSpec = new GranularityPathSpec();
@ -70,7 +88,7 @@ public class GranularityPathSpecTest
{
Granularity granularity = Granularity.DAY;
granularityPathSpec.setDataGranularity(granularity);
Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity());
Assert.assertEquals(granularity, granularityPathSpec.getDataGranularity());
}
@Test
@ -85,6 +103,57 @@ public class GranularityPathSpecTest
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
}
@Test
public void testAddInputPath() throws Exception
{
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"}));
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2015-11-06T00:00Z/2015-11-07T00:00Z"))
),
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null)
);
granularityPathSpec.setDataGranularity(Granularity.HOUR);
granularityPathSpec.setFilePattern(".*");
granularityPathSpec.setInputFormat(TextInputFormat.class);
Job job = Job.getInstance();
String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat";
testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=00");
testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=02");
testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=05");
testFolder.newFile("test/y=2015/m=11/d=06/H=00/file1");
testFolder.newFile("test/y=2015/m=11/d=06/H=02/file2");
testFolder.newFile("test/y=2015/m=11/d=06/H=05/file3");
testFolder.newFile("test/y=2015/m=11/d=06/H=05/file4");
granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test");
granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job);
String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats");
String expected = Joiner.on(",").join(Lists.newArrayList(
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=00/file1"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=02/file2"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file3"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file4")
));
Assert.assertEquals("Did not find expected input paths", expected, actual);
}
private void testSerde(
String inputPath,
String filePattern,