HADOOP-6890. Part 2: Incoport the change made in FileContext into FileSystem. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@981676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2010-08-02 20:25:44 +00:00
parent d0ba178800
commit 666a8e1600
6 changed files with 162 additions and 60 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.util.Arrays;
import java.util.Iterator;
import java.util.zip.CRC32;
import org.apache.commons.logging.Log;
@ -493,6 +494,21 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
return fs.listStatus(f, DEFAULT_FILTER);
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f
* given path
* @return the statuses of the files/directories in the given patch
* @throws IOException
*/
@Override
public Iterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return fs.listLocatedStatus(f, DEFAULT_FILTER);
}
@Override
public boolean mkdirs(Path f) throws IOException {
return fs.mkdirs(f);

View File

@ -1644,16 +1644,16 @@ public final class FileContext {
*/
@Override
public boolean hasNext() {
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
return true;
}
return true;
}
/**

View File

@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@ -1319,6 +1318,89 @@ public abstract class FileSystem extends Configured implements Closeable {
return globPathsLevel(parents, filePattern, level + 1, hasGlob);
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
* Return the file's status and block locations If the path is a file.
*
* If a returned status is a file, it contains the file's block locations.
*
* @param f is the path
* @param filter path filter
*
* @return an iterator that traverses statuses of the files/directories
* in the given path
* If any IO exception (for example the input directory gets deleted while
* listing is being executed), next() or hasNext() of the returned iterator
* may throw a RuntimeException with the IO exception as the cause.
*
* @throws FileNotFoundException If <code>f</code> does not exist
* @throws IOException If an I/O error occurred
*/
public Iterator<LocatedFileStatus> listLocatedStatus(final Path f)
throws FileNotFoundException, IOException {
return listLocatedStatus(f, DEFAULT_FILTER);
}
/**
* Listing a directory
* The returned results include its block location if it is a file
* The results are filtered by the given path filter
* @param f a path
* @param filter a path filter
* @return an iterator that traverses statuses of the files/directories
* in the given path
* @throws FileNotFoundException if <code>f</code> does not exist
* @throws IOException if any I/O error occurred
*/
protected Iterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
throws FileNotFoundException, IOException {
return new Iterator<LocatedFileStatus>() {
private final FileStatus[] stats = listStatus(f, filter);
private int i = 0;
/**
* {@inheritDoc}
* @return {@inheritDog}
* @throws Runtimeexception if any IOException occurs during traversal;
* the IOException is set as the cause of the RuntimeException
*/
@Override
public boolean hasNext() {
return i<stats.length;
}
/**
* {@inheritDoc}
* @return {@inheritDoc}
* @throws Runtimeexception if any IOException occurs during traversal;
* the IOException is set as the cause of the RuntimeException
* @exception {@inheritDoc}
*/
@Override
public LocatedFileStatus next() {
if (!hasNext()) {
throw new NoSuchElementException("No more entry in " + f);
}
FileStatus result = stats[i++];
try {
BlockLocation[] locs = result.isFile() ?
getFileBlockLocations(result.getPath(), 0, result.getLen()) :
null;
return new LocatedFileStatus(result, locs);
} catch (IOException ioe) {
throw (RuntimeException)new RuntimeException().initCause(ioe);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");
}
};
}
/**
* List the statuses and block locations of the files in the given path.
*
@ -1326,7 +1408,6 @@ public abstract class FileSystem extends Configured implements Closeable {
* if recursive is false, returns files in the directory;
* if recursive is true, return files in the subtree rooted at the path.
* If the path is a file, return the file's status and block locations.
* Files across symbolic links are also returned.
*
* @param f is the path
* @param recursive if the subdirectories need to be traversed recursively
@ -1343,13 +1424,11 @@ public abstract class FileSystem extends Configured implements Closeable {
final Path f, final boolean recursive)
throws FileNotFoundException, IOException {
return new Iterator<LocatedFileStatus>() {
private LinkedList<FileStatus> fileStats = new LinkedList<FileStatus>();
private Stack<FileStatus> dirStats = new Stack<FileStatus>();
{ // initializer
list(f);
}
private Stack<Iterator<LocatedFileStatus>> itors =
new Stack<Iterator<LocatedFileStatus>>();
Iterator<LocatedFileStatus> curItor = listLocatedStatus(f);
LocatedFileStatus curFile;
/**
* {@inheritDoc}
* @return {@inheritDog}
@ -1358,42 +1437,40 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
@Override
public boolean hasNext() {
if (fileStats.isEmpty()) {
listDir();
}
return !fileStats.isEmpty();
}
/**
* list at least one directory until file list is not empty
*/
private void listDir() {
while (fileStats.isEmpty() && !dirStats.isEmpty()) {
FileStatus dir = dirStats.pop();
list(dir.getPath());
while (curFile == null) {
if (curItor.hasNext()) {
handleFileStat(curItor.next());
} else if (!itors.empty()) {
curItor = itors.pop();
} else {
return false;
}
}
return true;
}
/**
* List the given path
*
* @param dirPath a path
* Process the input stat.
* If it is a file, return the file stat.
* If it is a directory, tranverse the directory if recursive is true;
* ignore it if recursive is false.
* @param stat input status
* @throws RuntimeException if any io error occurs; the io exception
* is set as the cause of RuntimeException
*/
private void list(Path dirPath) {
private void handleFileStat(LocatedFileStatus stat) {
try {
FileStatus[] stats = listStatus(dirPath);
for (FileStatus stat : stats) {
if (stat.isFile()) {
fileStats.add(stat);
} else if (recursive) { // directory & recursive
dirStats.push(stat);
}
if (stat.isFile()) { // file
curFile = stat;
} else if (recursive) { // directory
itors.push(curItor);
curItor = listLocatedStatus(stat.getPath());
}
} catch (IOException ioe) {
throw (RuntimeException) new RuntimeException().initCause(ioe);
}
throw (RuntimeException)new RuntimeException().initCause(ioe);
}
}
/**
* {@inheritDoc}
* @return {@inheritDoc}
@ -1403,19 +1480,14 @@ public abstract class FileSystem extends Configured implements Closeable {
*/
@Override
public LocatedFileStatus next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
FileStatus status = fileStats.remove();
try {
BlockLocation[] locs = getFileBlockLocations(
status, 0, status.getLen());
return new LocatedFileStatus(status, locs);
} catch (IOException ioe) {
throw (RuntimeException) new RuntimeException().initCause(ioe);
}
if (hasNext()) {
LocatedFileStatus result = curFile;
curFile = null;
return result;
}
throw new java.util.NoSuchElementException("No more entry in " + f);
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove is not supported");

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.net.URI;
import java.util.EnumSet;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -165,6 +166,12 @@ public class FilterFileSystem extends FileSystem {
return fs.listStatus(f);
}
/** List files and its block locations in a directory. */
public Iterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return fs.listLocatedStatus(f);
}
public Path getHomeDirectory() {
return fs.getHomeDirectory();
}

View File

@ -115,6 +115,13 @@ public class TestFilterFileSystem extends TestCase {
final Path path, final boolean isRecursive) {
return null;
}
public Iterator<LocatedFileStatus> listLocatedStatus(Path f) {
return null;
}
public Iterator<LocatedFileStatus> listLocatedStatus(Path f,
final PathFilter filter) {
return null;
}
public void copyFromLocalFile(Path src, Path dst) { }
public void moveFromLocalFile(Path[] srcs, Path dst) { }
public void moveFromLocalFile(Path src, Path dst) { }

View File

@ -138,13 +138,13 @@ public class TestListFiles {
itor = fs.listFiles(TEST_DIR, true);
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE2), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE3), stat.getPath());
stat = itor.next();
assertTrue(stat.isFile());
assertEquals(fs.makeQualified(FILE1), stat.getPath());
assertFalse(itor.hasNext());
itor = fs.listFiles(TEST_DIR, false);