HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)

Contains HADOOP-17300: FileSystem.DirListingIterator.next() call should
return NoSuchElementException

Contributed by Mukund Thakur

Change-Id: I4e7e5c6e295525db9e2de6f416f32bbb81e146d3
This commit is contained in:
Mukund Thakur 2020-10-07 18:29:06 +05:30 committed by Steve Loughran
parent 89314a7bae
commit 475dba1ddf
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
7 changed files with 191 additions and 2 deletions

View File

@ -2228,7 +2228,9 @@ public abstract class FileSystem extends Configured
@Override
@SuppressWarnings("unchecked")
public T next() throws IOException {
Preconditions.checkState(hasNext(), "No more items in iterator");
if (!hasNext()) {
throw new NoSuchElementException("No more items in iterator");
}
if (i == entries.getEntries().length) {
fetchMore();
}

View File

@ -294,6 +294,24 @@ any optimizations.
The atomicity and consistency constraints are as for
`listStatus(Path, PathFilter)`.
### `RemoteIterator<FileStatus> listStatusIterator(Path p)`
Return an iterator enumerating the `FileStatus` entries under
a path. This is similar to `listStatus(Path)` except the fact that
rather than returning an entire list, an iterator is returned.
The result is exactly the same as `listStatus(Path)`, provided no other
caller updates the directory during the listing. Having said that, this does
not guarantee atomicity if other callers are adding/deleting the files
inside the directory while listing is being performed. Different filesystems
may provide a more efficient implementation, for example S3A does the
listing in pages and fetches the next pages asynchronously while a
page is getting processed.
Note that now since the initial listing is async, bucket/path existence
exception may show up later during next() call.
Callers should prefer using listStatusIterator over listStatus as it
is incremental in nature.
### `FileStatus[] listStatus(Path[] paths)`

View File

@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
@ -148,6 +150,7 @@ public abstract class AbstractContractGetFileStatusTest extends
public void testComplexDirActions() throws Throwable {
TreeScanResults tree = createTestTree();
checkListStatusStatusComplexDir(tree);
checkListStatusIteratorComplexDir(tree);
checkListLocatedStatusStatusComplexDir(tree);
checkListFilesComplexDirNonRecursive(tree);
checkListFilesComplexDirRecursive(tree);
@ -169,6 +172,34 @@ public abstract class AbstractContractGetFileStatusTest extends
listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
}
/**
* Test {@link FileSystem#listStatusIterator(Path)} on a complex
* directory tree.
* @param tree directory tree to list.
* @throws Throwable
*/
protected void checkListStatusIteratorComplexDir(TreeScanResults tree)
throws Throwable {
describe("Expect listStatusIterator to list all entries in top dir only");
FileSystem fs = getFileSystem();
TreeScanResults listing = new TreeScanResults(
fs.listStatusIterator(tree.getBasePath()));
listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
List<FileStatus> resWithoutCheckingHasNext =
iteratorToListThroughNextCallsAlone(fs
.listStatusIterator(tree.getBasePath()));
List<FileStatus> resWithCheckingHasNext = iteratorToList(fs
.listStatusIterator(tree.getBasePath()));
Assertions.assertThat(resWithCheckingHasNext)
.describedAs("listStatusIterator() should return correct " +
"results even if hasNext() calls are not made.")
.hasSameElementsAs(resWithoutCheckingHasNext);
}
/**
* Test {@link FileSystem#listLocatedStatus(Path)} on a complex
* directory tree.
@ -322,6 +353,45 @@ public abstract class AbstractContractGetFileStatusTest extends
verifyStatusArrayMatchesFile(f, getFileSystem().listStatus(f));
}
@Test
public void testListStatusIteratorFile() throws Throwable {
describe("test the listStatusIterator(path) on a file");
Path f = touchf("listStItrFile");
List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
getFileSystem().listStatusIterator(f));
validateListingForFile(f, statusList, false);
List<FileStatus> statusList2 =
(List<FileStatus>) iteratorToListThroughNextCallsAlone(
getFileSystem().listStatusIterator(f));
validateListingForFile(f, statusList2, true);
}
/**
* Validate listing result for an input path which is file.
* @param f file.
* @param statusList list status of a file.
* @param nextCallAlone whether the listing generated just using
* next() calls.
*/
private void validateListingForFile(Path f,
List<FileStatus> statusList,
boolean nextCallAlone) {
String msg = String.format("size of file list returned using %s should " +
"be 1", nextCallAlone ?
"next() calls alone" : "hasNext() and next() calls");
Assertions.assertThat(statusList)
.describedAs(msg)
.hasSize(1);
Assertions.assertThat(statusList.get(0).getPath())
.describedAs("path returned should match with the input path")
.isEqualTo(f);
Assertions.assertThat(statusList.get(0).isFile())
.describedAs("path returned should be a file")
.isEqualTo(true);
}
@Test
public void testListFilesFile() throws Throwable {
describe("test the listStatus(path) on a file");

View File

@ -22,13 +22,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.RemoteIterator;
@ -39,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@ -242,6 +246,13 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
+ "listStatus = " + listStatusResult
+ "listFiles = " + listFilesResult,
fileList.size() <= statuses.length);
List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
fs.listStatusIterator(root));
Assertions.assertThat(statusList)
.describedAs("Result of listStatus(/) and listStatusIterator(/)"
+ " must match")
.hasSameElementsAs(Arrays.stream(statuses)
.collect(Collectors.toList()));
}
@Test

View File

@ -1453,6 +1453,52 @@ public class ContractTestUtils extends Assert {
return list;
}
/**
* Convert a remote iterator over file status results into a list.
* The utility equivalents in commons collection and guava cannot be
* used here, as this is a different interface, one whose operators
* can throw IOEs.
* @param iterator input iterator
* @return the file status entries as a list.
* @throws IOException
*/
public static <T extends FileStatus> List<T> iteratorToList(
RemoteIterator<T> iterator) throws IOException {
List<T> list = new ArrayList<>();
while (iterator.hasNext()) {
list.add(iterator.next());
}
return list;
}
/**
* Convert a remote iterator over file status results into a list.
* This uses {@link RemoteIterator#next()} calls only, expecting
* a raised {@link NoSuchElementException} exception to indicate that
* the end of the listing has been reached. This iteration strategy is
* designed to verify that the implementation of the remote iterator
* generates results and terminates consistently with the {@code hasNext/next}
* iteration. More succinctly "verifies that the {@code next()} operator
* isn't relying on {@code hasNext()} to always be called during an iteration.
* @param iterator input iterator
* @return the status entries as a list.
* @throws IOException IO problems
*/
@SuppressWarnings("InfiniteLoopStatement")
public static <T extends FileStatus> List<T> iteratorToListThroughNextCallsAlone(
RemoteIterator<T> iterator) throws IOException {
List<T> list = new ArrayList<>();
try {
while (true) {
list.add(iterator.next());
}
} catch (NoSuchElementException expected) {
// ignored
}
return list;
}
/**
* Convert a remote iterator over file status results into a list.
* This uses {@link RemoteIterator#next()} calls only, expecting
@ -1602,7 +1648,7 @@ public class ContractTestUtils extends Assert {
* @param results results of the listFiles/listStatus call.
* @throws IOException IO problems during the iteration.
*/
public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
public TreeScanResults(RemoteIterator<? extends FileStatus> results)
throws IOException {
while (results.hasNext()) {
add(results.next());

View File

@ -2643,6 +2643,30 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
/**
* Override subclass such that we benefit for async listing done
* in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
* {@inheritDoc}
*
*/
@Override
public RemoteIterator<FileStatus> listStatusIterator(Path p)
throws FileNotFoundException, IOException {
RemoteIterator<S3AFileStatus> listStatusItr = once("listStatus",
p.toString(), () -> innerListStatus(p));
return new RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return listStatusItr.hasNext();
}
@Override
public FileStatus next() throws IOException {
return listStatusItr.next();
}
};
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.

View File

@ -231,6 +231,24 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
"match with original list of files")
.hasSameElementsAs(originalListOfFiles)
.hasSize(numOfPutRequests);
// Validate listing using listStatusIterator().
NanoTimer timeUsingListStatusItr = new NanoTimer();
RemoteIterator<FileStatus> lsItr = fs.listStatusIterator(dir);
List<String> listUsingListStatusItr = new ArrayList<>();
while (lsItr.hasNext()) {
listUsingListStatusItr.add(lsItr.next().getPath().toString());
Thread.sleep(eachFileProcessingTime);
}
timeUsingListStatusItr.end("listing %d files using " +
"listStatusIterator() api with batch size of %d " +
"including %dms of processing time for each file",
numOfPutRequests, batchSize, eachFileProcessingTime);
Assertions.assertThat(listUsingListStatusItr)
.describedAs("Listing results using listStatusIterator() must" +
"match with original list of files")
.hasSameElementsAs(originalListOfFiles)
.hasSize(numOfPutRequests);
} finally {
executorService.shutdown();
}