MAPREDUCE-7086. Add config to allow FileInputFormat to ignore directories when recursive=false. Contributed by Sergey Shelukhin

This commit is contained in:
Jason Lowe 2018-05-01 16:19:53 -05:00
parent 24eeea8b18
commit 68c6ec719d
4 changed files with 54 additions and 8 deletions

View File

@ -82,6 +82,9 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public static final String INPUT_DIR_RECURSIVE = public static final String INPUT_DIR_RECURSIVE =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE; org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS;
private static final double SPLIT_SLOP = 1.1; // 10% slop private static final double SPLIT_SLOP = 1.1; // 10% slop
@ -319,16 +322,24 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public InputSplit[] getSplits(JobConf job, int numSplits) public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException { throws IOException {
StopWatch sw = new StopWatch().start(); StopWatch sw = new StopWatch().start();
FileStatus[] files = listStatus(job); FileStatus[] stats = listStatus(job);
// Save the number of input files for metrics/loadgen // Save the number of input files for metrics/loadgen
job.setLong(NUM_INPUT_FILES, files.length); job.setLong(NUM_INPUT_FILES, stats.length);
long totalSize = 0; // compute total size long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
&& job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
List<FileStatus> files = new ArrayList<>(stats.length);
for (FileStatus file: stats) { // check we have valid files
if (file.isDirectory()) { if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath()); if (!ignoreDirs) {
throw new IOException("Not a file: "+ file.getPath());
}
} else {
files.add(file);
totalSize += file.getLen();
} }
totalSize += file.getLen();
} }
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

View File

@ -76,6 +76,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
"mapreduce.input.fileinputformat.numinputfiles"; "mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE = public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive"; "mapreduce.input.fileinputformat.input.dir.recursive";
public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
"mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs";
public static final String LIST_STATUS_NUM_THREADS = public static final String LIST_STATUS_NUM_THREADS =
"mapreduce.input.fileinputformat.list-status.num-threads"; "mapreduce.input.fileinputformat.list-status.num-threads";
public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1; public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
@ -392,7 +394,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
// generate splits // generate splits
List<InputSplit> splits = new ArrayList<InputSplit>(); List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job); List<FileStatus> files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
for (FileStatus file: files) { for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath(); Path path = file.getPath();
long length = file.getLen(); long length = file.getLen();
if (length != 0) { if (length != 0) {

View File

@ -103,6 +103,21 @@ public class TestFileInputFormat {
FileSystem.closeAll(); FileSystem.closeAll();
} }
@Test
public void testIgnoreDirs() throws Exception {
Configuration conf = getConfiguration();
conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1");
MockFileSystem mockFs = (MockFileSystem) new Path("test:///").getFileSystem(conf);
JobConf job = new JobConf(conf);
TextInputFormat fileInputFormat = new TextInputFormat();
fileInputFormat.configure(job);
InputSplit[] splits = fileInputFormat.getSplits(job, 1);
Assert.assertEquals("Input splits are not correct", 1, splits.length);
FileSystem.closeAll();
}
@Test @Test
public void testSplitLocationInfo() throws Exception { public void testSplitLocationInfo() throws Exception {
Configuration conf = getConfiguration(); Configuration conf = getConfiguration();

View File

@ -123,6 +123,18 @@ public class TestFileInputFormat {
verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits); verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
} }
@Test
public void testNumInputFilesIgnoreDirs() throws Exception {
Configuration conf = getConfiguration();
conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 1, splits.size());
verifySplits(Lists.newArrayList("test:/a1/file1"), splits);
}
@Test @Test
public void testListLocatedStatus() throws Exception { public void testListLocatedStatus() throws Exception {
Configuration conf = getConfiguration(); Configuration conf = getConfiguration();