HADOOP-16801. S3Guard listFiles will not query S3 if all listings are authoritative (#1815). Contributed by Mustafa İman.
This commit is contained in:
parent
a5ef08b619
commit
5977360878
|
@ -1452,7 +1452,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
? Listing.ACCEPT_ALL_BUT_S3N
|
||||
: new Listing.AcceptAllButSelfAndS3nDirs(path),
|
||||
status,
|
||||
collectTombstones);
|
||||
collectTombstones,
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -3937,7 +3938,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
|
||||
boolean recursive) throws FileNotFoundException, IOException {
|
||||
return toLocatedFileStatusIterator(innerListFiles(f, recursive,
|
||||
new Listing.AcceptFilesOnly(qualify(f)), null, true));
|
||||
new Listing.AcceptFilesOnly(qualify(f)), null, true, false));
|
||||
}
|
||||
|
||||
private static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator(
|
||||
|
@ -3964,7 +3965,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
@Retries.RetryTranslated
|
||||
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
|
||||
Path f, boolean recursive) throws IOException {
|
||||
return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N, null, true);
|
||||
return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N,
|
||||
null, true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recursive List of files and empty directories, force metadatastore
|
||||
* to act like it is non-authoritative.
|
||||
* @param f path to list from
|
||||
* @param recursive
|
||||
* @return an iterator.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
@Retries.RetryTranslated
|
||||
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectoriesForceNonAuth(
|
||||
Path f, boolean recursive) throws IOException {
|
||||
return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N,
|
||||
null, true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3989,11 +4006,19 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* In case of recursive listing, if any of the directories reachable from
|
||||
* the path are not authoritative on the client, this method will query S3
|
||||
* for all the directories in the listing in addition to returning S3Guard
|
||||
* entries.
|
||||
*
|
||||
* @param f path
|
||||
* @param recursive recursive listing?
|
||||
* @param acceptor file status filter
|
||||
* @param status optional status of path to list.
|
||||
* @param collectTombstones should tombstones be collected from S3Guard?
|
||||
* @param forceNonAuthoritativeMS forces metadata store to act like non
|
||||
* authoritative. This is useful when
|
||||
* listFiles output is used by import tool.
|
||||
* @return an iterator over the listing.
|
||||
* @throws IOException failure
|
||||
*/
|
||||
|
@ -4003,7 +4028,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
final boolean recursive,
|
||||
final Listing.FileStatusAcceptor acceptor,
|
||||
final S3AFileStatus status,
|
||||
final boolean collectTombstones) throws IOException {
|
||||
final boolean collectTombstones,
|
||||
final boolean forceNonAuthoritativeMS) throws IOException {
|
||||
entryPoint(INVOCATION_LIST_FILES);
|
||||
Path path = qualify(f);
|
||||
LOG.debug("listFiles({}, {})", path, recursive);
|
||||
|
@ -4035,6 +4061,20 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
|||
new MetadataStoreListFilesIterator(metadataStore, pm,
|
||||
allowAuthoritative);
|
||||
tombstones = metadataStoreListFilesIterator.listTombstones();
|
||||
// if all of the below is true
|
||||
// - authoritative access is allowed for this metadatastore for this directory,
|
||||
// - all the directory listings are authoritative on the client
|
||||
// - the caller does not force non-authoritative access
|
||||
// return the listing without any further s3 access
|
||||
if (!forceNonAuthoritativeMS &&
|
||||
allowAuthoritative &&
|
||||
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
|
||||
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
|
||||
metadataStoreListFilesIterator, tombstones);
|
||||
cachedFilesIterator = listing.createProvidedFileStatusIterator(
|
||||
statuses, ACCEPT_ALL, acceptor);
|
||||
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
|
||||
}
|
||||
cachedFilesIterator = metadataStoreListFilesIterator;
|
||||
} else {
|
||||
DirListingMetadata meta =
|
||||
|
|
|
@ -323,7 +323,8 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
|
|||
// list files including any under tombstones through S3Guard
|
||||
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
|
||||
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
|
||||
callbacks.listFilesAndEmptyDirectories(path, status, false, true);
|
||||
callbacks.listFilesAndEmptyDirectories(path, status,
|
||||
false, true);
|
||||
|
||||
// iterate through and delete. The next() call will block when a new S3
|
||||
// page is required; this any active delete submitted to the executor
|
||||
|
|
|
@ -146,7 +146,7 @@ class ImportOperation extends ExecutingStoreOperation<Long> {
|
|||
long countOfFilesWritten = 0;
|
||||
long countOfDirsWritten = 0;
|
||||
RemoteIterator<S3ALocatedFileStatus> it = getFilesystem()
|
||||
.listFilesAndEmptyDirectories(basePath, true);
|
||||
.listFilesAndEmptyDirectoriesForceNonAuth(basePath, true);
|
||||
while (it.hasNext()) {
|
||||
S3ALocatedFileStatus located = it.next();
|
||||
S3AFileStatus child;
|
||||
|
|
|
@ -92,6 +92,7 @@ public class MetadataStoreListFilesIterator implements
|
|||
private final boolean allowAuthoritative;
|
||||
private final MetadataStore metadataStore;
|
||||
private final Set<Path> tombstones = new HashSet<>();
|
||||
private final boolean recursivelyAuthoritative;
|
||||
private Iterator<S3AFileStatus> leafNodesIterator = null;
|
||||
|
||||
public MetadataStoreListFilesIterator(MetadataStore ms, PathMetadata meta,
|
||||
|
@ -99,24 +100,48 @@ public class MetadataStoreListFilesIterator implements
|
|||
Preconditions.checkNotNull(ms);
|
||||
this.metadataStore = ms;
|
||||
this.allowAuthoritative = allowAuthoritative;
|
||||
prefetch(meta);
|
||||
this.recursivelyAuthoritative = prefetch(meta);
|
||||
}
|
||||
|
||||
private void prefetch(PathMetadata meta) throws IOException {
|
||||
/**
|
||||
* Walks the listing tree, starting from given metadata path. All
|
||||
* encountered files and empty directories are added to
|
||||
* {@link leafNodesIterator} unless a directory seems to be empty
|
||||
* and at least one of the following conditions hold:
|
||||
* <ul>
|
||||
* <li>
|
||||
* The directory listing is not marked authoritative
|
||||
* </li>
|
||||
* <li>
|
||||
* Authoritative mode is not allowed
|
||||
* </li>
|
||||
* </ul>
|
||||
* @param meta starting point for tree walk
|
||||
* @return {@code true} if all encountered directory listings
|
||||
* are marked as authoritative
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean prefetch(PathMetadata meta) throws IOException {
|
||||
final Queue<PathMetadata> queue = new LinkedList<>();
|
||||
final Collection<S3AFileStatus> leafNodes = new ArrayList<>();
|
||||
|
||||
boolean allListingsAuthoritative = true;
|
||||
if (meta != null) {
|
||||
final Path path = meta.getFileStatus().getPath();
|
||||
if (path.isRoot()) {
|
||||
DirListingMetadata rootListing = metadataStore.listChildren(path);
|
||||
if (rootListing != null) {
|
||||
if (!rootListing.isAuthoritative()) {
|
||||
allListingsAuthoritative = false;
|
||||
}
|
||||
tombstones.addAll(rootListing.listTombstones());
|
||||
queue.addAll(rootListing.withoutTombstones().getListing());
|
||||
}
|
||||
} else {
|
||||
queue.add(meta);
|
||||
}
|
||||
} else {
|
||||
allListingsAuthoritative = false;
|
||||
}
|
||||
|
||||
while(!queue.isEmpty()) {
|
||||
|
@ -131,6 +156,9 @@ public class MetadataStoreListFilesIterator implements
|
|||
final Path path = nextStatus.getPath();
|
||||
DirListingMetadata children = metadataStore.listChildren(path);
|
||||
if (children != null) {
|
||||
if (!children.isAuthoritative()) {
|
||||
allListingsAuthoritative = false;
|
||||
}
|
||||
tombstones.addAll(children.listTombstones());
|
||||
Collection<PathMetadata> liveChildren =
|
||||
children.withoutTombstones().getListing();
|
||||
|
@ -142,6 +170,9 @@ public class MetadataStoreListFilesIterator implements
|
|||
} else if (allowAuthoritative && children.isAuthoritative()) {
|
||||
leafNodes.add(nextStatus);
|
||||
}
|
||||
} else {
|
||||
// we do not have a listing, so directory definitely non-authoritative
|
||||
allListingsAuthoritative = false;
|
||||
}
|
||||
}
|
||||
// Directories that *might* be empty are ignored for now, since we
|
||||
|
@ -151,6 +182,7 @@ public class MetadataStoreListFilesIterator implements
|
|||
// The only other possibility is a symlink, which is unsupported on S3A.
|
||||
}
|
||||
leafNodesIterator = leafNodes.iterator();
|
||||
return allListingsAuthoritative;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -163,6 +195,10 @@ public class MetadataStoreListFilesIterator implements
|
|||
return leafNodesIterator.next();
|
||||
}
|
||||
|
||||
public boolean isRecursivelyAuthoritative() {
|
||||
return recursivelyAuthoritative;
|
||||
}
|
||||
|
||||
public Set<Path> listTombstones() {
|
||||
return tombstones;
|
||||
}
|
||||
|
|
|
@ -36,6 +36,8 @@ import javax.annotation.Nullable;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -242,6 +244,30 @@ public final class S3Guard {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the data of an iterator of {@link S3AFileStatus} to
|
||||
* an array. Given tombstones are filtered out. If the iterator
|
||||
* does return any item, an empty array is returned.
|
||||
* @param iterator a non-null iterator
|
||||
* @param tombstones
|
||||
* @return a possibly-empty array of file status entries
|
||||
* @throws IOException
|
||||
*/
|
||||
public static S3AFileStatus[] iteratorToStatuses(
|
||||
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
|
||||
throws IOException {
|
||||
List<FileStatus> statuses = new ArrayList<>();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
S3AFileStatus status = iterator.next();
|
||||
if (!tombstones.contains(status.getPath())) {
|
||||
statuses.add(status);
|
||||
}
|
||||
}
|
||||
|
||||
return statuses.toArray(new S3AFileStatus[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the data of a directory listing to an array of {@link FileStatus}
|
||||
* entries. Tombstones are filtered out at this point. If the listing is null
|
||||
|
|
|
@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a.s3guard;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -302,6 +304,90 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
|||
assertListDoesNotUpdateAuth(dir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListFilesRecursiveWhenAllListingsAreAuthoritative()
|
||||
throws Exception {
|
||||
describe("listFiles does not make further calls to the fs when"
|
||||
+ "all nested directory listings are authoritative");
|
||||
Set<Path> originalFiles = new HashSet<>();
|
||||
|
||||
Path parentDir = dir;
|
||||
Path parentFile = dirFile;
|
||||
Path nestedDir1 = new Path(dir, "nested1");
|
||||
Path nestedFile1 = new Path(nestedDir1, "nestedFile1");
|
||||
Path nestedDir2 = new Path(nestedDir1, "nested2/");
|
||||
Path nestedFile2 = new Path(nestedDir2, "nestedFile2");
|
||||
|
||||
originalFiles.add(parentFile);
|
||||
originalFiles.add(nestedFile1);
|
||||
originalFiles.add(nestedFile2);
|
||||
|
||||
mkAuthDir(parentDir);
|
||||
mkAuthDir(nestedDir1);
|
||||
mkAuthDir(nestedDir2);
|
||||
touchFile(parentFile);
|
||||
touchFile(nestedFile1);
|
||||
touchFile(nestedFile2);
|
||||
|
||||
S3ATestUtils.MetricDiff objListRequests =
|
||||
new S3ATestUtils.MetricDiff(authFS, OBJECT_LIST_REQUESTS);
|
||||
|
||||
RemoteIterator<LocatedFileStatus> statusIterator =
|
||||
authFS.listFiles(dir, true);
|
||||
|
||||
List<Path> pathsFromStatusIterator = toPaths(statusIterator);
|
||||
|
||||
Assertions.assertThat(pathsFromStatusIterator)
|
||||
.as("listFiles should return all the items in actual"
|
||||
+ "S3 directory and nothing more")
|
||||
.hasSameElementsAs(originalFiles)
|
||||
.hasSameSizeAs(originalFiles);
|
||||
|
||||
objListRequests.assertDiffEquals("There must not be any OBJECT_LIST "
|
||||
+ "requests as all directory listings are authoritative", 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListFilesRecursiveWhenSomePathsAreNotAuthoritative()
|
||||
throws Exception {
|
||||
describe("listFiles correctly constructs recursive listing"
|
||||
+ "when authoritative and non-authoritative paths are mixed");
|
||||
List<Path> originalFiles = new ArrayList<>();
|
||||
Path parentDir = dir;
|
||||
Path parentFile = dirFile;
|
||||
Path nestedDir1 = new Path(dir, "nested1");
|
||||
Path nestedFile1 = new Path(nestedDir1, "nestedFile1");
|
||||
Path nestedDir2 = new Path(nestedDir1, "nested2/");
|
||||
Path nestedFile2 = new Path(nestedDir2, "nestedFile2");
|
||||
|
||||
originalFiles.add(parentFile);
|
||||
originalFiles.add(nestedFile1);
|
||||
originalFiles.add(nestedFile2);
|
||||
|
||||
mkAuthDir(parentDir);
|
||||
mkNonauthDir(nestedDir1);
|
||||
mkAuthDir(nestedDir2);
|
||||
touchFile(parentFile);
|
||||
touchFile(nestedFile1);
|
||||
touchFile(nestedFile2);
|
||||
|
||||
S3ATestUtils.MetricDiff objListRequests =
|
||||
new S3ATestUtils.MetricDiff(authFS, OBJECT_LIST_REQUESTS);
|
||||
|
||||
RemoteIterator<LocatedFileStatus> statusIterator =
|
||||
authFS.listFiles(dir, true);
|
||||
|
||||
List<Path> pathsFromStatusIterator = toPaths(statusIterator);
|
||||
|
||||
Assertions.assertThat(pathsFromStatusIterator)
|
||||
.as("listFiles should return all the items in actual"
|
||||
+ "S3 directory and nothing more")
|
||||
.hasSameElementsAs(originalFiles)
|
||||
.hasSameSizeAs(originalFiles);
|
||||
objListRequests.assertDiffEquals("Only 1 OBJECT_LIST call is expected"
|
||||
+ "as a nested directory listing is not authoritative", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListStatusMakesDirAuth() throws Throwable {
|
||||
describe("Verify listStatus marks a dir as auth");
|
||||
|
@ -706,6 +792,24 @@ public class ITestDynamoDBMetadataStoreAuthoritativeMode
|
|||
return call;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Path array from all items retrieved from
|
||||
* {@link RemoteIterator<LocatedFileStatus>}.
|
||||
*
|
||||
* @param remoteIterator iterator
|
||||
* @return a list of Paths
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<Path> toPaths(RemoteIterator<LocatedFileStatus> remoteIterator)
|
||||
throws IOException {
|
||||
List<Path> list = new ArrayList<>();
|
||||
while (remoteIterator.hasNext()) {
|
||||
LocatedFileStatus fileStatus = remoteIterator.next();
|
||||
list.add(fileStatus.getPath());
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a listStatus call increments the
|
||||
* "s3guard_metadatastore_authoritative_directories_updated" counter.
|
||||
|
|
Loading…
Reference in New Issue