From e0619b702a9ebc85e3a02260172a7a1fb3062161 Mon Sep 17 00:00:00 2001 From: Mukund Thakur Date: Fri, 11 Mar 2022 13:05:45 +0530 Subject: [PATCH] HADOOP-18112: Implement paging during multi object delete. (#4045) Multi object delete of size more than 1000 is not supported by S3 and fails with MalformedXML error. So implementing paging of requests to reduce the number of keys in a single request. Page size can be configured using "fs.s3a.bulk.delete.page.size" Contributed By: Mukund Thakur --- .../java/org/apache/hadoop/util/Lists.java | 24 ++++ .../org/apache/hadoop/util/TestLists.java | 44 +++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 78 +++++-------- .../hadoop/fs/s3a/api/RequestFactory.java | 5 +- .../hadoop/fs/s3a/impl/DeleteOperation.java | 45 ++------ .../fs/s3a/impl/OperationCallbacks.java | 12 +- .../hadoop/fs/s3a/impl/RenameOperation.java | 8 +- .../fs/s3a/impl/RequestFactoryImpl.java | 5 +- .../hadoop/fs/s3a/tools/MarkerTool.java | 2 +- .../fs/s3a/tools/MarkerToolOperations.java | 9 +- .../s3a/tools/MarkerToolOperationsImpl.java | 10 +- .../fs/s3a/ITestS3AFailureHandling.java | 38 ++++++- .../apache/hadoop/fs/s3a/S3ATestUtils.java | 107 +++++++++++++++++- .../s3a/impl/ITestPartialRenamesDeletes.java | 105 ----------------- .../fs/s3a/impl/TestRequestFactory.java | 2 +- .../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 2 +- .../s3a/test/MinimalOperationCallbacks.java | 9 +- 17 files changed, 273 insertions(+), 232 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java index b6d74ee6792..5d9cc0502af 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/Lists.java @@ -232,4 +232,28 @@ public final class Lists { return addAll(addTo, elementsToAdd.iterator()); } + /** + * Returns consecutive sub-lists of a list, each of the same size + * (the final list may be smaller). + * @param originalList original big list. + * @param pageSize desired size of each sublist ( last one + * may be smaller) + * @return a list of sub lists. + */ + public static List> partition(List originalList, int pageSize) { + + Preconditions.checkArgument(originalList != null && originalList.size() > 0, + "Invalid original list"); + Preconditions.checkArgument(pageSize > 0, "Page size should " + + "be greater than 0 for performing partition"); + + List> result = new ArrayList<>(); + int i=0; + while (i < originalList.size()) { + result.add(originalList.subList(i, + Math.min(i + pageSize, originalList.size()))); + i = i + pageSize; + } + return result; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java index 537e3781edc..53241da695c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestLists.java @@ -18,9 +18,11 @@ package org.apache.hadoop.util; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -79,6 +81,48 @@ public class TestLists { Assert.assertEquals(4, list.size()); } + @Test + public void testListsPartition() { + List list = new ArrayList<>(); + list.add("a"); + list.add("b"); + list.add("c"); + list.add("d"); + list.add("e"); + List> res = Lists. + partition(list, 2); + Assertions.assertThat(res) + .describedAs("Number of partitions post partition") + .hasSize(3); + Assertions.assertThat(res.get(0)) + .describedAs("Number of elements in first partition") + .hasSize(2); + Assertions.assertThat(res.get(2)) + .describedAs("Number of elements in last partition") + .hasSize(1); + + List> res2 = Lists. + partition(list, 1); + Assertions.assertThat(res2) + .describedAs("Number of partitions post partition") + .hasSize(5); + Assertions.assertThat(res2.get(0)) + .describedAs("Number of elements in first partition") + .hasSize(1); + Assertions.assertThat(res2.get(4)) + .describedAs("Number of elements in last partition") + .hasSize(1); + + List> res3 = Lists. + partition(list, 6); + Assertions.assertThat(res3) + .describedAs("Number of partitions post partition") + .hasSize(1); + Assertions.assertThat(res3.get(0)) + .describedAs("Number of elements in first partition") + .hasSize(5); + } + @Test public void testArrayListWithSize() { List list = Lists.newArrayListWithCapacity(3); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index f5c7dd9d671..a977fb35ee0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -135,6 +135,7 @@ import org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.LambdaUtils; +import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; @@ -225,6 +226,7 @@ import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDura import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.util.Preconditions.checkArgument; import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator; /** @@ -550,6 +552,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, BULK_DELETE_PAGE_SIZE_DEFAULT, 0); + checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, + "page size out of range: %s", pageSize); listing = new Listing(listingOperationCallbacks, createStoreContext()); } catch (AmazonClientException e) { // amazon client exception: stop all services then throw the translation @@ -2021,14 +2025,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } @Override - public DeleteObjectsResult removeKeys( - final List keysToDelete, - final boolean deleteFakeDir, - final boolean quiet) + public void removeKeys( + final List keysToDelete, + final boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException { auditSpan.activate(); - return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir, - quiet); + S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir); } @Override @@ -2813,10 +2815,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * @param keysToDelete collection of keys to delete on the s3-backend. * if empty, no request is made of the object store. * @param deleteFakeDir indicates whether this is for deleting fake dirs - * @param quiet should a bulk query be quiet, or should its result list - * all deleted keys? - * @return the deletion result if a multi object delete was invoked - * and it returned without a failure. * @throws InvalidRequestException if the request was rejected due to * a mistaken attempt to delete the root directory. * @throws MultiObjectDeleteException one or more of the keys could not @@ -2826,10 +2824,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, * @throws AmazonClientException other amazon-layer failure. */ @Retries.RetryRaw - private DeleteObjectsResult removeKeysS3( - List keysToDelete, - boolean deleteFakeDir, - boolean quiet) + private void removeKeysS3( + List keysToDelete, + boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException { if (LOG.isDebugEnabled()) { @@ -2842,16 +2839,28 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } if (keysToDelete.isEmpty()) { // exit fast if there are no keys to delete - return null; + return; } for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { blockRootDelete(keyVersion.getKey()); } - DeleteObjectsResult result = null; try { if (enableMultiObjectsDelete) { - result = deleteObjects( - getRequestFactory().newBulkDeleteRequest(keysToDelete, quiet)); + if (keysToDelete.size() <= pageSize) { + deleteObjects(getRequestFactory() + .newBulkDeleteRequest(keysToDelete)); + } else { + // Multi object deletion of more than 1000 keys is not supported + // by s3. So we are paging the keys by page size. + LOG.debug("Partitioning the keys to delete as it is more than " + + "page size. Number of keys: {}, Page size: {}", + keysToDelete.size(), pageSize); + for (List batchOfKeysToDelete : + Lists.partition(keysToDelete, pageSize)) { + deleteObjects(getRequestFactory() + .newBulkDeleteRequest(batchOfKeysToDelete)); + } + } } else { for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { deleteObject(keyVersion.getKey()); @@ -2867,7 +2876,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, throw ex; } noteDeleted(keysToDelete.size(), deleteFakeDir); - return result; } /** @@ -2884,7 +2892,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, } /** - * Invoke {@link #removeKeysS3(List, boolean, boolean)}. + * Invoke {@link #removeKeysS3(List, boolean)}. * If a {@code MultiObjectDeleteException} is raised, the * relevant statistics are updated. * @@ -2905,35 +2913,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, final boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException { - removeKeys(keysToDelete, deleteFakeDir, - true); - } - - /** - * Invoke {@link #removeKeysS3(List, boolean, boolean)}. - * @param keysToDelete collection of keys to delete on the s3-backend. - * if empty, no request is made of the object store. - * @param deleteFakeDir indicates whether this is for deleting fake dirs. - * @param quiet should a bulk query be quiet, or should its result list - * all deleted keys - * @return the deletion result if a multi object delete was invoked - * and it returned without a failure, else null. - * @throws InvalidRequestException if the request was rejected due to - * a mistaken attempt to delete the root directory. - * @throws MultiObjectDeleteException one or more of the keys could not - * be deleted in a multiple object delete operation. - * @throws AmazonClientException amazon-layer failure. - * @throws IOException other IO Exception. - */ - @Retries.RetryRaw - private DeleteObjectsResult removeKeys( - final List keysToDelete, - final boolean deleteFakeDir, - final boolean quiet) - throws MultiObjectDeleteException, AmazonClientException, IOException { try (DurationInfo ignored = new DurationInfo(LOG, false, - "Deleting %d keys", keysToDelete.size())) { - return removeKeysS3(keysToDelete, deleteFakeDir, quiet); + "Deleting %d keys", keysToDelete.size())) { + removeKeysS3(keysToDelete, deleteFakeDir); } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java index 9bffcc90d0b..f3e3e563c78 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java @@ -285,12 +285,9 @@ public interface RequestFactory { /** * Bulk delete request. * @param keysToDelete list of keys to delete. - * @param quiet should a bulk query be quiet, or should its result list - * all deleted keys? * @return the request */ DeleteObjectsRequest newBulkDeleteRequest( - List keysToDelete, - boolean quiet); + List keysToDelete); } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java index 877616c0990..0797c36c529 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; @@ -365,8 +364,7 @@ public class DeleteOperation extends ExecutingStoreOperation { callableWithinAuditSpan( getAuditSpan(), () -> { asyncDeleteAction( - keyList, - LOG.isDebugEnabled()); + keyList); return null; })); } @@ -376,20 +374,16 @@ public class DeleteOperation extends ExecutingStoreOperation { * the keys from S3 and paths from S3Guard. * * @param keyList keys to delete. - * @param auditDeletedKeys should the results be audited and undeleted * entries logged? * @throws IOException failure */ @Retries.RetryTranslated private void asyncDeleteAction( - final List keyList, - final boolean auditDeletedKeys) + final List keyList) throws IOException { - List deletedObjects = new ArrayList<>(); try (DurationInfo ignored = new DurationInfo(LOG, false, "Delete page of %d keys", keyList.size())) { - DeleteObjectsResult result; if (!keyList.isEmpty()) { // first delete the files. List files = keyList.stream() @@ -397,15 +391,12 @@ public class DeleteOperation extends ExecutingStoreOperation { .map(e -> e.keyVersion) .collect(Collectors.toList()); LOG.debug("Deleting of {} file objects", files.size()); - result = Invoker.once("Remove S3 Files", + Invoker.once("Remove S3 Files", status.getPath().toString(), () -> callbacks.removeKeys( files, - false, - !auditDeletedKeys)); - if (result != null) { - deletedObjects.addAll(result.getDeletedObjects()); - } + false + )); // now the dirs List dirs = keyList.stream() .filter(e -> e.isDirMarker) @@ -413,32 +404,12 @@ public class DeleteOperation extends ExecutingStoreOperation { .collect(Collectors.toList()); LOG.debug("Deleting of {} directory markers", dirs.size()); // This is invoked with deleteFakeDir. - result = Invoker.once("Remove S3 Dir Markers", + Invoker.once("Remove S3 Dir Markers", status.getPath().toString(), () -> callbacks.removeKeys( dirs, - true, - !auditDeletedKeys)); - if (result != null) { - deletedObjects.addAll(result.getDeletedObjects()); - } - } - if (auditDeletedKeys) { - // audit the deleted keys - if (deletedObjects.size() != keyList.size()) { - // size mismatch - LOG.warn("Size mismatch in deletion operation. " - + "Expected count of deleted files: {}; " - + "actual: {}", - keyList.size(), deletedObjects.size()); - // strip out the deleted keys - for (DeleteObjectsResult.DeletedObject del : deletedObjects) { - keyList.removeIf(kv -> kv.getKey().equals(del.getKey())); - } - for (DeleteEntry kv : keyList) { - LOG.debug("{}", kv.getKey()); - } - } + true + )); } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java index a72dc7e10b3..ecfe2c0ba0a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/OperationCallbacks.java @@ -24,7 +24,6 @@ import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.transfer.model.CopyResult; @@ -138,10 +137,6 @@ public interface OperationCallbacks { * @param keysToDelete collection of keys to delete on the s3-backend. * if empty, no request is made of the object store. * @param deleteFakeDir indicates whether this is for deleting fake dirs. - * @param quiet should a bulk query be quiet, or should its result list - * all deleted keys - * @return the deletion result if a multi object delete was invoked - * and it returned without a failure, else null. * @throws InvalidRequestException if the request was rejected due to * a mistaken attempt to delete the root directory. * @throws MultiObjectDeleteException one or more of the keys could not @@ -150,10 +145,9 @@ public interface OperationCallbacks { * @throws IOException other IO Exception. */ @Retries.RetryRaw - DeleteObjectsResult removeKeys( - List keysToDelete, - boolean deleteFakeDir, - boolean quiet) + void removeKeys( + List keysToDelete, + boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java index c1700ef389c..bc9ad669b56 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RenameOperation.java @@ -49,6 +49,7 @@ import static org.apache.hadoop.fs.store.audit.AuditingFunctions.callableWithinA import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; import static org.apache.hadoop.fs.s3a.impl.InternalConstants.RENAME_PARALLEL_LIMIT; +import static org.apache.hadoop.util.Preconditions.checkArgument; /** * A parallelized rename operation. @@ -155,6 +156,9 @@ public class RenameOperation extends ExecutingStoreOperation { this.destKey = destKey; this.destStatus = destStatus; this.callbacks = callbacks; + checkArgument(pageSize > 0 + && pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE, + "page size out of range: %s", pageSize); this.pageSize = pageSize; } @@ -586,8 +590,8 @@ public class RenameOperation extends ExecutingStoreOperation { sourcePath.toString(), () -> callbacks.removeKeys( keys, - false, - true)); + false + )); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java index f9ff08a5f65..daa9076cbc3 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java @@ -542,12 +542,11 @@ public class RequestFactoryImpl implements RequestFactory { @Override public DeleteObjectsRequest newBulkDeleteRequest( - List keysToDelete, - boolean quiet) { + List keysToDelete) { return prepareRequest( new DeleteObjectsRequest(bucket) .withKeys(keysToDelete) - .withQuiet(quiet)); + .withQuiet(true)); } @Override diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java index 7179c6b2f01..bbcbd130393 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerTool.java @@ -817,7 +817,7 @@ public final class MarkerTool extends S3GuardTool { end); once("Remove S3 Keys", tracker.getBasePath().toString(), () -> - operations.removeKeys(page, true, false)); + operations.removeKeys(page, true)); summary.deleteRequests++; // and move to the start of the next page start = end; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java index 7d7627dfc03..a701f86f7b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperations.java @@ -23,7 +23,6 @@ import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import org.apache.hadoop.fs.InvalidRequestException; @@ -58,10 +57,7 @@ public interface MarkerToolOperations { * @param keysToDelete collection of keys to delete on the s3-backend. * if empty, no request is made of the object store. * @param deleteFakeDir indicates whether this is for deleting fake dirs. - * @param quiet should a bulk query be quiet, or should its result list * all deleted keys - * @return the deletion result if a multi object delete was invoked - * and it returned without a failure, else null. * @throws InvalidRequestException if the request was rejected due to * a mistaken attempt to delete the root directory. * @throws MultiObjectDeleteException one or more of the keys could not @@ -70,10 +66,9 @@ public interface MarkerToolOperations { * @throws IOException other IO Exception. */ @Retries.RetryMixed - DeleteObjectsResult removeKeys( + void removeKeys( List keysToDelete, - boolean deleteFakeDir, - boolean quiet) + boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java index 7ccbc41bbea..ccf80e1dde0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/tools/MarkerToolOperationsImpl.java @@ -23,7 +23,6 @@ import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import org.apache.hadoop.fs.Path; @@ -55,13 +54,12 @@ public class MarkerToolOperationsImpl implements MarkerToolOperations { } @Override - public DeleteObjectsResult removeKeys( + public void removeKeys( final List keysToDelete, - final boolean deleteFakeDir, - final boolean quiet) + final boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException { - return operationCallbacks.removeKeys(keysToDelete, deleteFakeDir, - quiet); + operationCallbacks.removeKeys(keysToDelete, deleteFakeDir + ); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java index c34ba22b105..c0f6a4b2322 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java @@ -20,10 +20,13 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import org.assertj.core.api.Assertions; import org.junit.Assume; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.fs.store.audit.AuditSpan; @@ -37,9 +40,11 @@ import java.util.List; import java.nio.file.AccessDeniedException; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.failIf; -import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.*; import static org.apache.hadoop.test.LambdaTestUtils.*; +import static org.apache.hadoop.util.functional.RemoteIterators.mappingRemoteIterator; +import static org.apache.hadoop.util.functional.RemoteIterators.toList; /** * ITest for failure handling, primarily multipart deletion. @@ -72,6 +77,37 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase { removeKeys(getFileSystem(), "ITestS3AFailureHandling/missingFile"); } + /** + * See HADOOP-18112. + */ + @Test + public void testMultiObjectDeleteLargeNumKeys() throws Exception { + S3AFileSystem fs = getFileSystem(); + Path path = path("largeDir"); + mkdirs(path); + createFiles(fs, path, 1, 1005, 0); + RemoteIterator locatedFileStatusRemoteIterator = + fs.listFiles(path, false); + List keys = toList(mappingRemoteIterator(locatedFileStatusRemoteIterator, + locatedFileStatus -> fs.pathToKey(locatedFileStatus.getPath()))); + // After implementation of paging during multi object deletion, + // no exception is encountered. + Long bulkDeleteReqBefore = getNumberOfBulkDeleteRequestsMadeTillNow(fs); + try (AuditSpan span = span()) { + fs.removeKeys(buildDeleteRequest(keys.toArray(new String[0])), false); + } + Long bulkDeleteReqAfter = getNumberOfBulkDeleteRequestsMadeTillNow(fs); + // number of delete requests is 5 as we have default page size of 250. + Assertions.assertThat(bulkDeleteReqAfter - bulkDeleteReqBefore) + .describedAs("Number of batched bulk delete requests") + .isEqualTo(5); + } + + private Long getNumberOfBulkDeleteRequestsMadeTillNow(S3AFileSystem fs) { + return fs.getIOStatistics().counters() + .get(StoreStatisticNames.OBJECT_BULK_DELETE_REQUEST); + } + private void removeKeys(S3AFileSystem fileSystem, String... keys) throws IOException { try (AuditSpan span = span()) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 55ddba9bbd1..d965e6e57a2 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -52,7 +52,11 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.service.ServiceOperations; +import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.functional.CallableRaisingIOE; @@ -70,6 +74,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -78,6 +83,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; +import static org.apache.hadoop.test.GenericTestUtils.buildPaths; import static org.apache.hadoop.util.Preconditions.checkNotNull; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_CREDENTIAL_PROVIDER_PATH; import static org.apache.commons.lang3.StringUtils.isNotEmpty; @@ -95,8 +104,23 @@ import static org.junit.Assert.*; @InterfaceAudience.Private @InterfaceStability.Unstable public final class S3ATestUtils { + private static final Logger LOG = LoggerFactory.getLogger( - S3ATestUtils.class); + S3ATestUtils.class); + + /** Many threads for scale performance: {@value}. */ + public static final int EXECUTOR_THREAD_COUNT = 64; + /** + * For submitting work. + */ + private static final ListeningExecutorService EXECUTOR = + MoreExecutors.listeningDecorator( + BlockingThreadPoolExecutorService.newInstance( + EXECUTOR_THREAD_COUNT, + EXECUTOR_THREAD_COUNT * 2, + 30, TimeUnit.SECONDS, + "test-operations")); + /** * Value to set a system property to (in maven) to declare that @@ -821,6 +845,87 @@ public final class S3ATestUtils { .build(); } + /** + * Write the text to a file asynchronously. Logs the operation duration. + * @param fs filesystem + * @param path path + * @return future to the patch created. + */ + private static CompletableFuture put(FileSystem fs, + Path path, String text) { + return submit(EXECUTOR, () -> { + try (DurationInfo ignore = + new DurationInfo(LOG, false, "Creating %s", path)) { + createFile(fs, path, true, text.getBytes(Charsets.UTF_8)); + return path; + } + }); + } + + /** + * Build a set of files in a directory tree. + * @param fs filesystem + * @param destDir destination + * @param depth file depth + * @param fileCount number of files to create. + * @param dirCount number of dirs to create at each level + * @return the list of files created. + */ + public static List createFiles(final FileSystem fs, + final Path destDir, + final int depth, + final int fileCount, + final int dirCount) throws IOException { + return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount, + new ArrayList<>(fileCount), + new ArrayList<>(dirCount)); + } + + /** + * Build a set of files in a directory tree. + * @param fs filesystem + * @param destDir destination + * @param depth file depth + * @param fileCount number of files to create. + * @param dirCount number of dirs to create at each level + * @param paths [out] list of file paths created + * @param dirs [out] list of directory paths created. + * @return the list of files created. + */ + public static List createDirsAndFiles(final FileSystem fs, + final Path destDir, + final int depth, + final int fileCount, + final int dirCount, + final List paths, + final List dirs) throws IOException { + buildPaths(paths, dirs, destDir, depth, fileCount, dirCount); + List> futures = new ArrayList<>(paths.size() + + dirs.size()); + + // create directories. With dir marker retention, that adds more entries + // to cause deletion issues + try (DurationInfo ignore = + new DurationInfo(LOG, "Creating %d directories", dirs.size())) { + for (Path path : dirs) { + futures.add(submit(EXECUTOR, () ->{ + fs.mkdirs(path); + return path; + })); + } + waitForCompletion(futures); + } + + try (DurationInfo ignore = + new DurationInfo(LOG, "Creating %d files", paths.size())) { + for (Path path : paths) { + futures.add(put(fs, path, path.getName())); + } + waitForCompletion(futures); + return paths; + } + } + /** * Helper class to do diffs of metrics. */ diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index 068b7b2dda5..378f4a70433 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -26,14 +26,9 @@ import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import com.amazonaws.services.s3.model.MultiObjectDeleteException; -import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; -import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; import org.assertj.core.api.Assertions; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,13 +37,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.util.BlockingThreadPoolExecutorService; import org.apache.hadoop.util.DurationInfo; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; @@ -69,13 +62,10 @@ import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatemen import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.forbidden; import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig; import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_BINDING; -import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; -import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.assertFileCount; import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; -import static org.apache.hadoop.test.GenericTestUtils.buildPaths; import static org.apache.hadoop.test.LambdaTestUtils.eval; /** @@ -112,20 +102,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase { private static final Statement STATEMENT_ALL_BUCKET_READ_ACCESS = statement(true, S3_ALL_BUCKETS, S3_BUCKET_READ_OPERATIONS); - /** Many threads for scale performance: {@value}. */ - public static final int EXECUTOR_THREAD_COUNT = 64; - - /** - * For submitting work. - */ - private static final ListeningExecutorService EXECUTOR = - MoreExecutors.listeningDecorator( - BlockingThreadPoolExecutorService.newInstance( - EXECUTOR_THREAD_COUNT, - EXECUTOR_THREAD_COUNT * 2, - 30, TimeUnit.SECONDS, - "test-operations")); - /** * The number of files in a non-scaled test. @@ -742,87 +718,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase { return files; } - /** - * Write the text to a file asynchronously. Logs the operation duration. - * @param fs filesystem - * @param path path - * @return future to the patch created. - */ - private static CompletableFuture put(FileSystem fs, - Path path, String text) { - return submit(EXECUTOR, () -> { - try (DurationInfo ignore = - new DurationInfo(LOG, false, "Creating %s", path)) { - createFile(fs, path, true, text.getBytes(Charsets.UTF_8)); - return path; - } - }); - } - - /** - * Build a set of files in a directory tree. - * @param fs filesystem - * @param destDir destination - * @param depth file depth - * @param fileCount number of files to create. - * @param dirCount number of dirs to create at each level - * @return the list of files created. - */ - public static List createFiles(final FileSystem fs, - final Path destDir, - final int depth, - final int fileCount, - final int dirCount) throws IOException { - return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount, - new ArrayList(fileCount), - new ArrayList(dirCount)); - } - - /** - * Build a set of files in a directory tree. - * @param fs filesystem - * @param destDir destination - * @param depth file depth - * @param fileCount number of files to create. - * @param dirCount number of dirs to create at each level - * @param paths [out] list of file paths created - * @param dirs [out] list of directory paths created. - * @return the list of files created. - */ - public static List createDirsAndFiles(final FileSystem fs, - final Path destDir, - final int depth, - final int fileCount, - final int dirCount, - final List paths, - final List dirs) throws IOException { - buildPaths(paths, dirs, destDir, depth, fileCount, dirCount); - List> futures = new ArrayList<>(paths.size() - + dirs.size()); - - // create directories. With dir marker retention, that adds more entries - // to cause deletion issues - try (DurationInfo ignore = - new DurationInfo(LOG, "Creating %d directories", dirs.size())) { - for (Path path : dirs) { - futures.add(submit(EXECUTOR, () ->{ - fs.mkdirs(path); - return path; - })); - } - waitForCompletion(futures); - } - - try (DurationInfo ignore = - new DurationInfo(LOG, "Creating %d files", paths.size())) { - for (Path path : paths) { - futures.add(put(fs, path, path.getName())); - } - waitForCompletion(futures); - return paths; - } - } - /** * Verifies that s3:DeleteObjectVersion is not required for rename. *

diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java index dbd89b960f9..9bc3aef83aa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java @@ -164,7 +164,7 @@ public class TestRequestFactory extends AbstractHadoopTestBase { new ArrayList<>())); a(factory.newCopyObjectRequest(path, path2, md)); a(factory.newDeleteObjectRequest(path)); - a(factory.newBulkDeleteRequest(new ArrayList<>(), true)); + a(factory.newBulkDeleteRequest(new ArrayList<>())); a(factory.newDirectoryMarkerRequest(path)); a(factory.newGetObjectRequest(path)); a(factory.newGetObjectMetadataRequest(path)); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java index d5862bcb335..dbdd8b5da6a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java @@ -38,7 +38,7 @@ import static org.apache.hadoop.fs.s3a.Constants.BULK_DELETE_PAGE_SIZE; import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING; import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; -import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createFiles; import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java index a2aebc82720..fa1ad2db62a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalOperationCallbacks.java @@ -23,7 +23,6 @@ import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.DeleteObjectsRequest; -import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.transfer.model.CopyResult; @@ -99,13 +98,11 @@ public class MinimalOperationCallbacks } @Override - public DeleteObjectsResult removeKeys( - List keysToDelete, - boolean deleteFakeDir, - boolean quiet) + public void removeKeys( + List keysToDelete, + boolean deleteFakeDir) throws MultiObjectDeleteException, AmazonClientException, IOException { - return null; } @Override