HADOOP-17244. S3A directory delete tombstones dir markers prematurely. (#2280)

This changes directory tree deletion so that only files are incrementally deleted
from S3Guard after the objects are deleted; the directories are left alone
until metadataStore.deleteSubtree(path) is invoke.

This avoids directory tombstones being added above files/child directories,
which stop the treewalk and delete phase from working.

Also:

* Callback to delete objects splits files and dirs so that
any problems deleting the dirs doesn't trigger s3guard updates
* New statistic to measure #of objects deleted, alongside request count.
* Callback listFilesAndEmptyDirectories renamed listFilesAndDirectoryMarkers
  to clarify behavior.
* Test enhancements to replicate the failure and verify the fix

Contributed by Steve Loughran

Change-Id: I0e6ea2c35e487267033b1664228c8837279a35c7
This commit is contained in:
Steve Loughran 2020-09-10 17:03:52 +01:00
parent ea37a05d4b
commit 0c82eb0324
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
13 changed files with 201 additions and 30 deletions

View File

@ -774,8 +774,13 @@ public abstract class AbstractFileSystem implements PathCapabilities {
if (dstStatus.isDirectory()) {
RemoteIterator<FileStatus> list = listStatusIterator(dst);
if (list != null && list.hasNext()) {
FileStatus child = list.next();
LOG.debug("Rename {}, {} failing as destination has"
+ " at least one child: {}",
src, dst, child);
throw new IOException(
"Rename cannot overwrite non empty destination directory " + dst);
"Rename cannot overwrite non empty destination directory " + dst
+ " containing child: " + child.getPath());
}
}
delete(dst, false);

View File

@ -1306,14 +1306,33 @@ public abstract class FileContextMainOperationsBaseTest {
protected void rename(Path src, Path dst, boolean srcExists,
boolean dstExists, Rename... options) throws IOException {
IOException ioe = null;
try {
fc.rename(src, dst, options);
} finally {
} catch (IOException ex) {
// lets not swallow this completely.
LOG.warn("Rename result: " + ex, ex);
ioe = ex;
}
// There's a bit of extra work in these assertions, as if they fail
// any IOE caught earlier is added as the cause. This
// gets the likely root cause to the test report
try {
LOG.debug("Probing source and destination");
Assert.assertEquals("Source exists", srcExists, exists(fc, src));
Assert.assertEquals("Destination exists", dstExists, exists(fc, dst));
} catch (AssertionError e) {
if (ioe != null && e.getCause() == null) {
e.initCause(ioe);
}
throw e;
}
if (ioe != null) {
throw ioe;
}
}
private boolean containsPath(Path path, FileStatus[] filteredPaths)
throws IOException {
for(int i = 0; i < filteredPaths.length; i ++) {

View File

@ -1574,7 +1574,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
@Override
@Retries.RetryTranslated
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean collectTombstones,
@ -2079,6 +2079,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
incrementStatistic(OBJECT_DELETE_REQUESTS);
incrementStatistic(OBJECT_DELETE_OBJECTS);
s3.deleteObject(bucket, key);
return null;
});
@ -2125,9 +2126,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
/**
* Perform a bulk object delete operation.
* Perform a bulk object delete operation against S3; leaves S3Guard
* alone.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* operation statistics
* <p></p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p></p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
@ -2148,9 +2154,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.getKeys().size();
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
deleteRequest.getKeys().size())) {
keyCount)) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
@ -2159,6 +2166,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
},
() -> {
incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3.deleteObjects(deleteRequest);
});
} catch (MultiObjectDeleteException e) {

View File

@ -156,6 +156,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
INVOCATION_RENAME,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_CONTINUE_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,

View File

@ -85,6 +85,8 @@ public enum Statistic {
"Calls of rename()"),
OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
OBJECT_DELETE_OBJECTS("object_delete_objects",
"Objects deleted in delete requests"),
OBJECT_LIST_REQUESTS("object_list_requests",
"Number of object listings made"),
OBJECT_CONTINUE_LIST_REQUESTS("object_continue_list_requests",

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
@ -152,10 +153,13 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
/**
* List of keys built up for the next delete batch.
*/
private List<DeleteObjectsRequest.KeyVersion> keys;
private List<DeleteEntry> keys;
/**
* List of paths built up for deletion.
* List of paths built up for incremental deletion on tree delete.
* At the end of the entire delete the full tree is scanned in S3Guard
* and tombstones added. For this reason this list of paths <i>must not</i>
* include directory markers, as that will break the scan.
*/
private List<Path> paths;
@ -323,7 +327,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
// list files including any under tombstones through S3Guard
LOG.debug("Getting objects for directory prefix {} to delete", dirKey);
final RemoteIterator<S3ALocatedFileStatus> locatedFiles =
callbacks.listFilesAndEmptyDirectories(path, status,
callbacks.listFilesAndDirectoryMarkers(path, status,
false, true);
// iterate through and delete. The next() call will block when a new S3
@ -359,7 +363,10 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
while (objects.hasNext()) {
// get the next entry in the listing.
extraFilesDeleted++;
queueForDeletion(deletionKey(objects.next()), null);
S3AFileStatus next = objects.next();
LOG.debug("Found Unlisted entry {}", next);
queueForDeletion(deletionKey(next), null,
next.isDirectory());
}
if (extraFilesDeleted > 0) {
LOG.debug("Raw S3 Scan found {} extra file(s) to delete",
@ -402,7 +409,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
*/
private void queueForDeletion(
final S3AFileStatus stat) throws IOException {
queueForDeletion(deletionKey(stat), stat.getPath());
queueForDeletion(deletionKey(stat), stat.getPath(), stat.isDirectory());
}
/**
@ -413,14 +420,18 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
*
* @param key key to delete
* @param deletePath nullable path of the key
* @param isDirMarker is the entry a directory?
* @throws IOException failure of the previous batch of deletions.
*/
private void queueForDeletion(final String key,
@Nullable final Path deletePath) throws IOException {
@Nullable final Path deletePath,
boolean isDirMarker) throws IOException {
LOG.debug("Adding object to delete: \"{}\"", key);
keys.add(new DeleteObjectsRequest.KeyVersion(key));
keys.add(new DeleteEntry(key, isDirMarker));
if (deletePath != null) {
paths.add(deletePath);
if (!isDirMarker) {
paths.add(deletePath);
}
}
if (keys.size() == pageSize) {
@ -484,7 +495,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
* @return the submitted future or null
*/
private CompletableFuture<Void> submitDelete(
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList) {
if (keyList.isEmpty() && pathList.isEmpty()) {
@ -514,31 +525,59 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
@Retries.RetryTranslated
private void asyncDeleteAction(
final BulkOperationState state,
final List<DeleteObjectsRequest.KeyVersion> keyList,
final List<DeleteEntry> keyList,
final List<Path> pathList,
final boolean auditDeletedKeys)
throws IOException {
List<DeleteObjectsResult.DeletedObject> deletedObjects = new ArrayList<>();
try (DurationInfo ignored =
new DurationInfo(LOG, false, "Delete page of keys")) {
DeleteObjectsResult result = null;
List<Path> undeletedObjects = new ArrayList<>();
if (!keyList.isEmpty()) {
result = Invoker.once("Remove S3 Keys",
// first delete the files.
List<DeleteObjectsRequest.KeyVersion> files = keyList.stream()
.filter(e -> !e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
result = Invoker.once("Remove S3 Files",
status.getPath().toString(),
() -> callbacks.removeKeys(
keyList,
files,
false,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
// now the dirs
List<DeleteObjectsRequest.KeyVersion> dirs = keyList.stream()
.filter(e -> e.isDirMarker)
.map(e -> e.keyVersion)
.collect(Collectors.toList());
// This is invoked with deleteFakeDir = true, so
// S3Guard is not updated.
result = Invoker.once("Remove S3 Dir Markers",
status.getPath().toString(),
() -> callbacks.removeKeys(
dirs,
true,
undeletedObjects,
state,
!auditDeletedKeys));
if (result != null) {
deletedObjects.addAll(result.getDeletedObjects());
}
}
if (!pathList.isEmpty()) {
// delete file paths only. This stops tombstones
// being added until the final directory cleanup
// (HADOOP-17244)
metadataStore.deletePaths(pathList, state);
}
if (auditDeletedKeys && result != null) {
if (auditDeletedKeys) {
// audit the deleted keys
List<DeleteObjectsResult.DeletedObject> deletedObjects =
result.getDeletedObjects();
if (deletedObjects.size() != keyList.size()) {
// size mismatch
LOG.warn("Size mismatch in deletion operation. "
@ -549,7 +588,7 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
for (DeleteObjectsResult.DeletedObject del : deletedObjects) {
keyList.removeIf(kv -> kv.getKey().equals(del.getKey()));
}
for (DeleteObjectsRequest.KeyVersion kv : keyList) {
for (DeleteEntry kv : keyList) {
LOG.debug("{}", kv.getKey());
}
}
@ -557,5 +596,31 @@ public class DeleteOperation extends ExecutingStoreOperation<Boolean> {
}
}
/**
* Deletion entry; dir marker state is tracked to control S3Guard
* update policy.
*/
private static final class DeleteEntry {
private final DeleteObjectsRequest.KeyVersion keyVersion;
private final boolean isDirMarker;
private DeleteEntry(final String key, final boolean isDirMarker) {
this.keyVersion = new DeleteObjectsRequest.KeyVersion(key);
this.isDirMarker = isDirMarker;
}
public String getKey() {
return keyVersion.getKey();
}
@Override
public String toString() {
return "DeleteEntry{" +
"key='" + getKey() + '\'' +
", isDirMarker=" + isDirMarker +
'}';
}
}
}

View File

@ -105,7 +105,7 @@ public interface OperationCallbacks {
throws IOException;
/**
* Recursive list of files and empty directories.
* Recursive list of files and directory markers.
*
* @param path path to list from
* @param status optional status of path to list.
@ -115,7 +115,7 @@ public interface OperationCallbacks {
* @throws IOException failure
*/
@Retries.RetryTranslated
RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
Path path,
S3AFileStatus status,
boolean collectTombstones,

View File

@ -400,6 +400,7 @@ Are * @throws IOException failure
destStatus.getPath());
// Although the dir marker policy doesn't always need to do this,
// it's simplest just to be consistent here.
// note: updates the metastore as well a S3.
callbacks.deleteObjectAtPath(destStatus.getPath(), dstKey, false, null);
}
@ -411,7 +412,7 @@ Are * @throws IOException failure
false);
final RemoteIterator<S3ALocatedFileStatus> iterator =
callbacks.listFilesAndEmptyDirectories(parentPath,
callbacks.listFilesAndDirectoryMarkers(parentPath,
sourceStatus,
true,
true);

View File

@ -717,7 +717,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
public DDBPathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
throws IOException {
checkPath(path);
LOG.debug("Get from table {} in region {}: {}. wantEmptyDirectory={}",
LOG.debug("Get from table {} in region {}: {} ; wantEmptyDirectory={}",
tableName, region, path, wantEmptyDirectoryFlag);
DDBPathMetadata result = innerGet(path, wantEmptyDirectoryFlag);
LOG.debug("result of get {} is: {}", path, result);

View File

@ -32,6 +32,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
@ -286,4 +287,27 @@ public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
s3.putObject(putObjectRequest);
}
@Test
public void testDirMarkerDelete() throws Throwable {
S3AFileSystem fs = getFileSystem();
assumeFilesystemHasMetadatastore(getFileSystem());
Path baseDir = methodPath();
Path subFile = new Path(baseDir, "subdir/file.txt");
// adds the s3guard entry
fs.mkdirs(baseDir);
touch(fs, subFile);
// PUT a marker
createEmptyObject(fs, fs.pathToKey(baseDir) + "/");
fs.delete(baseDir, true);
assertPathDoesNotExist("Should have been deleted", baseDir);
// now create the dir again
fs.mkdirs(baseDir);
FileStatus fileStatus = fs.getFileStatus(baseDir);
Assertions.assertThat(fileStatus)
.matches(FileStatus::isDirectory, "Not a directory");
Assertions.assertThat(fs.listStatus(baseDir))
.describedAs("listing of %s", baseDir)
.isEmpty();
}
}

View File

@ -333,7 +333,7 @@ public class TestPartialDeleteFailures {
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
Path path,
S3AFileStatus status,
boolean collectTombstones,

View File

@ -148,6 +148,7 @@ public class AbstractS3ACostTest extends AbstractS3ATestBase {
INVOCATION_COPY_FROM_LOCAL_FILE,
OBJECT_COPY_REQUESTS,
OBJECT_DELETE_REQUESTS,
OBJECT_DELETE_OBJECTS,
OBJECT_LIST_REQUESTS,
OBJECT_METADATA_REQUESTS,
OBJECT_PUT_BYTES,

View File

@ -19,15 +19,18 @@
package org.apache.hadoop.fs.s3a.performance;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collection;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
@ -37,6 +40,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.*;
import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Use metrics to assert about the cost of file API calls.
@ -145,7 +149,7 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
boolean rawAndDeleting = isRaw() && isDeleting();
verifyMetrics(() -> {
fs.delete(file1, false);
return "after fs.delete(file1simpleFile) " + getMetricSummary();
return "after fs.delete(file1) " + getMetricSummary();
},
// delete file. For keeping: that's it
probe(rawAndKeeping, OBJECT_METADATA_REQUESTS,
@ -173,7 +177,11 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
public void testDirMarkersSubdir() throws Throwable {
describe("verify cost of deep subdir creation");
Path subDir = new Path(methodPath(), "1/2/3/4/5/6");
Path parent = methodPath();
Path subDir = new Path(parent, "1/2/3/4/5/6");
S3AFileSystem fs = getFileSystem();
int dirsCreated = 2;
fs.mkdirs(parent);
// one dir created, possibly a parent removed
verifyMetrics(() -> {
mkdirs(subDir);
@ -187,6 +195,43 @@ public class ITestS3ADeleteCost extends AbstractS3ACostTest {
// delete all possible fake dirs above the subdirectory
withWhenDeleting(FAKE_DIRECTORIES_DELETED,
directoriesInPath(subDir) - 1));
int dirDeleteRequests = 1;
int fileDeleteRequests = 0;
int totalDeleteRequests = dirDeleteRequests + fileDeleteRequests;
// now delete the deep tree.
verifyMetrics(() ->
fs.delete(parent, true),
// keeping: the parent dir marker needs deletion alongside
// the subdir one.
with(OBJECT_DELETE_REQUESTS, totalDeleteRequests),
withWhenKeeping(OBJECT_DELETE_OBJECTS, dirsCreated),
// deleting: only the marker at the bottom needs deleting
withWhenDeleting(OBJECT_DELETE_OBJECTS, 1));
// followup with list calls to make sure all is clear.
verifyNoListing(parent);
verifyNoListing(subDir);
// now reinstate the directory, which in HADOOP-17244 hitting problems
fs.mkdirs(parent);
FileStatus[] children = fs.listStatus(parent);
Assertions.assertThat(children)
.describedAs("Children of %s", parent)
.isEmpty();
}
/**
* List a path, verify that there are no direct child entries.
* @param path path to scan
*/
protected void verifyNoListing(final Path path) throws Exception {
intercept(FileNotFoundException.class, () -> {
FileStatus[] statuses = getFileSystem().listStatus(path);
return Arrays.deepToString(statuses);
});
}
@Test