svn merge -c 1612400 FIXES: MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories in its results. Contributed by Jason Dere

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1612402 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-07-21 21:30:33 +00:00
parent d038d161c2
commit 0e2fee873d
3 changed files with 59 additions and 1 deletions

View File

@ -26,6 +26,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
enabled if custom output format/committer is used (Sangjin Lee via jlowe)
MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories
in its results (Jason Dere via jlowe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -579,7 +579,7 @@ public abstract class CombineFileInputFormat<K, V>
blocks = new OneBlockInfo[0];
} else {
if(locations.length == 0) {
if(locations.length == 0 && !stat.isDirectory()) {
locations = new BlockLocation[] { new BlockLocation() };
}

View File

@ -1274,6 +1274,61 @@ public class TestCombineFileInputFormat extends TestCase {
fileSys.delete(file.getParent(), true);
}
/**
* Test that directories do not get included as part of getSplits()
*/
@Test
public void testGetSplitsWithDirectory() throws Exception {
MiniDFSCluster dfs = null;
try {
Configuration conf = new Configuration();
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
.build();
dfs.waitActive();
FileSystem fileSys = dfs.getFileSystem();
// Set up the following directory structure:
// /dir1/: directory
// /dir1/file: regular file
// /dir1/dir2/: directory
Path dir1 = new Path("/dir1");
Path file = new Path("/dir1/file1");
Path dir2 = new Path("/dir1/dir2");
if (!fileSys.mkdirs(dir1)) {
throw new IOException("Mkdirs failed to create " + dir1.toString());
}
FSDataOutputStream out = fileSys.create(file);
out.write(new byte[0]);
out.close();
if (!fileSys.mkdirs(dir2)) {
throw new IOException("Mkdirs failed to create " + dir2.toString());
}
// split it using a CombinedFile input format
DummyInputFormat inFormat = new DummyInputFormat();
Job job = Job.getInstance(conf);
FileInputFormat.setInputPaths(job, "/dir1");
List<InputSplit> splits = inFormat.getSplits(job);
// directories should be omitted from getSplits() - we should only see file1 and not dir2
assertEquals(1, splits.size());
CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
assertEquals(1, fileSplit.getNumPaths());
assertEquals(file.getName(), fileSplit.getPath(0).getName());
assertEquals(0, fileSplit.getOffset(0));
assertEquals(0, fileSplit.getLength(0));
} finally {
if (dfs != null) {
dfs.shutdown();
}
}
}
/**
* Test when input files are from non-default file systems
*/