HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)

Contributed by Mukund Thakur.
This commit is contained in:
Mukund Thakur 2020-08-04 20:30:02 +05:30 committed by GitHub
parent e072d33327
commit 8fd4f5490f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 521 additions and 106 deletions

View File

@ -30,11 +30,22 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -46,25 +57,31 @@ import java.util.NoSuchElementException;
import java.util.Set;
import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;
/**
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
public class Listing {
public class Listing extends AbstractStoreOperation {
private final S3AFileSystem owner;
private static final Logger LOG = S3AFileSystem.LOG;
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();
public Listing(S3AFileSystem owner) {
this.owner = owner;
private final ListingOperationCallbacks listingOperationCallbacks;
public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
}
/**
@ -156,6 +173,145 @@ public class Listing {
return new TombstoneReconcilingIterator(iterator, tombstones);
}
/**
* List files under a path assuming the path to be a directory.
* @param path input path.
* @param recursive recursive listing?
* @param acceptor file status filter
* @param collectTombstones should tombstones be collected from S3Guard?
* @param forceNonAuthoritativeMS forces metadata store to act like non
* authoritative. This is useful when
* listFiles output is used by import tool.
* @return an iterator over listing.
* @throws IOException any exception.
*/
public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
Path path,
boolean recursive, Listing.FileStatusAcceptor acceptor,
boolean collectTombstones,
boolean forceNonAuthoritativeMS) throws IOException {
String key = maybeAddTrailingSlash(pathToKey(path));
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = getStoreContext()
.getMetadataStore()
.get(path, true);
if (pm != null) {
if (pm.isDeleted()) {
OffsetDateTime deletedAt = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(
pm.getFileStatus().getModificationTime()),
ZoneOffset.UTC);
throw new FileNotFoundException("Path " + path + " is recorded as " +
"deleted by S3Guard at " + deletedAt);
}
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(
getStoreContext().getMetadataStore(),
pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
// if all of the below is true
// - authoritative access is allowed for this metadatastore
// for this directory,
// - all the directory listings are authoritative on the client
// - the caller does not force non-authoritative access
// return the listing without any further s3 access
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
return createLocatedFileStatusIterator(cachedFilesIterator);
}
cachedFilesIterator = metadataStoreListFilesIterator;
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
} else {
tombstones = null;
}
cachedFilesIterator = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
// metadata listing is authoritative, so return it directly
return createLocatedFileStatusIterator(cachedFilesIterator);
}
}
return createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(path,
listingOperationCallbacks
.createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
cachedFilesIterator)),
collectTombstones ? tombstones : null);
}
/**
* Generate list located status for a directory.
* Also performing tombstone reconciliation for guarded directories.
* @param dir directory to check.
* @param filter a path filter.
* @return an iterator that traverses statuses of the given dir.
* @throws IOException in case of failure.
*/
public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
Path dir, PathFilter filter) throws IOException {
final String key = maybeAddTrailingSlash(pathToKey(dir));
final Listing.FileStatusAcceptor acceptor =
new Listing.AcceptAllButSelfAndS3nDirs(dir);
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(dir);
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
dir,
listingOperationCallbacks
.getUpdatedTtlTimeProvider(),
allowAuthoritative);
Set<Path> tombstones = meta != null
? meta.listTombstones()
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? createLocatedFileStatusIterator(
cachedFileStatusIterator)
: createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(dir,
listingOperationCallbacks
.createListObjectsRequest(key, "/"),
filter,
acceptor,
cachedFileStatusIterator)),
tombstones);
}
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}
/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
@ -193,9 +349,9 @@ public class Listing {
* value.
*
* If the status value is null, the iterator declares that it has no data.
* This iterator is used to handle {@link S3AFileSystem#listStatus} calls
* where the path handed in refers to a file, not a directory: this is the
* iterator returned.
* This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
* calls where the path handed in refers to a file, not a directory:
* this is the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<S3ALocatedFileStatus> {
@ -465,14 +621,15 @@ public class Listing {
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
Path keyPath = owner.keyToQualifiedPath(key);
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
@ -485,10 +642,12 @@ public class Listing {
// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
Path keyPath = getStoreContext()
.getContextAccessors()
.keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
getStoreContext().getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
@ -573,8 +732,8 @@ public class Listing {
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = owner.getMaxKeys();
this.objects = owner.listObjects(request);
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.objects = listingOperationCallbacks.listObjects(request);
this.request = request;
}
@ -616,7 +775,8 @@ public class Listing {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = owner.continueListObjects(request, objects);
objects = listingOperationCallbacks
.continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
@ -716,7 +876,8 @@ public class Listing {
@Override
public S3ALocatedFileStatus next() throws IOException {
return owner.toLocatedFileStatus(statusIterator.next());
return listingOperationCallbacks
.toLocatedFileStatus(statusIterator.next());
}
}

View File

@ -105,6 +105,7 @@ import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
@ -148,7 +149,6 @@ import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
@ -293,6 +293,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final S3AFileSystem.OperationCallbacksImpl
operationCallbacks = new OperationCallbacksImpl();
private final ListingOperationCallbacks listingOperationCallbacks =
new ListingOperationCallbacksImpl();
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@ -362,7 +365,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
FAIL_ON_METADATA_WRITE_ERROR_DEFAULT);
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
listing = new Listing(this);
partSize = getMultipartSizeProperty(conf,
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = getMultipartSizeProperty(conf,
@ -455,6 +457,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
@ -589,6 +592,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return instrumentation;
}
/**
* Get current listing instance.
* @return this instance's listing.
*/
public Listing getListing() {
return listing;
}
/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
@ -1599,6 +1610,61 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
protected class ListingOperationCallbacksImpl implements
ListingOperationCallbacks {
@Override
@Retries.RetryRaw
public S3ListResult listObjects(
S3ListRequest request)
throws IOException {
return S3AFileSystem.this.listObjects(request);
}
@Override
@Retries.RetryRaw
public S3ListResult continueListObjects(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return S3AFileSystem.this.continueListObjects(request, prevResult);
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status)
throws IOException {
return S3AFileSystem.this.toLocatedFileStatus(status);
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
}
@Override
public long getDefaultBlockSize(Path path) {
return S3AFileSystem.this.getDefaultBlockSize(path);
}
@Override
public int getMaxKeys() {
return S3AFileSystem.this.getMaxKeys();
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return S3AFileSystem.this.ttlTimeProvider;
}
@Override
public boolean allowAuthoritative(final Path p) {
return S3AFileSystem.this.allowAuthoritative(p);
}
}
/**
* Low-level call to get at the object metadata.
* @param path path to the object
@ -4216,7 +4282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// Assuming the path to be a directory
// do a bulk operation.
RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
getListFilesAssumingDir(path,
listing.getListFilesAssumingDir(path,
recursive,
acceptor,
collectTombstones,
@ -4242,89 +4308,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
}
/**
* List files under a path assuming the path to be a directory.
* @param path input path.
* @param recursive recursive listing?
* @param acceptor file status filter
* @param collectTombstones should tombstones be collected from S3Guard?
* @param forceNonAuthoritativeMS forces metadata store to act like non
* authoritative. This is useful when
* listFiles output is used by import tool.
* @return an iterator over listing.
* @throws IOException any exception.
*/
private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
Path path,
boolean recursive, Listing.FileStatusAcceptor acceptor,
boolean collectTombstones,
boolean forceNonAuthoritativeMS) throws IOException {
String key = maybeAddTrailingSlash(pathToKey(path));
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = metadataStore.get(path, true);
if (pm != null) {
if (pm.isDeleted()) {
OffsetDateTime deletedAt = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(
pm.getFileStatus().getModificationTime()),
ZoneOffset.UTC);
throw new FileNotFoundException("Path " + path + " is recorded as " +
"deleted by S3Guard at " + deletedAt);
}
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(metadataStore, pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
// if all of the below is true
// - authoritative access is allowed for this metadatastore
// for this directory,
// - all the directory listings are authoritative on the client
// - the caller does not force non-authoritative access
// return the listing without any further s3 access
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = listing.createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
}
cachedFilesIterator = metadataStoreListFilesIterator;
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
} else {
tombstones = null;
}
cachedFilesIterator = listing.createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
// metadata listing is authoritative, so return it directly
return listing.createLocatedFileStatusIterator(cachedFilesIterator);
}
}
return listing.createTombstoneReconcilingIterator(
listing.createLocatedFileStatusIterator(
listing.createFileStatusListingIterator(path,
createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
cachedFilesIterator)),
collectTombstones ? tombstones : null);
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
@ -4363,7 +4346,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// trigger a list call directly.
final RemoteIterator<S3ALocatedFileStatus>
locatedFileStatusIteratorForDir =
getLocatedFileStatusIteratorForDir(path, filter);
listing.getLocatedFileStatusIteratorForDir(path, filter);
// If no listing is present then path might be a file.
if (!locatedFileStatusIteratorForDir.hasNext()) {
@ -4847,5 +4830,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.impl;
import java.io.IOException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3ListRequest;
import org.apache.hadoop.fs.s3a.S3ListResult;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
/**
* These are all the callbacks which
* {@link org.apache.hadoop.fs.s3a.Listing} operations
* need, derived from the actual appropriate S3AFileSystem
* methods.
*/
public interface ListingOperationCallbacks {
/**
* Initiate a {@code listObjects} operation, incrementing metrics
* in the process.
*
* Retry policy: retry untranslated.
* @param request request to initiate
* @return the results
* @throws IOException if the retry invocation raises one (it shouldn't).
*/
@Retries.RetryRaw
S3ListResult listObjects(
S3ListRequest request)
throws IOException;
/**
* List the next set of objects.
* Retry policy: retry untranslated.
* @param request last list objects request to continue
* @param prevResult last paged result to continue from
* @return the next result object
* @throws IOException none, just there for retryUntranslated.
*/
@Retries.RetryRaw
S3ListResult continueListObjects(
S3ListRequest request,
S3ListResult prevResult)
throws IOException;
/**
* Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
* @param status file status
* @return a located status with block locations set up from this FS.
* @throws IOException IO Problems.
*/
S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status)
throws IOException;
/**
* Create a {@code ListObjectsRequest} request against this bucket,
* with the maximum keys returned in a query set by
* {@link this.getMaxKeys()}.
* @param key key for request
* @param delimiter any delimiter
* @return the request
*/
S3ListRequest createListObjectsRequest(
String key,
String delimiter);
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param path path of file
* @return the default block size for the path's filesystem
*/
long getDefaultBlockSize(Path path);
/**
* Get the maximum key count.
* @return a value, valid after initialization
*/
int getMaxKeys();
/**
* Get the updated time provider for the current fs instance.
* @return implementation of {@link ITtlTimeProvider}
*/
ITtlTimeProvider getUpdatedTtlTimeProvider();
/**
* Is the path for this instance considered authoritative on the client,
* that is: will listing/status operations only be handled by the metastore,
* with no fallback to S3.
* @param p path
* @return true iff the path is authoritative on the client.
*/
boolean allowAuthoritative(Path p);
}

View File

@ -210,6 +210,10 @@ public class StoreContext {
return useListV1;
}
public ContextAccessors getContextAccessors() {
return contextAccessors;
}
/**
* Convert a key to a fully qualified path.
* @param key input key

View File

@ -348,8 +348,8 @@ public class DumpS3GuardDynamoTable extends AbstractS3GuardDynamoDBDiagnostic {
final CsvFile csv) throws IOException {
S3AFileSystem fs = getFilesystem();
Path rootPath = fs.qualify(new Path("/"));
Listing listing = new Listing(fs);
S3ListRequest request = fs.createListObjectsRequest("", null);
Listing listing = fs.getListing();
S3ListRequest request = listing.createListObjectsRequest("", null);
long count = 0;
RemoteIterator<S3AFileStatus> st =
listing.createFileStatusListingIterator(rootPath, request,

View File

@ -68,7 +68,7 @@ public class TestListing extends AbstractS3AMockTest {
Path[] allFiles = {parent, liveChild, deletedChild};
Path[] liveFiles = {parent, liveChild};
Listing listing = new Listing(fs);
Listing listing = fs.getListing();
Collection<FileStatus> statuses = new ArrayList<>();
statuses.add(blankFileStatus(parent));
statuses.add(blankFileStatus(liveChild));

View File

@ -32,8 +32,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Before;
@ -42,14 +45,21 @@ import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3ListRequest;
import org.apache.hadoop.fs.s3a.S3ListResult;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
@ -230,6 +240,142 @@ public class TestPartialDeleteFailures {
.build();
}
private static class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public S3ListResult listObjects(S3ListRequest request)
throws IOException {
return null;
}
@Override
public S3ListResult continueListObjects(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return null;
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status) throws IOException {
return null;
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return null;
}
@Override
public long getDefaultBlockSize(Path path) {
return 0;
}
@Override
public int getMaxKeys() {
return 0;
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
}
private static class MinimalOperationCallbacks
implements OperationCallbacks {
@Override
public S3ObjectAttributes createObjectAttributes(
Path path,
String eTag,
String versionId,
long len) {
return null;
}
@Override
public S3ObjectAttributes createObjectAttributes(
S3AFileStatus fileStatus) {
return null;
}
@Override
public S3AReadOpContext createReadContext(
FileStatus fileStatus) {
return null;
}
@Override
public void finishRename(
Path sourceRenamed,
Path destCreated)
throws IOException {
}
@Override
public void deleteObjectAtPath(
Path path,
String key,
boolean isFile,
BulkOperationState operationState)
throws IOException {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
Path path,
S3AFileStatus status,
boolean collectTombstones,
boolean includeSelf)
throws IOException {
return null;
}
@Override
public CopyResult copyFile(
String srcKey,
String destKey,
S3ObjectAttributes srcAttributes,
S3AReadOpContext readContext)
throws IOException {
return null;
}
@Override
public DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
List<Path> undeletedObjectsOnFailure,
BulkOperationState operationState,
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
@Override
public RemoteIterator<S3AFileStatus> listObjects(
Path path,
String key)
throws IOException {
return null;
}
}
private static class MinimalContextAccessor implements ContextAccessors {
@Override
@ -333,7 +479,8 @@ public class TestPartialDeleteFailures {
@Override
public void deletePaths(final Collection<Path> paths,
@Nullable final BulkOperationState operationState) throws IOException {
@Nullable final BulkOperationState operationState)
throws IOException {
deleted.addAll(paths);
}