HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2310)

This fixes the S3Guard/Directory Marker Retention integration so that when
fs.s3a.directory.marker.retention=keep, failures during multipart delete
are handled correctly, as are incremental deletes during
directory tree operations.

In both cases, when a directory marker with children is deleted from
S3, the directory entry in S3Guard is not deleted, because it is still
critical to representing the structure of the store.

Contributed by Steve Loughran.

Change-Id: I4ca133a23ea582cd42ec35dbf2dc85b286297d2f
This commit is contained in:
Steve Loughran 2020-11-18 12:15:52 +00:00
parent 425996eb4a
commit e3c08f285a
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
20 changed files with 1099 additions and 439 deletions

View File

@ -265,7 +265,7 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
fs.listFiles(root, true));
describe("verifying consistency with treewalk's files");
ContractTestUtils.TreeScanResults treeWalk = treeWalk(fs, root);
treeWalk.assertFieldsEquivalent("files", listing,
treeWalk.assertFieldsEquivalent("treewalk vs listFiles(/, true)", listing,
treeWalk.getFiles(),
listing.getFiles());
}

View File

@ -1576,7 +1576,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Override
@Retries.RetryTranslated
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean collectTombstones,
@ -2081,6 +2081,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
incrementStatistic(OBJECT_DELETE_REQUESTS);
incrementStatistic(OBJECT_DELETE_OBJECTS);
s3.deleteObject(bucket, key);
return null;
});
@ -2127,9 +2128,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
* Perform a bulk object delete operation.
* Perform a bulk object delete operation against S3; leaves S3Guard
* alone.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* operation statistics
* <p></p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p></p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
@ -2150,9 +2156,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.getKeys().size();
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
keyCount)) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
@ -2161,6 +2168,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {
@ -2550,8 +2558,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
// entries so we only process these failures on "real" deletes.
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results =
new MultiObjectDeleteSupport(createStoreContext(), operationState)
.processDeleteFailure(ex, keysToDelete);
undeletedObjectsOnFailure.addAll(results.getMiddle());
.processDeleteFailure(ex, keysToDelete, new ArrayList<Path>());
undeletedObjectsOnFailure.addAll(results.getLeft());
}
throw ex;
} catch (AmazonClientException | IOException ex) {

View File

@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_CONTINUE_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,

View File

@ -85,6 +85,8 @@ public enum Statistic {
"Calls of rename()"),
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
OBJECT_DELETE_OBJECTS("object_delete_objects",
"Objects deleted in delete requests"),
OBJECT_LIST_REQUESTS("object_list_requests",
"Number of object listings made"),
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
/**
* List of keys built up for the next delete batch.
*/
private List<DeleteObjectsRequest.KeyVersion> keys;
private List<DeleteEntry> keys;
/**
* List of paths built up for deletion.
* List of paths built up for incremental deletion on tree delete.
* At the end of the entire delete the full tree is scanned in S3Guard
* and tombstones added. For this reason this list of paths <i>must not</i>
* include directory markers, as that will break the scan.
*/
private List<Path> paths;
@ -279,7 +283,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
LOG.debug("deleting simple file {}", path);
deleteObjectAtPath(path, key, true);
}
LOG.debug("Deleted {} files", filesDeleted);
LOG.debug("Deleted {} objects", filesDeleted);
return true;
}
@ -323,7 +327,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
// list files including any under tombstones through S3Guard
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
callbacks.listFilesAndEmptyDirectories(path, status,
callbacks.listFilesAndDirectoryMarkers(path, status,
false, true);
// iterate through and delete. The next() call will block when a new S3
@ -359,7 +363,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
while (objects.hasNext()) {
// get the next entry in the listing.
extraFilesDeleted++;
queueForDeletion(deletionKey(objects.next()), null);
S3AFileStatus next = objects.next();
LOG.debug("Found Unlisted entry {}", next);
queueForDeletion(deletionKey(next), null,
next.isDirectory());
}
if (extraFilesDeleted > 0) {
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
@ -402,7 +409,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
*/
private void queueForDeletion(
final S3AFileStatus stat) throws IOException {
queueForDeletion(deletionKey(stat), stat.getPath());
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
}
/**
@ -413,14 +420,18 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
*
* @param key key to delete
* @param deletePath nullable path of the key
* @param isDirMarker is the entry a directory?
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(final String key,
@Nullable final Path deletePath) throws IOException {
@Nullable final Path deletePath,
boolean isDirMarker) throws IOException {
LOG.debug("Adding object to delete: \"{}\"", key);
keys.add(new DeleteObjectsRequest.KeyVersion(key));
keys.add(new DeleteEntry(key, isDirMarker));
if (deletePath != null) {
paths.add(deletePath);
if (!isDirMarker) {
paths.add(deletePath);
}
}
if (keys.size() == pageSize) {
@ -484,7 +495,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
* @return the submitted future or null
*/
private CompletableFuture<Void> submitDelete(
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList) {
if (keyList.isEmpty() && pathList.isEmpty()) {
@ -514,31 +525,62 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
@Retries.RetryTranslated
private void asyncDeleteAction(
final BulkOperationState state,
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList,
final boolean auditDeletedKeys)
throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Delete page of keys")) {
new DurationInfo(LOG, false,
"Delete page of %d keys", keyList.size())) {
DeleteObjectsResult result = null;
List<Path> undeletedObjects = new ArrayList<>();
if (!keyList.isEmpty()) {
result = Invoker.once("Remove S3 Keys",
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} file objects", files.size());
result = Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
keyList,
files,
false,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
LOG.debug("Deleting of {} directory markers", dirs.size());
// This is invoked with deleteFakeDir = true, so
// S3Guard is not updated.
result = Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (!pathList.isEmpty()) {
// delete file paths only. This stops tombstones
// being added until the final directory cleanup
// (HADOOP-17244)
metadataStore.deletePaths(pathList, state);
}
if (auditDeletedKeys && result != null) {
if (auditDeletedKeys) {
// audit the deleted keys
List<DeleteObjectsResult.DeletedObject> deletedObjects =
result.getDeletedObjects();
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
@ -549,7 +591,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
@ -557,5 +599,31 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
}
}
/**
* Deletion entry; dir marker state is tracked to control S3Guard
* update policy.
*/
private static final class DeleteEntry {
private final DeleteObjectsRequest.KeyVersion keyVersion;
private final boolean isDirMarker;
private DeleteEntry(final String key, final boolean isDirMarker) {
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
this.isDirMarker = isDirMarker;
}
public String getKey() {
return keyVersion.getKey();
}
@Override
public String toString() {
return "DeleteEntry{" +
"key='" + getKey() + '\'' +
", isDirMarker=" + isDirMarker +
'}';
}
}
}

View File

@ -23,6 +23,7 @@ import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -37,8 +38,10 @@ import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AWSS3IOException;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
@ -84,15 +87,25 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
public static IOException translateDeleteException(
final String message,
final MultiObjectDeleteException deleteException) {
List<MultiObjectDeleteException.DeleteError> errors
= deleteException.getErrors();
LOG.warn("Bulk delete operation failed to delete all objects;"
+ " failure count = {}",
errors.size());
final StringBuilder result = new StringBuilder(
deleteException.getErrors().size() * 256);
errors.size() * 256);
result.append(message).append(": ");
String exitCode = "";
for (MultiObjectDeleteException.DeleteError error :
deleteException.getErrors()) {
String code = error.getCode();
result.append(String.format("%s: %s: %s%n", code, error.getKey(),
error.getMessage()));
String item = String.format("%s: %s%s: %s%n", code, error.getKey(),
(error.getVersionId() != null
? (" (" + error.getVersionId() + ")")
: ""),
error.getMessage());
LOG.warn(item);
result.append(item);
if (exitCode.isEmpty() || ACCESS_DENIED.equals(code)) {
exitCode = code;
}
@ -113,7 +126,7 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
* @param keysToDelete the keys in the delete request
* @return tuple of (undeleted, deleted) paths.
*/
public Pair<List<Path>, List<Path>> splitUndeletedKeys(
public Pair<List<KeyPath>, List<KeyPath>> splitUndeletedKeys(
final MultiObjectDeleteException deleteException,
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
LOG.debug("Processing delete failure; keys to delete count = {};"
@ -122,11 +135,11 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
deleteException.getErrors().size(),
deleteException.getDeletedObjects().size());
// convert the collection of keys being deleted into paths
final List<Path> pathsBeingDeleted = keysToPaths(keysToDelete);
// Take this is list of paths
final List<KeyPath> pathsBeingDeleted = keysToKeyPaths(keysToDelete);
// Take this ist of paths
// extract all undeleted entries contained in the exception and
// then removes them from the original list.
List<Path> undeleted = removeUndeletedPaths(deleteException,
// then remove them from the original list.
List<KeyPath> undeleted = removeUndeletedPaths(deleteException,
pathsBeingDeleted,
getStoreContext()::keyToPath);
return Pair.of(undeleted, pathsBeingDeleted);
@ -139,7 +152,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
*/
public List<Path> keysToPaths(
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
return convertToPaths(keysToDelete,
return toPathList(keysToKeyPaths(keysToDelete));
}
/**
* Given a list of delete requests, convert them all to keypaths.
* @param keysToDelete list of keys for the delete operation.
* @return list of keypath entries
*/
public List<KeyPath> keysToKeyPaths(
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete) {
return convertToKeyPaths(keysToDelete,
getStoreContext()::keyToPath);
}
@ -149,13 +172,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
* @param qualifier path qualifier
* @return the paths.
*/
public static List<Path> convertToPaths(
public static List<KeyPath> convertToKeyPaths(
final Collection<DeleteObjectsRequest.KeyVersion> keysToDelete,
final Function<String, Path> qualifier) {
return keysToDelete.stream()
.map((keyVersion) ->
qualifier.apply(keyVersion.getKey()))
.collect(Collectors.toList());
List<KeyPath> l = new ArrayList<>(keysToDelete.size());
for (DeleteObjectsRequest.KeyVersion kv : keysToDelete) {
String key = kv.getKey();
Path p = qualifier.apply(key);
boolean isDir = key.endsWith("/");
l.add(new KeyPath(key, p, isDir));
}
return l;
}
/**
@ -164,27 +191,59 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
* and the original list of files to delete declares to have been deleted.
* @param deleteException the delete exception.
* @param keysToDelete collection of keys which had been requested.
* @param retainedMarkers list built up of retained markers.
* @return a tuple of (undeleted, deleted, failures)
*/
public Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
processDeleteFailure(
final MultiObjectDeleteException deleteException,
final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final List<Path> retainedMarkers) {
final MetadataStore metadataStore =
checkNotNull(getStoreContext().getMetadataStore(),
"context metadatastore");
final List<Pair<Path, IOException>> failures = new ArrayList<>();
final Pair<List<Path>, List<Path>> outcome =
final Pair<List<KeyPath>, List<KeyPath>> outcome =
splitUndeletedKeys(deleteException, keysToDelete);
List<Path> deleted = outcome.getRight();
List<Path> undeleted = outcome.getLeft();
// delete the paths but recover
// TODO: handle the case where a parent path is deleted but not a child.
// TODO: in a fake object delete, we don't actually want to delete
// metastore entries
deleted.forEach(path -> {
try {
metadataStore.delete(path, operationState);
List<KeyPath> deleted = outcome.getRight();
List<Path> deletedPaths = new ArrayList<>();
List<KeyPath> undeleted = outcome.getLeft();
retainedMarkers.clear();
List<Path> undeletedPaths = toPathList((List<KeyPath>) undeleted);
// sort shorter keys first,
// so that if the left key is longer than the first it is considered
// smaller, so appears in the list first.
// thus when we look for a dir being empty, we know it holds
deleted.sort((l, r) -> r.getKey().length() - l.getKey().length());
// now go through and delete from S3Guard all paths listed in
// the result which are either files or directories with
// no children.
deleted.forEach(kp -> {
Path path = kp.getPath();
try{
boolean toDelete = true;
if (kp.isDirectoryMarker()) {
// its a dir marker, which could be an empty dir
// (which is then tombstoned), or a non-empty dir, which
// is not tombstoned.
// for this to be handled, we have to have removed children
// from the store first, which relies on the sort
PathMetadata pmentry = metadataStore.get(path, true);
if (pmentry != null && !pmentry.isDeleted()) {
toDelete = pmentry.getFileStatus().isEmptyDirectory()
== Tristate.TRUE;
} else {
toDelete = false;
}
}
if (toDelete) {
LOG.debug("Removing deleted object from S3Guard Store {}", path);
metadataStore.delete(path, operationState);
} else {
LOG.debug("Retaining S3Guard directory entry {}", path);
retainedMarkers.add(path);
}
} catch (IOException e) {
// trouble: we failed to delete the far end entry
// try with the next one.
@ -192,11 +251,25 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
LOG.warn("Failed to update S3Guard store with deletion of {}", path);
failures.add(Pair.of(path, e));
}
// irrespective of the S3Guard outcome, it is declared as deleted, as
// it is no longer in the S3 store.
deletedPaths.add(path);
});
if (LOG.isDebugEnabled()) {
undeleted.forEach(p -> LOG.debug("Deleted {}", p));
}
return Triple.of(undeleted, deleted, failures);
return Triple.of(undeletedPaths, deletedPaths, failures);
}
/**
* Given a list of keypaths, convert to a list of paths.
* @param keyPaths source list
* @return a listg of paths
*/
public static List<Path> toPathList(final List<KeyPath> keyPaths) {
return keyPaths.stream()
.map(KeyPath::getPath)
.collect(Collectors.toList());
}
/**
@ -211,8 +284,31 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
public static List<Path> extractUndeletedPaths(
final MultiObjectDeleteException deleteException,
final Function<String, Path> qualifierFn) {
return deleteException.getErrors().stream()
.map((e) -> qualifierFn.apply(e.getKey()))
return toPathList(extractUndeletedKeyPaths(deleteException, qualifierFn));
}
/**
* Build a list of undeleted paths from a {@code MultiObjectDeleteException}.
* Outside of unit tests, the qualifier function should be
* {@link S3AFileSystem#keyToQualifiedPath(String)}.
* @param deleteException the delete exception.
* @param qualifierFn function to qualify paths
* @return the possibly empty list of paths.
*/
@VisibleForTesting
public static List<KeyPath> extractUndeletedKeyPaths(
final MultiObjectDeleteException deleteException,
final Function<String, Path> qualifierFn) {
List<MultiObjectDeleteException.DeleteError> errors
= deleteException.getErrors();
return errors.stream()
.map((error) -> {
String key = error.getKey();
Path path = qualifierFn.apply(key);
boolean isDir = key.endsWith("/");
return new KeyPath(key, path, isDir);
})
.collect(Collectors.toList());
}
@ -227,12 +323,17 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
* @return the list of undeleted entries
*/
@VisibleForTesting
static List<Path> removeUndeletedPaths(
static List<KeyPath> removeUndeletedPaths(
final MultiObjectDeleteException deleteException,
final Collection<Path> pathsBeingDeleted,
final Collection<KeyPath> pathsBeingDeleted,
final Function<String, Path> qualifier) {
List<Path> undeleted = extractUndeletedPaths(deleteException, qualifier);
pathsBeingDeleted.removeAll(undeleted);
// get the undeleted values
List<KeyPath> undeleted = extractUndeletedKeyPaths(deleteException,
qualifier);
// and remove them from the undeleted list, matching on key
for (KeyPath undel : undeleted) {
pathsBeingDeleted.removeIf(kp -> kp.getPath().equals(undel.getPath()));
}
return undeleted;
}
@ -247,4 +348,70 @@ public final class MultiObjectDeleteSupport extends AbstractStoreOperation {
final List<DeleteObjectsRequest.KeyVersion> keysToDelete) {
return keysToPaths(keysToDelete);
}
/**
* Representation of a (key, path) which couldn't be deleted;
* the dir marker flag is inferred from the key suffix.
* <p>
* Added because Pairs of Lists of Triples was just too complex
* for Java code.
* </p>
*/
public static final class KeyPath {
/** Key in bucket. */
private final String key;
/** Full path. */
private final Path path;
/** Is this a directory marker? */
private final boolean directoryMarker;
public KeyPath(final String key,
final Path path,
final boolean directoryMarker) {
this.key = key;
this.path = path;
this.directoryMarker = directoryMarker;
}
public String getKey() {
return key;
}
public Path getPath() {
return path;
}
public boolean isDirectoryMarker() {
return directoryMarker;
}
@Override
public String toString() {
return "KeyPath{" +
"key='" + key + '\'' +
", path=" + path +
", directoryMarker=" + directoryMarker +
'}';
}
/**
* Equals test is on key alone.
*/
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
KeyPath keyPath = (KeyPath) o;
return key.equals(keyPath.key);
}
@Override
public int hashCode() {
return Objects.hash(key);
}
}
}

View File

@ -105,7 +105,7 @@ public interface OperationCallbacks {
throws IOException;
/**
* Recursive list of files and empty directories.
* Recursive list of files and directory markers.
*
* @param path path to list from
* @param status optional status of path to list.
@ -115,7 +115,7 @@ public interface OperationCallbacks {
* @throws IOException failure
*/
@Retries.RetryTranslated
RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
Path path,
S3AFileStatus status,
boolean collectTombstones,

View File

@ -211,14 +211,23 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
* Only queuing objects here whose copy operation has
* been submitted and so is in that thread pool.
* </li>
* <li>
* If a path is supplied, then after the delete is executed
* (and completes) the rename tracker from S3Guard will be
* told of its deletion. Do not set this for directory
* markers with children, as it may mistakenly add
* tombstones into the table.
* </li>
* </ol>
* This method must only be called from the primary thread.
* @param path path to the object
* @param path path to the object.
* @param key key of the object.
*/
private void queueToDelete(Path path, String key) {
LOG.debug("Queueing to delete {}", path);
pathsToDelete.add(path);
if (path != null) {
pathsToDelete.add(path);
}
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
}
@ -234,7 +243,9 @@ public class RenameOperation extends ExecutingStoreOperation<Long> {
*/
private void queueToDelete(
List<DirMarkerTracker.Marker> markersToDelete) {
markersToDelete.forEach(this::queueToDelete);
markersToDelete.forEach(m -> queueToDelete(
null,
m.getKey()));
}
/**
@ -397,6 +408,7 @@ Are * @throws IOException failure
destStatus.getPath());
// Although the dir marker policy doesn't always need to do this,
// it's simplest just to be consistent here.
// note: updates the metastore as well a S3.
callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
}
@ -408,7 +420,7 @@ Are * @throws IOException failure
false);
final RemoteIterator<S3ALocatedFileStatus> iterator =
callbacks.listFilesAndEmptyDirectories(parentPath,
callbacks.listFilesAndDirectoryMarkers(parentPath,
sourceStatus,
true,
true);

View File

@ -717,7 +717,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
throws IOException {
checkPath(path);
LOG.debug("Get from table {} in region {}: {}. wantEmptyDirectory={}",
LOG.debug("Get from table {} in region {}: {} ; wantEmptyDirectory={}",
tableName, region, path, wantEmptyDirectoryFlag);
DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
LOG.debug("result of get {} is: {}", path, result);

View File

@ -12,7 +12,11 @@
limitations under the License. See accompanying LICENSE file.
-->
# Controlling the S3A Directory Marker Behavior
# Experimental: Controlling the S3A Directory Marker Behavior
This document discusses an experimental feature of the S3A
connector since Hadoop 3.3.1: the ability to retain directory
marker objects above paths containing files or subdirectories.
## <a name="compatibility"></a> Critical: this is not backwards compatible!
@ -26,15 +30,40 @@ Versions of Hadoop which are incompatible with other marker retention policies,
as of August 2020.
-------------------------------------------------------
| Branch | Compatible Since | Future Fix Planned? |
| Branch | Compatible Since | Supported |
|------------|------------------|---------------------|
| Hadoop 2.x | | NO |
| Hadoop 3.0 | | NO |
| Hadoop 3.1 | check | Yes |
| Hadoop 3.2 | check | Yes |
| Hadoop 2.x | n/a | WONTFIX |
| Hadoop 3.0 | check | Read-only |
| Hadoop 3.1 | check | Read-only |
| Hadoop 3.2 | check | Read-only |
| Hadoop 3.3 | 3.3.1 | Done |
-------------------------------------------------------
*WONTFIX*
The Hadoop branch-2 line will *not* be patched.
*Read-only*
These branches have read-only compatibility.
* They may list directories with directory markers, and correctly identify when
such directories have child entries.
* They will open files under directories with such markers.
However, they have limitations when writing/deleting directories.
Specifically: S3Guard tables may not be correctly updated in
all conditions, especially on the partial failure of delete
operations. Specifically: they may mistakenly add a tombstone in
the dynamoDB table and so future directory/directory tree listings
will consider the directory to be nonexistent.
_It is not safe for Hadoop releases before Hadoop 3.3.1 to write
to S3 buckets which have directory markers when S3Guard is enabled_
## Verifying read compatibility.
The `s3guard bucket-info` tool [can be used to verify support](#bucket-info).
This allows for a command line check of compatibility, including
in scripts.
@ -49,6 +78,7 @@ It is only safe change the directory marker policy if the following
(including backing up) an S3 bucket.
2. You know all applications which read data from the bucket are compatible.
### <a name="backups"></a> Applications backing up data.
It is not enough to have a version of Apache Hadoop which is compatible, any
@ -240,7 +270,7 @@ can switch to the higher-performance mode for those specific directories.
Only the default setting, `fs.s3a.directory.marker.retention = delete` is compatible with
every shipping Hadoop releases.
## <a name="authoritative"></a> Directory Markers and S3Guard
## <a name="s3guard"></a> Directory Markers and S3Guard
Applications which interact with S3A in S3A clients with S3Guard enabled still
create and delete markers. There's no attempt to skip operations, such as by having
@ -256,6 +286,28 @@ then an S3A connector with a retention policy of `fs.s3a.directory.marker.retent
only use in managed applications where all clients are using the same version of
hadoop, and configured consistently.
After the directory marker feature [HADOOP-13230](https://issues.apache.org/jira/browse/HADOOP-13230)
was added, issues related to S3Guard integration surfaced:
1. The incremental update of the S3Guard table was inserting tombstones
over directories as the markers were deleted, hiding files underneath.
This happened during directory `rename()` and `delete()`.
1. The update of the S3Guard table after a partial failure of a bulk delete
operation would insert tombstones in S3Guard records of successfully
deleted markers, irrespective of the directory status.
Issue #1 is unique to Hadoop branch 3.3; however issue #2 is s critical
part of the S3Guard consistency handling.
Both issues have been fixed in Hadoop 3.3.x,
in [HADOOP-17244](https://issues.apache.org/jira/browse/HADOOP-17244)
Issue #2, delete failure handling, is not easily backported and is
not likely to be backported.
Accordingly: Hadoop releases with read-only compatibility must not be used
to rename or delete directories where markers are retained *when S3Guard is enabled.*
## <a name="bucket-info"></a> Verifying marker policy with `s3guard bucket-info`
The `bucket-info` command has been enhanced to support verification from the command

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.apache.hadoop.conf.Configuration;
@ -141,13 +142,14 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
Path markerPath = fs.keyToQualifiedPath(marker);
keys.add(new DeleteObjectsRequest.KeyVersion(marker));
Pair<List<Path>, List<Path>> pair =
Pair<List<KeyPath>, List<KeyPath>> pair =
new MultiObjectDeleteSupport(fs.createStoreContext(), null)
.splitUndeletedKeys(ex, keys);
assertEquals(undeleted, pair.getLeft());
List<Path> right = pair.getRight();
assertEquals("Wrong size for " + join(right), 1, right.size());
assertEquals(markerPath, right.get(0));
assertEquals(undeleted, toPathList(pair.getLeft()));
List<KeyPath> right = pair.getRight();
Assertions.assertThat(right)
.hasSize(1);
assertEquals(markerPath, right.get(0).getPath());
}
/**

View File

@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
@ -286,4 +287,27 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
s3.putObject(putObjectRequest);
}
@Test
public void testDirMarkerDelete() throws Throwable {
S3AFileSystem fs = getFileSystem();
assumeFilesystemHasMetadatastore(getFileSystem());
Path baseDir = methodPath();
Path subFile = new Path(baseDir, "subdir/file.txt");
// adds the s3guard entry
fs.mkdirs(baseDir);
touch(fs, subFile);
// PUT a marker
createEmptyObject(fs, fs.pathToKey(baseDir) + "/");
fs.delete(baseDir, true);
assertPathDoesNotExist("Should have been deleted", baseDir);
// now create the dir again
fs.mkdirs(baseDir);
FileStatus fileStatus = fs.getFileStatus(baseDir);
Assertions.assertThat(fileStatus)
.matches(FileStatus::isDirectory, "Not a directory");
Assertions.assertThat(fs.listStatus(baseDir))
.describedAs("listing of %s", baseDir)
.isEmpty();
}
}

View File

@ -37,19 +37,13 @@ import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@ -61,13 +55,7 @@ import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.ReflectionUtils;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import javax.annotation.Nullable;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Assume;
@ -82,8 +70,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -1024,8 +1010,14 @@ public final class S3ATestUtils {
* @param expected expected value.
*/
public void assertDiffEquals(String message, long expected) {
Assert.assertEquals(message + ": " + statistic.getSymbol(),
expected, diff());
String text = message + ": " + statistic.getSymbol();
long diff = diff();
if (expected != diff) {
// Log in error ensures that the details appear in the test output
LOG.error(text + " expected {}, actual {}", expected, diff);
}
Assert.assertEquals(text,
expected, diff);
}
/**
@ -1540,292 +1532,4 @@ public final class S3ATestUtils {
probes);
}
public static class MinimalOperationCallbacks
implements OperationCallbacks {
@Override
public S3ObjectAttributes createObjectAttributes(
Path path,
String eTag,
String versionId,
long len) {
return null;
}
@Override
public S3ObjectAttributes createObjectAttributes(
S3AFileStatus fileStatus) {
return null;
}
@Override
public S3AReadOpContext createReadContext(
FileStatus fileStatus) {
return null;
}
@Override
public void finishRename(
Path sourceRenamed,
Path destCreated)
throws IOException {
}
@Override
public void deleteObjectAtPath(
Path path,
String key,
boolean isFile,
BulkOperationState operationState)
throws IOException {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
Path path,
S3AFileStatus status,
boolean collectTombstones,
boolean includeSelf)
throws IOException {
return null;
}
@Override
public CopyResult copyFile(
String srcKey,
String destKey,
S3ObjectAttributes srcAttributes,
S3AReadOpContext readContext)
throws IOException {
return null;
}
@Override
public DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
List<Path> undeletedObjectsOnFailure,
BulkOperationState operationState,
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
@Override
public RemoteIterator<S3AFileStatus> listObjects(
Path path,
String key)
throws IOException {
return null;
}
}
/**
* MetadataStore which tracks what is deleted and added.
*/
public static class OperationTrackingStore implements MetadataStore {
private final List<Path> deleted = new ArrayList<>();
private final List<Path> created = new ArrayList<>();
@Override
public void initialize(final FileSystem fs,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void initialize(final Configuration conf,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void forgetMetadata(final Path path) {
}
@Override
public PathMetadata get(final Path path) {
return null;
}
@Override
public PathMetadata get(final Path path,
final boolean wantEmptyDirectoryFlag) {
return null;
}
@Override
public DirListingMetadata listChildren(final Path path) {
return null;
}
@Override
public void put(final PathMetadata meta) {
put(meta, null);
}
@Override
public void put(final PathMetadata meta,
final BulkOperationState operationState) {
created.add(meta.getFileStatus().getPath());
}
@Override
public void put(final Collection<? extends PathMetadata> metas,
final BulkOperationState operationState) {
metas.stream().forEach(meta -> put(meta, null));
}
@Override
public void put(final DirListingMetadata meta,
final List<Path> unchangedEntries,
final BulkOperationState operationState) {
created.add(meta.getPath());
}
@Override
public void destroy() {
}
@Override
public void delete(final Path path,
final BulkOperationState operationState) {
deleted.add(path);
}
@Override
public void deletePaths(final Collection<Path> paths,
@Nullable final BulkOperationState operationState)
throws IOException {
deleted.addAll(paths);
}
@Override
public void deleteSubtree(final Path path,
final BulkOperationState operationState) {
}
@Override
public void move(@Nullable final Collection<Path> pathsToDelete,
@Nullable final Collection<PathMetadata> pathsToCreate,
@Nullable final BulkOperationState operationState) {
}
@Override
public void prune(final PruneMode pruneMode, final long cutoff) {
}
@Override
public long prune(final PruneMode pruneMode,
final long cutoff,
final String keyPrefix) {
return 0;
}
@Override
public BulkOperationState initiateBulkWrite(
final BulkOperationState.OperationType operation,
final Path dest) {
return null;
}
@Override
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
}
@Override
public Map<String, String> getDiagnostics() {
return null;
}
@Override
public void updateParameters(final Map<String, String> parameters) {
}
@Override
public void close() {
}
public List<Path> getDeleted() {
return deleted;
}
public List<Path> getCreated() {
return created;
}
@Override
public RenameTracker initiateRenameOperation(
final StoreContext storeContext,
final Path source,
final S3AFileStatus sourceStatus,
final Path dest) {
throw new UnsupportedOperationException("unsupported");
}
@Override
public void addAncestors(final Path qualifiedPath,
@Nullable final BulkOperationState operationState) {
}
}
public static class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return null;
}
@Override
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return null;
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status) throws IOException {
return null;
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return null;
}
@Override
public long getDefaultBlockSize(Path path) {
return 0;
}
@Override
public int getMaxKeys() {
return 0;
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
}
}

View File

@ -42,7 +42,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -58,6 +57,7 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
import static org.apache.hadoop.fs.s3a.Statistic.FILES_DELETE_REJECTED;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_OBJECTS;
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_DELETE_REQUESTS;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement;
@ -72,6 +72,7 @@ import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.extractUndeletedPaths;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount;
import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@ -331,27 +332,37 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
removeBucketOverrides(bucketName, conf,
MAX_THREADS,
MAXIMUM_CONNECTIONS,
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY);
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
DIRECTORY_MARKER_POLICY,
BULK_DELETE_PAGE_SIZE);
conf.setInt(MAX_THREADS, EXECUTOR_THREAD_COUNT);
conf.setInt(MAXIMUM_CONNECTIONS, EXECUTOR_THREAD_COUNT * 2);
// turn off prune delays, so as to stop scale tests creating
// so much cruft that future CLI prune commands take forever
conf.setInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY, 0);
// use the keep policy to ensure that surplus markers exist
// to complicate failures
conf.set(DIRECTORY_MARKER_POLICY, DIRECTORY_MARKER_POLICY_KEEP);
// set the delete page size to its maximum to ensure that all
// entries are included in the same large delete, even on
// scale runs. This is needed for assertions on the result.
conf.setInt(BULK_DELETE_PAGE_SIZE, 1_000);
return conf;
}
/**
* Create a unique path, which includes method name,
* multidelete flag and a random UUID.
* multidelete flag and a timestamp.
* @return a string to use for paths.
* @throws IOException path creation failure.
*/
private Path uniquePath() throws IOException {
long now = System.currentTimeMillis();
return path(
String.format("%s-%s-%04d",
String.format("%s-%s-%06d.%03d",
getMethodName(),
multiDelete ? "multi" : "single",
System.currentTimeMillis() % 10000));
now / 1000, now % 1000));
}
/**
@ -477,8 +488,11 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
// create a set of files
// this is done in parallel as it is 10x faster on a long-haul test run.
List<Path> createdFiles = createFiles(fs, readOnlyDir, dirDepth, fileCount,
dirCount);
List<Path> dirs = new ArrayList<>(dirCount);
List<Path> createdFiles = createDirsAndFiles(fs, readOnlyDir, dirDepth,
fileCount, dirCount,
new ArrayList<>(fileCount),
dirs);
// are they all there?
int expectedFileCount = createdFiles.size();
assertFileCount("files ready to rename", roleFS,
@ -495,26 +509,36 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
MultiObjectDeleteException.class, deniedException);
final List<Path> undeleted
= extractUndeletedPaths(mde, fs::keyToQualifiedPath);
List<Path> expectedUndeletedFiles = new ArrayList<>(createdFiles);
if (getFileSystem().getDirectoryMarkerPolicy()
.keepDirectoryMarkers(readOnlyDir)) {
// directory markers are being retained,
// so will also be in the list of undeleted files
expectedUndeletedFiles.addAll(dirs);
}
Assertions.assertThat(undeleted)
.as("files which could not be deleted")
.hasSize(expectedFileCount)
.containsAll(createdFiles)
.containsExactlyInAnyOrderElementsOf(createdFiles);
.containsExactlyInAnyOrderElementsOf(expectedUndeletedFiles);
}
LOG.info("Result of renaming read-only files is as expected",
deniedException);
assertFileCount("files in the source directory", roleFS,
readOnlyDir, expectedFileCount);
// now lets look at the destination.
// even with S3Guard on, we expect the destination to match that of our
// even with S3Guard on, we expect the destination to match that of
// the remote state.
// the test will exist
describe("Verify destination directory exists");
FileStatus st = roleFS.getFileStatus(writableDir);
assertTrue("Not a directory: " + st,
st.isDirectory());
assertIsDirectory(writableDir);
assertFileCount("files in the dest directory", roleFS,
writableDir, expectedFileCount);
// all directories in the source tree must still exist,
// which for S3Guard means no tombstone markers were added
LOG.info("Verifying all directories still exist");
for (Path dir : dirs) {
assertIsDirectory(dir);
}
}
@Test
@ -611,9 +635,14 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
// the full FS
S3AFileSystem fs = getFileSystem();
StoreContext storeContext = fs.createStoreContext();
List<Path> readOnlyFiles = createFiles(fs, readOnlyDir,
dirDepth, fileCount, dirCount);
List<Path> dirs = new ArrayList<>(dirCount);
List<Path> readOnlyFiles = createDirsAndFiles(
fs, readOnlyDir, dirDepth,
fileCount, dirCount,
new ArrayList<>(fileCount),
dirs);
List<Path> deletableFiles = createFiles(fs,
writableDir, dirDepth, fileCount, dirCount);
@ -625,20 +654,31 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
readOnlyFiles.stream(),
deletableFiles.stream())
.collect(Collectors.toList());
List<MultiObjectDeleteSupport.KeyPath> keyPaths = allFiles.stream()
.map(path ->
new MultiObjectDeleteSupport.KeyPath(
storeContext.pathToKey(path),
path,
false))
.collect(Collectors.toList());
// this set can be deleted by the role FS
MetricDiff rejectionCount = new MetricDiff(roleFS, FILES_DELETE_REJECTED);
MetricDiff deleteVerbCount = new MetricDiff(roleFS, OBJECT_DELETE_REQUESTS);
MetricDiff deleteObjectCount = new MetricDiff(roleFS,
OBJECT_DELETE_OBJECTS);
describe("Trying to delete read only directory");
AccessDeniedException ex = expectDeleteForbidden(readOnlyDir);
if (multiDelete) {
// multi-delete status checks
extractCause(MultiObjectDeleteException.class, ex);
deleteVerbCount.assertDiffEquals("Wrong delete request count", 1);
deleteObjectCount.assertDiffEquals("Number of keys in delete request",
readOnlyFiles.size());
rejectionCount.assertDiffEquals("Wrong rejection count",
readOnlyFiles.size());
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
reset(rejectionCount, deleteVerbCount);
reset(rejectionCount, deleteVerbCount, deleteObjectCount);
}
// all the files are still there? (avoid in scale test due to cost)
if (!scaleTest) {
@ -649,16 +689,20 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
ex = expectDeleteForbidden(basePath);
if (multiDelete) {
// multi-delete status checks
extractCause(MultiObjectDeleteException.class, ex);
deleteVerbCount.assertDiffEquals("Wrong delete count", 1);
MultiObjectDeleteException mde = extractCause(
MultiObjectDeleteException.class, ex);
final List<Path> undeleted
= removeUndeletedPaths(mde, allFiles, fs::keyToQualifiedPath);
List<MultiObjectDeleteSupport.KeyPath> undeletedKeyPaths =
removeUndeletedPaths(mde, keyPaths, storeContext::keyToPath);
final List<Path> undeleted = toPathList(
undeletedKeyPaths);
deleteObjectCount.assertDiffEquals(
"Wrong count of objects in delete request",
allFiles.size());
Assertions.assertThat(undeleted)
.as("files which could not be deleted")
.containsExactlyInAnyOrderElementsOf(readOnlyFiles);
Assertions.assertThat(allFiles)
Assertions.assertThat(toPathList(keyPaths))
.as("files which were deleted")
.containsExactlyInAnyOrderElementsOf(deletableFiles);
rejectionCount.assertDiffEquals("Wrong rejection count",
@ -677,7 +721,26 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
Assertions.assertThat(readOnlyListing)
.as("ReadOnly directory " + directoryList)
.containsAll(readOnlyFiles);
.containsExactlyInAnyOrderElementsOf(readOnlyFiles);
}
/**
* Verifies the logic of handling directory markers in
* delete operations, specifically:
* <ol>
* <li>all markers above empty directories MUST be deleted</li>
* <li>all markers above non-empty directories MUST NOT be deleted</li>
* </ol>
* As the delete list may include subdirectories, we need to work up from
* the bottom of the list of deleted files before probing the parents,
* that being done by a s3guard get(path, need-empty-directory) call.
* <p></p>
* This is pretty sensitive code.
*/
@Test
public void testSubdirDeleteFailures() throws Throwable {
describe("Multiobject delete handling of directorYesFory markers");
assume("Multiobject delete only", multiDelete);
}
/**
@ -771,7 +834,7 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
}
/**
* Parallel-touch a set of files in the destination directory.
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
@ -784,12 +847,48 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase {
final int depth,
final int fileCount,
final int dirCount) throws IOException {
List<CompletableFuture<Path>> futures = new ArrayList<>(fileCount);
List<Path> paths = new ArrayList<>(fileCount);
List<Path> dirs = new ArrayList<>(fileCount);
return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount,
new ArrayList<Path>(fileCount),
new ArrayList<Path>(dirCount));
}
/**
* Build a set of files in a directory tree.
* @param fs filesystem
* @param destDir destination
* @param depth file depth
* @param fileCount number of files to create.
* @param dirCount number of dirs to create at each level
* @param paths [out] list of file paths created
* @param dirs [out] list of directory paths created.
* @return the list of files created.
*/
public static List<Path> createDirsAndFiles(final FileSystem fs,
final Path destDir,
final int depth,
final int fileCount,
final int dirCount,
final List<Path> paths,
final List<Path> dirs) throws IOException {
buildPaths(paths, dirs, destDir, depth, fileCount, dirCount);
List<CompletableFuture<Path>> futures = new ArrayList<>(paths.size()
+ dirs.size());
// create directories. With dir marker retention, that adds more entries
// to cause deletion issues
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", fileCount)) {
new DurationInfo(LOG, "Creating %d directories", dirs.size())) {
for (Path path : dirs) {
futures.add(submit(EXECUTOR, () ->{
fs.mkdirs(path);
return path;
}));
}
waitForCompletion(futures);
}
try (DurationInfo ignore =
new DurationInfo(LOG, "Creating %d files", paths.size())) {
for (Path path : paths) {
futures.add(put(fs, path, path.getName()));
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.impl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -36,9 +37,11 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.test.OperationTrackingStore;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.ACCESS_DENIED;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.removeUndeletedPaths;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.toPathList;
import static org.junit.Assert.assertEquals;
/**
@ -56,36 +59,42 @@ public class TestPartialDeleteFailures {
return new Path("s3a://bucket/" + k);
}
private static String toKey(Path path) {
return path.toUri().getPath();
}
@Before
public void setUp() throws Exception {
context = S3ATestUtils.createMockStoreContext(true,
new S3ATestUtils.OperationTrackingStore(), CONTEXT_ACCESSORS);
new OperationTrackingStore(), CONTEXT_ACCESSORS);
}
@Test
public void testDeleteExtraction() {
List<Path> src = pathList("a", "a/b", "a/c");
List<Path> rejected = pathList("a/b");
List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
rejected);
List<Path> undeleted = removeUndeletedPaths(ex, src,
TestPartialDeleteFailures::qualifyKey);
List<MultiObjectDeleteSupport.KeyPath> undeleted =
removeUndeletedPaths(ex, src,
TestPartialDeleteFailures::qualifyKey);
assertEquals("mismatch of rejected and undeleted entries",
rejected, undeleted);
}
@Test
public void testSplitKeysFromResults() throws Throwable {
List<Path> src = pathList("a", "a/b", "a/c");
List<Path> rejected = pathList("a/b");
List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(src);
List<MultiObjectDeleteSupport.KeyPath> src = pathList("a", "a/b", "a/c");
List<MultiObjectDeleteSupport.KeyPath> rejected = pathList("a/b");
List<DeleteObjectsRequest.KeyVersion> keys = keysToDelete(toPathList(src));
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
rejected);
Pair<List<Path>, List<Path>> pair =
Pair<List<MultiObjectDeleteSupport.KeyPath>,
List<MultiObjectDeleteSupport.KeyPath>> pair =
new MultiObjectDeleteSupport(context, null)
.splitUndeletedKeys(ex, keys);
List<Path> undeleted = pair.getLeft();
List<Path> deleted = pair.getRight();
List<MultiObjectDeleteSupport.KeyPath> undeleted = pair.getLeft();
List<MultiObjectDeleteSupport.KeyPath> deleted = pair.getRight();
assertEquals(rejected, undeleted);
// now check the deleted list to verify that it is valid
src.remove(rejected.get(0));
@ -97,9 +106,12 @@ public class TestPartialDeleteFailures {
* @param paths paths to qualify and then convert to a lst.
* @return same paths as a list.
*/
private List<Path> pathList(String... paths) {
private List<MultiObjectDeleteSupport.KeyPath> pathList(String... paths) {
return Arrays.stream(paths)
.map(TestPartialDeleteFailures::qualifyKey)
.map(k->
new MultiObjectDeleteSupport.KeyPath(k,
qualifyKey(k),
k.endsWith("/")))
.collect(Collectors.toList());
}
@ -111,12 +123,13 @@ public class TestPartialDeleteFailures {
*/
private MultiObjectDeleteException createDeleteException(
final String code,
final List<Path> rejected) {
final List<MultiObjectDeleteSupport.KeyPath> rejected) {
List<MultiObjectDeleteException.DeleteError> errors = rejected.stream()
.map((p) -> {
.map((kp) -> {
Path p = kp.getPath();
MultiObjectDeleteException.DeleteError e
= new MultiObjectDeleteException.DeleteError();
e.setKey(p.toUri().getPath());
e.setKey(kp.getKey());
e.setCode(code);
e.setMessage("forbidden");
return e;
@ -125,14 +138,33 @@ public class TestPartialDeleteFailures {
}
/**
* From a list of paths, build up the list of keys for a delete request.
* From a list of paths, build up the list of KeyVersion records
* for a delete request.
* All the entries will be files (i.e. no trailing /)
* @param paths path list
* @return a key list suitable for a delete request.
*/
public static List<DeleteObjectsRequest.KeyVersion> keysToDelete(
List<Path> paths) {
return paths.stream()
.map((p) -> p.toUri().getPath())
.map(p -> {
String uripath = p.toUri().getPath();
return uripath.substring(1);
})
.map(DeleteObjectsRequest.KeyVersion::new)
.collect(Collectors.toList());
}
/**
* From a list of keys, build up the list of keys for a delete request.
* If a key has a trailing /, that will be retained, so it will be
* considered a directory during multi-object delete failure handling
* @param keys key list
* @return a key list suitable for a delete request.
*/
public static List<DeleteObjectsRequest.KeyVersion> toDeleteRequests(
List<String> keys) {
return keys.stream()
.map(DeleteObjectsRequest.KeyVersion::new)
.collect(Collectors.toList());
}
@ -143,23 +175,33 @@ public class TestPartialDeleteFailures {
*/
@Test
public void testProcessDeleteFailure() throws Throwable {
Path pathA = qualifyKey("/a");
Path pathAB = qualifyKey("/a/b");
Path pathAC = qualifyKey("/a/c");
String keyA = "/a/";
String keyAB = "/a/b";
String keyAC = "/a/c";
Path pathA = qualifyKey(keyA);
Path pathAB = qualifyKey(keyAB);
Path pathAC = qualifyKey(keyAC);
List<String> srcKeys = Lists.newArrayList(keyA, keyAB, keyAC);
List<Path> src = Lists.newArrayList(pathA, pathAB, pathAC);
List<DeleteObjectsRequest.KeyVersion> keyList = keysToDelete(src);
List<DeleteObjectsRequest.KeyVersion> keyList = toDeleteRequests(srcKeys);
List<Path> deleteForbidden = Lists.newArrayList(pathAB);
final List<Path> deleteAllowed = Lists.newArrayList(pathA, pathAC);
List<MultiObjectDeleteSupport.KeyPath> forbiddenKP =
Lists.newArrayList(
new MultiObjectDeleteSupport.KeyPath(keyAB, pathAB, true));
MultiObjectDeleteException ex = createDeleteException(ACCESS_DENIED,
deleteForbidden);
S3ATestUtils.OperationTrackingStore store
= new S3ATestUtils.OperationTrackingStore();
forbiddenKP);
OperationTrackingStore store
= new OperationTrackingStore();
StoreContext storeContext = S3ATestUtils
.createMockStoreContext(true, store, CONTEXT_ACCESSORS);
MultiObjectDeleteSupport deleteSupport
= new MultiObjectDeleteSupport(storeContext, null);
List<Path> retainedMarkers = new ArrayList<>();
Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>>
triple = deleteSupport.processDeleteFailure(ex, keyList);
triple = deleteSupport.processDeleteFailure(ex,
keyList,
retainedMarkers);
Assertions.assertThat(triple.getRight())
.as("failure list")
.isEmpty();
@ -173,6 +215,14 @@ public class TestPartialDeleteFailures {
as("undeleted store entries")
.containsAll(deleteForbidden)
.doesNotContainAnyElementsOf(deleteAllowed);
// because dir marker retention is on, we expect at least one retained
// marker
Assertions.assertThat(retainedMarkers).
as("Retained Markers")
.containsExactly(pathA);
Assertions.assertThat(store.getDeleted()).
as("List of tombstoned records")
.doesNotContain(pathA);
}

View File

@ -111,7 +111,10 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
keepMarkers
? DIRECTORY_MARKER_POLICY_KEEP
: DIRECTORY_MARKER_POLICY_DELETE);
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
if (isGuarded()) {
conf.set(S3_METADATA_STORE_IMPL, S3GUARD_METASTORE_DYNAMO);
conf.setBoolean(METADATASTORE_AUTHORITATIVE, authoritative);
}
disableFilesystemCaching(conf);
return conf;
}
@ -148,6 +151,7 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
INVOCATION_COPY_FROM_LOCAL_FILE,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
OBJECT_PUT_BYTES,

View File

@ -19,16 +19,20 @@
package org.apache.hadoop.fs.s3a.performance;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collection;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Tristate;
@ -37,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Use metrics to assert about the cost of file API calls.
@ -58,7 +63,9 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
{"raw-keep-markers", false, true, false},
{"raw-delete-markers", false, false, false},
{"nonauth-keep-markers", true, true, false},
{"auth-delete-markers", true, false, true}
{"nonauth-delete-markers", true, false, false},
{"auth-delete-markers", true, false, true},
{"auth-keep-markers", true, true, true}
});
}
@ -145,7 +152,7 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
boolean rawAndDeleting = isRaw() && isDeleting();
verifyMetrics(() -> {
fs.delete(file1, false);
return "after fs.delete(file1simpleFile) " + getMetricSummary();
return "after fs.delete(file1) " + getMetricSummary();
},
// delete file. For keeping: that's it
probe(rawAndKeeping, OBJECT_METADATA_REQUESTS,
@ -173,7 +180,24 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
public void testDirMarkersSubdir() throws Throwable {
describe("verify cost of deep subdir creation");
Path subDir = new Path(methodPath(), "1/2/3/4/5/6");
Path methodPath = methodPath();
Path parent = new Path(methodPath, "parent");
Path subDir = new Path(parent, "1/2/3/4/5/6");
S3AFileSystem fs = getFileSystem();
// this creates a peer of the parent dir, so ensures
// that when parent dir is deleted, no markers need to
// be recreated...that complicates all the metrics which
// are measured
Path sibling = new Path(methodPath, "sibling");
ContractTestUtils.touch(fs, sibling);
int dirsCreated = 2;
fs.delete(parent, true);
LOG.info("creating parent dir {}", parent);
fs.mkdirs(parent);
LOG.info("creating sub directory {}", subDir);
// one dir created, possibly a parent removed
verifyMetrics(() -> {
mkdirs(subDir);
@ -187,6 +211,47 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
// delete all possible fake dirs above the subdirectory
withWhenDeleting(FAKE_DIRECTORIES_DELETED,
directoriesInPath(subDir) - 1));
int dirDeleteRequests = 1;
int fileDeleteRequests = 0;
int totalDeleteRequests = dirDeleteRequests + fileDeleteRequests;
LOG.info("About to delete {}", parent);
// now delete the deep tree.
verifyMetrics(() -> {
fs.delete(parent, true);
return "deleting parent dir " + parent + " " + getMetricSummary();
},
// two directory markers will be deleted in a single request
with(OBJECT_DELETE_REQUESTS, totalDeleteRequests),
// keeping: the parent dir marker needs deletion alongside
// the subdir one.
withWhenKeeping(OBJECT_DELETE_OBJECTS, dirsCreated),
// deleting: only the marker at the bottom needs deleting
withWhenDeleting(OBJECT_DELETE_OBJECTS, 1));
// followup with list calls to make sure all is clear.
verifyNoListing(parent);
verifyNoListing(subDir);
// now reinstate the directory, which in HADOOP-17244 hitting problems
fs.mkdirs(parent);
FileStatus[] children = fs.listStatus(parent);
Assertions.assertThat(children)
.describedAs("Children of %s", parent)
.isEmpty();
}
/**
* List a path, verify that there are no direct child entries.
* @param path path to scan
*/
protected void verifyNoListing(final Path path) throws Exception {
intercept(FileNotFoundException.class, () -> {
FileStatus[] statuses = getFileSystem().listStatus(path);
return Arrays.deepToString(statuses);
});
}
@Test

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3ListRequest;
import org.apache.hadoop.fs.s3a.S3ListResult;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
/**
* Stub implementation of {@link ListingOperationCallbacks}.
*/
public class MinimalListingOperationCallbacks
implements ListingOperationCallbacks {
@Override
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request)
throws IOException {
return null;
}
@Override
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult)
throws IOException {
return null;
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status) throws IOException {
return null;
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter) {
return null;
}
@Override
public long getDefaultBlockSize(Path path) {
return 0;
}
@Override
public int getMaxKeys() {
return 0;
}
@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.test;
import java.io.IOException;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
/**
* Stub implementation of {@link OperationCallbacks}.
*/
public class MinimalOperationCallbacks
implements OperationCallbacks {
@Override
public S3ObjectAttributes createObjectAttributes(
Path path,
String eTag,
String versionId,
long len) {
return null;
}
@Override
public S3ObjectAttributes createObjectAttributes(
S3AFileStatus fileStatus) {
return null;
}
@Override
public S3AReadOpContext createReadContext(
FileStatus fileStatus) {
return null;
}
@Override
public void finishRename(
Path sourceRenamed,
Path destCreated)
throws IOException {
}
@Override
public void deleteObjectAtPath(
Path path,
String key,
boolean isFile,
BulkOperationState operationState)
throws IOException {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean collectTombstones,
final boolean includeSelf) throws IOException {
return null;
}
@Override
public CopyResult copyFile(
String srcKey,
String destKey,
S3ObjectAttributes srcAttributes,
S3AReadOpContext readContext)
throws IOException {
return null;
}
@Override
public DeleteObjectsResult removeKeys(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir,
List<Path> undeletedObjectsOnFailure,
BulkOperationState operationState,
boolean quiet)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
return null;
}
@Override
public boolean allowAuthoritative(Path p) {
return false;
}
@Override
public RemoteIterator<S3AFileStatus> listObjects(
Path path,
String key)
throws IOException {
return null;
}
}

View File

@ -0,0 +1,189 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.test;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.RenameTracker;
/**
* MetadataStore which tracks what is deleted and added.
*/
public class OperationTrackingStore implements MetadataStore {
private final List<Path> deleted = new ArrayList<>();
private final List<Path> created = new ArrayList<>();
@Override
public void initialize(final FileSystem fs,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void initialize(final Configuration conf,
ITtlTimeProvider ttlTimeProvider) {
}
@Override
public void forgetMetadata(final Path path) {
}
@Override
public PathMetadata get(final Path path) {
return null;
}
@Override
public PathMetadata get(final Path path,
final boolean wantEmptyDirectoryFlag) {
return null;
}
@Override
public DirListingMetadata listChildren(final Path path) {
return null;
}
@Override
public void put(final PathMetadata meta) {
put(meta, null);
}
@Override
public void put(final PathMetadata meta,
final BulkOperationState operationState) {
created.add(meta.getFileStatus().getPath());
}
@Override
public void put(final Collection<? extends PathMetadata> metas,
final BulkOperationState operationState) {
metas.stream().forEach(meta -> put(meta, null));
}
@Override
public void put(final DirListingMetadata meta,
final List<Path> unchangedEntries,
final BulkOperationState operationState) {
created.add(meta.getPath());
}
@Override
public void destroy() {
}
@Override
public void delete(final Path path,
final BulkOperationState operationState) {
deleted.add(path);
}
@Override
public void deletePaths(final Collection<Path> paths,
@Nullable final BulkOperationState operationState)
throws IOException {
deleted.addAll(paths);
}
@Override
public void deleteSubtree(final Path path,
final BulkOperationState operationState) {
}
@Override
public void move(@Nullable final Collection<Path> pathsToDelete,
@Nullable final Collection<PathMetadata> pathsToCreate,
@Nullable final BulkOperationState operationState) {
}
@Override
public void prune(final PruneMode pruneMode, final long cutoff) {
}
@Override
public long prune(final PruneMode pruneMode,
final long cutoff,
final String keyPrefix) {
return 0;
}
@Override
public BulkOperationState initiateBulkWrite(
final BulkOperationState.OperationType operation,
final Path dest) {
return null;
}
@Override
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
}
@Override
public Map<String, String> getDiagnostics() {
return null;
}
@Override
public void updateParameters(final Map<String, String> parameters) {
}
@Override
public void close() {
}
public List<Path> getDeleted() {
return deleted;
}
public List<Path> getCreated() {
return created;
}
@Override
public RenameTracker initiateRenameOperation(
final StoreContext storeContext,
final Path source,
final S3AFileStatus sourceStatus,
final Path dest) {
throw new UnsupportedOperationException("unsupported");
}
@Override
public void addAncestors(final Path qualifiedPath,
@Nullable final BulkOperationState operationState) {
}
}