HADOOP-10987. Provide an iterator-based listing API for FileSystem. Contributed by Kihwal Lee.

This commit is contained in:
Kihwal Lee 2014-11-03 08:23:34 -06:00
parent caae0a0a08
commit c8648bd27e
7 changed files with 192 additions and 76 deletions

View File

@ -6,6 +6,8 @@ Release 2.7.0 - UNRELEASED
NEW FEATURES
HADOOP-10987. Provide an iterator-based listing API for FileSystem (kihwal)
IMPROVEMENTS
HADOOP-11156. DelegateToFileSystem should implement

View File

@ -1701,6 +1701,36 @@ public abstract class FileSystem extends Configured implements Closeable {
};
}
/**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. Each file system implementation should
* override this method and provide a more efficient implementation, if
* possible.
*
* @param p target path
* @return remote iterator
*/
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws FileNotFoundException, IOException {
return new RemoteIterator<FileStatus>() {
private final FileStatus[] stats = listStatus(p);
private int i = 0;
@Override
public boolean hasNext() {
return i<stats.length;
}
@Override
public FileStatus next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException("No more entry in " + p);
}
return stats[i++];
}
};
}
/**
* List the statuses and block locations of the files in the given path.
*

View File

@ -251,6 +251,13 @@ public class FilterFileSystem extends FileSystem {
return fs.listLocatedStatus(f);
}
/** Return a remote iterator for listing in a directory */
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path f)
throws IOException {
return fs.listStatusIterator(f);
}
@Override
public Path getHomeDirectory() {
return fs.getHomeDirectory();

View File

@ -334,37 +334,11 @@ public final class FileSystemTestWrapper extends FSTestWrapper {
return fs.getFileChecksum(f);
}
private class FakeRemoteIterator<E> implements RemoteIterator<E> {
private E[] elements;
private int count;
FakeRemoteIterator(E[] elements) {
this.elements = elements;
count = 0;
}
@Override
public boolean hasNext() throws IOException {
return count < elements.length;
}
@Override
public E next() throws IOException {
if (hasNext()) {
return elements[count++];
}
return null;
}
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path f)
throws AccessControlException, FileNotFoundException,
UnsupportedFileSystemException, IOException {
// Fake the RemoteIterator, because FileSystem has no such thing
FileStatus[] statuses = fs.listStatus(f);
return new FakeRemoteIterator<FileStatus>(statuses);
return fs.listStatusIterator(f);
}
@Override

View File

@ -125,6 +125,7 @@ public class TestHarFileSystem {
public Iterator<LocatedFileStatus> listLocatedStatus(Path f);
public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
PathFilter filter);
public Iterator<FileStatus> listStatusIterator(Path f);
public void copyFromLocalFile(Path src, Path dst);
public void moveFromLocalFile(Path[] srcs, Path dst);
public void moveFromLocalFile(Path src, Path dst);

View File

@ -766,66 +766,145 @@ public class DistributedFileSystem extends FileSystem {
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
final PathFilter filter)
throws IOException {
final Path absF = fixRelativePart(p);
return new RemoteIterator<LocatedFileStatus>() {
private DirectoryListing thisListing;
private int i;
private String src;
private LocatedFileStatus curStat = null;
{ // initializer
// Fully resolve symlinks in path first to avoid additional resolution
// round-trips as we fetch more batches of listings
src = getPathName(resolvePath(absF));
// fetch the first batch of entries in the directory
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
statistics.incrementReadOps(1);
if (thisListing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
Path absF = fixRelativePart(p);
return new FileSystemLinkResolver<RemoteIterator<LocatedFileStatus>>() {
@Override
public RemoteIterator<LocatedFileStatus> doCall(final Path p)
throws IOException, UnresolvedLinkException {
return new DirListingIterator<LocatedFileStatus>(p, filter, true);
}
@Override
public boolean hasNext() throws IOException {
while (curStat == null && hasNextNoFilter()) {
LocatedFileStatus next =
((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
.makeQualifiedLocated(getUri(), absF);
if (filter.accept(next.getPath())) {
curStat = next;
}
public RemoteIterator<LocatedFileStatus> next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
return ((DistributedFileSystem)fs).listLocatedStatus(p, filter);
}
return curStat != null;
// symlink resolution for this methos does not work cross file systems
// because it is a protected method.
throw new IOException("Link resolution does not work with multiple " +
"file systems for listLocatedStatus(): " + p);
}
}.resolve(this, absF);
}
/**
* Returns a remote iterator so that followup calls are made on demand
* while consuming the entries. This reduces memory consumption during
* listing of a large directory.
*
* @param p target path
* @return remote iterator
*/
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws IOException {
Path absF = fixRelativePart(p);
return new FileSystemLinkResolver<RemoteIterator<FileStatus>>() {
@Override
public RemoteIterator<FileStatus> doCall(final Path p)
throws IOException, UnresolvedLinkException {
return new DirListingIterator<FileStatus>(p, false);
}
@Override
public RemoteIterator<FileStatus> next(final FileSystem fs, final Path p)
throws IOException {
return ((DistributedFileSystem)fs).listStatusIterator(p);
}
}.resolve(this, absF);
}
/**
* This class defines an iterator that returns
* the file status of each file/subdirectory of a directory
*
* if needLocation, status contains block location if it is a file
* throws a RuntimeException with the error as its cause.
*
* @param <T> the type of the file status
*/
private class DirListingIterator<T extends FileStatus>
implements RemoteIterator<T> {
private DirectoryListing thisListing;
private int i;
private Path p;
private String src;
private T curStat = null;
private PathFilter filter;
private boolean needLocation;
private DirListingIterator(Path p, PathFilter filter,
boolean needLocation) throws IOException {
this.p = p;
this.src = getPathName(p);
this.filter = filter;
this.needLocation = needLocation;
// fetch the first batch of entries in the directory
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
needLocation);
statistics.incrementReadOps(1);
if (thisListing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
i = 0;
}
private DirListingIterator(Path p, boolean needLocation)
throws IOException {
this(p, null, needLocation);
}
@Override
@SuppressWarnings("unchecked")
public boolean hasNext() throws IOException {
while (curStat == null && hasNextNoFilter()) {
T next;
HdfsFileStatus fileStat = thisListing.getPartialListing()[i++];
if (needLocation) {
next = (T)((HdfsLocatedFileStatus)fileStat)
.makeQualifiedLocated(getUri(), p);
} else {
next = (T)fileStat.makeQualified(getUri(), p);
}
// apply filter if not null
if (filter == null || filter.accept(next.getPath())) {
curStat = next;
}
}
return curStat != null;
}
/** Check if there is a next item before applying the given filter */
private boolean hasNextNoFilter() throws IOException {
/** Check if there is a next item before applying the given filter */
private boolean hasNextNoFilter() throws IOException {
if (thisListing == null) {
return false;
}
if (i >= thisListing.getPartialListing().length
&& thisListing.hasMore()) {
// current listing is exhausted & fetch a new listing
thisListing = dfs.listPaths(src, thisListing.getLastName(),
needLocation);
statistics.incrementReadOps(1);
if (thisListing == null) {
return false;
}
if (i>=thisListing.getPartialListing().length
&& thisListing.hasMore()) {
// current listing is exhausted & fetch a new listing
thisListing = dfs.listPaths(src, thisListing.getLastName(), true);
statistics.incrementReadOps(1);
if (thisListing == null) {
return false;
}
i = 0;
}
return (i<thisListing.getPartialListing().length);
i = 0;
}
return (i < thisListing.getPartialListing().length);
}
@Override
public LocatedFileStatus next() throws IOException {
if (hasNext()) {
LocatedFileStatus tmp = curStat;
curStat = null;
return tmp;
}
throw new java.util.NoSuchElementException("No more entry in " + p);
}
};
@Override
public T next() throws IOException {
if (hasNext()) {
T tmp = curStat;
curStat = null;
return tmp;
}
throw new java.util.NoSuchElementException("No more entry in " + p);
}
}
/**

View File

@ -227,6 +227,9 @@ public class TestFileStatus {
RemoteIterator<FileStatus> itor = fc.listStatus(dir);
assertFalse(dir + " should be empty", itor.hasNext());
itor = fs.listStatusIterator(dir);
assertFalse(dir + " should be empty", itor.hasNext());
// create another file that is smaller than a block.
Path file2 = new Path(dir, "filestatus2.dat");
writeFile(fs, file2, 1, blockSize/4, blockSize);
@ -264,6 +267,12 @@ public class TestFileStatus {
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
itor = fs.listStatusIterator(dir);
assertEquals(file2.toString(), itor.next().getPath().toString());
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
// Test iterative listing. Now dir has 2 entries, create one more.
Path dir3 = fs.makeQualified(new Path(dir, "dir3"));
fs.mkdirs(dir3);
@ -280,6 +289,12 @@ public class TestFileStatus {
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
itor = fs.listStatusIterator(dir);
assertEquals(dir3.toString(), itor.next().getPath().toString());
assertEquals(file2.toString(), itor.next().getPath().toString());
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse("Unexpected addtional file", itor.hasNext());
// Now dir has 3 entries, create two more
Path dir4 = fs.makeQualified(new Path(dir, "dir4"));
fs.mkdirs(dir4);
@ -303,6 +318,14 @@ public class TestFileStatus {
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse(itor.hasNext());
itor = fs.listStatusIterator(dir);
assertEquals(dir3.toString(), itor.next().getPath().toString());
assertEquals(dir4.toString(), itor.next().getPath().toString());
assertEquals(dir5.toString(), itor.next().getPath().toString());
assertEquals(file2.toString(), itor.next().getPath().toString());
assertEquals(file3.toString(), itor.next().getPath().toString());
assertFalse(itor.hasNext());
{ //test permission error on hftp
fs.setPermission(dir, new FsPermission((short)0));
try {