diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index bd9d391afde..bcafb120af8 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -6,6 +6,8 @@ Release 2.7.0 - UNRELEASED NEW FEATURES + HADOOP-10987. Provide an iterator-based listing API for FileSystem (kihwal) + IMPROVEMENTS HADOOP-11156. DelegateToFileSystem should implement diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 894e9e064a5..745238cdbc6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -1701,6 +1701,36 @@ public abstract class FileSystem extends Configured implements Closeable { }; } + /** + * Returns a remote iterator so that followup calls are made on demand + * while consuming the entries. Each file system implementation should + * override this method and provide a more efficient implementation, if + * possible. + * + * @param p target path + * @return remote iterator + */ + public RemoteIterator listStatusIterator(final Path p) + throws FileNotFoundException, IOException { + return new RemoteIterator() { + private final FileStatus[] stats = listStatus(p); + private int i = 0; + + @Override + public boolean hasNext() { + return i listStatusIterator(Path f) + throws IOException { + return fs.listStatusIterator(f); + } + @Override public Path getHomeDirectory() { return fs.getHomeDirectory(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java index 9a5f40edf67..933ad1a2358 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemTestWrapper.java @@ -334,37 +334,11 @@ public final class FileSystemTestWrapper extends FSTestWrapper { return fs.getFileChecksum(f); } - private class FakeRemoteIterator implements RemoteIterator { - - private E[] elements; - private int count; - - FakeRemoteIterator(E[] elements) { - this.elements = elements; - count = 0; - } - - @Override - public boolean hasNext() throws IOException { - return count < elements.length; - } - - @Override - public E next() throws IOException { - if (hasNext()) { - return elements[count++]; - } - return null; - } - } - @Override public RemoteIterator listStatusIterator(Path f) throws AccessControlException, FileNotFoundException, UnsupportedFileSystemException, IOException { - // Fake the RemoteIterator, because FileSystem has no such thing - FileStatus[] statuses = fs.listStatus(f); - return new FakeRemoteIterator(statuses); + return fs.listStatusIterator(f); } @Override diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 1e86439785b..374bb2e8eb3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -125,6 +125,7 @@ public class TestHarFileSystem { public Iterator listLocatedStatus(Path f); public Iterator listLocatedStatus(Path f, PathFilter filter); + public Iterator listStatusIterator(Path f); public void copyFromLocalFile(Path src, Path dst); public void moveFromLocalFile(Path[] srcs, Path dst); public void moveFromLocalFile(Path src, Path dst); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 067adce1fd6..8152527a71e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem { protected RemoteIterator listLocatedStatus(final Path p, final PathFilter filter) throws IOException { - final Path absF = fixRelativePart(p); - return new RemoteIterator() { - private DirectoryListing thisListing; - private int i; - private String src; - private LocatedFileStatus curStat = null; - - { // initializer - // Fully resolve symlinks in path first to avoid additional resolution - // round-trips as we fetch more batches of listings - src = getPathName(resolvePath(absF)); - // fetch the first batch of entries in the directory - thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true); - statistics.incrementReadOps(1); - if (thisListing == null) { // the directory does not exist - throw new FileNotFoundException("File " + p + " does not exist."); - } + Path absF = fixRelativePart(p); + return new FileSystemLinkResolver>() { + @Override + public RemoteIterator doCall(final Path p) + throws IOException, UnresolvedLinkException { + return new DirListingIterator(p, filter, true); } @Override - public boolean hasNext() throws IOException { - while (curStat == null && hasNextNoFilter()) { - LocatedFileStatus next = - ((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++]) - .makeQualifiedLocated(getUri(), absF); - if (filter.accept(next.getPath())) { - curStat = next; - } + public RemoteIterator next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + return ((DistributedFileSystem)fs).listLocatedStatus(p, filter); } - return curStat != null; + // symlink resolution for this methos does not work cross file systems + // because it is a protected method. + throw new IOException("Link resolution does not work with multiple " + + "file systems for listLocatedStatus(): " + p); } + }.resolve(this, absF); + } + + + /** + * Returns a remote iterator so that followup calls are made on demand + * while consuming the entries. This reduces memory consumption during + * listing of a large directory. + * + * @param p target path + * @return remote iterator + */ + @Override + public RemoteIterator listStatusIterator(final Path p) + throws IOException { + Path absF = fixRelativePart(p); + return new FileSystemLinkResolver>() { + @Override + public RemoteIterator doCall(final Path p) + throws IOException, UnresolvedLinkException { + return new DirListingIterator(p, false); + } + + @Override + public RemoteIterator next(final FileSystem fs, final Path p) + throws IOException { + return ((DistributedFileSystem)fs).listStatusIterator(p); + } + }.resolve(this, absF); + + } + + /** + * This class defines an iterator that returns + * the file status of each file/subdirectory of a directory + * + * if needLocation, status contains block location if it is a file + * throws a RuntimeException with the error as its cause. + * + * @param the type of the file status + */ + private class DirListingIterator + implements RemoteIterator { + private DirectoryListing thisListing; + private int i; + private Path p; + private String src; + private T curStat = null; + private PathFilter filter; + private boolean needLocation; + + private DirListingIterator(Path p, PathFilter filter, + boolean needLocation) throws IOException { + this.p = p; + this.src = getPathName(p); + this.filter = filter; + this.needLocation = needLocation; + // fetch the first batch of entries in the directory + thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, + needLocation); + statistics.incrementReadOps(1); + if (thisListing == null) { // the directory does not exist + throw new FileNotFoundException("File " + p + " does not exist."); + } + i = 0; + } + + private DirListingIterator(Path p, boolean needLocation) + throws IOException { + this(p, null, needLocation); + } + + @Override + @SuppressWarnings("unchecked") + public boolean hasNext() throws IOException { + while (curStat == null && hasNextNoFilter()) { + T next; + HdfsFileStatus fileStat = thisListing.getPartialListing()[i++]; + if (needLocation) { + next = (T)((HdfsLocatedFileStatus)fileStat) + .makeQualifiedLocated(getUri(), p); + } else { + next = (T)fileStat.makeQualified(getUri(), p); + } + // apply filter if not null + if (filter == null || filter.accept(next.getPath())) { + curStat = next; + } + } + return curStat != null; + } - /** Check if there is a next item before applying the given filter */ - private boolean hasNextNoFilter() throws IOException { + /** Check if there is a next item before applying the given filter */ + private boolean hasNextNoFilter() throws IOException { + if (thisListing == null) { + return false; + } + if (i >= thisListing.getPartialListing().length + && thisListing.hasMore()) { + // current listing is exhausted & fetch a new listing + thisListing = dfs.listPaths(src, thisListing.getLastName(), + needLocation); + statistics.incrementReadOps(1); if (thisListing == null) { return false; } - if (i>=thisListing.getPartialListing().length - && thisListing.hasMore()) { - // current listing is exhausted & fetch a new listing - thisListing = dfs.listPaths(src, thisListing.getLastName(), true); - statistics.incrementReadOps(1); - if (thisListing == null) { - return false; - } - i = 0; - } - return (i itor = fc.listStatus(dir); assertFalse(dir + " should be empty", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertFalse(dir + " should be empty", itor.hasNext()); + // create another file that is smaller than a block. Path file2 = new Path(dir, "filestatus2.dat"); writeFile(fs, file2, 1, blockSize/4, blockSize); @@ -264,6 +267,12 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse("Unexpected addtional file", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse("Unexpected addtional file", itor.hasNext()); + + // Test iterative listing. Now dir has 2 entries, create one more. Path dir3 = fs.makeQualified(new Path(dir, "dir3")); fs.mkdirs(dir3); @@ -280,6 +289,12 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse("Unexpected addtional file", itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(dir3.toString(), itor.next().getPath().toString()); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse("Unexpected addtional file", itor.hasNext()); + // Now dir has 3 entries, create two more Path dir4 = fs.makeQualified(new Path(dir, "dir4")); fs.mkdirs(dir4); @@ -303,6 +318,14 @@ public class TestFileStatus { assertEquals(file3.toString(), itor.next().getPath().toString()); assertFalse(itor.hasNext()); + itor = fs.listStatusIterator(dir); + assertEquals(dir3.toString(), itor.next().getPath().toString()); + assertEquals(dir4.toString(), itor.next().getPath().toString()); + assertEquals(dir5.toString(), itor.next().getPath().toString()); + assertEquals(file2.toString(), itor.next().getPath().toString()); + assertEquals(file3.toString(), itor.next().getPath().toString()); + assertFalse(itor.hasNext()); + { //test permission error on hftp fs.setPermission(dir, new FsPermission((short)0)); try {