diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java new file mode 100644 index 00000000000..4120b20c1a1 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -0,0 +1,594 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.RemoteIterator; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; + +import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX; +import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus; +import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory; +import static org.apache.hadoop.fs.s3a.S3AUtils.stringify; +import static org.apache.hadoop.fs.s3a.S3AUtils.translateException; + +/** + * Place for the S3A listing classes; keeps all the small classes under control. + */ +public class Listing { + + private final S3AFileSystem owner; + private static final Logger LOG = S3AFileSystem.LOG; + + public Listing(S3AFileSystem owner) { + this.owner = owner; + } + + /** + * Create a FileStatus iterator against a path, with a given + * list object request. + * @param listPath path of the listing + * @param request initial request to make + * @param filter the filter on which paths to accept + * @param acceptor the class/predicate to decide which entries to accept + * in the listing based on the full file status. + * @return the iterator + * @throws IOException IO Problems + */ + FileStatusListingIterator createFileStatusListingIterator( + Path listPath, + ListObjectsRequest request, + PathFilter filter, + Listing.FileStatusAcceptor acceptor) throws IOException { + return new FileStatusListingIterator( + new ObjectListingIterator(listPath, request), + filter, + acceptor); + } + + /** + * Create a located status iterator over a file status iterator. + * @param statusIterator an iterator over the remote status entries + * @return a new remote iterator + */ + LocatedFileStatusIterator createLocatedFileStatusIterator( + RemoteIterator statusIterator) { + return new LocatedFileStatusIterator(statusIterator); + } + + /** + * Interface to implement by the logic deciding whether to accept a summary + * entry or path as a valid file or directory. + */ + interface FileStatusAcceptor { + + /** + * Predicate to decide whether or not to accept a summary entry. + * @param keyPath qualified path to the entry + * @param summary summary entry + * @return true if the entry is accepted (i.e. that a status entry + * should be generated. + */ + boolean accept(Path keyPath, S3ObjectSummary summary); + + /** + * Predicate to decide whether or not to accept a prefix. + * @param keyPath qualified path to the entry + * @param commonPrefix the prefix + * @return true if the entry is accepted (i.e. that a status entry + * should be generated.) + */ + boolean accept(Path keyPath, String commonPrefix); + } + + /** + * A remote iterator which only iterates over a single `LocatedFileStatus` + * value. + * + * If the status value is null, the iterator declares that it has no data. + * This iterator is used to handle {@link listStatus()} calls where the path + * handed in refers to a file, not a directory: this is the iterator + * returned. + */ + static final class SingleStatusRemoteIterator + implements RemoteIterator { + + /** + * The status to return; set to null after the first iteration. + */ + private LocatedFileStatus status; + + /** + * Constructor. + * @param status status value: may be null, in which case + * the iterator is empty. + */ + public SingleStatusRemoteIterator(LocatedFileStatus status) { + this.status = status; + } + + /** + * {@inheritDoc} + * @return true if there is a file status to return: this is always false + * for the second iteration, and may be false for the first. + * @throws IOException never + */ + @Override + public boolean hasNext() throws IOException { + return status != null; + } + + /** + * {@inheritDoc} + * @return the non-null status element passed in when the instance was + * constructed, if it ha not already been retrieved. + * @throws IOException never + * @throws NoSuchElementException if this is the second call, or it is + * the first call and a null {@link LocatedFileStatus} entry was passed + * to the constructor. + */ + @Override + public LocatedFileStatus next() throws IOException { + if (hasNext()) { + LocatedFileStatus s = this.status; + status = null; + return s; + } else { + throw new NoSuchElementException(); + } + } + } + + /** + * Wraps up object listing into a remote iterator which will ask for more + * listing data if needed. + * + * This is a complex operation, especially the process to determine + * if there are more entries remaining. If there are no more results + * remaining in the (filtered) results of the current listing request, then + * another request is made and those results filtered before the + * iterator can declare that there is more data available. + * + * The need to filter the results precludes the iterator from simply + * declaring that if the {@link S3AFileSystem.ObjectListingIterator#hasNext()} + * is true then there are more results. Instead the next batch of results must + * be retrieved and filtered. + * + * What does this mean? It means that remote requests to retrieve new + * batches of object listings are made in the {@link #hasNext()} call; + * the {@link #next()} call simply returns the filtered results of the last + * listing processed. However, do note that {@link #next()} calls + * {@link #hasNext()} during its operation. This is critical to ensure + * that a listing obtained through a sequence of {@link #next()} will + * complete with the same set of results as a classic + * {@code while(it.hasNext()} loop. + * + * Thread safety: None. + */ + class FileStatusListingIterator + implements RemoteIterator { + + /** Source of objects. */ + private final ObjectListingIterator source; + /** Filter of paths from API call. */ + private final PathFilter filter; + /** Filter of entries from file status. */ + private final FileStatusAcceptor acceptor; + /** request batch size. */ + private int batchSize; + /** Iterator over the current set of results. */ + private ListIterator statusBatchIterator; + + /** + * Create an iterator over file status entries. + * @param source the listing iterator from a listObjects call. + * @param filter the filter on which paths to accept + * @param acceptor the class/predicate to decide which entries to accept + * in the listing based on the full file status. + * @throws IOException IO Problems + */ + FileStatusListingIterator(ObjectListingIterator source, + PathFilter filter, + FileStatusAcceptor acceptor) throws IOException { + this.source = source; + this.filter = filter; + this.acceptor = acceptor; + // build the first set of results. This will not trigger any + // remote IO, assuming the source iterator is in its initial + // iteration + requestNextBatch(); + } + + /** + * Report whether or not there is new data available. + * If there is data in the local filtered list, return true. + * Else: request more data util that condition is met, or there + * is no more remote listing data. + * @return true if a call to {@link #next()} will succeed. + * @throws IOException + */ + @Override + public boolean hasNext() throws IOException { + return statusBatchIterator.hasNext() || requestNextBatch(); + } + + @Override + public FileStatus next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return statusBatchIterator.next(); + } + + /** + * Try to retrieve another batch. + * Note that for the initial batch, + * {@link S3AFileSystem.ObjectListingIterator} does not generate a request; + * it simply returns the initial set. + * + * @return true if a new batch was created. + * @throws IOException IO problems + */ + private boolean requestNextBatch() throws IOException { + // look for more object listing batches being available + while (source.hasNext()) { + // if available, retrieve it and build the next status + if (buildNextStatusBatch(source.next())) { + // this batch successfully generated entries matching the filters/ + // acceptors; declare that the request was successful + return true; + } else { + LOG.debug("All entries in batch were filtered...continuing"); + } + } + // if this code is reached, it means that all remaining + // object lists have been retrieved, and there are no new entries + // to return. + return false; + } + + /** + * Build the next status batch from a listing. + * @param objects the next object listing + * @return true if this added any entries after filtering + */ + private boolean buildNextStatusBatch(ObjectListing objects) { + // counters for debug logs + int added = 0, ignored = 0; + // list to fill in with results. Initial size will be list maximum. + List stats = new ArrayList<>( + objects.getObjectSummaries().size() + + objects.getCommonPrefixes().size()); + // objects + for (S3ObjectSummary summary : objects.getObjectSummaries()) { + String key = summary.getKey(); + Path keyPath = owner.keyToQualifiedPath(key); + if (LOG.isDebugEnabled()) { + LOG.debug("{}: {}", keyPath, stringify(summary)); + } + // Skip over keys that are ourselves and old S3N _$folder$ files + if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) { + FileStatus status = createFileStatus(keyPath, summary, + owner.getDefaultBlockSize(keyPath)); + LOG.debug("Adding: {}", status); + stats.add(status); + added++; + } else { + LOG.debug("Ignoring: {}", keyPath); + ignored++; + } + } + + // prefixes: always directories + for (String prefix : objects.getCommonPrefixes()) { + Path keyPath = owner.keyToQualifiedPath(prefix); + if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) { + FileStatus status = new S3AFileStatus(true, false, keyPath); + LOG.debug("Adding directory: {}", status); + added++; + stats.add(status); + } else { + LOG.debug("Ignoring directory: {}", keyPath); + ignored++; + } + } + + // finish up + batchSize = stats.size(); + statusBatchIterator = stats.listIterator(); + boolean hasNext = statusBatchIterator.hasNext(); + LOG.debug("Added {} entries; ignored {}; hasNext={}; hasMoreObjects={}", + added, ignored, hasNext, objects.isTruncated()); + return hasNext; + } + + /** + * Get the number of entries in the current batch. + * @return a number, possibly zero. + */ + public int getBatchSize() { + return batchSize; + } + } + + /** + * Wraps up AWS `ListObjects` requests in a remote iterator + * which will ask for more listing data if needed. + * + * That is: + * + * 1. The first invocation of the {@link #next()} call will return the results + * of the first request, the one created during the construction of the + * instance. + * + * 2. Second and later invocations will continue the ongoing listing, + * calling {@link #continueListObjects(ObjectListing)} to request the next + * batch of results. + * + * 3. The {@link #hasNext()} predicate returns true for the initial call, + * where {@link #next()} will return the initial results. It declares + * that it has future results iff the last executed request was truncated. + * + * Thread safety: none. + */ + class ObjectListingIterator implements RemoteIterator { + + /** The path listed. */ + private final Path listPath; + + /** The most recent listing results. */ + private ObjectListing objects; + + /** Indicator that this is the first listing. */ + private boolean firstListing = true; + + /** + * Count of how many listings have been requested + * (including initial result). + */ + private int listingCount = 1; + + /** + * Maximum keys in a request. + */ + private int maxKeys; + + /** + * Constructor -calls `listObjects()` on the request to populate the + * initial set of results/fail if there was a problem talking to the bucket. + * @param listPath path of the listing + * @param request initial request to make + * */ + ObjectListingIterator( + Path listPath, + ListObjectsRequest request) { + this.listPath = listPath; + this.maxKeys = owner.getMaxKeys(); + this.objects = owner.listObjects(request); + } + + /** + * Declare that the iterator has data if it is either is the initial + * iteration or it is a later one and the last listing obtained was + * incomplete. + * @throws IOException never: there is no IO in this operation. + */ + @Override + public boolean hasNext() throws IOException { + return firstListing || objects.isTruncated(); + } + + /** + * Ask for the next listing. + * For the first invocation, this returns the initial set, with no + * remote IO. For later requests, S3 will be queried, hence the calls + * may block or fail. + * @return the next object listing. + * @throws IOException if a query made of S3 fails. + * @throws NoSuchElementException if there is no more data to list. + */ + @Override + public ObjectListing next() throws IOException { + if (firstListing) { + // on the first listing, don't request more data. + // Instead just clear the firstListing flag so that it future calls + // will request new data. + firstListing = false; + } else { + try { + if (!objects.isTruncated()) { + // nothing more to request: fail. + throw new NoSuchElementException("No more results in listing of " + + listPath); + } + // need to request a new set of objects. + LOG.debug("[{}], Requesting next {} objects under {}", + listingCount, maxKeys, listPath); + objects = owner.continueListObjects(objects); + listingCount++; + LOG.debug("New listing status: {}", this); + } catch (AmazonClientException e) { + throw translateException("listObjects()", listPath, e); + } + } + return objects; + } + + @Override + public String toString() { + return "Object listing iterator against " + listPath + + "; listing count "+ listingCount + + "; isTruncated=" + objects.isTruncated(); + } + + /** + * Get the path listed. + * @return the path used in this listing. + */ + public Path getListPath() { + return listPath; + } + + /** + * Get the count of listing requests. + * @return the counter of requests made (including the initial lookup). + */ + public int getListingCount() { + return listingCount; + } + } + + /** + * Accept all entries except the base path and those which map to S3N + * pseudo directory markers. + */ + static class AcceptFilesOnly implements FileStatusAcceptor { + private final Path qualifiedPath; + + public AcceptFilesOnly(Path qualifiedPath) { + this.qualifiedPath = qualifiedPath; + } + + /** + * Reject a summary entry if the key path is the qualified Path, or + * it ends with {@code "_$folder$"}. + * @param keyPath key path of the entry + * @param summary summary entry + * @return true if the entry is accepted (i.e. that a status entry + * should be generated. + */ + @Override + public boolean accept(Path keyPath, S3ObjectSummary summary) { + return !keyPath.equals(qualifiedPath) + && !summary.getKey().endsWith(S3N_FOLDER_SUFFIX) + && !objectRepresentsDirectory(summary.getKey(), summary.getSize()); + } + + /** + * Accept no directory paths. + * @param keyPath qualified path to the entry + * @param prefix common prefix in listing. + * @return false, always. + */ + @Override + public boolean accept(Path keyPath, String prefix) { + return false; + } + } + + /** + * Take a remote iterator over a set of {@link FileStatus} instances and + * return a remote iterator of {@link LocatedFileStatus} instances. + */ + class LocatedFileStatusIterator + implements RemoteIterator { + private final RemoteIterator statusIterator; + + /** + * Constructor. + * @param statusIterator an iterator over the remote status entries + */ + LocatedFileStatusIterator(RemoteIterator statusIterator) { + this.statusIterator = statusIterator; + } + + @Override + public boolean hasNext() throws IOException { + return statusIterator.hasNext(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return owner.toLocatedFileStatus(statusIterator.next()); + } + } + + /** + * Accept all entries except the base path and those which map to S3N + * pseudo directory markers. + */ + static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor { + + /** Base path. */ + private final Path qualifiedPath; + + /** + * Constructor. + * @param qualifiedPath an already-qualified path. + */ + public AcceptAllButSelfAndS3nDirs(Path qualifiedPath) { + this.qualifiedPath = qualifiedPath; + } + + /** + * Reject a summary entry if the key path is the qualified Path, or + * it ends with {@code "_$folder$"}. + * @param keyPath key path of the entry + * @param summary summary entry + * @return true if the entry is accepted (i.e. that a status entry + * should be generated.) + */ + @Override + public boolean accept(Path keyPath, S3ObjectSummary summary) { + return !keyPath.equals(qualifiedPath) && + !summary.getKey().endsWith(S3N_FOLDER_SUFFIX); + } + + /** + * Accept all prefixes except the one for the base path, "self". + * @param keyPath qualified path to the entry + * @param prefix common prefix in listing. + * @return true if the entry is accepted (i.e. that a status entry + * should be generated. + */ + @Override + public boolean accept(Path keyPath, String prefix) { + return !keyPath.equals(qualifiedPath); + } + } + + /** + * A Path filter which accepts all filenames. + */ + static final PathFilter ACCEPT_ALL = new PathFilter() { + @Override + public boolean accept(Path file) { + return true; + } + + @Override + public String toString() { + return "ACCEPT_ALL"; + } + }; + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 902f73ab439..1cf26319767 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -84,6 +84,7 @@ import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.VersionInfo; import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL; import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.Statistic.*; @@ -115,6 +116,7 @@ public class S3AFileSystem extends FileSystem { private AmazonS3Client s3; private String bucket; private int maxKeys; + private Listing listing; private long partSize; private boolean enableMultiObjectsDelete; private TransferManager transfers; @@ -181,6 +183,7 @@ public class S3AFileSystem extends FileSystem { initAmazonS3Client(conf, credentials, awsConf); maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); + listing = new Listing(this); partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); if (partSize < 5 * 1024 * 1024) { LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); @@ -514,7 +517,11 @@ public class S3AFileSystem extends FileSystem { super(); } - /* Turns a path (relative or otherwise) into an S3 key + /** + * Turns a path (relative or otherwise) into an S3 key. + * + * @param path input path, may be relative to the working dir + * @return a key excluding the leading "/", or, if it is the root path, "" */ private String pathToKey(Path path) { if (!path.isAbsolute()) { @@ -528,10 +535,49 @@ public class S3AFileSystem extends FileSystem { return path.toUri().getPath().substring(1); } + /** + * Turns a path (relative or otherwise) into an S3 key, adding a trailing + * "/" if the path is not the root and does not already have a "/" + * at the end. + * + * @param key s3 key or "" + * @return the with a trailing "/", or, if it is the root key, "", + */ + private String maybeAddTrailingSlash(String key) { + if (!key.isEmpty() && !key.endsWith("/")) { + return key + '/'; + } else { + return key; + } + } + + /** + * Convert a path back to a key. + * @param key input key + * @return the path from this key + */ private Path keyToPath(String key) { return new Path("/" + key); } + /** + * Convert a key to a fully qualified path. + * @param key input key + * @return the fully qualified path including URI scheme and bucket name. + */ + Path keyToQualifiedPath(String key) { + return qualify(keyToPath(key)); + } + + /** + * Qualify a path. + * @param path path to qualify + * @return a qualified path. + */ + Path qualify(Path path) { + return path.makeQualified(uri, workingDir); + } + /** * Check that a Path belongs to this FileSystem. * Unlike the superclass, this version does not look at authority, @@ -550,10 +596,10 @@ public class S3AFileSystem extends FileSystem { } /** - * Opens an FSDataInputStream at the indicated Path. - * @param f the file name to open - * @param bufferSize the size of the buffer to be used. - */ + * Opens an FSDataInputStream at the indicated Path. + * @param f the file name to open + * @param bufferSize the size of the buffer to be used. + */ public FSDataInputStream open(Path f, int bufferSize) throws IOException { @@ -875,7 +921,7 @@ public class S3AFileSystem extends FileSystem { * @return the next result object */ protected ObjectListing continueListObjects(ObjectListing objects) { - incrementStatistic(OBJECT_LIST_REQUESTS); + incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS); incrementReadOperations(); return s3.listNextBatchOfObjects(objects); } @@ -1132,12 +1178,7 @@ public class S3AFileSystem extends FileSystem { } else { LOG.debug("Getting objects for directory prefix {} to delete", key); - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - // Hopefully not setting a delimiter will cause this to find everything - //request.setDelimiter("/"); - request.setMaxKeys(maxKeys); + ListObjectsRequest request = createListObjectsRequest(key, null); ObjectListing objects = listObjects(request); List keys = @@ -1211,66 +1252,57 @@ public class S3AFileSystem extends FileSystem { */ public FileStatus[] innerListStatus(Path f) throws FileNotFoundException, IOException, AmazonClientException { - String key = pathToKey(f); - LOG.debug("List status for path: {}", f); + Path path = qualify(f); + String key = pathToKey(path); + LOG.debug("List status for path: {}", path); incrementStatistic(INVOCATION_LIST_STATUS); - final List result = new ArrayList(); - final FileStatus fileStatus = getFileStatus(f); + List result; + final FileStatus fileStatus = getFileStatus(path); if (fileStatus.isDirectory()) { if (!key.isEmpty()) { - key = key + "/"; + key = key + '/'; } - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - request.setDelimiter("/"); - request.setMaxKeys(maxKeys); - + ListObjectsRequest request = createListObjectsRequest(key, "/"); LOG.debug("listStatus: doing listObjects for directory {}", key); - ObjectListing objects = listObjects(request); - - Path fQualified = f.makeQualified(uri, workingDir); - - while (true) { - for (S3ObjectSummary summary : objects.getObjectSummaries()) { - Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir); - // Skip over keys that are ourselves and old S3N _$folder$ files - if (keyPath.equals(fQualified) || - summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) { - LOG.debug("Ignoring: {}", keyPath); - } else { - S3AFileStatus status = createFileStatus(keyPath, summary, - getDefaultBlockSize(keyPath)); - result.add(status); - LOG.debug("Adding: {}", status); - } - } - - for (String prefix : objects.getCommonPrefixes()) { - Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir); - if (!keyPath.equals(f)) { - result.add(new S3AFileStatus(true, false, keyPath)); - LOG.debug("Adding: rd: {}", keyPath); - } - } - - if (objects.isTruncated()) { - LOG.debug("listStatus: list truncated - getting next batch"); - objects = continueListObjects(objects); - } else { - break; - } + Listing.FileStatusListingIterator files = + listing.createFileStatusListingIterator(path, + request, + ACCEPT_ALL, + new Listing.AcceptAllButSelfAndS3nDirs(path)); + result = new ArrayList<>(files.getBatchSize()); + while (files.hasNext()) { + result.add(files.next()); } + return result.toArray(new FileStatus[result.size()]); } else { - LOG.debug("Adding: rd (not a dir): {}", f); - result.add(fileStatus); + LOG.debug("Adding: rd (not a dir): {}", path); + FileStatus[] stats = new FileStatus[1]; + stats[0]= fileStatus; + return stats; } + } - return result.toArray(new FileStatus[result.size()]); + /** + * Create a {@code ListObjectsRequest} request against this bucket, + * with the maximum keys returned in a query set by {@link #maxKeys}. + * @param key key for request + * @param delimiter any delimiter + * @return the request + */ + private ListObjectsRequest createListObjectsRequest(String key, + String delimiter) { + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setMaxKeys(maxKeys); + request.setPrefix(key); + if (delimiter != null) { + request.setDelimiter(delimiter); + } + return request; } /** @@ -1371,11 +1403,11 @@ public class S3AFileSystem extends FileSystem { * @throws java.io.FileNotFoundException when the path does not exist; * @throws IOException on other problems. */ - public S3AFileStatus getFileStatus(Path f) throws IOException { - String key = pathToKey(f); + public S3AFileStatus getFileStatus(final Path f) throws IOException { incrementStatistic(INVOCATION_GET_FILE_STATUS); - LOG.debug("Getting path status for {} ({})", f , key); - + final Path path = qualify(f); + String key = pathToKey(path); + LOG.debug("Getting path status for {} ({})", path , key); if (!key.isEmpty()) { try { ObjectMetadata meta = getObjectMetadata(key); @@ -1383,20 +1415,20 @@ public class S3AFileSystem extends FileSystem { if (objectRepresentsDirectory(key, meta.getContentLength())) { LOG.debug("Found exact file: fake directory"); return new S3AFileStatus(true, true, - f.makeQualified(uri, workingDir)); + path); } else { LOG.debug("Found exact file: normal file"); return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), - f.makeQualified(uri, workingDir), - getDefaultBlockSize(f.makeQualified(uri, workingDir))); + path, + getDefaultBlockSize(path)); } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { - throw translateException("getFileStatus", f, e); + throw translateException("getFileStatus", path, e); } } catch (AmazonClientException e) { - throw translateException("getFileStatus", f, e); + throw translateException("getFileStatus", path, e); } // Necessary? @@ -1407,14 +1439,14 @@ public class S3AFileSystem extends FileSystem { if (objectRepresentsDirectory(newKey, meta.getContentLength())) { LOG.debug("Found file (with /): fake directory"); - return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + return new S3AFileStatus(true, true, path); } else { LOG.warn("Found file (with /): real file? should not happen: {}", key); return new S3AFileStatus(meta.getContentLength(), dateToLong(meta.getLastModified()), - f.makeQualified(uri, workingDir), - getDefaultBlockSize(f.makeQualified(uri, workingDir))); + path, + getDefaultBlockSize(path)); } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { @@ -1427,9 +1459,7 @@ public class S3AFileSystem extends FileSystem { } try { - if (!key.isEmpty() && !key.endsWith("/")) { - key = key + "/"; - } + key = maybeAddTrailingSlash(key); ListObjectsRequest request = new ListObjectsRequest(); request.setBucketName(bucket); request.setPrefix(key); @@ -1453,11 +1483,10 @@ public class S3AFileSystem extends FileSystem { } } - return new S3AFileStatus(true, false, - f.makeQualified(uri, workingDir)); + return new S3AFileStatus(true, false, path); } else if (key.isEmpty()) { LOG.debug("Found root directory"); - return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir)); + return new S3AFileStatus(true, true, path); } } catch (AmazonServiceException e) { if (e.getStatusCode() != 404) { @@ -1467,8 +1496,8 @@ public class S3AFileSystem extends FileSystem { throw translateException("getFileStatus", key, e); } - LOG.debug("Not Found: {}", f); - throw new FileNotFoundException("No such file or directory: " + f); + LOG.debug("Not Found: {}", path); + throw new FileNotFoundException("No such file or directory: " + path); } /** @@ -1829,7 +1858,15 @@ public class S3AFileSystem extends FileSystem { } /** - * Override superclass so as to add statistic collection. + * Get the maximum key count. + * @return a value, valid after initialization + */ + int getMaxKeys() { + return maxKeys; + } + + /** + * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}. * {@inheritDoc} */ @Override @@ -1849,24 +1886,6 @@ public class S3AFileSystem extends FileSystem { return super.globStatus(pathPattern, filter); } - /** - * Override superclass so as to add statistic collection. - * {@inheritDoc} - */ - @Override - public RemoteIterator listLocatedStatus(Path f) - throws FileNotFoundException, IOException { - incrementStatistic(INVOCATION_LIST_LOCATED_STATUS); - return super.listLocatedStatus(f); - } - - @Override - public RemoteIterator listFiles(Path f, - boolean recursive) throws FileNotFoundException, IOException { - incrementStatistic(INVOCATION_LIST_FILES); - return super.listFiles(f, recursive); - } - /** * Override superclass so as to add statistic collection. * {@inheritDoc} @@ -1897,6 +1916,129 @@ public class S3AFileSystem extends FileSystem { return super.isFile(f); } + /** + * {@inheritDoc}. + * + * This implementation is optimized for S3, which can do a bulk listing + * off all entries under a path in one single operation. Thus there is + * no need to recursively walk the directory tree. + * + * Instead a {@link ListObjectsRequest} is created requesting a (windowed) + * listing of all entries under the given path. This is used to construct + * an {@code ObjectListingIterator} instance, iteratively returning the + * sequence of lists of elements under the path. This is then iterated + * over in a {@code FileStatusListingIterator}, which generates + * {@link S3AFileStatus} instances, one per listing entry. + * These are then translated into {@link LocatedFileStatus} instances. + * + * This is essentially a nested and wrapped set of iterators, with some + * generator classes; an architecture which may become less convoluted + * using lambda-expressions. + * @param f a path + * @param recursive if the subdirectories need to be traversed recursively + * + * @return an iterator that traverses statuses of the files/directories + * in the given path + * @throws FileNotFoundException if {@code path} does not exist + * @throws IOException if any I/O error occurred + */ + @Override + public RemoteIterator listFiles(Path f, + boolean recursive) throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_FILES); + Path path = qualify(f); + LOG.debug("listFiles({}, {})", path, recursive); + try { + // lookup dir triggers existence check + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isFile()) { + // simple case: File + LOG.debug("Path is a file"); + return new Listing.SingleStatusRemoteIterator( + toLocatedFileStatus(fileStatus)); + } else { + // directory: do a bulk operation + String key = maybeAddTrailingSlash(pathToKey(path)); + String delimiter = recursive ? null : "/"; + LOG.debug("Requesting all entries under {} with delimiter '{}'", + key, delimiter); + return listing.createLocatedFileStatusIterator( + listing.createFileStatusListingIterator(path, + createListObjectsRequest(key, delimiter), + ACCEPT_ALL, + new Listing.AcceptFilesOnly(path))); + } + } catch (AmazonClientException e) { + throw translateException("listFiles", path, e); + } + } + + /** + * Override superclass so as to add statistic collection. + * {@inheritDoc} + */ + @Override + public RemoteIterator listLocatedStatus(Path f) + throws FileNotFoundException, IOException { + return listLocatedStatus(f, ACCEPT_ALL); + } + + /** + * {@inheritDoc}. + * + * S3 Optimized directory listing. The initial operation performs the + * first bulk listing; extra listings will take place + * when all the current set of results are used up. + * @param f a path + * @param filter a path filter + * @return an iterator that traverses statuses of the files/directories + * in the given path + * @throws FileNotFoundException if {@code path} does not exist + * @throws IOException if any I/O error occurred + */ + @Override + public RemoteIterator listLocatedStatus(final Path f, + final PathFilter filter) + throws FileNotFoundException, IOException { + incrementStatistic(INVOCATION_LIST_LOCATED_STATUS); + Path path = qualify(f); + LOG.debug("listLocatedStatus({}, {}", path, filter); + try { + // lookup dir triggers existence check + final FileStatus fileStatus = getFileStatus(path); + if (fileStatus.isFile()) { + // simple case: File + LOG.debug("Path is a file"); + return new Listing.SingleStatusRemoteIterator( + filter.accept(path) ? toLocatedFileStatus(fileStatus) : null); + } else { + // directory: trigger a lookup + String key = maybeAddTrailingSlash(pathToKey(path)); + return listing.createLocatedFileStatusIterator( + listing.createFileStatusListingIterator(path, + createListObjectsRequest(key, "/"), + filter, + new Listing.AcceptAllButSelfAndS3nDirs(path))); + } + } catch (AmazonClientException e) { + throw translateException("listLocatedStatus", path, e); + } + } + + /** + * Build a {@link LocatedFileStatus} from a {@link FileStatus} instance. + * @param status file status + * @return a located status with block locations set up from this FS. + * @throws IOException IO Problems. + */ + LocatedFileStatus toLocatedFileStatus(FileStatus status) + throws IOException { + return new LocatedFileStatus(status, + status.isFile() ? + getFileBlockLocations(status, 0, status.getLen()) + : null); + } + /** * Get a integer option >= the minimum allowed value. * @param conf configuration @@ -1934,38 +2076,4 @@ public class S3AFileSystem extends FileSystem { return v; } - /** - * This is a simple encapsulation of the - * S3 access key and secret. - */ - static class AWSAccessKeys { - private String accessKey = null; - private String accessSecret = null; - - /** - * Constructor. - * @param key - AWS access key - * @param secret - AWS secret key - */ - public AWSAccessKeys(String key, String secret) { - accessKey = key; - accessSecret = secret; - } - - /** - * Return the AWS access key. - * @return key - */ - public String getAccessKey() { - return accessKey; - } - - /** - * Return the AWS secret key. - * @return secret - */ - public String getAccessSecret() { - return accessSecret; - } - } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 514b9743245..b4c40639766 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -95,6 +95,7 @@ public class S3AInstrumentation { OBJECT_COPY_REQUESTS, OBJECT_DELETE_REQUESTS, OBJECT_LIST_REQUESTS, + OBJECT_CONTINUE_LIST_REQUESTS, OBJECT_METADATA_REQUESTS, OBJECT_MULTIPART_UPLOAD_ABORTED, OBJECT_PUT_BYTES, diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index 8033ac30cf3..3d48fcdc008 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -280,4 +280,16 @@ public final class S3AUtils { return val; } } + + /** + * String information about a summary entry for debug messages. + * @param summary summary object + * @return string value + */ + public static String stringify(S3ObjectSummary summary) { + StringBuilder builder = new StringBuilder(summary.getKey().length() + 100); + builder.append(summary.getKey()).append(' '); + builder.append("size=").append(summary.getSize()); + return builder.toString(); + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java index 36d163c5fc2..cbc34d665dc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java @@ -69,6 +69,8 @@ public enum Statistic { OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"), OBJECT_LIST_REQUESTS("object_list_requests", "Number of object listings made"), + OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests", + "Number of continued object listings made"), OBJECT_METADATA_REQUESTS("object_metadata_requests", "Number of requests for object metadata"), OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted", diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractGetFileStatus.java index d7b8fe33fdc..5937d490ab6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractGetFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/TestS3AContractGetFileStatus.java @@ -20,6 +20,8 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3ATestUtils; public class TestS3AContractGetFileStatus extends AbstractContractGetFileStatusTest { @@ -28,4 +30,18 @@ public class TestS3AContractGetFileStatus extends AbstractContractGetFileStatusT return new S3AContract(conf); } + @Override + public void teardown() throws Exception { + getLog().info("FS details {}", getFileSystem()); + super.teardown(); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + // aggressively low page size forces tests to go multipage + conf.setInt(Constants.MAX_PAGING_KEYS, 2); + return conf; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 39c60287d06..e45db483e72 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -124,7 +124,7 @@ public class S3ATestUtils { try { callback.call(); return; - } catch (FailFastException e) { + } catch (InterruptedException | FailFastException e) { throw e; } catch (Exception e) { lastException = e; @@ -321,7 +321,7 @@ public class S3ATestUtils { * @return true if the value is {@code ==} the other's */ public boolean diffEquals(MetricDiff that) { - return this.currentValue() == that.currentValue(); + return this.diff() == that.diff(); } /** @@ -330,7 +330,7 @@ public class S3ATestUtils { * @return true if the value is {@code <} the other's */ public boolean diffLessThan(MetricDiff that) { - return this.currentValue() < that.currentValue(); + return this.diff() < that.diff(); } /** @@ -339,7 +339,7 @@ public class S3ATestUtils { * @return true if the value is {@code <=} the other's */ public boolean diffLessThanOrEquals(MetricDiff that) { - return this.currentValue() <= that.currentValue(); + return this.diff() <= that.diff(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java index 7ece394ea76..35ea3add55a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java @@ -18,7 +18,6 @@ package org.apache.hadoop.fs.s3a.scale; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.Statistic; import org.junit.Test; @@ -51,6 +50,8 @@ public class TestS3ADirectoryPerformance extends S3AScaleTestBase { int files = scale; MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS); MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS); + MetricDiff listContinueRequests = + new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS); MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES); MetricDiff getFileStatusCalls = new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); @@ -69,34 +70,29 @@ public class TestS3ADirectoryPerformance extends S3AScaleTestBase { printThenReset(LOG, metadataRequests, listRequests, + listContinueRequests, listStatusCalls, getFileStatusCalls); + describe("Listing files via treewalk"); try { // Scan the directory via an explicit tree walk. // This is the baseline for any listing speedups. - MetricDiff treewalkMetadataRequests = - new MetricDiff(fs, OBJECT_METADATA_REQUESTS); - MetricDiff treewalkListRequests = new MetricDiff(fs, - OBJECT_LIST_REQUESTS); - MetricDiff treewalkListStatusCalls = new MetricDiff(fs, - INVOCATION_LIST_FILES); - MetricDiff treewalkGetFileStatusCalls = - new MetricDiff(fs, INVOCATION_GET_FILE_STATUS); NanoTimer treeWalkTimer = new NanoTimer(); TreeScanResults treewalkResults = treeWalk(fs, listDir); - treeWalkTimer.end("List status via treewalk"); + treeWalkTimer.end("List status via treewalk of %s", created); - print(LOG, - treewalkMetadataRequests, - treewalkListRequests, - treewalkListStatusCalls, - treewalkGetFileStatusCalls); + printThenReset(LOG, + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); assertEquals("Files found in listFiles(recursive=true) " + " created=" + created + " listed=" + treewalkResults, created.getFileCount(), treewalkResults.getFileCount()); - + describe("Listing files via listFiles(recursive=true)"); // listFiles() does the recursion internally NanoTimer listFilesRecursiveTimer = new NanoTimer(); @@ -108,31 +104,33 @@ public class TestS3ADirectoryPerformance extends S3AScaleTestBase { " created=" + created + " listed=" + listFilesResults, created.getFileCount(), listFilesResults.getFileCount()); - treewalkListRequests.assertDiffEquals(listRequests); - printThenReset(LOG, - metadataRequests, listRequests, - listStatusCalls, getFileStatusCalls); - - NanoTimer globStatusTimer = new NanoTimer(); - FileStatus[] globStatusFiles = fs.globStatus(listDir); - globStatusTimer.end("Time to globStatus() %s", globStatusTimer); - LOG.info("Time for glob status {} entries: {}", - globStatusFiles.length, - toHuman(createTimer.duration())); - printThenReset(LOG, + // only two list operations should have taken place + print(LOG, metadataRequests, listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); + assertEquals(listRequests.toString(), 2, listRequests.diff()); + reset(metadataRequests, + listRequests, + listContinueRequests, listStatusCalls, getFileStatusCalls); + } finally { + describe("deletion"); // deletion at the end of the run NanoTimer deleteTimer = new NanoTimer(); fs.delete(listDir, true); deleteTimer.end("Deleting directory tree"); printThenReset(LOG, - metadataRequests, listRequests, - listStatusCalls, getFileStatusCalls); + metadataRequests, + listRequests, + listContinueRequests, + listStatusCalls, + getFileStatusCalls); } }