HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A.
Contributed by Aaron Fabbri
This commit is contained in:
parent
ab8368d2e0
commit
5bbca80428
|
@ -1428,6 +1428,15 @@
|
|||
<description>The implementation class of the S3A AbstractFileSystem.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.list.version</name>
|
||||
<value>2</value>
|
||||
<description>
|
||||
Select which version of the S3 SDK's List Objects API to use. Currently
|
||||
support 2 (default) and 1 (older API).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<!-- Azure file system properties -->
|
||||
<property>
|
||||
<name>fs.wasb.impl</name>
|
||||
|
|
|
@ -451,4 +451,13 @@ public final class Constants {
|
|||
public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
|
||||
"fs.s3a.failinject.inconsistency.probability";
|
||||
|
||||
/**
|
||||
* S3 API level parameters.
|
||||
*/
|
||||
@InterfaceStability.Unstable
|
||||
public static final String LIST_VERSION = "fs.s3a.list.version";
|
||||
|
||||
@InterfaceStability.Unstable
|
||||
public static final int DEFAULT_LIST_VERSION = 2;
|
||||
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
|
|||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsResult;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
|
@ -109,8 +111,10 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
|
|||
}
|
||||
}
|
||||
|
||||
/** Map of key to delay -> time it was deleted + object summary (object
|
||||
* summary is null for prefixes. */
|
||||
/**
|
||||
* Map of key to delay -> time it was deleted + object summary (object summary
|
||||
* is null for prefixes.
|
||||
*/
|
||||
private Map<String, Delete> delayedDeletes = new HashMap<>();
|
||||
|
||||
/** Map of key to delay -> time it was created. */
|
||||
|
@ -196,17 +200,29 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
|
|||
return super.putObject(putObjectRequest);
|
||||
}
|
||||
|
||||
/* We should only need to override this version of listObjects() */
|
||||
/* We should only need to override these versions of listObjects() */
|
||||
@Override
|
||||
public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
|
||||
throws AmazonClientException, AmazonServiceException {
|
||||
LOG.debug("prefix {}", listObjectsRequest.getPrefix());
|
||||
ObjectListing listing = super.listObjects(listObjectsRequest);
|
||||
listing = filterListObjects(listObjectsRequest, listing);
|
||||
listing = filterListObjects(listing);
|
||||
listing = restoreListObjects(listObjectsRequest, listing);
|
||||
return listing;
|
||||
}
|
||||
|
||||
/* We should only need to override these versions of listObjects() */
|
||||
@Override
|
||||
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
|
||||
throws AmazonClientException, AmazonServiceException {
|
||||
LOG.debug("prefix {}", request.getPrefix());
|
||||
ListObjectsV2Result listing = super.listObjectsV2(request);
|
||||
listing = filterListObjectsV2(listing);
|
||||
listing = restoreListObjectsV2(request, listing);
|
||||
return listing;
|
||||
}
|
||||
|
||||
|
||||
private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
|
||||
S3ObjectSummary item) {
|
||||
// Behavior of S3ObjectSummary
|
||||
|
@ -282,21 +298,58 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
|
|||
// recursive list has no delimiter, returns everything that matches a
|
||||
// prefix.
|
||||
boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
|
||||
String prefix = request.getPrefix();
|
||||
|
||||
restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
|
||||
return new CustomObjectListing(rawListing, outputList, outputPrefixes);
|
||||
}
|
||||
|
||||
/**
|
||||
* V2 list API variant of
|
||||
* {@link #restoreListObjects(ListObjectsRequest, ObjectListing)}.
|
||||
* @param request original v2 list request
|
||||
* @param result raw s3 result
|
||||
*/
|
||||
private ListObjectsV2Result restoreListObjectsV2(ListObjectsV2Request request,
|
||||
ListObjectsV2Result result) {
|
||||
List<S3ObjectSummary> outputList = result.getObjectSummaries();
|
||||
List<String> outputPrefixes = result.getCommonPrefixes();
|
||||
// recursive list has no delimiter, returns everything that matches a
|
||||
// prefix.
|
||||
boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
|
||||
String prefix = request.getPrefix();
|
||||
|
||||
restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
|
||||
return new CustomListObjectsV2Result(result, outputList, outputPrefixes);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Main logic for
|
||||
* {@link #restoreListObjects(ListObjectsRequest, ObjectListing)} and
|
||||
* the v2 variant above.
|
||||
* @param summaries object summary list to modify.
|
||||
* @param prefixes prefix list to modify
|
||||
* @param recursive true if recursive list request
|
||||
* @param prefix prefix for original list request
|
||||
*/
|
||||
private void restoreDeleted(List<S3ObjectSummary> summaries,
|
||||
List<String> prefixes, boolean recursive, String prefix) {
|
||||
|
||||
// Go through all deleted keys
|
||||
for (String key : new HashSet<>(delayedDeletes.keySet())) {
|
||||
Delete delete = delayedDeletes.get(key);
|
||||
if (isKeyDelayed(delete.time(), key)) {
|
||||
if (isDescendant(request.getPrefix(), key, recursiveObjectList)) {
|
||||
if (isDescendant(prefix, key, recursive)) {
|
||||
if (delete.summary() != null) {
|
||||
addSummaryIfNotPresent(outputList, delete.summary());
|
||||
addSummaryIfNotPresent(summaries, delete.summary());
|
||||
}
|
||||
}
|
||||
// Non-recursive list has delimiter: will return rolled-up prefixes for
|
||||
// all keys that are not direct children
|
||||
if (!recursiveObjectList) {
|
||||
if (isDescendant(request.getPrefix(), key, true)) {
|
||||
addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key);
|
||||
if (!recursive) {
|
||||
if (isDescendant(prefix, key, true)) {
|
||||
addPrefixIfNotPresent(prefixes, prefix, key);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -304,31 +357,52 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
|
|||
delayedDeletes.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectListing filterListObjects(ObjectListing rawListing) {
|
||||
|
||||
// Filter object listing
|
||||
List<S3ObjectSummary> outputList = filterSummaries(
|
||||
rawListing.getObjectSummaries());
|
||||
|
||||
// Filter prefixes (directories)
|
||||
List<String> outputPrefixes = filterPrefixes(
|
||||
rawListing.getCommonPrefixes());
|
||||
|
||||
return new CustomObjectListing(rawListing, outputList, outputPrefixes);
|
||||
}
|
||||
|
||||
private ObjectListing filterListObjects(ListObjectsRequest request,
|
||||
ObjectListing rawListing) {
|
||||
|
||||
private ListObjectsV2Result filterListObjectsV2(ListObjectsV2Result raw) {
|
||||
// Filter object listing
|
||||
List<S3ObjectSummary> outputList = filterSummaries(
|
||||
raw.getObjectSummaries());
|
||||
|
||||
// Filter prefixes (directories)
|
||||
List<String> outputPrefixes = filterPrefixes(raw.getCommonPrefixes());
|
||||
|
||||
return new CustomListObjectsV2Result(raw, outputList, outputPrefixes);
|
||||
}
|
||||
|
||||
private List<S3ObjectSummary> filterSummaries(
|
||||
List<S3ObjectSummary> summaries) {
|
||||
List<S3ObjectSummary> outputList = new ArrayList<>();
|
||||
for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
|
||||
for (S3ObjectSummary s : summaries) {
|
||||
String key = s.getKey();
|
||||
if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
|
||||
outputList.add(s);
|
||||
}
|
||||
}
|
||||
return outputList;
|
||||
}
|
||||
|
||||
// Filter prefixes (directories)
|
||||
private List<String> filterPrefixes(List<String> prefixes) {
|
||||
List<String> outputPrefixes = new ArrayList<>();
|
||||
for (String key : rawListing.getCommonPrefixes()) {
|
||||
for (String key : prefixes) {
|
||||
if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
|
||||
outputPrefixes.add(key);
|
||||
}
|
||||
}
|
||||
|
||||
return new CustomObjectListing(rawListing, outputList, outputPrefixes);
|
||||
return outputPrefixes;
|
||||
}
|
||||
|
||||
private boolean isKeyDelayed(Long enqueueTime, String key) {
|
||||
|
@ -431,4 +505,37 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
|
|||
return customPrefixes;
|
||||
}
|
||||
}
|
||||
|
||||
private static class CustomListObjectsV2Result extends ListObjectsV2Result {
|
||||
|
||||
private final List<S3ObjectSummary> customListing;
|
||||
private final List<String> customPrefixes;
|
||||
|
||||
CustomListObjectsV2Result(ListObjectsV2Result raw,
|
||||
List<S3ObjectSummary> customListing, List<String> customPrefixes) {
|
||||
super();
|
||||
this.customListing = customListing;
|
||||
this.customPrefixes = customPrefixes;
|
||||
|
||||
this.setBucketName(raw.getBucketName());
|
||||
this.setCommonPrefixes(raw.getCommonPrefixes());
|
||||
this.setDelimiter(raw.getDelimiter());
|
||||
this.setEncodingType(raw.getEncodingType());
|
||||
this.setStartAfter(raw.getStartAfter());
|
||||
this.setMaxKeys(raw.getMaxKeys());
|
||||
this.setContinuationToken(raw.getContinuationToken());
|
||||
this.setPrefix(raw.getPrefix());
|
||||
this.setTruncated(raw.isTruncated());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<S3ObjectSummary> getObjectSummaries() {
|
||||
return customListing;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getCommonPrefixes() {
|
||||
return customPrefixes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,6 @@
|
|||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -90,7 +88,7 @@ public class Listing {
|
|||
*/
|
||||
FileStatusListingIterator createFileStatusListingIterator(
|
||||
Path listPath,
|
||||
ListObjectsRequest request,
|
||||
S3ListRequest request,
|
||||
PathFilter filter,
|
||||
Listing.FileStatusAcceptor acceptor) throws IOException {
|
||||
return createFileStatusListingIterator(listPath, request, filter, acceptor,
|
||||
|
@ -112,7 +110,7 @@ public class Listing {
|
|||
*/
|
||||
FileStatusListingIterator createFileStatusListingIterator(
|
||||
Path listPath,
|
||||
ListObjectsRequest request,
|
||||
S3ListRequest request,
|
||||
PathFilter filter,
|
||||
Listing.FileStatusAcceptor acceptor,
|
||||
RemoteIterator<FileStatus> providedStatus) throws IOException {
|
||||
|
@ -432,7 +430,7 @@ public class Listing {
|
|||
* @param objects the next object listing
|
||||
* @return true if this added any entries after filtering
|
||||
*/
|
||||
private boolean buildNextStatusBatch(ObjectListing objects) {
|
||||
private boolean buildNextStatusBatch(S3ListResult objects) {
|
||||
// counters for debug logs
|
||||
int added = 0, ignored = 0;
|
||||
// list to fill in with results. Initial size will be list maximum.
|
||||
|
@ -512,13 +510,16 @@ public class Listing {
|
|||
*
|
||||
* Thread safety: none.
|
||||
*/
|
||||
class ObjectListingIterator implements RemoteIterator<ObjectListing> {
|
||||
class ObjectListingIterator implements RemoteIterator<S3ListResult> {
|
||||
|
||||
/** The path listed. */
|
||||
private final Path listPath;
|
||||
|
||||
/** The most recent listing results. */
|
||||
private ObjectListing objects;
|
||||
private S3ListResult objects;
|
||||
|
||||
/** The most recent listing request. */
|
||||
private S3ListRequest request;
|
||||
|
||||
/** Indicator that this is the first listing. */
|
||||
private boolean firstListing = true;
|
||||
|
@ -542,10 +543,11 @@ public class Listing {
|
|||
* */
|
||||
ObjectListingIterator(
|
||||
Path listPath,
|
||||
ListObjectsRequest request) {
|
||||
S3ListRequest request) {
|
||||
this.listPath = listPath;
|
||||
this.maxKeys = owner.getMaxKeys();
|
||||
this.objects = owner.listObjects(request);
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -569,7 +571,7 @@ public class Listing {
|
|||
* @throws NoSuchElementException if there is no more data to list.
|
||||
*/
|
||||
@Override
|
||||
public ObjectListing next() throws IOException {
|
||||
public S3ListResult next() throws IOException {
|
||||
if (firstListing) {
|
||||
// on the first listing, don't request more data.
|
||||
// Instead just clear the firstListing flag so that it future calls
|
||||
|
@ -585,7 +587,7 @@ public class Listing {
|
|||
// need to request a new set of objects.
|
||||
LOG.debug("[{}], Requesting next {} objects under {}",
|
||||
listingCount, maxKeys, listPath);
|
||||
objects = owner.continueListObjects(objects);
|
||||
objects = owner.continueListObjects(request, objects);
|
||||
listingCount++;
|
||||
LOG.debug("New listing status: {}", this);
|
||||
} catch (AmazonClientException e) {
|
||||
|
|
|
@ -53,8 +53,8 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
|||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PartETag;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
|
@ -167,6 +167,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
private String blockOutputBuffer;
|
||||
private S3ADataBlocks.BlockFactory blockFactory;
|
||||
private int blockOutputActiveBlocks;
|
||||
private boolean useListV1;
|
||||
|
||||
/** Add any deprecated keys. */
|
||||
@SuppressWarnings("deprecation")
|
||||
|
@ -261,6 +262,13 @@ public class S3AFileSystem extends FileSystem {
|
|||
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
|
||||
"s3a-transfer-unbounded"));
|
||||
|
||||
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
|
||||
if (listVersion < 1 || listVersion > 2) {
|
||||
LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
|
||||
"version 2", listVersion);
|
||||
}
|
||||
useListV1 = (listVersion == 1);
|
||||
|
||||
initTransferManager();
|
||||
|
||||
initCannedAcls(conf);
|
||||
|
@ -1056,21 +1064,37 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @param request request to initiate
|
||||
* @return the results
|
||||
*/
|
||||
protected ObjectListing listObjects(ListObjectsRequest request) {
|
||||
protected S3ListResult listObjects(S3ListRequest request) {
|
||||
incrementStatistic(OBJECT_LIST_REQUESTS);
|
||||
incrementReadOperations();
|
||||
return s3.listObjects(request);
|
||||
if (useListV1) {
|
||||
Preconditions.checkArgument(request.isV1());
|
||||
return S3ListResult.v1(s3.listObjects(request.getV1()));
|
||||
} else {
|
||||
Preconditions.checkArgument(!request.isV1());
|
||||
return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* List the next set of objects.
|
||||
* @param objects paged result
|
||||
* @param request last list objects request to continue
|
||||
* @param prevResult last paged result to continue from
|
||||
* @return the next result object
|
||||
*/
|
||||
protected ObjectListing continueListObjects(ObjectListing objects) {
|
||||
protected S3ListResult continueListObjects(S3ListRequest request,
|
||||
S3ListResult prevResult) {
|
||||
incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
|
||||
incrementReadOperations();
|
||||
return s3.listNextBatchOfObjects(objects);
|
||||
if (useListV1) {
|
||||
Preconditions.checkArgument(request.isV1());
|
||||
return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1()));
|
||||
} else {
|
||||
Preconditions.checkArgument(!request.isV1());
|
||||
request.getV2().setContinuationToken(prevResult.getV2()
|
||||
.getNextContinuationToken());
|
||||
return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1464,9 +1488,9 @@ public class S3AFileSystem extends FileSystem {
|
|||
} else {
|
||||
LOG.debug("Getting objects for directory prefix {} to delete", key);
|
||||
|
||||
ListObjectsRequest request = createListObjectsRequest(key, null);
|
||||
S3ListRequest request = createListObjectsRequest(key, null);
|
||||
|
||||
ObjectListing objects = listObjects(request);
|
||||
S3ListResult objects = listObjects(request);
|
||||
List<DeleteObjectsRequest.KeyVersion> keys =
|
||||
new ArrayList<>(objects.getObjectSummaries().size());
|
||||
while (true) {
|
||||
|
@ -1481,7 +1505,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
if (objects.isTruncated()) {
|
||||
objects = continueListObjects(objects);
|
||||
objects = continueListObjects(request, objects);
|
||||
} else {
|
||||
if (!keys.isEmpty()) {
|
||||
// TODO: HADOOP-13761 S3Guard: retries
|
||||
|
@ -1589,7 +1613,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
return S3Guard.dirMetaToStatuses(dirMeta);
|
||||
}
|
||||
|
||||
ListObjectsRequest request = createListObjectsRequest(key, "/");
|
||||
S3ListRequest request = createListObjectsRequest(key, "/");
|
||||
LOG.debug("listStatus: doing listObjects for directory {}", key);
|
||||
|
||||
Listing.FileStatusListingIterator files =
|
||||
|
@ -1619,8 +1643,26 @@ public class S3AFileSystem extends FileSystem {
|
|||
* @return the request
|
||||
*/
|
||||
@VisibleForTesting
|
||||
ListObjectsRequest createListObjectsRequest(String key,
|
||||
S3ListRequest createListObjectsRequest(String key,
|
||||
String delimiter) {
|
||||
return createListObjectsRequest(key, delimiter, null);
|
||||
}
|
||||
|
||||
private S3ListRequest createListObjectsRequest(String key,
|
||||
String delimiter, Integer overrideMaxKeys) {
|
||||
if (!useListV1) {
|
||||
ListObjectsV2Request request =
|
||||
new ListObjectsV2Request().withBucketName(bucket)
|
||||
.withMaxKeys(maxKeys)
|
||||
.withPrefix(key);
|
||||
if (delimiter != null) {
|
||||
request.setDelimiter(delimiter);
|
||||
}
|
||||
if (overrideMaxKeys != null) {
|
||||
request.setMaxKeys(overrideMaxKeys);
|
||||
}
|
||||
return S3ListRequest.v2(request);
|
||||
} else {
|
||||
ListObjectsRequest request = new ListObjectsRequest();
|
||||
request.setBucketName(bucket);
|
||||
request.setMaxKeys(maxKeys);
|
||||
|
@ -1628,7 +1670,11 @@ public class S3AFileSystem extends FileSystem {
|
|||
if (delimiter != null) {
|
||||
request.setDelimiter(delimiter);
|
||||
}
|
||||
return request;
|
||||
if (overrideMaxKeys != null) {
|
||||
request.setMaxKeys(overrideMaxKeys);
|
||||
}
|
||||
return S3ListRequest.v1(request);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1885,13 +1931,9 @@ public class S3AFileSystem extends FileSystem {
|
|||
|
||||
try {
|
||||
key = maybeAddTrailingSlash(key);
|
||||
ListObjectsRequest request = new ListObjectsRequest();
|
||||
request.setBucketName(bucket);
|
||||
request.setPrefix(key);
|
||||
request.setDelimiter("/");
|
||||
request.setMaxKeys(1);
|
||||
S3ListRequest request = createListObjectsRequest(key, "/", 1);
|
||||
|
||||
ObjectListing objects = listObjects(request);
|
||||
S3ListResult objects = listObjects(request);
|
||||
|
||||
Collection<String> prefixes = objects.getCommonPrefixes();
|
||||
Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
|
||||
|
@ -2441,6 +2483,7 @@ public class S3AFileSystem extends FileSystem {
|
|||
}
|
||||
sb.append(", metastore=").append(metadataStore);
|
||||
sb.append(", authoritative=").append(allowAuthoritative);
|
||||
sb.append(", useListV1=").append(useListV1);
|
||||
sb.append(", boundedExecutor=").append(boundedThreadPool);
|
||||
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
|
||||
sb.append(", statistics {")
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
|
||||
/**
|
||||
* API version-independent container for S3 List requests.
|
||||
*/
|
||||
public class S3ListRequest {
|
||||
private ListObjectsRequest v1Request;
|
||||
private ListObjectsV2Request v2Request;
|
||||
|
||||
protected S3ListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) {
|
||||
v1Request = v1;
|
||||
v2Request = v2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param request v1 request
|
||||
* @return new list request container
|
||||
*/
|
||||
public static S3ListRequest v1(ListObjectsRequest request) {
|
||||
return new S3ListRequest(request, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param request v2 request
|
||||
* @return new list request container
|
||||
*/
|
||||
public static S3ListRequest v2(ListObjectsV2Request request) {
|
||||
return new S3ListRequest(null, request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this a v1 API request or v2?
|
||||
* @return true if v1, false if v2
|
||||
*/
|
||||
public boolean isV1() {
|
||||
return v1Request != null;
|
||||
}
|
||||
|
||||
public ListObjectsRequest getV1() {
|
||||
return v1Request;
|
||||
}
|
||||
|
||||
public ListObjectsV2Request getV2() {
|
||||
return v2Request;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* API version-independent container for S3 List responses.
|
||||
*/
|
||||
public class S3ListResult {
|
||||
private ObjectListing v1Result;
|
||||
private ListObjectsV2Result v2Result;
|
||||
|
||||
protected S3ListResult(ObjectListing v1, ListObjectsV2Result v2) {
|
||||
v1Result = v1;
|
||||
v2Result = v2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param result v1 result
|
||||
* @return new list result container
|
||||
*/
|
||||
public static S3ListResult v1(ObjectListing result) {
|
||||
return new S3ListResult(result, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Restricted constructors to ensure v1 or v2, not both.
|
||||
* @param result v2 result
|
||||
* @return new list result container
|
||||
*/
|
||||
public static S3ListResult v2(ListObjectsV2Result result) {
|
||||
return new S3ListResult(null, result);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this a v1 API result or v2?
|
||||
* @return true if v1, false if v2
|
||||
*/
|
||||
public boolean isV1() {
|
||||
return v1Result != null;
|
||||
}
|
||||
|
||||
public ObjectListing getV1() {
|
||||
return v1Result;
|
||||
}
|
||||
|
||||
public ListObjectsV2Result getV2() {
|
||||
return v2Result;
|
||||
}
|
||||
|
||||
public List<S3ObjectSummary> getObjectSummaries() {
|
||||
if (isV1()) {
|
||||
return v1Result.getObjectSummaries();
|
||||
} else {
|
||||
return v2Result.getObjectSummaries();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isTruncated() {
|
||||
if (isV1()) {
|
||||
return v1Result.isTruncated();
|
||||
} else {
|
||||
return v2Result.isTruncated();
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getCommonPrefixes() {
|
||||
if (isV1()) {
|
||||
return v1Result.getCommonPrefixes();
|
||||
} else {
|
||||
return v2Result.getCommonPrefixes();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -895,6 +895,15 @@ from placing its declaration on the command line.
|
|||
any call to setReadahead() is made to an open stream.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>fs.s3a.list.version</name>
|
||||
<value>2</value>
|
||||
<description>
|
||||
Select which version of the S3 SDK's List Objects API to use. Currently
|
||||
support 2 (default) and 1 (older API).
|
||||
</description>
|
||||
</property>
|
||||
|
||||
### Configuring different S3 buckets
|
||||
|
||||
Different S3 buckets can be accessed with different S3A client configurations.
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
|
||||
import org.apache.hadoop.fs.contract.AbstractFSContract;
|
||||
import org.apache.hadoop.fs.contract.s3a.S3AContract;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.LIST_VERSION;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
|
||||
|
||||
/**
|
||||
* S3A contract tests for getFileStatus, using the v1 List Objects API.
|
||||
*/
|
||||
public class ITestS3AContractGetFileStatusV1List
|
||||
extends AbstractContractGetFileStatusTest {
|
||||
|
||||
|
||||
@Override
|
||||
protected AbstractFSContract createContract(Configuration conf) {
|
||||
return new S3AContract(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void teardown() throws Exception {
|
||||
getLog().info("FS details {}", getFileSystem());
|
||||
super.teardown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
Configuration conf = super.createConfiguration();
|
||||
disableFilesystemCaching(conf);
|
||||
conf.setInt(Constants.MAX_PAGING_KEYS, 2);
|
||||
maybeEnableS3Guard(conf);
|
||||
|
||||
// Use v1 List Objects API
|
||||
conf.setInt(LIST_VERSION, 1);
|
||||
return conf;
|
||||
}
|
||||
}
|
|
@ -18,7 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.s3a;
|
||||
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
|
@ -488,6 +488,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
|
|||
|
||||
@Test
|
||||
public void testInconsistentS3ClientDeletes() throws Throwable {
|
||||
// Test only implemented for v2 S3 list API
|
||||
Assume.assumeTrue(getConfiguration()
|
||||
.getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
|
||||
|
||||
S3AFileSystem fs = getFileSystem();
|
||||
Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
|
@ -502,17 +506,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
|
|||
AmazonS3 client = fs.getAmazonS3Client();
|
||||
String key = fs.pathToKey(root) + "/";
|
||||
|
||||
ObjectListing preDeleteDelimited = client.listObjects(
|
||||
fs.createListObjectsRequest(key, "/"));
|
||||
ObjectListing preDeleteUndelimited = client.listObjects(
|
||||
fs.createListObjectsRequest(key, null));
|
||||
ListObjectsV2Result preDeleteDelimited = client.listObjectsV2(
|
||||
fs.createListObjectsRequest(key, "/").getV2());
|
||||
ListObjectsV2Result preDeleteUndelimited = client.listObjectsV2(
|
||||
fs.createListObjectsRequest(key, null).getV2());
|
||||
|
||||
fs.delete(root, true);
|
||||
|
||||
ObjectListing postDeleteDelimited = client.listObjects(
|
||||
fs.createListObjectsRequest(key, "/"));
|
||||
ObjectListing postDeleteUndelimited = client.listObjects(
|
||||
fs.createListObjectsRequest(key, null));
|
||||
ListObjectsV2Result postDeleteDelimited = client.listObjectsV2(
|
||||
fs.createListObjectsRequest(key, "/").getV2());
|
||||
ListObjectsV2Result postDeleteUndelimited = client.listObjectsV2(
|
||||
fs.createListObjectsRequest(key, null).getV2());
|
||||
|
||||
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
|
||||
"in a non-recursive listing",
|
||||
|
|
|
@ -25,9 +25,12 @@ import static org.mockito.Mockito.*;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
@ -93,12 +96,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
|
|||
when(s3.getObjectMetadata(argThat(
|
||||
correctGetMetadataRequest(BUCKET, key + "/"))
|
||||
)).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.singletonList("dir/"));
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
setupListMocks(Collections.singletonList("dir/"), Collections.emptyList());
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
|
@ -118,12 +116,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
|
|||
when(s3.getObjectMetadata(argThat(
|
||||
correctGetMetadataRequest(BUCKET, key + "/")
|
||||
))).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.<String>emptyList());
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
setupListMocks(Collections.emptyList(), Collections.emptyList());
|
||||
FileStatus stat = fs.getFileStatus(path);
|
||||
assertNotNull(stat);
|
||||
assertEquals(fs.makeQualified(path), stat.getPath());
|
||||
|
@ -140,16 +133,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
|
|||
when(s3.getObjectMetadata(argThat(
|
||||
correctGetMetadataRequest(BUCKET, key + "/")
|
||||
))).thenThrow(NOT_FOUND);
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(
|
||||
Collections.<String>emptyList());
|
||||
when(objects.getObjectSummaries()).thenReturn(
|
||||
Collections.<S3ObjectSummary>emptyList());
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
setupListMocks(Collections.emptyList(), Collections.emptyList());
|
||||
exception.expect(FileNotFoundException.class);
|
||||
fs.getFileStatus(path);
|
||||
}
|
||||
|
||||
private void setupListMocks(List<String> prefixes,
|
||||
List<S3ObjectSummary> summaries) {
|
||||
|
||||
// V1 list API mock
|
||||
ObjectListing objects = mock(ObjectListing.class);
|
||||
when(objects.getCommonPrefixes()).thenReturn(prefixes);
|
||||
when(objects.getObjectSummaries()).thenReturn(summaries);
|
||||
when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
|
||||
|
||||
// V2 list API mock
|
||||
ListObjectsV2Result v2Result = mock(ListObjectsV2Result.class);
|
||||
when(v2Result.getCommonPrefixes()).thenReturn(prefixes);
|
||||
when(v2Result.getObjectSummaries()).thenReturn(summaries);
|
||||
when(s3.listObjectsV2(any(ListObjectsV2Request.class)))
|
||||
.thenReturn(v2Result);
|
||||
}
|
||||
|
||||
private Matcher<GetObjectMetadataRequest> correctGetMetadataRequest(
|
||||
String bucket, String key) {
|
||||
return new BaseMatcher<GetObjectMetadataRequest>() {
|
||||
|
|
Loading…
Reference in New Issue