From f6611d89c803e4ba2c11eedf2834f97e7ab216d7 Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Fri, 26 Jul 2013 18:22:00 +0000 Subject: [PATCH] svn merge -c 1507385 FIXES: MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus. Contributed by Hairong Kuang and Jason Lowe git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1507387 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 6 + .../apache/hadoop/mapred/FileInputFormat.java | 89 ++++++++------ .../lib/input/CombineFileInputFormat.java | 69 +++++------ .../mapreduce/lib/input/FileInputFormat.java | 46 ++++--- .../hadoop/mapred/TestFileInputFormat.java | 113 ++++++++++++++++++ .../lib/input/TestFileInputFormat.java | 38 +++++- 6 files changed, 271 insertions(+), 90 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 726bfa999e7..2d3b6386f0d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -19,6 +19,9 @@ Release 2.3.0 - UNRELEASED OPTIMIZATIONS + MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus + (Hairong Kuang and Jason Lowe via jlowe) + BUG FIXES MAPREDUCE-5316. job -list-attempt-ids command does not handle illegal @@ -1107,6 +1110,9 @@ Release 0.23.10 - UNRELEASED OPTIMIZATIONS + MAPREDUCE-1981. Improve getSplits performance by using listLocatedStatus + (Hairong Kuang and Jason Lowe via jlowe) + BUG FIXES MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index 1c9ae3b79e5..bb43e20cbee 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -36,8 +36,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; @@ -169,13 +171,17 @@ public abstract class FileInputFormat implements InputFormat { protected void addInputPathRecursively(List result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } /** List input directories. @@ -221,14 +227,19 @@ public abstract class FileInputFormat implements InputFormat { } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { - for(FileStatus stat: fs.listStatus(globStat.getPath(), - inputFilter)) { - if (recursive && stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -254,7 +265,6 @@ public abstract class FileInputFormat implements InputFormat { /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ - @SuppressWarnings("deprecation") public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { FileStatus[] files = listStatus(job); @@ -278,31 +288,38 @@ public abstract class FileInputFormat implements InputFormat { NetworkTopology clusterMap = new NetworkTopology(); for (FileStatus file: files) { Path path = file.getPath(); - FileSystem fs = path.getFileSystem(job); long length = file.getLen(); - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - if ((length != 0) && isSplitable(fs, path)) { - long blockSize = file.getBlockSize(); - long splitSize = computeSplitSize(goalSize, minSize, blockSize); + if (length != 0) { + FileSystem fs = path.getFileSystem(job); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + blkLocations = fs.getFileBlockLocations(file, 0, length); + } + if (isSplitable(fs, path)) { + long blockSize = file.getBlockSize(); + long splitSize = computeSplitSize(goalSize, minSize, blockSize); - long bytesRemaining = length; - while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, - length-bytesRemaining, splitSize, clusterMap); - splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); - bytesRemaining -= splitSize; + long bytesRemaining = length; + while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { + String[] splitHosts = getSplitHosts(blkLocations, + length-bytesRemaining, splitSize, clusterMap); + splits.add(makeSplit(path, length-bytesRemaining, splitSize, + splitHosts)); + bytesRemaining -= splitSize; + } + + if (bytesRemaining != 0) { + String[] splitHosts = getSplitHosts(blkLocations, length + - bytesRemaining, bytesRemaining, clusterMap); + splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, + splitHosts)); + } + } else { + String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts)); } - - if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length - - bytesRemaining, bytesRemaining, clusterMap); - splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); - } - } else if (length != 0) { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java index 504c7b711a2..1d3d64d18be 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java @@ -21,7 +21,6 @@ package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.HashSet; import java.util.List; import java.util.HashMap; @@ -33,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; @@ -204,46 +203,33 @@ public abstract class CombineFileInputFormat } // all the files in input set - Path[] paths = FileUtil.stat2Paths( - listStatus(job).toArray(new FileStatus[0])); + List stats = listStatus(job); List splits = new ArrayList(); - if (paths.length == 0) { + if (stats.size() == 0) { return splits; } - // Convert them to Paths first. This is a costly operation and - // we should do it first, otherwise we will incur doing it multiple - // times, one time each for each pool in the next loop. - List newpaths = new LinkedList(); - for (int i = 0; i < paths.length; i++) { - FileSystem fs = paths[i].getFileSystem(conf); - Path p = fs.makeQualified(paths[i]); - newpaths.add(p); - } - // In one single iteration, process all the paths in a single pool. // Processing one pool at a time ensures that a split contains paths // from a single pool only. for (MultiPathFilter onepool : pools) { - ArrayList myPaths = new ArrayList(); + ArrayList myPaths = new ArrayList(); // pick one input path. If it matches all the filters in a pool, // add it to the output set - for (Iterator iter = newpaths.iterator(); iter.hasNext();) { - Path p = iter.next(); - if (onepool.accept(p)) { + for (Iterator iter = stats.iterator(); iter.hasNext();) { + FileStatus p = iter.next(); + if (onepool.accept(p.getPath())) { myPaths.add(p); // add it to my output set iter.remove(); } } // create splits for all files in this pool. - getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits); } // create splits for all files that are not in any pool. - getMoreSplits(job, newpaths.toArray(new Path[newpaths.size()]), - maxSize, minSizeNode, minSizeRack, splits); + getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits); // free up rackToNodes map rackToNodes.clear(); @@ -253,7 +239,7 @@ public abstract class CombineFileInputFormat /** * Return all the splits in the specified set of paths */ - private void getMoreSplits(JobContext job, Path[] paths, + private void getMoreSplits(JobContext job, List stats, long maxSize, long minSizeNode, long minSizeRack, List splits) throws IOException { @@ -274,15 +260,16 @@ public abstract class CombineFileInputFormat HashMap> nodeToBlocks = new HashMap>(); - files = new OneFileInfo[paths.length]; - if (paths.length == 0) { + files = new OneFileInfo[stats.size()]; + if (stats.size() == 0) { return; } // populate all the blocks for all files long totLength = 0; - for (int i = 0; i < paths.length; i++) { - files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]), + int i = 0; + for (FileStatus stat : stats) { + files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes, maxSize); totLength += files[i].getLength(); @@ -523,7 +510,7 @@ public abstract class CombineFileInputFormat private long fileSize; // size of the file private OneBlockInfo[] blocks; // all blocks in this file - OneFileInfo(Path path, Configuration conf, + OneFileInfo(FileStatus stat, Configuration conf, boolean isSplitable, HashMap> rackToBlocks, HashMap blockToNodes, @@ -534,10 +521,13 @@ public abstract class CombineFileInputFormat this.fileSize = 0; // get block locations from file system - FileSystem fs = path.getFileSystem(conf); - FileStatus stat = fs.getFileStatus(path); - BlockLocation[] locations = fs.getFileBlockLocations(stat, 0, - stat.getLen()); + BlockLocation[] locations; + if (stat instanceof LocatedFileStatus) { + locations = ((LocatedFileStatus) stat).getBlockLocations(); + } else { + FileSystem fs = stat.getPath().getFileSystem(conf); + locations = fs.getFileBlockLocations(stat, 0, stat.getLen()); + } // create a list of all block and their locations if (locations == null) { blocks = new OneBlockInfo[0]; @@ -552,8 +542,8 @@ public abstract class CombineFileInputFormat // full file length blocks = new OneBlockInfo[1]; fileSize = stat.getLen(); - blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0] - .getHosts(), locations[0].getTopologyPaths()); + blocks[0] = new OneBlockInfo(stat.getPath(), 0, fileSize, + locations[0].getHosts(), locations[0].getTopologyPaths()); } else { ArrayList blocksList = new ArrayList( locations.length); @@ -579,9 +569,9 @@ public abstract class CombineFileInputFormat myLength = Math.min(maxSize, left); } } - OneBlockInfo oneblock = new OneBlockInfo(path, myOffset, - myLength, locations[i].getHosts(), locations[i] - .getTopologyPaths()); + OneBlockInfo oneblock = new OneBlockInfo(stat.getPath(), + myOffset, myLength, locations[i].getHosts(), + locations[i].getTopologyPaths()); left -= myLength; myOffset += myLength; @@ -693,6 +683,9 @@ public abstract class CombineFileInputFormat protected BlockLocation[] getFileBlockLocations( FileSystem fs, FileStatus stat) throws IOException { + if (stat instanceof LocatedFileStatus) { + return ((LocatedFileStatus) stat).getBlockLocations(); + } return fs.getFileBlockLocations(stat, 0, stat.getLen()); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 457e2c48384..e46703d7f9c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -29,9 +29,11 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -259,14 +261,19 @@ public abstract class FileInputFormat extends InputFormat { } else { for (FileStatus globStat: matches) { if (globStat.isDirectory()) { - for(FileStatus stat: fs.listStatus(globStat.getPath(), - inputFilter)) { - if (recursive && stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = + fs.listLocatedStatus(globStat.getPath()); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (recursive && stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), + inputFilter); + } else { + result.add(stat); + } } - } + } } else { result.add(globStat); } @@ -296,13 +303,17 @@ public abstract class FileInputFormat extends InputFormat { protected void addInputPathRecursively(List result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - for(FileStatus stat: fs.listStatus(path, inputFilter)) { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); + RemoteIterator iter = fs.listLocatedStatus(path); + while (iter.hasNext()) { + LocatedFileStatus stat = iter.next(); + if (inputFilter.accept(stat.getPath())) { + if (stat.isDirectory()) { + addInputPathRecursively(result, fs, stat.getPath(), inputFilter); + } else { + result.add(stat); + } } - } + } } @@ -331,8 +342,13 @@ public abstract class FileInputFormat extends InputFormat { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { - FileSystem fs = path.getFileSystem(job.getConfiguration()); - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); + BlockLocation[] blkLocations; + if (file instanceof LocatedFileStatus) { + blkLocations = ((LocatedFileStatus) file).getBlockLocations(); + } else { + FileSystem fs = path.getFileSystem(job.getConfiguration()); + blkLocations = fs.getFileBlockLocations(file, 0, length); + } if (isSplitable(job, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java new file mode 100644 index 00000000000..876b718acf7 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.junit.Assert; +import org.junit.Test; + +public class TestFileInputFormat { + + @Test + public void testListLocatedStatus() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean("fs.test.impl.disable.cache", false); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Assert.assertEquals("listLocatedStatus already called", + 0, mockFs.numListLocatedStatusCalls); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + InputSplit[] splits = fileInputFormat.getSplits(job, 1); + Assert.assertEquals("Input splits are not correct", 2, splits.length); + Assert.assertEquals("listLocatedStatuss calls", + 1, mockFs.numListLocatedStatusCalls); + } + + private Configuration getConfiguration() { + Configuration conf = new Configuration(); + conf.set("fs.test.impl.disable.cache", "true"); + conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1"); + return conf; + } + + static class MockFileSystem extends RawLocalFileSystem { + int numListLocatedStatusCalls = 0; + + @Override + public FileStatus[] listStatus(Path f) throws FileNotFoundException, + IOException { + if (f.toString().equals("test:/a1")) { + return new FileStatus[] { + new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; + } else if (f.toString().equals("test:/a1/a2")) { + return new FileStatus[] { + new FileStatus(10, false, 1, 150, 150, + new Path("test:/a1/a2/file2")), + new FileStatus(10, false, 1, 151, 150, + new Path("test:/a1/a2/file3")) }; + } + return new FileStatus[0]; + } + + @Override + public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + throws IOException { + return new FileStatus[] { new FileStatus(10, true, 1, 150, 150, + pathPattern) }; + } + + @Override + public FileStatus[] listStatus(Path f, PathFilter filter) + throws FileNotFoundException, IOException { + return this.listStatus(f); + } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + return new BlockLocation[] { + new BlockLocation(new String[] { "localhost:50010" }, + new String[] { "localhost" }, 0, len) }; + } + + @Override + protected RemoteIterator listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index e9647829147..77445596d93 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -24,11 +24,14 @@ import java.util.List; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.Test; @@ -77,6 +80,23 @@ public class TestFileInputFormat { .toString()); } + @Test + public void testListLocatedStatus() throws Exception { + Configuration conf = getConfiguration(); + conf.setBoolean("fs.test.impl.disable.cache", false); + conf.set(FileInputFormat.INPUT_DIR, "test:///a1/a2"); + MockFileSystem mockFs = + (MockFileSystem) new Path("test:///").getFileSystem(conf); + Assert.assertEquals("listLocatedStatus already called", + 0, mockFs.numListLocatedStatusCalls); + Job job = Job.getInstance(conf); + FileInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + Assert.assertEquals("Input splits are not correct", 2, splits.size()); + Assert.assertEquals("listLocatedStatuss calls", + 1, mockFs.numListLocatedStatusCalls); + } + private Configuration getConfiguration() { Configuration conf = new Configuration(); conf.set("fs.test.impl.disable.cache", "true"); @@ -86,13 +106,14 @@ public class TestFileInputFormat { } static class MockFileSystem extends RawLocalFileSystem { + int numListLocatedStatusCalls = 0; @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { if (f.toString().equals("test:/a1")) { return new FileStatus[] { - new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")), + new FileStatus(0, true, 1, 150, 150, new Path("test:/a1/a2")), new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) }; } else if (f.toString().equals("test:/a1/a2")) { return new FileStatus[] { @@ -116,5 +137,20 @@ public class TestFileInputFormat { throws FileNotFoundException, IOException { return this.listStatus(f); } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, long start, long len) + throws IOException { + return new BlockLocation[] { + new BlockLocation(new String[] { "localhost:50010" }, + new String[] { "localhost" }, 0, len) }; + } + + @Override + protected RemoteIterator listLocatedStatus(Path f, + PathFilter filter) throws FileNotFoundException, IOException { + ++numListLocatedStatusCalls; + return super.listLocatedStatus(f, filter); + } } }