HADOOP-17023. Tune S3AFileSystem.listStatus() (#2257)

S3AFileSystem.listStatus() is optimized for invocations
where the path supplied is a non-empty directory.
The number of S3 requests is significantly reduced, saving
time, money, and reducing the risk of S3 throttling.

Contributed by Mukund Thakur.

Change-Id: I7cc5f87aa16a4819e245e0fbd2aad226bd500f3f
This commit is contained in:
Mukund Thakur 2020-09-21 21:50:16 +05:30 committed by Steve Loughran
parent e5e91397de
commit 7e642ec5a3
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
9 changed files with 611 additions and 453 deletions

View File

@ -24,6 +24,7 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
@ -102,6 +103,19 @@ public class Listing extends AbstractStoreOperation {
return new ProvidedFileStatusIterator(fileStatuses, filter, acceptor);
}
/**
* Create a FileStatus iterator against a provided list of file status.
* @param fileStatuses array of file status.
* @return the file status iterator.
*/
@VisibleForTesting
public static ProvidedFileStatusIterator toProvidedFileStatusIterator(
S3AFileStatus[] fileStatuses) {
return new ProvidedFileStatusIterator(fileStatuses,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}
/**
* Create a FileStatus iterator against a path, with a given list object
* request.
@ -250,7 +264,7 @@ public class Listing extends AbstractStoreOperation {
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
S3AFileStatus[] statuses = S3AUtils.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
@ -329,6 +343,56 @@ public class Listing extends AbstractStoreOperation {
tombstones);
}
/**
* Calculate list of file statuses assuming path
* to be a non-empty directory.
* @param path input path.
* @return Triple of file statuses, metaData, auth flag.
* @throws IOException Any IO problems.
*/
public Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
getFileStatusesAssumingNonEmptyDir(Path path)
throws IOException {
String key = pathToKey(path);
List<S3AFileStatus> result;
if (!key.isEmpty()) {
key = key + '/';
}
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
// In auth mode return directly with auth flag.
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
ProvidedFileStatusIterator mfsItr = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(dirMeta),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
return Triple.of(mfsItr,
dirMeta, Boolean.TRUE);
}
S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);
FileStatusListingIterator filesItr = createFileStatusListingIterator(
path,
request,
ACCEPT_ALL,
new Listing.AcceptAllButSelfAndS3nDirs(path));
// return the results obtained from s3.
return Triple.of(
filesItr,
dirMeta,
Boolean.FALSE);
}
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}

View File

@ -35,6 +35,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -185,6 +186,7 @@ import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@ -2652,7 +2654,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
return once("listStatus", f.toString(), () -> innerListStatus(f));
return once("listStatus",
f.toString(),
() -> iteratorToStatuses(innerListStatus(f), new HashSet<>()));
}
/**
@ -2665,51 +2669,52 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
IOException, AmazonClientException {
private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
throws FileNotFoundException,
IOException, AmazonClientException {
Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("List status for path: {}", path);
entryPoint(INVOCATION_LIST_STATUS);
List<S3AFileStatus> result;
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);
Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean>
statusesAssumingNonEmptyDir = listing
.getFileStatusesAssumingNonEmptyDir(path);
if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + '/';
if (!statusesAssumingNonEmptyDir.getLeft().hasNext() &&
statusesAssumingNonEmptyDir.getRight()) {
// We are sure that this is an empty directory in auth mode.
return statusesAssumingNonEmptyDir.getLeft();
} else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) {
// We may have an empty dir, or may have file or may have nothing.
// So we call innerGetFileStatus to get the status, this may throw
// FileNotFoundException if we have nothing.
// So We are guaranteed to have either a dir marker or a file.
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);
// If it is a file return directly.
if (fileStatus.isFile()) {
LOG.debug("Adding: rd (not a dir): {}", path);
S3AFileStatus[] stats = new S3AFileStatus[1];
stats[0] = fileStatus;
return listing.createProvidedFileStatusIterator(
stats,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}
boolean allowAuthoritative = allowAuthoritative(f);
DirListingMetadata dirMeta =
S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
allowAuthoritative);
if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
return S3Guard.dirMetaToStatuses(dirMeta);
}
S3ListRequest request = createListObjectsRequest(key, "/");
LOG.debug("listStatus: doing listObjects for directory {}", key);
Listing.FileStatusListingIterator files =
listing.createFileStatusListingIterator(path,
request,
ACCEPT_ALL,
new Listing.AcceptAllButSelfAndS3nDirs(path));
result = new ArrayList<>(files.getBatchSize());
while (files.hasNext()) {
result.add(files.next());
}
// merge the results. This will update the store as needed
return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
allowAuthoritative, ttlTimeProvider);
} else {
LOG.debug("Adding: rd (not a dir): {}", path);
S3AFileStatus[] stats = new S3AFileStatus[1];
stats[0]= fileStatus;
return stats;
}
// Here we have a directory which may or may not be empty.
// So we update the metastore and return.
return S3Guard.dirListingUnion(
metadataStore,
path,
statusesAssumingNonEmptyDir.getLeft(),
statusesAssumingNonEmptyDir.getMiddle(),
allowAuthoritative(path),
ttlTimeProvider, p ->
listing.createProvidedFileStatusIterator(
dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N));
}
/**
@ -4497,7 +4502,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
listing.createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? listing.createLocatedFileStatusIterator(

View File

@ -42,6 +42,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
@ -1416,6 +1417,30 @@ public final class S3AUtils {
awsConf.setUserAgentPrefix(userAgent);
}
/**
* Convert the data of an iterator of {@link S3AFileStatus} to
* an array. Given tombstones are filtered out. If the iterator
* does return any item, an empty array is returned.
* @param iterator a non-null iterator
* @param tombstones
* @return a possibly-empty array of file status entries
* @throws IOException
*/
public static S3AFileStatus[] iteratorToStatuses(
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
throws IOException {
List<FileStatus> statuses = new ArrayList<>();
while (iterator.hasNext()) {
S3AFileStatus status = iterator.next();
if (!tombstones.contains(status.getPath())) {
statuses.add(status);
}
}
return statuses.toArray(new S3AFileStatus[0]);
}
/**
* An interface for use in lambda-expressions working with
* directory tree listings.

View File

@ -295,30 +295,6 @@ public final class S3Guard {
}
}
/**
* Convert the data of an iterator of {@link S3AFileStatus} to
* an array. Given tombstones are filtered out. If the iterator
* does return any item, an empty array is returned.
* @param iterator a non-null iterator
* @param tombstones
* @return a possibly-empty array of file status entries
* @throws IOException
*/
public static S3AFileStatus[] iteratorToStatuses(
RemoteIterator<S3AFileStatus> iterator, Set<Path> tombstones)
throws IOException {
List<FileStatus> statuses = new ArrayList<>();
while (iterator.hasNext()) {
S3AFileStatus status = iterator.next();
if (!tombstones.contains(status.getPath())) {
statuses.add(status);
}
}
return statuses.toArray(new S3AFileStatus[0]);
}
/**
* Convert the data of a directory listing to an array of {@link FileStatus}
* entries. Tombstones are filtered out at this point. If the listing is null
@ -359,17 +335,22 @@ public final class S3Guard {
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
* @param timeProvider Time provider to use when updating entries
* @param toStatusItr function to convert array of file status to
* RemoteIterator.
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
public static S3AFileStatus[] dirListingUnion(MetadataStore ms, Path path,
List<S3AFileStatus> backingStatuses, DirListingMetadata dirMeta,
boolean isAuthoritative, ITtlTimeProvider timeProvider)
throws IOException {
public static RemoteIterator<S3AFileStatus> dirListingUnion(
MetadataStore ms, Path path,
RemoteIterator<S3AFileStatus> backingStatuses,
DirListingMetadata dirMeta, boolean isAuthoritative,
ITtlTimeProvider timeProvider,
Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
throws IOException {
// Fast-path for NullMetadataStore
if (isNullMetadataStore(ms)) {
return backingStatuses.toArray(new S3AFileStatus[backingStatuses.size()]);
return backingStatuses;
}
assertQualified(path);
@ -410,7 +391,7 @@ public final class S3Guard {
}
IOUtils.cleanupWithLogger(LOG, operationState);
return dirMetaToStatuses(dirMeta);
return toStatusItr.apply(dirMetaToStatuses(dirMeta));
}
/**
@ -429,7 +410,7 @@ public final class S3Guard {
private static void authoritativeUnion(
final MetadataStore ms,
final Path path,
final List<S3AFileStatus> backingStatuses,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
@ -440,7 +421,8 @@ public final class S3Guard {
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
for (S3AFileStatus s : backingStatuses) {
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
@ -493,16 +475,17 @@ public final class S3Guard {
private static void nonAuthoritativeUnion(
final MetadataStore ms,
final Path path,
final List<S3AFileStatus> backingStatuses,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
List<PathMetadata> entriesToAdd = new ArrayList<>(backingStatuses.size());
List<PathMetadata> entriesToAdd = new ArrayList<>();
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
for (S3AFileStatus s : backingStatuses) {
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;

View File

@ -176,6 +176,49 @@ public class ITestS3AFileOperationCost extends AbstractS3ACostTest {
.plus(GET_FILE_STATUS_FNFE)));
}
@Test
public void testCostOfListStatusOnFile() throws Throwable {
describe("Performing listStatus() on a file");
Path file = path(getMethodName() + ".txt");
S3AFileSystem fs = getFileSystem();
touch(fs, file);
verifyMetrics(() ->
fs.listStatus(file),
whenRaw(LIST_STATUS_LIST_OP
.plus(GET_FILE_STATUS_ON_FILE)),
whenAuthoritative(LIST_STATUS_LIST_OP),
whenNonauth(LIST_STATUS_LIST_OP));
}
@Test
public void testCostOfListStatusOnEmptyDir() throws Throwable {
describe("Performing listStatus() on an empty dir");
Path dir = path(getMethodName());
S3AFileSystem fs = getFileSystem();
fs.mkdirs(dir);
verifyMetrics(() ->
fs.listStatus(dir),
whenRaw(LIST_STATUS_LIST_OP
.plus(GET_FILE_STATUS_ON_EMPTY_DIR)),
whenAuthoritative(NO_IO),
whenNonauth(LIST_STATUS_LIST_OP));
}
@Test
public void testCostOfListStatusOnNonEmptyDir() throws Throwable {
describe("Performing listStatus() on a non empty dir");
Path dir = path(getMethodName());
S3AFileSystem fs = getFileSystem();
fs.mkdirs(dir);
Path file = new Path(dir, "file.txt");
touch(fs, file);
verifyMetrics(() ->
fs.listStatus(dir),
whenRaw(LIST_STATUS_LIST_OP),
whenAuthoritative(NO_IO),
whenNonauth(LIST_STATUS_LIST_OP));
}
@Test
public void testCostOfGetFileStatusOnFile() throws Throwable {
describe("performing getFileStatus on a file");
@ -406,8 +449,7 @@ public class ITestS3AFileOperationCost extends AbstractS3ACostTest {
fs.globStatus(basePath.suffix("/*"));
// 2 head + 1 list from getFileStatus on path,
// plus 1 list to match the glob pattern
verifyRaw(GET_FILE_STATUS_ON_DIR
.plus(LIST_OPERATION),
verifyRaw(LIST_STATUS_LIST_OP,
() -> fs.globStatus(basePath.suffix("/*")));
}
@ -426,8 +468,7 @@ public class ITestS3AFileOperationCost extends AbstractS3ACostTest {
// unguarded: 2 head + 1 list from getFileStatus on path,
// plus 1 list to match the glob pattern
// no additional operations from symlink resolution
verifyRaw(GET_FILE_STATUS_ON_DIR
.plus(LIST_OPERATION),
verifyRaw(LIST_STATUS_LIST_OP,
() -> fs.globStatus(basePath.suffix("/*")));
}

View File

@ -35,18 +35,39 @@ import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.ReflectionUtils;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import javax.annotation.Nullable;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Assume;
@ -61,12 +82,15 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@ -894,6 +918,49 @@ public final class S3ATestUtils {
StatusProbeEnum.ALL);
}
/**
* Create mock implementation of store context.
* @param multiDelete
* @param store
* @param accessors
* @return
* @throws URISyntaxException
* @throws IOException
*/
public static StoreContext createMockStoreContext(
boolean multiDelete,
OperationTrackingStore store,
ContextAccessors accessors)
throws URISyntaxException, IOException {
URI name = new URI("s3a://bucket");
Configuration conf = new Configuration();
return new StoreContextBuilder().setFsURI(name)
.setBucket("bucket")
.setConfiguration(conf)
.setUsername("alice")
.setOwner(UserGroupInformation.getCurrentUser())
.setExecutor(BlockingThreadPoolExecutorService.newInstance(
4,
4,
10, TimeUnit.SECONDS,
"s3a-transfer-shared"))
.setExecutorCapacity(DEFAULT_EXECUTOR_CAPACITY)
.setInvoker(
new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
.setInstrumentation(new S3AInstrumentation(name))
.setStorageStatistics(new S3AStorageStatistics())
.setInputPolicy(S3AInputPolicy.Normal)
.setChangeDetectionPolicy(
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false))
.setMultiObjectDeleteEnabled(multiDelete)
.setMetadataStore(store)
.setUseListV1(false)
.setContextAccessors(accessors)
.setTimeProvider(new S3Guard.TtlTimeProvider(conf))
.build();
}
/**
* Helper class to do diffs of metrics.
*/
@ -1472,4 +1539,293 @@ public final class S3ATestUtils {
needEmptyDirectoryFlag,
probes);
}
public static class MinimalOperationCallbacks
implements OperationCallbacks {
@Override
public S3ObjectAttributes createObjectAttributes(
Path path,
String eTag,
String versionId,
long len) {
return null;
}
@Override
public S3ObjectAttributes createObjectAttributes(
S3AFileStatus fileStatus) {
return null;
}
@Override
public S3AReadOpContext createReadContext(
FileStatus fileStatus) {
return null;
}
@Override
public void finishRename(
Path sourceRenamed,
Path destCreated)
throws IOException {
}
@Override
public void deleteObjectAtPath(
Path path,
String key,
boolean isFile,
BulkOperationState operationState)
throws IOException {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
Path path,
S3AFileStatus status,
boolean collectTombstones,
boolean includeSelf)
throws IOException {
return null;
}
@Override
public CopyResult copyFile(
String srcKey,
String destKey,
S3ObjectAttributes srcAttributes,
S3AReadOpContext readContext)
throws IOException {
return null;
}
@Override
public DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
List<Path> undeletedObjectsOnFailure,
BulkOperationState operationState,
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
@Override
public RemoteIterator<S3AFileStatus> listObjects(
Path path,
String key)
throws IOException {
return null;
}
}
/**
* MetadataStore which tracks what is deleted and added.
*/
public static class OperationTrackingStore implements MetadataStore {
private final List<Path> deleted = new ArrayList<>();
private final List<Path> created = new ArrayList<>();
@Override
public void initialize(final FileSystem fs,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void initialize(final Configuration conf,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void forgetMetadata(final Path path) {
}
@Override
public PathMetadata get(final Path path) {
return null;
}
@Override
public PathMetadata get(final Path path,
final boolean wantEmptyDirectoryFlag) {
return null;
}
@Override
public DirListingMetadata listChildren(final Path path) {
return null;
}
@Override
public void put(final PathMetadata meta) {
put(meta, null);
}
@Override
public void put(final PathMetadata meta,
final BulkOperationState operationState) {
created.add(meta.getFileStatus().getPath());
}
@Override
public void put(final Collection<? extends PathMetadata> metas,
final BulkOperationState operationState) {
metas.stream().forEach(meta -> put(meta, null));
}
@Override
public void put(final DirListingMetadata meta,
final List<Path> unchangedEntries,
final BulkOperationState operationState) {
created.add(meta.getPath());
}
@Override
public void destroy() {
}
@Override
public void delete(final Path path,
final BulkOperationState operationState) {
deleted.add(path);
}
@Override
public void deletePaths(final Collection<Path> paths,
@Nullable final BulkOperationState operationState)
throws IOException {
deleted.addAll(paths);
}
@Override
public void deleteSubtree(final Path path,
final BulkOperationState operationState) {
}
@Override
public void move(@Nullable final Collection<Path> pathsToDelete,
@Nullable final Collection<PathMetadata> pathsToCreate,
@Nullable final BulkOperationState operationState) {
}
@Override
public void prune(final PruneMode pruneMode, final long cutoff) {
}
@Override
public long prune(final PruneMode pruneMode,
final long cutoff,
final String keyPrefix) {
return 0;
}
@Override
public BulkOperationState initiateBulkWrite(
final BulkOperationState.OperationType operation,
final Path dest) {
return null;
}
@Override
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
}
@Override
public Map<String, String> getDiagnostics() {
return null;
}
@Override
public void updateParameters(final Map<String, String> parameters) {
}
@Override
public void close() {
}
public List<Path> getDeleted() {
return deleted;
}
public List<Path> getCreated() {
return created;
}
@Override
public RenameTracker initiateRenameOperation(
final StoreContext storeContext,
final Path source,
final S3AFileStatus sourceStatus,
final Path dest) {
throw new UnsupportedOperationException("unsupported");
}
@Override
public void addAncestors(final Path qualifiedPath,
@Nullable final BulkOperationState operationState) {
}
}
public static class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return null;
}
@Override
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return null;
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status) throws IOException {
return null;
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return null;
}
@Override
public long getDefaultBlockSize(Path path) {
return 0;
}
@Override
public int getMaxKeys() {
return 0;
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
}
}

View File

@ -18,26 +18,15 @@
package org.apache.hadoop.fs.s3a.impl;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Before;
@ -45,32 +34,8 @@ import org.junit.Test;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3AStorageStatistics;
import org.apache.hadoop.fs.s3a.S3ListRequest;
import org.apache.hadoop.fs.s3a.S3ListResult;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
@ -93,8 +58,8 @@ public class TestPartialDeleteFailures {
@Before
public void setUp() throws Exception {
context = createMockStoreContext(true,
new OperationTrackingStore());
context = S3ATestUtils.createMockStoreContext(true,
new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
}
@Test
@ -187,9 +152,10 @@ public class TestPartialDeleteFailures {
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
deleteForbidden);
OperationTrackingStore store
= new OperationTrackingStore();
StoreContext storeContext = createMockStoreContext(true, store);
S3ATestUtils.OperationTrackingStore store
= new S3ATestUtils.OperationTrackingStore();
StoreContext storeContext = S3ATestUtils
.createMockStoreContext(true, store, CONTEXT_ACCESSORS);
MultiObjectDeleteSupport deleteSupport
= new MultiObjectDeleteSupport(storeContext, null);
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
@ -210,174 +176,6 @@ public class TestPartialDeleteFailures {
}
private StoreContext createMockStoreContext(boolean multiDelete,
OperationTrackingStore store) throws URISyntaxException, IOException {
URI name = new URI("s3a://bucket");
Configuration conf = new Configuration();
return new StoreContextBuilder().setFsURI(name)
.setBucket("bucket")
.setConfiguration(conf)
.setUsername("alice")
.setOwner(UserGroupInformation.getCurrentUser())
.setExecutor(BlockingThreadPoolExecutorService.newInstance(
4,
4,
10, TimeUnit.SECONDS,
"s3a-transfer-shared"))
.setExecutorCapacity(Constants.DEFAULT_EXECUTOR_CAPACITY)
.setInvoker(
new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, Invoker.LOG_EVENT))
.setInstrumentation(new S3AInstrumentation(name))
.setStorageStatistics(new S3AStorageStatistics())
.setInputPolicy(S3AInputPolicy.Normal)
.setChangeDetectionPolicy(
ChangeDetectionPolicy.createPolicy(ChangeDetectionPolicy.Mode.None,
ChangeDetectionPolicy.Source.ETag, false))
.setMultiObjectDeleteEnabled(multiDelete)
.setMetadataStore(store)
.setUseListV1(false)
.setContextAccessors(CONTEXT_ACCESSORS)
.setTimeProvider(new S3Guard.TtlTimeProvider(conf))
.build();
}
private static class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return null;
}
@Override
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return null;
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status) throws IOException {
return null;
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return null;
}
@Override
public long getDefaultBlockSize(Path path) {
return 0;
}
@Override
public int getMaxKeys() {
return 0;
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
}
private static class MinimalOperationCallbacks
implements OperationCallbacks {
@Override
public S3ObjectAttributes createObjectAttributes(
Path path,
String eTag,
String versionId,
long len) {
return null;
}
@Override
public S3ObjectAttributes createObjectAttributes(
S3AFileStatus fileStatus) {
return null;
}
@Override
public S3AReadOpContext createReadContext(
FileStatus fileStatus) {
return null;
}
@Override
public void finishRename(
Path sourceRenamed,
Path destCreated)
throws IOException {
}
@Override
public void deleteObjectAtPath(
Path path,
String key,
boolean isFile,
BulkOperationState operationState)
throws IOException {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
Path path,
S3AFileStatus status,
boolean collectTombstones,
boolean includeSelf)
throws IOException {
return null;
}
@Override
public CopyResult copyFile(
String srcKey,
String destKey,
S3ObjectAttributes srcAttributes,
S3AReadOpContext readContext)
throws IOException {
return null;
}
@Override
public DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
List<Path> undeletedObjectsOnFailure,
BulkOperationState operationState,
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
@Override
public RemoteIterator<S3AFileStatus> listObjects(
Path path,
String key)
throws IOException {
return null;
}
}
private static class MinimalContextAccessor implements ContextAccessors {
@Override
@ -406,155 +204,5 @@ public class TestPartialDeleteFailures {
return path;
}
}
/**
* MetadataStore which tracks what is deleted and added.
*/
private static class OperationTrackingStore implements MetadataStore {
private final List<Path> deleted = new ArrayList<>();
private final List<Path> created = new ArrayList<>();
@Override
public void initialize(final FileSystem fs,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void initialize(final Configuration conf,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void forgetMetadata(final Path path) {
}
@Override
public PathMetadata get(final Path path) {
return null;
}
@Override
public PathMetadata get(final Path path,
final boolean wantEmptyDirectoryFlag) {
return null;
}
@Override
public DirListingMetadata listChildren(final Path path) {
return null;
}
@Override
public void put(final PathMetadata meta) {
put(meta, null);
}
@Override
public void put(final PathMetadata meta,
final BulkOperationState operationState) {
created.add(meta.getFileStatus().getPath());
}
@Override
public void put(final Collection<? extends PathMetadata> metas,
final BulkOperationState operationState) {
metas.stream().forEach(meta -> put(meta, null));
}
@Override
public void put(final DirListingMetadata meta,
final List<Path> unchangedEntries,
final BulkOperationState operationState) {
created.add(meta.getPath());
}
@Override
public void destroy() {
}
@Override
public void delete(final Path path,
final BulkOperationState operationState) {
deleted.add(path);
}
@Override
public void deletePaths(final Collection<Path> paths,
@Nullable final BulkOperationState operationState)
throws IOException {
deleted.addAll(paths);
}
@Override
public void deleteSubtree(final Path path,
final BulkOperationState operationState) {
}
@Override
public void move(@Nullable final Collection<Path> pathsToDelete,
@Nullable final Collection<PathMetadata> pathsToCreate,
@Nullable final BulkOperationState operationState) {
}
@Override
public void prune(final PruneMode pruneMode, final long cutoff) {
}
@Override
public long prune(final PruneMode pruneMode,
final long cutoff,
final String keyPrefix) {
return 0;
}
@Override
public BulkOperationState initiateBulkWrite(
final BulkOperationState.OperationType operation,
final Path dest) {
return null;
}
@Override
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
}
@Override
public Map<String, String> getDiagnostics() {
return null;
}
@Override
public void updateParameters(final Map<String, String> parameters) {
}
@Override
public void close() {
}
public List<Path> getDeleted() {
return deleted;
}
public List<Path> getCreated() {
return created;
}
@Override
public RenameTracker initiateRenameOperation(
final StoreContext storeContext,
final Path source,
final S3AFileStatus sourceStatus,
final Path dest) {
throw new UnsupportedOperationException("unsupported");
}
@Override
public void addAncestors(final Path qualifiedPath,
@Nullable final BulkOperationState operationState) {
}
}
}

View File

@ -107,9 +107,10 @@ public final class OperationCost {
new OperationCost(0, 1);
/** listFiles always does a LIST. */
public static final OperationCost LIST_FILES_LIST_OP =
new OperationCost(0, 1);
public static final OperationCost LIST_FILES_LIST_OP = LIST_OPERATION;
/** listStatus always does a LIST. */
public static final OperationCost LIST_STATUS_LIST_OP = LIST_OPERATION;
/**
* Metadata cost of a copy operation, as used during rename.
* This happens even if the store is guarded.

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
@ -39,7 +40,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.service.launcher.LauncherExitCodes;
import org.apache.hadoop.test.LambdaTestUtils;
@ -47,6 +50,8 @@ import org.apache.hadoop.util.ExitUtil;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Listing.toProvidedFileStatusIterator;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -79,7 +84,6 @@ public class TestS3Guard extends Assert {
ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
}
@After
@ -108,9 +112,14 @@ public class TestS3Guard extends Assert {
List<S3AFileStatus> s3Listing = Arrays.asList(
s1Status,
s2Status);
FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
dirMeta, false, timeProvider);
RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
s3Listing.toArray(new S3AFileStatus[0]));
RemoteIterator<S3AFileStatus> resultItr = S3Guard.dirListingUnion(
ms, DIR_PATH, storeItr, dirMeta, false,
timeProvider, s3AFileStatuses ->
toProvidedFileStatusIterator(dirMetaToStatuses(dirMeta)));
S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
@ -124,9 +133,18 @@ public class TestS3Guard extends Assert {
S3AFileStatus f1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(MS_FILE_1),
1, null, "tag2", "ver2");
FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
Arrays.asList(f1Status2),
dirMeta, false, timeProvider);
S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
f1Statuses[0] = f1Status2;
RemoteIterator<S3AFileStatus> itr = toProvidedFileStatusIterator(
f1Statuses);
FileStatus[] result2 = S3AUtils.iteratorToStatuses(
S3Guard.dirListingUnion(
ms, DIR_PATH, itr, dirMeta,
false, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta))),
new HashSet<>());
// the listing returns the new status
Assertions.assertThat(find(result2, MS_FILE_1))
.describedAs("Entry in listing results for %s", MS_FILE_1)
@ -159,9 +177,18 @@ public class TestS3Guard extends Assert {
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
FileStatus[] result = S3Guard.dirListingUnion(ms, DIR_PATH, s3Listing,
dirMeta, true, timeProvider);
RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
s3Listing.toArray(new S3AFileStatus[0]));
RemoteIterator<S3AFileStatus> resultItr = S3Guard
.dirListingUnion(ms, DIR_PATH, storeItr, dirMeta,
true, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta)));
S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
@ -181,13 +208,21 @@ public class TestS3Guard extends Assert {
S3AFileStatus s1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(S3_FILE_3),
1, null, "tag2", "ver2");
S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
f1Statuses[0] = s1Status2;
RemoteIterator<S3AFileStatus> itr =
toProvidedFileStatusIterator(f1Statuses);
FileStatus[] result2 = S3AUtils.iteratorToStatuses(
S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta,
true, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta))),
new HashSet<>());
// but the result of the listing contains the old entry
// because auth mode doesn't pick up changes in S3 which
// didn't go through s3guard
FileStatus[] result2 = S3Guard.dirListingUnion(ms, DIR_PATH,
Arrays.asList(s1Status2),
dirMeta2, true, timeProvider);
Assertions.assertThat(find(result2, S3_FILE_3))
.describedAs("Entry in listing results for %s", S3_FILE_3)
.isSameAs(file3Meta.getFileStatus());