diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index b3e2b4ade80..b7380379fdd 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -193,7 +193,8 @@ protected void addInputPathRecursively(List result, if (stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { - result.add(stat); + result.add(org.apache.hadoop.mapreduce.lib.input. + FileInputFormat.shrinkStatus(stat)); } } } @@ -290,7 +291,8 @@ private List singleThreadedListStatus(JobConf job, Path[] dirs, addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { - result.add(stat); + result.add(org.apache.hadoop.mapreduce.lib.input. + FileInputFormat.shrinkStatus(stat)); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index a248f1401cb..4cb36a5576b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -259,7 +259,8 @@ public Result call() throws Exception { if (recursive && stat.isDirectory()) { result.dirsNeedingRecursiveCalls.add(stat); } else { - result.locatedFileStatuses.add(stat); + result.locatedFileStatuses.add(org.apache.hadoop.mapreduce.lib. + input.FileInputFormat.shrinkStatus(stat)); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 22efe1471f9..1b3365c2cf1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -325,7 +325,7 @@ private List singleThreadedListStatus(JobContext job, Path[] dirs, addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { - result.add(stat); + result.add(shrinkStatus(stat)); } } } @@ -364,13 +364,42 @@ protected void addInputPathRecursively(List result, if (stat.isDirectory()) { addInputPathRecursively(result, fs, stat.getPath(), inputFilter); } else { - result.add(stat); + result.add(shrinkStatus(stat)); } } } } - - + + /** + * The HdfsBlockLocation includes a LocatedBlock which contains messages + * for issuing more detailed queries to datanodes about a block, but these + * messages are useless during job submission currently. This method tries + * to exclude the LocatedBlock from HdfsBlockLocation by creating a new + * BlockLocation from original, reshaping the LocatedFileStatus, + * allowing {@link #listStatus(JobContext)} to scan more files with less + * memory footprint. + * @see BlockLocation + * @see org.apache.hadoop.fs.HdfsBlockLocation + * @param origStat The fat FileStatus. + * @return The FileStatus that has been shrunk. + */ + public static FileStatus shrinkStatus(FileStatus origStat) { + if (origStat.isDirectory() || origStat.getLen() == 0 || + !(origStat instanceof LocatedFileStatus)) { + return origStat; + } else { + BlockLocation[] blockLocations = + ((LocatedFileStatus)origStat).getBlockLocations(); + BlockLocation[] locs = new BlockLocation[blockLocations.length]; + int i = 0; + for (BlockLocation location : blockLocations) { + locs[i++] = new BlockLocation(location); + } + LocatedFileStatus newStat = new LocatedFileStatus(origStat, locs); + return newStat; + } + } + /** * A factory that makes the split for this class. It can be overridden * by sub-classes to make sub-types diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 3897a9b2b3b..ca30bf34750 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -32,11 +32,17 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -238,6 +244,50 @@ public void testListStatusErrorOnNonExistantDir() throws IOException { } } + @Test + public void testShrinkStatus() throws IOException { + Configuration conf = getConfiguration(); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Path dir1 = new Path("test:/a1"); + RemoteIterator statuses = mockFs.listLocatedStatus(dir1); + boolean verified = false; + while (statuses.hasNext()) { + LocatedFileStatus orig = statuses.next(); + LocatedFileStatus shrink = + (LocatedFileStatus)FileInputFormat.shrinkStatus(orig); + Assert.assertTrue(orig.equals(shrink)); + if (shrink.getBlockLocations() != null) { + Assert.assertEquals(orig.getBlockLocations().length, + shrink.getBlockLocations().length); + for (int i = 0; i < shrink.getBlockLocations().length; i++) { + verified = true; + BlockLocation location = shrink.getBlockLocations()[i]; + BlockLocation actual = orig.getBlockLocations()[i]; + Assert.assertNotNull(((HdfsBlockLocation)actual).getLocatedBlock()); + Assert.assertEquals(BlockLocation.class.getName(), + location.getClass().getName()); + Assert.assertArrayEquals(actual.getHosts(), location.getHosts()); + Assert.assertArrayEquals(actual.getCachedHosts(), + location.getCachedHosts()); + Assert.assertArrayEquals(actual.getStorageIds(), + location.getStorageIds()); + Assert.assertArrayEquals(actual.getStorageTypes(), + location.getStorageTypes()); + Assert.assertArrayEquals(actual.getTopologyPaths(), + location.getTopologyPaths()); + Assert.assertArrayEquals(actual.getNames(), location.getNames()); + Assert.assertEquals(actual.getLength(), location.getLength()); + Assert.assertEquals(actual.getOffset(), location.getOffset()); + Assert.assertEquals(actual.isCorrupt(), location.isCorrupt()); + } + } else { + Assert.assertTrue(orig.getBlockLocations() == null); + } + } + Assert.assertTrue(verified); + } + public static List configureTestSimple(Configuration conf, FileSystem localFs) throws IOException { Path base1 = new Path(TEST_ROOT_DIR, "input1"); @@ -437,10 +487,31 @@ public FileStatus[] listStatus(Path f, PathFilter filter) @Override public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { - return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:9866", "otherhost:9866" }, - new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, - new String[0], 0, len, false) }; } + DatanodeInfo[] ds = new DatanodeInfo[2]; + ds[0] = new DatanodeDescriptor( + new DatanodeID("127.0.0.1", "localhost", "abcd", + 9866, 9867, 9868, 9869)); + ds[1] = new DatanodeDescriptor( + new DatanodeID("1.0.0.1", "otherhost", "efgh", + 9866, 9867, 9868, 9869)); + long blockLen = len / 3; + ExtendedBlock b1 = new ExtendedBlock("bpid", 0, blockLen, 0); + ExtendedBlock b2 = new ExtendedBlock("bpid", 1, blockLen, 1); + ExtendedBlock b3 = new ExtendedBlock("bpid", 2, len - 2 * blockLen, 2); + String[] names = new String[]{ "localhost:9866", "otherhost:9866" }; + String[] hosts = new String[]{ "localhost", "otherhost" }; + String[] cachedHosts = {"localhost"}; + BlockLocation loc1 = new BlockLocation(names, hosts, cachedHosts, + new String[0], 0, blockLen, false); + BlockLocation loc2 = new BlockLocation(names, hosts, cachedHosts, + new String[0], blockLen, blockLen, false); + BlockLocation loc3 = new BlockLocation(names, hosts, cachedHosts, + new String[0], 2 * blockLen, len - 2 * blockLen, false); + return new BlockLocation[]{ + new HdfsBlockLocation(loc1, new LocatedBlock(b1, ds)), + new HdfsBlockLocation(loc2, new LocatedBlock(b2, ds)), + new HdfsBlockLocation(loc3, new LocatedBlock(b3, ds)) }; + } @Override protected RemoteIterator listLocatedStatus(Path f,