svn merge -c 1499125 FIXES: MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir. Contributed by Devaraj K

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1499127 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-07-02 22:00:43 +00:00
parent 65bbb30101
commit 199485bcdf
6 changed files with 191 additions and 3 deletions

View File

@ -15,6 +15,9 @@ Release 2.3.0 - UNRELEASED
MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal
task-state (Ashwin Shankar via jlowe)
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
input path dir (Devaraj K via jlowe)
Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -1062,6 +1065,9 @@ Release 0.23.10 - UNRELEASED
BUG FIXES
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
input path dir (Devaraj K via jlowe)
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES

View File

@ -70,6 +70,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
public static final String NUM_INPUT_FILES =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
public static final String INPUT_DIR_RECURSIVE =
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
private static final double SPLIT_SLOP = 1.1; // 10% slop
private long minSplitSize = 1;
@ -192,7 +196,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
// Whether we need to recursive look into the directory structure
boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
List<FileStatus> result = new ArrayList<FileStatus>();
List<IOException> errors = new ArrayList<IOException>();

View File

@ -64,6 +64,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
"mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
public static final String INPUT_DIR_RECURSIVE =
"mapreduce.input.fileinputformat.input.dir.recursive";
private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
@ -103,6 +105,27 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
}
/**
* @param job
* the job to modify
* @param inputDirRecursive
*/
public static void setInputDirRecursive(Job job,
boolean inputDirRecursive) {
job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
inputDirRecursive);
}
/**
* @param job
* the job to look at.
* @return should the files to be read recursively?
*/
public static boolean getInputDirRecursive(JobContext job) {
return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
false);
}
/**
* Get the lower bound on split size imposed by the format.
* @return the number of bytes of the minimal split for this format
@ -210,6 +233,9 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration());
// Whether we need to recursive look into the directory structure
boolean recursive = getInputDirRecursive(job);
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
@ -235,7 +261,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
if (globStat.isDirectory()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
result.add(stat);
if (recursive && stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
} else {
result.add(globStat);
@ -251,6 +281,31 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
return result;
}
/**
* Add files in the input path recursively into the results.
* @param result
* The List to store all files.
* @param fs
* The FileSystem.
* @param path
* The input path.
* @param inputFilter
* The input filter that can be used to filter files/dirs.
* @throws IOException
*/
protected void addInputPathRecursively(List<FileStatus> result,
FileSystem fs, Path path, PathFilter inputFilter)
throws IOException {
for(FileStatus stat: fs.listStatus(path, inputFilter)) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
} else {
result.add(stat);
}
}
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
@ -528,6 +529,8 @@ public class ConfigUtil {
MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
MRJobConfig.SPLIT_METAINFO_MAXSIZE);
Configuration.addDeprecation("mapred.input.dir.recursive",
FileInputFormat.INPUT_DIR_RECURSIVE);
}
public static void main(String[] args) {

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.lib.input;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
public class TestFileInputFormat {
@Test
public void testNumInputFilesRecursively() throws Exception {
Configuration conf = getConfiguration();
conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
.getPath().toString());
Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
.getPath().toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
.toString());
// Using the deprecated configuration
conf = getConfiguration();
conf.set("mapred.input.dir.recursive", "true");
job = Job.getInstance(conf);
splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 3, splits.size());
Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
.getPath().toString());
Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
.getPath().toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
.toString());
}
@Test
public void testNumInputFilesWithoutRecursively() throws Exception {
Configuration conf = getConfiguration();
Job job = Job.getInstance(conf);
FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
List<InputSplit> splits = fileInputFormat.getSplits(job);
Assert.assertEquals("Input splits are not correct", 2, splits.size());
Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
.toString());
Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
.toString());
}
private Configuration getConfiguration() {
Configuration conf = new Configuration();
conf.set("fs.test.impl.disable.cache", "true");
conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class);
conf.set(FileInputFormat.INPUT_DIR, "test:///a1");
return conf;
}
static class MockFileSystem extends RawLocalFileSystem {
@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
if (f.toString().equals("test:/a1")) {
return new FileStatus[] {
new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
} else if (f.toString().equals("test:/a1/a2")) {
return new FileStatus[] {
new FileStatus(10, false, 1, 150, 150,
new Path("test:/a1/a2/file2")),
new FileStatus(10, false, 1, 151, 150,
new Path("test:/a1/a2/file3")) };
}
return new FileStatus[0];
}
@Override
public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
throws IOException {
return new FileStatus[] { new FileStatus(10, true, 1, 150, 150,
pathPattern) };
}
@Override
public FileStatus[] listStatus(Path f, PathFilter filter)
throws FileNotFoundException, IOException {
return this.listStatus(f);
}
}
}

View File

@ -190,7 +190,7 @@ public class TestFileInputFormat extends TestCase {
+ "directory with directories inside.", exceptionThrown);
// Enable multi-level/recursive inputs
job.setBoolean("mapred.input.dir.recursive", true);
job.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true);
InputSplit[] splits = inFormat.getSplits(job, 1);
assertEquals(splits.length, 2);
}