HADOOP-13208. S3A listFiles(recursive=true) to do a bulk listObjects instead of walking the pseudo-tree of directories. Contributed by Steve Loughran.
This commit is contained in:
parent
869393643d
commit
822d661b8f
|
@ -0,0 +1,594 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import com.amazonaws.AmazonClientException;
|
||||||
|
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||||
|
import com.amazonaws.services.s3.model.ObjectListing;
|
||||||
|
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ListIterator;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Place for the S3A listing classes; keeps all the small classes under control.
|
||||||
|
*/
|
||||||
|
public class Listing {
|
||||||
|
|
||||||
|
private final S3AFileSystem owner;
|
||||||
|
private static final Logger LOG = S3AFileSystem.LOG;
|
||||||
|
|
||||||
|
public Listing(S3AFileSystem owner) {
|
||||||
|
this.owner = owner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a FileStatus iterator against a path, with a given
|
||||||
|
* list object request.
|
||||||
|
* @param listPath path of the listing
|
||||||
|
* @param request initial request to make
|
||||||
|
* @param filter the filter on which paths to accept
|
||||||
|
* @param acceptor the class/predicate to decide which entries to accept
|
||||||
|
* in the listing based on the full file status.
|
||||||
|
* @return the iterator
|
||||||
|
* @throws IOException IO Problems
|
||||||
|
*/
|
||||||
|
FileStatusListingIterator createFileStatusListingIterator(
|
||||||
|
Path listPath,
|
||||||
|
ListObjectsRequest request,
|
||||||
|
PathFilter filter,
|
||||||
|
Listing.FileStatusAcceptor acceptor) throws IOException {
|
||||||
|
return new FileStatusListingIterator(
|
||||||
|
new ObjectListingIterator(listPath, request),
|
||||||
|
filter,
|
||||||
|
acceptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a located status iterator over a file status iterator.
|
||||||
|
* @param statusIterator an iterator over the remote status entries
|
||||||
|
* @return a new remote iterator
|
||||||
|
*/
|
||||||
|
LocatedFileStatusIterator createLocatedFileStatusIterator(
|
||||||
|
RemoteIterator<FileStatus> statusIterator) {
|
||||||
|
return new LocatedFileStatusIterator(statusIterator);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface to implement by the logic deciding whether to accept a summary
|
||||||
|
* entry or path as a valid file or directory.
|
||||||
|
*/
|
||||||
|
interface FileStatusAcceptor {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Predicate to decide whether or not to accept a summary entry.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
boolean accept(Path keyPath, S3ObjectSummary summary);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Predicate to decide whether or not to accept a prefix.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param commonPrefix the prefix
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.)
|
||||||
|
*/
|
||||||
|
boolean accept(Path keyPath, String commonPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A remote iterator which only iterates over a single `LocatedFileStatus`
|
||||||
|
* value.
|
||||||
|
*
|
||||||
|
* If the status value is null, the iterator declares that it has no data.
|
||||||
|
* This iterator is used to handle {@link listStatus()} calls where the path
|
||||||
|
* handed in refers to a file, not a directory: this is the iterator
|
||||||
|
* returned.
|
||||||
|
*/
|
||||||
|
static final class SingleStatusRemoteIterator
|
||||||
|
implements RemoteIterator<LocatedFileStatus> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The status to return; set to null after the first iteration.
|
||||||
|
*/
|
||||||
|
private LocatedFileStatus status;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param status status value: may be null, in which case
|
||||||
|
* the iterator is empty.
|
||||||
|
*/
|
||||||
|
public SingleStatusRemoteIterator(LocatedFileStatus status) {
|
||||||
|
this.status = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
* @return true if there is a file status to return: this is always false
|
||||||
|
* for the second iteration, and may be false for the first.
|
||||||
|
* @throws IOException never
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
return status != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}
|
||||||
|
* @return the non-null status element passed in when the instance was
|
||||||
|
* constructed, if it ha not already been retrieved.
|
||||||
|
* @throws IOException never
|
||||||
|
* @throws NoSuchElementException if this is the second call, or it is
|
||||||
|
* the first call and a null {@link LocatedFileStatus} entry was passed
|
||||||
|
* to the constructor.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public LocatedFileStatus next() throws IOException {
|
||||||
|
if (hasNext()) {
|
||||||
|
LocatedFileStatus s = this.status;
|
||||||
|
status = null;
|
||||||
|
return s;
|
||||||
|
} else {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps up object listing into a remote iterator which will ask for more
|
||||||
|
* listing data if needed.
|
||||||
|
*
|
||||||
|
* This is a complex operation, especially the process to determine
|
||||||
|
* if there are more entries remaining. If there are no more results
|
||||||
|
* remaining in the (filtered) results of the current listing request, then
|
||||||
|
* another request is made <i>and those results filtered</i> before the
|
||||||
|
* iterator can declare that there is more data available.
|
||||||
|
*
|
||||||
|
* The need to filter the results precludes the iterator from simply
|
||||||
|
* declaring that if the {@link S3AFileSystem.ObjectListingIterator#hasNext()}
|
||||||
|
* is true then there are more results. Instead the next batch of results must
|
||||||
|
* be retrieved and filtered.
|
||||||
|
*
|
||||||
|
* What does this mean? It means that remote requests to retrieve new
|
||||||
|
* batches of object listings are made in the {@link #hasNext()} call;
|
||||||
|
* the {@link #next()} call simply returns the filtered results of the last
|
||||||
|
* listing processed. However, do note that {@link #next()} calls
|
||||||
|
* {@link #hasNext()} during its operation. This is critical to ensure
|
||||||
|
* that a listing obtained through a sequence of {@link #next()} will
|
||||||
|
* complete with the same set of results as a classic
|
||||||
|
* {@code while(it.hasNext()} loop.
|
||||||
|
*
|
||||||
|
* Thread safety: None.
|
||||||
|
*/
|
||||||
|
class FileStatusListingIterator
|
||||||
|
implements RemoteIterator<FileStatus> {
|
||||||
|
|
||||||
|
/** Source of objects. */
|
||||||
|
private final ObjectListingIterator source;
|
||||||
|
/** Filter of paths from API call. */
|
||||||
|
private final PathFilter filter;
|
||||||
|
/** Filter of entries from file status. */
|
||||||
|
private final FileStatusAcceptor acceptor;
|
||||||
|
/** request batch size. */
|
||||||
|
private int batchSize;
|
||||||
|
/** Iterator over the current set of results. */
|
||||||
|
private ListIterator<FileStatus> statusBatchIterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an iterator over file status entries.
|
||||||
|
* @param source the listing iterator from a listObjects call.
|
||||||
|
* @param filter the filter on which paths to accept
|
||||||
|
* @param acceptor the class/predicate to decide which entries to accept
|
||||||
|
* in the listing based on the full file status.
|
||||||
|
* @throws IOException IO Problems
|
||||||
|
*/
|
||||||
|
FileStatusListingIterator(ObjectListingIterator source,
|
||||||
|
PathFilter filter,
|
||||||
|
FileStatusAcceptor acceptor) throws IOException {
|
||||||
|
this.source = source;
|
||||||
|
this.filter = filter;
|
||||||
|
this.acceptor = acceptor;
|
||||||
|
// build the first set of results. This will not trigger any
|
||||||
|
// remote IO, assuming the source iterator is in its initial
|
||||||
|
// iteration
|
||||||
|
requestNextBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Report whether or not there is new data available.
|
||||||
|
* If there is data in the local filtered list, return true.
|
||||||
|
* Else: request more data util that condition is met, or there
|
||||||
|
* is no more remote listing data.
|
||||||
|
* @return true if a call to {@link #next()} will succeed.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
return statusBatchIterator.hasNext() || requestNextBatch();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FileStatus next() throws IOException {
|
||||||
|
if (!hasNext()) {
|
||||||
|
throw new NoSuchElementException();
|
||||||
|
}
|
||||||
|
return statusBatchIterator.next();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to retrieve another batch.
|
||||||
|
* Note that for the initial batch,
|
||||||
|
* {@link S3AFileSystem.ObjectListingIterator} does not generate a request;
|
||||||
|
* it simply returns the initial set.
|
||||||
|
*
|
||||||
|
* @return true if a new batch was created.
|
||||||
|
* @throws IOException IO problems
|
||||||
|
*/
|
||||||
|
private boolean requestNextBatch() throws IOException {
|
||||||
|
// look for more object listing batches being available
|
||||||
|
while (source.hasNext()) {
|
||||||
|
// if available, retrieve it and build the next status
|
||||||
|
if (buildNextStatusBatch(source.next())) {
|
||||||
|
// this batch successfully generated entries matching the filters/
|
||||||
|
// acceptors; declare that the request was successful
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
LOG.debug("All entries in batch were filtered...continuing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if this code is reached, it means that all remaining
|
||||||
|
// object lists have been retrieved, and there are no new entries
|
||||||
|
// to return.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build the next status batch from a listing.
|
||||||
|
* @param objects the next object listing
|
||||||
|
* @return true if this added any entries after filtering
|
||||||
|
*/
|
||||||
|
private boolean buildNextStatusBatch(ObjectListing objects) {
|
||||||
|
// counters for debug logs
|
||||||
|
int added = 0, ignored = 0;
|
||||||
|
// list to fill in with results. Initial size will be list maximum.
|
||||||
|
List<FileStatus> stats = new ArrayList<>(
|
||||||
|
objects.getObjectSummaries().size() +
|
||||||
|
objects.getCommonPrefixes().size());
|
||||||
|
// objects
|
||||||
|
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
||||||
|
String key = summary.getKey();
|
||||||
|
Path keyPath = owner.keyToQualifiedPath(key);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("{}: {}", keyPath, stringify(summary));
|
||||||
|
}
|
||||||
|
// Skip over keys that are ourselves and old S3N _$folder$ files
|
||||||
|
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
|
||||||
|
FileStatus status = createFileStatus(keyPath, summary,
|
||||||
|
owner.getDefaultBlockSize(keyPath));
|
||||||
|
LOG.debug("Adding: {}", status);
|
||||||
|
stats.add(status);
|
||||||
|
added++;
|
||||||
|
} else {
|
||||||
|
LOG.debug("Ignoring: {}", keyPath);
|
||||||
|
ignored++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefixes: always directories
|
||||||
|
for (String prefix : objects.getCommonPrefixes()) {
|
||||||
|
Path keyPath = owner.keyToQualifiedPath(prefix);
|
||||||
|
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
|
||||||
|
FileStatus status = new S3AFileStatus(true, false, keyPath);
|
||||||
|
LOG.debug("Adding directory: {}", status);
|
||||||
|
added++;
|
||||||
|
stats.add(status);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Ignoring directory: {}", keyPath);
|
||||||
|
ignored++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finish up
|
||||||
|
batchSize = stats.size();
|
||||||
|
statusBatchIterator = stats.listIterator();
|
||||||
|
boolean hasNext = statusBatchIterator.hasNext();
|
||||||
|
LOG.debug("Added {} entries; ignored {}; hasNext={}; hasMoreObjects={}",
|
||||||
|
added, ignored, hasNext, objects.isTruncated());
|
||||||
|
return hasNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of entries in the current batch.
|
||||||
|
* @return a number, possibly zero.
|
||||||
|
*/
|
||||||
|
public int getBatchSize() {
|
||||||
|
return batchSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps up AWS `ListObjects` requests in a remote iterator
|
||||||
|
* which will ask for more listing data if needed.
|
||||||
|
*
|
||||||
|
* That is:
|
||||||
|
*
|
||||||
|
* 1. The first invocation of the {@link #next()} call will return the results
|
||||||
|
* of the first request, the one created during the construction of the
|
||||||
|
* instance.
|
||||||
|
*
|
||||||
|
* 2. Second and later invocations will continue the ongoing listing,
|
||||||
|
* calling {@link #continueListObjects(ObjectListing)} to request the next
|
||||||
|
* batch of results.
|
||||||
|
*
|
||||||
|
* 3. The {@link #hasNext()} predicate returns true for the initial call,
|
||||||
|
* where {@link #next()} will return the initial results. It declares
|
||||||
|
* that it has future results iff the last executed request was truncated.
|
||||||
|
*
|
||||||
|
* Thread safety: none.
|
||||||
|
*/
|
||||||
|
class ObjectListingIterator implements RemoteIterator<ObjectListing> {
|
||||||
|
|
||||||
|
/** The path listed. */
|
||||||
|
private final Path listPath;
|
||||||
|
|
||||||
|
/** The most recent listing results. */
|
||||||
|
private ObjectListing objects;
|
||||||
|
|
||||||
|
/** Indicator that this is the first listing. */
|
||||||
|
private boolean firstListing = true;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Count of how many listings have been requested
|
||||||
|
* (including initial result).
|
||||||
|
*/
|
||||||
|
private int listingCount = 1;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum keys in a request.
|
||||||
|
*/
|
||||||
|
private int maxKeys;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor -calls `listObjects()` on the request to populate the
|
||||||
|
* initial set of results/fail if there was a problem talking to the bucket.
|
||||||
|
* @param listPath path of the listing
|
||||||
|
* @param request initial request to make
|
||||||
|
* */
|
||||||
|
ObjectListingIterator(
|
||||||
|
Path listPath,
|
||||||
|
ListObjectsRequest request) {
|
||||||
|
this.listPath = listPath;
|
||||||
|
this.maxKeys = owner.getMaxKeys();
|
||||||
|
this.objects = owner.listObjects(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Declare that the iterator has data if it is either is the initial
|
||||||
|
* iteration or it is a later one and the last listing obtained was
|
||||||
|
* incomplete.
|
||||||
|
* @throws IOException never: there is no IO in this operation.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
return firstListing || objects.isTruncated();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ask for the next listing.
|
||||||
|
* For the first invocation, this returns the initial set, with no
|
||||||
|
* remote IO. For later requests, S3 will be queried, hence the calls
|
||||||
|
* may block or fail.
|
||||||
|
* @return the next object listing.
|
||||||
|
* @throws IOException if a query made of S3 fails.
|
||||||
|
* @throws NoSuchElementException if there is no more data to list.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public ObjectListing next() throws IOException {
|
||||||
|
if (firstListing) {
|
||||||
|
// on the first listing, don't request more data.
|
||||||
|
// Instead just clear the firstListing flag so that it future calls
|
||||||
|
// will request new data.
|
||||||
|
firstListing = false;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
if (!objects.isTruncated()) {
|
||||||
|
// nothing more to request: fail.
|
||||||
|
throw new NoSuchElementException("No more results in listing of "
|
||||||
|
+ listPath);
|
||||||
|
}
|
||||||
|
// need to request a new set of objects.
|
||||||
|
LOG.debug("[{}], Requesting next {} objects under {}",
|
||||||
|
listingCount, maxKeys, listPath);
|
||||||
|
objects = owner.continueListObjects(objects);
|
||||||
|
listingCount++;
|
||||||
|
LOG.debug("New listing status: {}", this);
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
throw translateException("listObjects()", listPath, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return objects;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Object listing iterator against " + listPath
|
||||||
|
+ "; listing count "+ listingCount
|
||||||
|
+ "; isTruncated=" + objects.isTruncated();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the path listed.
|
||||||
|
* @return the path used in this listing.
|
||||||
|
*/
|
||||||
|
public Path getListPath() {
|
||||||
|
return listPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the count of listing requests.
|
||||||
|
* @return the counter of requests made (including the initial lookup).
|
||||||
|
*/
|
||||||
|
public int getListingCount() {
|
||||||
|
return listingCount;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all entries except the base path and those which map to S3N
|
||||||
|
* pseudo directory markers.
|
||||||
|
*/
|
||||||
|
static class AcceptFilesOnly implements FileStatusAcceptor {
|
||||||
|
private final Path qualifiedPath;
|
||||||
|
|
||||||
|
public AcceptFilesOnly(Path qualifiedPath) {
|
||||||
|
this.qualifiedPath = qualifiedPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reject a summary entry if the key path is the qualified Path, or
|
||||||
|
* it ends with {@code "_$folder$"}.
|
||||||
|
* @param keyPath key path of the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, S3ObjectSummary summary) {
|
||||||
|
return !keyPath.equals(qualifiedPath)
|
||||||
|
&& !summary.getKey().endsWith(S3N_FOLDER_SUFFIX)
|
||||||
|
&& !objectRepresentsDirectory(summary.getKey(), summary.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept no directory paths.
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param prefix common prefix in listing.
|
||||||
|
* @return false, always.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, String prefix) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take a remote iterator over a set of {@link FileStatus} instances and
|
||||||
|
* return a remote iterator of {@link LocatedFileStatus} instances.
|
||||||
|
*/
|
||||||
|
class LocatedFileStatusIterator
|
||||||
|
implements RemoteIterator<LocatedFileStatus> {
|
||||||
|
private final RemoteIterator<FileStatus> statusIterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param statusIterator an iterator over the remote status entries
|
||||||
|
*/
|
||||||
|
LocatedFileStatusIterator(RemoteIterator<FileStatus> statusIterator) {
|
||||||
|
this.statusIterator = statusIterator;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasNext() throws IOException {
|
||||||
|
return statusIterator.hasNext();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LocatedFileStatus next() throws IOException {
|
||||||
|
return owner.toLocatedFileStatus(statusIterator.next());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all entries except the base path and those which map to S3N
|
||||||
|
* pseudo directory markers.
|
||||||
|
*/
|
||||||
|
static class AcceptAllButSelfAndS3nDirs implements FileStatusAcceptor {
|
||||||
|
|
||||||
|
/** Base path. */
|
||||||
|
private final Path qualifiedPath;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor.
|
||||||
|
* @param qualifiedPath an already-qualified path.
|
||||||
|
*/
|
||||||
|
public AcceptAllButSelfAndS3nDirs(Path qualifiedPath) {
|
||||||
|
this.qualifiedPath = qualifiedPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reject a summary entry if the key path is the qualified Path, or
|
||||||
|
* it ends with {@code "_$folder$"}.
|
||||||
|
* @param keyPath key path of the entry
|
||||||
|
* @param summary summary entry
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, S3ObjectSummary summary) {
|
||||||
|
return !keyPath.equals(qualifiedPath) &&
|
||||||
|
!summary.getKey().endsWith(S3N_FOLDER_SUFFIX);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accept all prefixes except the one for the base path, "self".
|
||||||
|
* @param keyPath qualified path to the entry
|
||||||
|
* @param prefix common prefix in listing.
|
||||||
|
* @return true if the entry is accepted (i.e. that a status entry
|
||||||
|
* should be generated.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path keyPath, String prefix) {
|
||||||
|
return !keyPath.equals(qualifiedPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A Path filter which accepts all filenames.
|
||||||
|
*/
|
||||||
|
static final PathFilter ACCEPT_ALL = new PathFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(Path file) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ACCEPT_ALL";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
|
@ -84,6 +84,7 @@
|
||||||
import org.apache.hadoop.util.VersionInfo;
|
import org.apache.hadoop.util.VersionInfo;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
||||||
|
|
||||||
|
@ -115,6 +116,7 @@ public class S3AFileSystem extends FileSystem {
|
||||||
private AmazonS3Client s3;
|
private AmazonS3Client s3;
|
||||||
private String bucket;
|
private String bucket;
|
||||||
private int maxKeys;
|
private int maxKeys;
|
||||||
|
private Listing listing;
|
||||||
private long partSize;
|
private long partSize;
|
||||||
private boolean enableMultiObjectsDelete;
|
private boolean enableMultiObjectsDelete;
|
||||||
private TransferManager transfers;
|
private TransferManager transfers;
|
||||||
|
@ -181,6 +183,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
|
||||||
initAmazonS3Client(conf, credentials, awsConf);
|
initAmazonS3Client(conf, credentials, awsConf);
|
||||||
|
|
||||||
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
|
||||||
|
listing = new Listing(this);
|
||||||
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
|
||||||
if (partSize < 5 * 1024 * 1024) {
|
if (partSize < 5 * 1024 * 1024) {
|
||||||
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
|
||||||
|
@ -514,7 +517,11 @@ public S3AFileSystem() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Turns a path (relative or otherwise) into an S3 key
|
/**
|
||||||
|
* Turns a path (relative or otherwise) into an S3 key.
|
||||||
|
*
|
||||||
|
* @param path input path, may be relative to the working dir
|
||||||
|
* @return a key excluding the leading "/", or, if it is the root path, ""
|
||||||
*/
|
*/
|
||||||
private String pathToKey(Path path) {
|
private String pathToKey(Path path) {
|
||||||
if (!path.isAbsolute()) {
|
if (!path.isAbsolute()) {
|
||||||
|
@ -528,10 +535,49 @@ private String pathToKey(Path path) {
|
||||||
return path.toUri().getPath().substring(1);
|
return path.toUri().getPath().substring(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Turns a path (relative or otherwise) into an S3 key, adding a trailing
|
||||||
|
* "/" if the path is not the root <i>and</i> does not already have a "/"
|
||||||
|
* at the end.
|
||||||
|
*
|
||||||
|
* @param key s3 key or ""
|
||||||
|
* @return the with a trailing "/", or, if it is the root key, "",
|
||||||
|
*/
|
||||||
|
private String maybeAddTrailingSlash(String key) {
|
||||||
|
if (!key.isEmpty() && !key.endsWith("/")) {
|
||||||
|
return key + '/';
|
||||||
|
} else {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a path back to a key.
|
||||||
|
* @param key input key
|
||||||
|
* @return the path from this key
|
||||||
|
*/
|
||||||
private Path keyToPath(String key) {
|
private Path keyToPath(String key) {
|
||||||
return new Path("/" + key);
|
return new Path("/" + key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a key to a fully qualified path.
|
||||||
|
* @param key input key
|
||||||
|
* @return the fully qualified path including URI scheme and bucket name.
|
||||||
|
*/
|
||||||
|
Path keyToQualifiedPath(String key) {
|
||||||
|
return qualify(keyToPath(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Qualify a path.
|
||||||
|
* @param path path to qualify
|
||||||
|
* @return a qualified path.
|
||||||
|
*/
|
||||||
|
Path qualify(Path path) {
|
||||||
|
return path.makeQualified(uri, workingDir);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check that a Path belongs to this FileSystem.
|
* Check that a Path belongs to this FileSystem.
|
||||||
* Unlike the superclass, this version does not look at authority,
|
* Unlike the superclass, this version does not look at authority,
|
||||||
|
@ -550,10 +596,10 @@ protected URI canonicalizeUri(URI rawUri) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Opens an FSDataInputStream at the indicated Path.
|
* Opens an FSDataInputStream at the indicated Path.
|
||||||
* @param f the file name to open
|
* @param f the file name to open
|
||||||
* @param bufferSize the size of the buffer to be used.
|
* @param bufferSize the size of the buffer to be used.
|
||||||
*/
|
*/
|
||||||
public FSDataInputStream open(Path f, int bufferSize)
|
public FSDataInputStream open(Path f, int bufferSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
||||||
|
@ -875,7 +921,7 @@ protected ObjectListing listObjects(ListObjectsRequest request) {
|
||||||
* @return the next result object
|
* @return the next result object
|
||||||
*/
|
*/
|
||||||
protected ObjectListing continueListObjects(ObjectListing objects) {
|
protected ObjectListing continueListObjects(ObjectListing objects) {
|
||||||
incrementStatistic(OBJECT_LIST_REQUESTS);
|
incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
|
||||||
incrementReadOperations();
|
incrementReadOperations();
|
||||||
return s3.listNextBatchOfObjects(objects);
|
return s3.listNextBatchOfObjects(objects);
|
||||||
}
|
}
|
||||||
|
@ -1132,12 +1178,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Getting objects for directory prefix {} to delete", key);
|
LOG.debug("Getting objects for directory prefix {} to delete", key);
|
||||||
|
|
||||||
ListObjectsRequest request = new ListObjectsRequest();
|
ListObjectsRequest request = createListObjectsRequest(key, null);
|
||||||
request.setBucketName(bucket);
|
|
||||||
request.setPrefix(key);
|
|
||||||
// Hopefully not setting a delimiter will cause this to find everything
|
|
||||||
//request.setDelimiter("/");
|
|
||||||
request.setMaxKeys(maxKeys);
|
|
||||||
|
|
||||||
ObjectListing objects = listObjects(request);
|
ObjectListing objects = listObjects(request);
|
||||||
List<DeleteObjectsRequest.KeyVersion> keys =
|
List<DeleteObjectsRequest.KeyVersion> keys =
|
||||||
|
@ -1211,66 +1252,57 @@ public FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
||||||
*/
|
*/
|
||||||
public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
|
public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
|
||||||
IOException, AmazonClientException {
|
IOException, AmazonClientException {
|
||||||
String key = pathToKey(f);
|
Path path = qualify(f);
|
||||||
LOG.debug("List status for path: {}", f);
|
String key = pathToKey(path);
|
||||||
|
LOG.debug("List status for path: {}", path);
|
||||||
incrementStatistic(INVOCATION_LIST_STATUS);
|
incrementStatistic(INVOCATION_LIST_STATUS);
|
||||||
|
|
||||||
final List<FileStatus> result = new ArrayList<FileStatus>();
|
List<FileStatus> result;
|
||||||
final FileStatus fileStatus = getFileStatus(f);
|
final FileStatus fileStatus = getFileStatus(path);
|
||||||
|
|
||||||
if (fileStatus.isDirectory()) {
|
if (fileStatus.isDirectory()) {
|
||||||
if (!key.isEmpty()) {
|
if (!key.isEmpty()) {
|
||||||
key = key + "/";
|
key = key + '/';
|
||||||
}
|
}
|
||||||
|
|
||||||
ListObjectsRequest request = new ListObjectsRequest();
|
ListObjectsRequest request = createListObjectsRequest(key, "/");
|
||||||
request.setBucketName(bucket);
|
|
||||||
request.setPrefix(key);
|
|
||||||
request.setDelimiter("/");
|
|
||||||
request.setMaxKeys(maxKeys);
|
|
||||||
|
|
||||||
LOG.debug("listStatus: doing listObjects for directory {}", key);
|
LOG.debug("listStatus: doing listObjects for directory {}", key);
|
||||||
|
|
||||||
ObjectListing objects = listObjects(request);
|
Listing.FileStatusListingIterator files =
|
||||||
|
listing.createFileStatusListingIterator(path,
|
||||||
Path fQualified = f.makeQualified(uri, workingDir);
|
request,
|
||||||
|
ACCEPT_ALL,
|
||||||
while (true) {
|
new Listing.AcceptAllButSelfAndS3nDirs(path));
|
||||||
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
|
result = new ArrayList<>(files.getBatchSize());
|
||||||
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
|
while (files.hasNext()) {
|
||||||
// Skip over keys that are ourselves and old S3N _$folder$ files
|
result.add(files.next());
|
||||||
if (keyPath.equals(fQualified) ||
|
|
||||||
summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
|
|
||||||
LOG.debug("Ignoring: {}", keyPath);
|
|
||||||
} else {
|
|
||||||
S3AFileStatus status = createFileStatus(keyPath, summary,
|
|
||||||
getDefaultBlockSize(keyPath));
|
|
||||||
result.add(status);
|
|
||||||
LOG.debug("Adding: {}", status);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (String prefix : objects.getCommonPrefixes()) {
|
|
||||||
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
|
|
||||||
if (!keyPath.equals(f)) {
|
|
||||||
result.add(new S3AFileStatus(true, false, keyPath));
|
|
||||||
LOG.debug("Adding: rd: {}", keyPath);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (objects.isTruncated()) {
|
|
||||||
LOG.debug("listStatus: list truncated - getting next batch");
|
|
||||||
objects = continueListObjects(objects);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return result.toArray(new FileStatus[result.size()]);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Adding: rd (not a dir): {}", f);
|
LOG.debug("Adding: rd (not a dir): {}", path);
|
||||||
result.add(fileStatus);
|
FileStatus[] stats = new FileStatus[1];
|
||||||
|
stats[0]= fileStatus;
|
||||||
|
return stats;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return result.toArray(new FileStatus[result.size()]);
|
/**
|
||||||
|
* Create a {@code ListObjectsRequest} request against this bucket,
|
||||||
|
* with the maximum keys returned in a query set by {@link #maxKeys}.
|
||||||
|
* @param key key for request
|
||||||
|
* @param delimiter any delimiter
|
||||||
|
* @return the request
|
||||||
|
*/
|
||||||
|
private ListObjectsRequest createListObjectsRequest(String key,
|
||||||
|
String delimiter) {
|
||||||
|
ListObjectsRequest request = new ListObjectsRequest();
|
||||||
|
request.setBucketName(bucket);
|
||||||
|
request.setMaxKeys(maxKeys);
|
||||||
|
request.setPrefix(key);
|
||||||
|
if (delimiter != null) {
|
||||||
|
request.setDelimiter(delimiter);
|
||||||
|
}
|
||||||
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1371,11 +1403,11 @@ private boolean innerMkdirs(Path f, FsPermission permission)
|
||||||
* @throws java.io.FileNotFoundException when the path does not exist;
|
* @throws java.io.FileNotFoundException when the path does not exist;
|
||||||
* @throws IOException on other problems.
|
* @throws IOException on other problems.
|
||||||
*/
|
*/
|
||||||
public S3AFileStatus getFileStatus(Path f) throws IOException {
|
public S3AFileStatus getFileStatus(final Path f) throws IOException {
|
||||||
String key = pathToKey(f);
|
|
||||||
incrementStatistic(INVOCATION_GET_FILE_STATUS);
|
incrementStatistic(INVOCATION_GET_FILE_STATUS);
|
||||||
LOG.debug("Getting path status for {} ({})", f , key);
|
final Path path = qualify(f);
|
||||||
|
String key = pathToKey(path);
|
||||||
|
LOG.debug("Getting path status for {} ({})", path , key);
|
||||||
if (!key.isEmpty()) {
|
if (!key.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
ObjectMetadata meta = getObjectMetadata(key);
|
ObjectMetadata meta = getObjectMetadata(key);
|
||||||
|
@ -1383,20 +1415,20 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||||
if (objectRepresentsDirectory(key, meta.getContentLength())) {
|
if (objectRepresentsDirectory(key, meta.getContentLength())) {
|
||||||
LOG.debug("Found exact file: fake directory");
|
LOG.debug("Found exact file: fake directory");
|
||||||
return new S3AFileStatus(true, true,
|
return new S3AFileStatus(true, true,
|
||||||
f.makeQualified(uri, workingDir));
|
path);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Found exact file: normal file");
|
LOG.debug("Found exact file: normal file");
|
||||||
return new S3AFileStatus(meta.getContentLength(),
|
return new S3AFileStatus(meta.getContentLength(),
|
||||||
dateToLong(meta.getLastModified()),
|
dateToLong(meta.getLastModified()),
|
||||||
f.makeQualified(uri, workingDir),
|
path,
|
||||||
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
|
getDefaultBlockSize(path));
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
throw translateException("getFileStatus", f, e);
|
throw translateException("getFileStatus", path, e);
|
||||||
}
|
}
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
throw translateException("getFileStatus", f, e);
|
throw translateException("getFileStatus", path, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Necessary?
|
// Necessary?
|
||||||
|
@ -1407,14 +1439,14 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||||
|
|
||||||
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
|
if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
|
||||||
LOG.debug("Found file (with /): fake directory");
|
LOG.debug("Found file (with /): fake directory");
|
||||||
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
|
return new S3AFileStatus(true, true, path);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
LOG.warn("Found file (with /): real file? should not happen: {}", key);
|
||||||
|
|
||||||
return new S3AFileStatus(meta.getContentLength(),
|
return new S3AFileStatus(meta.getContentLength(),
|
||||||
dateToLong(meta.getLastModified()),
|
dateToLong(meta.getLastModified()),
|
||||||
f.makeQualified(uri, workingDir),
|
path,
|
||||||
getDefaultBlockSize(f.makeQualified(uri, workingDir)));
|
getDefaultBlockSize(path));
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
|
@ -1427,9 +1459,7 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (!key.isEmpty() && !key.endsWith("/")) {
|
key = maybeAddTrailingSlash(key);
|
||||||
key = key + "/";
|
|
||||||
}
|
|
||||||
ListObjectsRequest request = new ListObjectsRequest();
|
ListObjectsRequest request = new ListObjectsRequest();
|
||||||
request.setBucketName(bucket);
|
request.setBucketName(bucket);
|
||||||
request.setPrefix(key);
|
request.setPrefix(key);
|
||||||
|
@ -1453,11 +1483,10 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new S3AFileStatus(true, false,
|
return new S3AFileStatus(true, false, path);
|
||||||
f.makeQualified(uri, workingDir));
|
|
||||||
} else if (key.isEmpty()) {
|
} else if (key.isEmpty()) {
|
||||||
LOG.debug("Found root directory");
|
LOG.debug("Found root directory");
|
||||||
return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
|
return new S3AFileStatus(true, true, path);
|
||||||
}
|
}
|
||||||
} catch (AmazonServiceException e) {
|
} catch (AmazonServiceException e) {
|
||||||
if (e.getStatusCode() != 404) {
|
if (e.getStatusCode() != 404) {
|
||||||
|
@ -1467,8 +1496,8 @@ public S3AFileStatus getFileStatus(Path f) throws IOException {
|
||||||
throw translateException("getFileStatus", key, e);
|
throw translateException("getFileStatus", key, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("Not Found: {}", f);
|
LOG.debug("Not Found: {}", path);
|
||||||
throw new FileNotFoundException("No such file or directory: " + f);
|
throw new FileNotFoundException("No such file or directory: " + path);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1829,7 +1858,15 @@ public long getMultiPartThreshold() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override superclass so as to add statistic collection.
|
* Get the maximum key count.
|
||||||
|
* @return a value, valid after initialization
|
||||||
|
*/
|
||||||
|
int getMaxKeys() {
|
||||||
|
return maxKeys;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -1849,24 +1886,6 @@ public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
|
||||||
return super.globStatus(pathPattern, filter);
|
return super.globStatus(pathPattern, filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Override superclass so as to add statistic collection.
|
|
||||||
* {@inheritDoc}
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
|
|
||||||
throws FileNotFoundException, IOException {
|
|
||||||
incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
|
|
||||||
return super.listLocatedStatus(f);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
|
|
||||||
boolean recursive) throws FileNotFoundException, IOException {
|
|
||||||
incrementStatistic(INVOCATION_LIST_FILES);
|
|
||||||
return super.listFiles(f, recursive);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Override superclass so as to add statistic collection.
|
* Override superclass so as to add statistic collection.
|
||||||
* {@inheritDoc}
|
* {@inheritDoc}
|
||||||
|
@ -1897,6 +1916,129 @@ public boolean isFile(Path f) throws IOException {
|
||||||
return super.isFile(f);
|
return super.isFile(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}.
|
||||||
|
*
|
||||||
|
* This implementation is optimized for S3, which can do a bulk listing
|
||||||
|
* off all entries under a path in one single operation. Thus there is
|
||||||
|
* no need to recursively walk the directory tree.
|
||||||
|
*
|
||||||
|
* Instead a {@link ListObjectsRequest} is created requesting a (windowed)
|
||||||
|
* listing of all entries under the given path. This is used to construct
|
||||||
|
* an {@code ObjectListingIterator} instance, iteratively returning the
|
||||||
|
* sequence of lists of elements under the path. This is then iterated
|
||||||
|
* over in a {@code FileStatusListingIterator}, which generates
|
||||||
|
* {@link S3AFileStatus} instances, one per listing entry.
|
||||||
|
* These are then translated into {@link LocatedFileStatus} instances.
|
||||||
|
*
|
||||||
|
* This is essentially a nested and wrapped set of iterators, with some
|
||||||
|
* generator classes; an architecture which may become less convoluted
|
||||||
|
* using lambda-expressions.
|
||||||
|
* @param f a path
|
||||||
|
* @param recursive if the subdirectories need to be traversed recursively
|
||||||
|
*
|
||||||
|
* @return an iterator that traverses statuses of the files/directories
|
||||||
|
* in the given path
|
||||||
|
* @throws FileNotFoundException if {@code path} does not exist
|
||||||
|
* @throws IOException if any I/O error occurred
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
|
||||||
|
boolean recursive) throws FileNotFoundException, IOException {
|
||||||
|
incrementStatistic(INVOCATION_LIST_FILES);
|
||||||
|
Path path = qualify(f);
|
||||||
|
LOG.debug("listFiles({}, {})", path, recursive);
|
||||||
|
try {
|
||||||
|
// lookup dir triggers existence check
|
||||||
|
final FileStatus fileStatus = getFileStatus(path);
|
||||||
|
if (fileStatus.isFile()) {
|
||||||
|
// simple case: File
|
||||||
|
LOG.debug("Path is a file");
|
||||||
|
return new Listing.SingleStatusRemoteIterator(
|
||||||
|
toLocatedFileStatus(fileStatus));
|
||||||
|
} else {
|
||||||
|
// directory: do a bulk operation
|
||||||
|
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||||
|
String delimiter = recursive ? null : "/";
|
||||||
|
LOG.debug("Requesting all entries under {} with delimiter '{}'",
|
||||||
|
key, delimiter);
|
||||||
|
return listing.createLocatedFileStatusIterator(
|
||||||
|
listing.createFileStatusListingIterator(path,
|
||||||
|
createListObjectsRequest(key, delimiter),
|
||||||
|
ACCEPT_ALL,
|
||||||
|
new Listing.AcceptFilesOnly(path)));
|
||||||
|
}
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
throw translateException("listFiles", path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override superclass so as to add statistic collection.
|
||||||
|
* {@inheritDoc}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
|
||||||
|
throws FileNotFoundException, IOException {
|
||||||
|
return listLocatedStatus(f, ACCEPT_ALL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc}.
|
||||||
|
*
|
||||||
|
* S3 Optimized directory listing. The initial operation performs the
|
||||||
|
* first bulk listing; extra listings will take place
|
||||||
|
* when all the current set of results are used up.
|
||||||
|
* @param f a path
|
||||||
|
* @param filter a path filter
|
||||||
|
* @return an iterator that traverses statuses of the files/directories
|
||||||
|
* in the given path
|
||||||
|
* @throws FileNotFoundException if {@code path} does not exist
|
||||||
|
* @throws IOException if any I/O error occurred
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
|
||||||
|
final PathFilter filter)
|
||||||
|
throws FileNotFoundException, IOException {
|
||||||
|
incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
|
||||||
|
Path path = qualify(f);
|
||||||
|
LOG.debug("listLocatedStatus({}, {}", path, filter);
|
||||||
|
try {
|
||||||
|
// lookup dir triggers existence check
|
||||||
|
final FileStatus fileStatus = getFileStatus(path);
|
||||||
|
if (fileStatus.isFile()) {
|
||||||
|
// simple case: File
|
||||||
|
LOG.debug("Path is a file");
|
||||||
|
return new Listing.SingleStatusRemoteIterator(
|
||||||
|
filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
|
||||||
|
} else {
|
||||||
|
// directory: trigger a lookup
|
||||||
|
String key = maybeAddTrailingSlash(pathToKey(path));
|
||||||
|
return listing.createLocatedFileStatusIterator(
|
||||||
|
listing.createFileStatusListingIterator(path,
|
||||||
|
createListObjectsRequest(key, "/"),
|
||||||
|
filter,
|
||||||
|
new Listing.AcceptAllButSelfAndS3nDirs(path)));
|
||||||
|
}
|
||||||
|
} catch (AmazonClientException e) {
|
||||||
|
throw translateException("listLocatedStatus", path, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a {@link LocatedFileStatus} from a {@link FileStatus} instance.
|
||||||
|
* @param status file status
|
||||||
|
* @return a located status with block locations set up from this FS.
|
||||||
|
* @throws IOException IO Problems.
|
||||||
|
*/
|
||||||
|
LocatedFileStatus toLocatedFileStatus(FileStatus status)
|
||||||
|
throws IOException {
|
||||||
|
return new LocatedFileStatus(status,
|
||||||
|
status.isFile() ?
|
||||||
|
getFileBlockLocations(status, 0, status.getLen())
|
||||||
|
: null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a integer option >= the minimum allowed value.
|
* Get a integer option >= the minimum allowed value.
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
|
@ -1934,38 +2076,4 @@ static long longOption(Configuration conf,
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This is a simple encapsulation of the
|
|
||||||
* S3 access key and secret.
|
|
||||||
*/
|
|
||||||
static class AWSAccessKeys {
|
|
||||||
private String accessKey = null;
|
|
||||||
private String accessSecret = null;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor.
|
|
||||||
* @param key - AWS access key
|
|
||||||
* @param secret - AWS secret key
|
|
||||||
*/
|
|
||||||
public AWSAccessKeys(String key, String secret) {
|
|
||||||
accessKey = key;
|
|
||||||
accessSecret = secret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the AWS access key.
|
|
||||||
* @return key
|
|
||||||
*/
|
|
||||||
public String getAccessKey() {
|
|
||||||
return accessKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the AWS secret key.
|
|
||||||
* @return secret
|
|
||||||
*/
|
|
||||||
public String getAccessSecret() {
|
|
||||||
return accessSecret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,6 +95,7 @@ public class S3AInstrumentation {
|
||||||
OBJECT_COPY_REQUESTS,
|
OBJECT_COPY_REQUESTS,
|
||||||
OBJECT_DELETE_REQUESTS,
|
OBJECT_DELETE_REQUESTS,
|
||||||
OBJECT_LIST_REQUESTS,
|
OBJECT_LIST_REQUESTS,
|
||||||
|
OBJECT_CONTINUE_LIST_REQUESTS,
|
||||||
OBJECT_METADATA_REQUESTS,
|
OBJECT_METADATA_REQUESTS,
|
||||||
OBJECT_MULTIPART_UPLOAD_ABORTED,
|
OBJECT_MULTIPART_UPLOAD_ABORTED,
|
||||||
OBJECT_PUT_BYTES,
|
OBJECT_PUT_BYTES,
|
||||||
|
|
|
@ -280,4 +280,16 @@ private static String getPassword(Configuration conf, String key, String val)
|
||||||
return val;
|
return val;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* String information about a summary entry for debug messages.
|
||||||
|
* @param summary summary object
|
||||||
|
* @return string value
|
||||||
|
*/
|
||||||
|
public static String stringify(S3ObjectSummary summary) {
|
||||||
|
StringBuilder builder = new StringBuilder(summary.getKey().length() + 100);
|
||||||
|
builder.append(summary.getKey()).append(' ');
|
||||||
|
builder.append("size=").append(summary.getSize());
|
||||||
|
return builder.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,6 +69,8 @@ public enum Statistic {
|
||||||
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
|
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
|
||||||
OBJECT_LIST_REQUESTS("object_list_requests",
|
OBJECT_LIST_REQUESTS("object_list_requests",
|
||||||
"Number of object listings made"),
|
"Number of object listings made"),
|
||||||
|
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",
|
||||||
|
"Number of continued object listings made"),
|
||||||
OBJECT_METADATA_REQUESTS("object_metadata_requests",
|
OBJECT_METADATA_REQUESTS("object_metadata_requests",
|
||||||
"Number of requests for object metadata"),
|
"Number of requests for object metadata"),
|
||||||
OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted",
|
OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted",
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||||
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
|
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3ATestUtils;
|
||||||
|
|
||||||
public class TestS3AContractGetFileStatus extends AbstractContractGetFileStatusTest {
|
public class TestS3AContractGetFileStatus extends AbstractContractGetFileStatusTest {
|
||||||
|
|
||||||
|
@ -28,4 +30,18 @@ protected AbstractFSContract createContract(Configuration conf) {
|
||||||
return new S3AContract(conf);
|
return new S3AContract(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void teardown() throws Exception {
|
||||||
|
getLog().info("FS details {}", getFileSystem());
|
||||||
|
super.teardown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Configuration createConfiguration() {
|
||||||
|
Configuration conf = super.createConfiguration();
|
||||||
|
S3ATestUtils.disableFilesystemCaching(conf);
|
||||||
|
// aggressively low page size forces tests to go multipage
|
||||||
|
conf.setInt(Constants.MAX_PAGING_KEYS, 2);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,7 +124,7 @@ public static void eventually(int timeout, Callable<Void> callback)
|
||||||
try {
|
try {
|
||||||
callback.call();
|
callback.call();
|
||||||
return;
|
return;
|
||||||
} catch (FailFastException e) {
|
} catch (InterruptedException | FailFastException e) {
|
||||||
throw e;
|
throw e;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
lastException = e;
|
lastException = e;
|
||||||
|
@ -321,7 +321,7 @@ public void assertDiffEquals(MetricDiff that) {
|
||||||
* @return true if the value is {@code ==} the other's
|
* @return true if the value is {@code ==} the other's
|
||||||
*/
|
*/
|
||||||
public boolean diffEquals(MetricDiff that) {
|
public boolean diffEquals(MetricDiff that) {
|
||||||
return this.currentValue() == that.currentValue();
|
return this.diff() == that.diff();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -330,7 +330,7 @@ public boolean diffEquals(MetricDiff that) {
|
||||||
* @return true if the value is {@code <} the other's
|
* @return true if the value is {@code <} the other's
|
||||||
*/
|
*/
|
||||||
public boolean diffLessThan(MetricDiff that) {
|
public boolean diffLessThan(MetricDiff that) {
|
||||||
return this.currentValue() < that.currentValue();
|
return this.diff() < that.diff();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -339,7 +339,7 @@ public boolean diffLessThan(MetricDiff that) {
|
||||||
* @return true if the value is {@code <=} the other's
|
* @return true if the value is {@code <=} the other's
|
||||||
*/
|
*/
|
||||||
public boolean diffLessThanOrEquals(MetricDiff that) {
|
public boolean diffLessThanOrEquals(MetricDiff that) {
|
||||||
return this.currentValue() <= that.currentValue();
|
return this.diff() <= that.diff();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a.scale;
|
package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.Statistic;
|
import org.apache.hadoop.fs.s3a.Statistic;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -51,6 +50,8 @@ public void testListOperations() throws Throwable {
|
||||||
int files = scale;
|
int files = scale;
|
||||||
MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
|
MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
|
||||||
MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
|
MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
|
||||||
|
MetricDiff listContinueRequests =
|
||||||
|
new MetricDiff(fs, OBJECT_CONTINUE_LIST_REQUESTS);
|
||||||
MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
|
MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
|
||||||
MetricDiff getFileStatusCalls =
|
MetricDiff getFileStatusCalls =
|
||||||
new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
|
new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
|
||||||
|
@ -69,34 +70,29 @@ public void testListOperations() throws Throwable {
|
||||||
printThenReset(LOG,
|
printThenReset(LOG,
|
||||||
metadataRequests,
|
metadataRequests,
|
||||||
listRequests,
|
listRequests,
|
||||||
|
listContinueRequests,
|
||||||
listStatusCalls,
|
listStatusCalls,
|
||||||
getFileStatusCalls);
|
getFileStatusCalls);
|
||||||
|
|
||||||
|
describe("Listing files via treewalk");
|
||||||
try {
|
try {
|
||||||
// Scan the directory via an explicit tree walk.
|
// Scan the directory via an explicit tree walk.
|
||||||
// This is the baseline for any listing speedups.
|
// This is the baseline for any listing speedups.
|
||||||
MetricDiff treewalkMetadataRequests =
|
|
||||||
new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
|
|
||||||
MetricDiff treewalkListRequests = new MetricDiff(fs,
|
|
||||||
OBJECT_LIST_REQUESTS);
|
|
||||||
MetricDiff treewalkListStatusCalls = new MetricDiff(fs,
|
|
||||||
INVOCATION_LIST_FILES);
|
|
||||||
MetricDiff treewalkGetFileStatusCalls =
|
|
||||||
new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
|
|
||||||
NanoTimer treeWalkTimer = new NanoTimer();
|
NanoTimer treeWalkTimer = new NanoTimer();
|
||||||
TreeScanResults treewalkResults = treeWalk(fs, listDir);
|
TreeScanResults treewalkResults = treeWalk(fs, listDir);
|
||||||
treeWalkTimer.end("List status via treewalk");
|
treeWalkTimer.end("List status via treewalk of %s", created);
|
||||||
|
|
||||||
print(LOG,
|
printThenReset(LOG,
|
||||||
treewalkMetadataRequests,
|
metadataRequests,
|
||||||
treewalkListRequests,
|
listRequests,
|
||||||
treewalkListStatusCalls,
|
listContinueRequests,
|
||||||
treewalkGetFileStatusCalls);
|
listStatusCalls,
|
||||||
|
getFileStatusCalls);
|
||||||
assertEquals("Files found in listFiles(recursive=true) " +
|
assertEquals("Files found in listFiles(recursive=true) " +
|
||||||
" created=" + created + " listed=" + treewalkResults,
|
" created=" + created + " listed=" + treewalkResults,
|
||||||
created.getFileCount(), treewalkResults.getFileCount());
|
created.getFileCount(), treewalkResults.getFileCount());
|
||||||
|
|
||||||
|
describe("Listing files via listFiles(recursive=true)");
|
||||||
// listFiles() does the recursion internally
|
// listFiles() does the recursion internally
|
||||||
NanoTimer listFilesRecursiveTimer = new NanoTimer();
|
NanoTimer listFilesRecursiveTimer = new NanoTimer();
|
||||||
|
|
||||||
|
@ -108,31 +104,33 @@ public void testListOperations() throws Throwable {
|
||||||
" created=" + created + " listed=" + listFilesResults,
|
" created=" + created + " listed=" + listFilesResults,
|
||||||
created.getFileCount(), listFilesResults.getFileCount());
|
created.getFileCount(), listFilesResults.getFileCount());
|
||||||
|
|
||||||
treewalkListRequests.assertDiffEquals(listRequests);
|
// only two list operations should have taken place
|
||||||
printThenReset(LOG,
|
print(LOG,
|
||||||
metadataRequests, listRequests,
|
|
||||||
listStatusCalls, getFileStatusCalls);
|
|
||||||
|
|
||||||
NanoTimer globStatusTimer = new NanoTimer();
|
|
||||||
FileStatus[] globStatusFiles = fs.globStatus(listDir);
|
|
||||||
globStatusTimer.end("Time to globStatus() %s", globStatusTimer);
|
|
||||||
LOG.info("Time for glob status {} entries: {}",
|
|
||||||
globStatusFiles.length,
|
|
||||||
toHuman(createTimer.duration()));
|
|
||||||
printThenReset(LOG,
|
|
||||||
metadataRequests,
|
metadataRequests,
|
||||||
listRequests,
|
listRequests,
|
||||||
|
listContinueRequests,
|
||||||
|
listStatusCalls,
|
||||||
|
getFileStatusCalls);
|
||||||
|
assertEquals(listRequests.toString(), 2, listRequests.diff());
|
||||||
|
reset(metadataRequests,
|
||||||
|
listRequests,
|
||||||
|
listContinueRequests,
|
||||||
listStatusCalls,
|
listStatusCalls,
|
||||||
getFileStatusCalls);
|
getFileStatusCalls);
|
||||||
|
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
|
describe("deletion");
|
||||||
// deletion at the end of the run
|
// deletion at the end of the run
|
||||||
NanoTimer deleteTimer = new NanoTimer();
|
NanoTimer deleteTimer = new NanoTimer();
|
||||||
fs.delete(listDir, true);
|
fs.delete(listDir, true);
|
||||||
deleteTimer.end("Deleting directory tree");
|
deleteTimer.end("Deleting directory tree");
|
||||||
printThenReset(LOG,
|
printThenReset(LOG,
|
||||||
metadataRequests, listRequests,
|
metadataRequests,
|
||||||
listStatusCalls, getFileStatusCalls);
|
listRequests,
|
||||||
|
listContinueRequests,
|
||||||
|
listStatusCalls,
|
||||||
|
getFileStatusCalls);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue