From c4e84e2b34b924fcda972615ec707fdd9393f2d3 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 16 Apr 2019 17:19:05 +0200 Subject: [PATCH] Add Bulk Delete Api to BlobStore (#40322) (#41253) * Adds Bulk delete API to blob container * Implement bulk delete API for S3 * Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs * Closes #40250 --- .../repositories/s3/S3BlobContainer.java | 52 +++++++++++++ .../repositories/s3/AmazonS3Fixture.java | 7 +- .../repositories/s3/MockAmazonS3.java | 6 +- .../common/blobstore/BlobContainer.java | 31 +++++++- .../blobstore/BlobStoreRepository.java | 73 ++++++++----------- .../ESBlobStoreContainerTestCase.java | 18 +++++ 6 files changed, 136 insertions(+), 51 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index fc3f80b5b32..f98382e5526 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -23,6 +23,7 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -56,6 +57,12 @@ import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING class S3BlobContainer extends AbstractBlobContainer { + /** + * Maximum number of deletes in a {@link DeleteObjectsRequest}. + * @see S3 Documentation. + */ + private static final int MAX_BULK_DELETES = 1000; + private final S3BlobStore blobStore; private final String keyPath; @@ -118,6 +125,51 @@ class S3BlobContainer extends AbstractBlobContainer { deleteBlobIgnoringIfNotExists(blobName); } + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + if (blobNames.isEmpty()) { + return; + } + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes + final List deleteRequests = new ArrayList<>(); + final List partition = new ArrayList<>(); + for (String blob : blobNames) { + partition.add(buildKey(blob)); + if (partition.size() == MAX_BULK_DELETES ) { + deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); + partition.clear(); + } + } + if (partition.isEmpty() == false) { + deleteRequests.add(bulkDelete(blobStore.bucket(), partition)); + } + SocketAccess.doPrivilegedVoid(() -> { + AmazonClientException aex = null; + for (DeleteObjectsRequest deleteRequest : deleteRequests) { + try { + clientReference.client().deleteObjects(deleteRequest); + } catch (AmazonClientException e) { + if (aex == null) { + aex = e; + } else { + aex.addSuppressed(e); + } + } + } + if (aex != null) { + throw aex; + } + }); + } catch (final AmazonClientException e) { + throw new IOException("Exception when deleting blobs [" + blobNames + "]", e); + } + } + + private static DeleteObjectsRequest bulkDelete(String bucket, List blobs) { + return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true); + } + @Override public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException { try (AmazonS3Reference clientReference = blobStore.clientReference()) { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java index a411a1c53cf..51b1d5159ed 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java @@ -324,7 +324,7 @@ public class AmazonS3Fixture extends AbstractHttpFixture { // Delete Multiple Objects // // https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html - handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> { + final RequestHandler bulkDeleteHandler = request -> { final List deletes = new ArrayList<>(); final List errors = new ArrayList<>(); @@ -344,7 +344,6 @@ public class AmazonS3Fixture extends AbstractHttpFixture { if (closingOffset != -1) { offset = offset + startMarker.length(); final String objectName = requestBody.substring(offset, closingOffset); - boolean found = false; for (Bucket bucket : buckets.values()) { if (bucket.objects.containsKey(objectName)) { @@ -369,7 +368,9 @@ public class AmazonS3Fixture extends AbstractHttpFixture { } } return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request"); - }); + }; + handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler); + handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler); // non-authorized requests diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java index 9e0a6009659..37f5d9b03db 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java @@ -158,11 +158,7 @@ class MockAmazonS3 extends AbstractAmazonS3 { final List deletions = new ArrayList<>(); for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) { - if (blobs.remove(key.getKey()) == null) { - AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist."); - exception.setStatusCode(404); - throw exception; - } else { + if (blobs.remove(key.getKey()) != null) { DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject(); deletion.setKey(key.getKey()); deletions.add(deletion); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index ab3971c3283..19d3a66a87d 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; +import java.util.List; import java.util.Map; /** @@ -96,8 +97,9 @@ public interface BlobContainer { * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException; + /** - * Deletes a blob with giving name, if the blob exists. If the blob does not exist, + * Deletes the blob with the given name, if the blob exists. If the blob does not exist, * this method throws a NoSuchFileException. * * @param blobName @@ -107,6 +109,33 @@ public interface BlobContainer { */ void deleteBlob(String blobName) throws IOException; + /** + * Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception + * when one or multiple of the given blobs don't exist and simply ignore this case. + * + * @param blobNames The names of the blob to delete. + * @throws IOException if a subset of blob exists but could not be deleted. + */ + default void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + IOException ioe = null; + for (String blobName : blobNames) { + try { + deleteBlob(blobName); + } catch (NoSuchFileException e) { + // ignored + } catch (IOException e) { + if (ioe == null) { + ioe = e; + } else { + ioe.addSuppressed(e); + } + } + } + if (ioe != null) { + throw ioe; + } + } + /** * Deletes a blob with giving name, ignoring if the blob does not exist. * diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c6eb7e5c6c2..04af29438d4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -100,7 +100,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.DirectoryNotEmptyException; import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.ArrayList; @@ -464,22 +463,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values()); indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values()); final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices")); - for (final IndexId indexId : indicesToCleanUp) { try { - indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId()); - } catch (DirectoryNotEmptyException dnee) { - // if the directory isn't empty for some reason, it will fail to clean up; - // we'll ignore that and accept that cleanup didn't fully succeed. - // since we are using UUIDs for path names, this won't be an issue for - // snapshotting indices of the same name - logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + - "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee); + indicesBlobContainer.deleteBlobsIgnoringIfNotExists( + indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList())); } catch (IOException ioe) { // a different IOException occurred while trying to delete - will just log the issue for now - logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " + - "but failed to clean up its index folder.", metadata.name(), indexId), ioe); + logger.warn(() -> + new ParameterizedMessage( + "[{}] indices {} are no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe); } - } } catch (IOException | ResourceNotFoundException ex) { throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex); } @@ -1016,16 +1009,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp try { // Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier // attempt to write an index file with this generation failed mid-way after creating the temporary file. - for (final String blobName : blobs.keySet()) { - if (FsBlobContainer.isTempBlobName(blobName)) { - try { - blobContainer.deleteBlobIgnoringIfNotExists(blobName); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization", - snapshotId, shardId, blobName), e); - throw e; - } - } + final List blobNames = + blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList()); + try { + blobContainer.deleteBlobsIgnoringIfNotExists(blobNames); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization", + snapshotId, shardId, blobNames), e); + throw e; } // If we deleted all snapshots, we don't need to create a new index file @@ -1034,28 +1025,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } // Delete old index files - for (final String blobName : blobs.keySet()) { - if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) { - try { - blobContainer.deleteBlobIgnoringIfNotExists(blobName); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization", - snapshotId, shardId, blobName), e); - throw e; - } - } + final List indexBlobs = + blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList()); + try { + blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization", + snapshotId, shardId, indexBlobs), e); + throw e; } // Delete all blobs that don't exist in a snapshot - for (final String blobName : blobs.keySet()) { - if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) { - try { - blobContainer.deleteBlobIgnoringIfNotExists(blobName); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization", - snapshotId, shardId, blobName), e); - } - } + final List orphanedBlobs = blobs.keySet().stream() + .filter(blobName -> + blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null) + .collect(Collectors.toList()); + try { + blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization", + snapshotId, shardId, orphanedBlobs), e); } } catch (IOException e) { String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]"; diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java index 3e4e639dd01..21071f7cb50 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java @@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes; @@ -136,6 +137,23 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase { } } + public void testDeleteBlobs() throws IOException { + try (BlobStore store = newBlobStore()) { + final List blobNames = Arrays.asList("foobar", "barfoo"); + final BlobContainer container = store.blobContainer(new BlobPath()); + container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist + byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16))); + final BytesArray bytesArray = new BytesArray(data); + for (String blobName : blobNames) { + writeBlob(container, blobName, bytesArray, randomBoolean()); + } + assertEquals(container.listBlobs().size(), 2); + container.deleteBlobsIgnoringIfNotExists(blobNames); + assertTrue(container.listBlobs().isEmpty()); + container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist + } + } + public void testDeleteBlobIgnoringIfNotExists() throws IOException { try (BlobStore store = newBlobStore()) { BlobPath blobPath = new BlobPath();