HADOOP-16465 listLocatedStatus() optimisation (#1943)
Contributed by Mukund Thakur Optimize S3AFileSystem.listLocatedStatus() to perform list operations directly and then fallback to head checks for files
This commit is contained in:
parent
aeeebc5e79
commit
7b2d84d19c
|
@ -4283,41 +4283,68 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
RemoteIterator<? extends LocatedFileStatus> iterator =
|
RemoteIterator<? extends LocatedFileStatus> iterator =
|
||||||
once("listLocatedStatus", path.toString(),
|
once("listLocatedStatus", path.toString(),
|
||||||
() -> {
|
() -> {
|
||||||
// lookup dir triggers existence check
|
// Assuming the path to be a directory,
|
||||||
final S3AFileStatus fileStatus =
|
// trigger a list call directly.
|
||||||
(S3AFileStatus) getFileStatus(path);
|
final RemoteIterator<S3ALocatedFileStatus>
|
||||||
if (fileStatus.isFile()) {
|
locatedFileStatusIteratorForDir =
|
||||||
// simple case: File
|
getLocatedFileStatusIteratorForDir(path, filter);
|
||||||
LOG.debug("Path is a file");
|
|
||||||
return new Listing.SingleStatusRemoteIterator(
|
// If no listing is present then path might be a file.
|
||||||
filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
|
if (!locatedFileStatusIteratorForDir.hasNext()) {
|
||||||
} else {
|
final S3AFileStatus fileStatus =
|
||||||
// directory: trigger a lookup
|
(S3AFileStatus) getFileStatus(path);
|
||||||
final String key = maybeAddTrailingSlash(pathToKey(path));
|
if (fileStatus.isFile()) {
|
||||||
final Listing.FileStatusAcceptor acceptor =
|
// simple case: File
|
||||||
new Listing.AcceptAllButSelfAndS3nDirs(path);
|
LOG.debug("Path is a file");
|
||||||
boolean allowAuthoritative = allowAuthoritative(f);
|
return new Listing.SingleStatusRemoteIterator(
|
||||||
DirListingMetadata meta =
|
filter.accept(path)
|
||||||
S3Guard.listChildrenWithTtl(metadataStore, path,
|
? toLocatedFileStatus(fileStatus)
|
||||||
ttlTimeProvider, allowAuthoritative);
|
: null);
|
||||||
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
|
}
|
||||||
listing.createProvidedFileStatusIterator(
|
|
||||||
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
|
||||||
return (allowAuthoritative && meta != null
|
|
||||||
&& meta.isAuthoritative())
|
|
||||||
? listing.createLocatedFileStatusIterator(
|
|
||||||
cachedFileStatusIterator)
|
|
||||||
: listing.createLocatedFileStatusIterator(
|
|
||||||
listing.createFileStatusListingIterator(path,
|
|
||||||
createListObjectsRequest(key, "/"),
|
|
||||||
filter,
|
|
||||||
acceptor,
|
|
||||||
cachedFileStatusIterator));
|
|
||||||
}
|
}
|
||||||
|
// Either empty or non-empty directory.
|
||||||
|
return locatedFileStatusIteratorForDir;
|
||||||
});
|
});
|
||||||
return toLocatedFileStatusIterator(iterator);
|
return toLocatedFileStatusIterator(iterator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate list located status for a directory.
|
||||||
|
* Also performing tombstone reconciliation for guarded directories.
|
||||||
|
* @param dir directory to check.
|
||||||
|
* @param filter a path filter.
|
||||||
|
* @return an iterator that traverses statuses of the given dir.
|
||||||
|
* @throws IOException in case of failure.
|
||||||
|
*/
|
||||||
|
private RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
|
||||||
|
Path dir, PathFilter filter) throws IOException {
|
||||||
|
final String key = maybeAddTrailingSlash(pathToKey(dir));
|
||||||
|
final Listing.FileStatusAcceptor acceptor =
|
||||||
|
new Listing.AcceptAllButSelfAndS3nDirs(dir);
|
||||||
|
boolean allowAuthoritative = allowAuthoritative(dir);
|
||||||
|
DirListingMetadata meta =
|
||||||
|
S3Guard.listChildrenWithTtl(metadataStore, dir,
|
||||||
|
ttlTimeProvider, allowAuthoritative);
|
||||||
|
Set<Path> tombstones = meta != null
|
||||||
|
? meta.listTombstones()
|
||||||
|
: null;
|
||||||
|
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
|
||||||
|
listing.createProvidedFileStatusIterator(
|
||||||
|
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
|
||||||
|
return (allowAuthoritative && meta != null
|
||||||
|
&& meta.isAuthoritative())
|
||||||
|
? listing.createLocatedFileStatusIterator(
|
||||||
|
cachedFileStatusIterator)
|
||||||
|
: listing.createTombstoneReconcilingIterator(
|
||||||
|
listing.createLocatedFileStatusIterator(
|
||||||
|
listing.createFileStatusListingIterator(dir,
|
||||||
|
createListObjectsRequest(key, "/"),
|
||||||
|
filter,
|
||||||
|
acceptor,
|
||||||
|
cachedFileStatusIterator)),
|
||||||
|
tombstones);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
|
* Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
|
||||||
* @param status file status
|
* @param status file status
|
||||||
|
|
|
@ -111,6 +111,63 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
|
||||||
skipDuringFaultInjection(fs);
|
skipDuringFaultInjection(fs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCostOfLocatedFileStatusOnFile() throws Throwable {
|
||||||
|
describe("performing listLocatedStatus on a file");
|
||||||
|
Path file = path(getMethodName() + ".txt");
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
touch(fs, file);
|
||||||
|
resetMetricDiffs();
|
||||||
|
fs.listLocatedStatus(file);
|
||||||
|
if (!fs.hasMetadataStore()) {
|
||||||
|
// Unguarded FS.
|
||||||
|
metadataRequests.assertDiffEquals(1);
|
||||||
|
}
|
||||||
|
listRequests.assertDiffEquals(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCostOfListLocatedStatusOnEmptyDir() throws Throwable {
|
||||||
|
describe("performing listLocatedStatus on an empty dir");
|
||||||
|
Path dir = path(getMethodName());
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
resetMetricDiffs();
|
||||||
|
fs.listLocatedStatus(dir);
|
||||||
|
if (!fs.hasMetadataStore()) {
|
||||||
|
// Unguarded FS.
|
||||||
|
verifyOperationCount(2, 1);
|
||||||
|
} else {
|
||||||
|
if (fs.allowAuthoritative(dir)) {
|
||||||
|
verifyOperationCount(0, 0);
|
||||||
|
} else {
|
||||||
|
verifyOperationCount(0, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCostOfListLocatedStatusOnNonEmptyDir() throws Throwable {
|
||||||
|
describe("performing listLocatedStatus on a non empty dir");
|
||||||
|
Path dir = path(getMethodName() + "dir");
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
fs.mkdirs(dir);
|
||||||
|
Path file = new Path(dir, "file.txt");
|
||||||
|
touch(fs, file);
|
||||||
|
resetMetricDiffs();
|
||||||
|
fs.listLocatedStatus(dir);
|
||||||
|
if (!fs.hasMetadataStore()) {
|
||||||
|
// Unguarded FS.
|
||||||
|
verifyOperationCount(0, 1);
|
||||||
|
} else {
|
||||||
|
if(fs.allowAuthoritative(dir)) {
|
||||||
|
verifyOperationCount(0, 0);
|
||||||
|
} else {
|
||||||
|
verifyOperationCount(0, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCostOfGetFileStatusOnFile() throws Throwable {
|
public void testCostOfGetFileStatusOnFile() throws Throwable {
|
||||||
describe("performing getFileStatus on a file");
|
describe("performing getFileStatus on a file");
|
||||||
|
|
Loading…
Reference in New Issue