diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
index fc3f80b5b32..f98382e5526 100644
--- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
+++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java
@@ -23,6 +23,7 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -56,6 +57,12 @@ import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING
class S3BlobContainer extends AbstractBlobContainer {
+ /**
+ * Maximum number of deletes in a {@link DeleteObjectsRequest}.
+ * @see S3 Documentation.
+ */
+ private static final int MAX_BULK_DELETES = 1000;
+
private final S3BlobStore blobStore;
private final String keyPath;
@@ -118,6 +125,51 @@ class S3BlobContainer extends AbstractBlobContainer {
deleteBlobIgnoringIfNotExists(blobName);
}
+ @Override
+ public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException {
+ if (blobNames.isEmpty()) {
+ return;
+ }
+ try (AmazonS3Reference clientReference = blobStore.clientReference()) {
+ // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
+ final List deleteRequests = new ArrayList<>();
+ final List partition = new ArrayList<>();
+ for (String blob : blobNames) {
+ partition.add(buildKey(blob));
+ if (partition.size() == MAX_BULK_DELETES ) {
+ deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
+ partition.clear();
+ }
+ }
+ if (partition.isEmpty() == false) {
+ deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
+ }
+ SocketAccess.doPrivilegedVoid(() -> {
+ AmazonClientException aex = null;
+ for (DeleteObjectsRequest deleteRequest : deleteRequests) {
+ try {
+ clientReference.client().deleteObjects(deleteRequest);
+ } catch (AmazonClientException e) {
+ if (aex == null) {
+ aex = e;
+ } else {
+ aex.addSuppressed(e);
+ }
+ }
+ }
+ if (aex != null) {
+ throw aex;
+ }
+ });
+ } catch (final AmazonClientException e) {
+ throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
+ }
+ }
+
+ private static DeleteObjectsRequest bulkDelete(String bucket, List blobs) {
+ return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
+ }
+
@Override
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java
index a411a1c53cf..51b1d5159ed 100644
--- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java
+++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/AmazonS3Fixture.java
@@ -324,7 +324,7 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
// Delete Multiple Objects
//
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
- handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
+ final RequestHandler bulkDeleteHandler = request -> {
final List deletes = new ArrayList<>();
final List errors = new ArrayList<>();
@@ -344,7 +344,6 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
if (closingOffset != -1) {
offset = offset + startMarker.length();
final String objectName = requestBody.substring(offset, closingOffset);
-
boolean found = false;
for (Bucket bucket : buckets.values()) {
if (bucket.objects.containsKey(objectName)) {
@@ -369,7 +368,9 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
}
}
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
- });
+ };
+ handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
+ handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
// non-authorized requests
diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java
index 9e0a6009659..37f5d9b03db 100644
--- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java
+++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/MockAmazonS3.java
@@ -158,11 +158,7 @@ class MockAmazonS3 extends AbstractAmazonS3 {
final List deletions = new ArrayList<>();
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
- if (blobs.remove(key.getKey()) == null) {
- AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
- exception.setStatusCode(404);
- throw exception;
- } else {
+ if (blobs.remove(key.getKey()) != null) {
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
deletion.setKey(key.getKey());
deletions.add(deletion);
diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
index ab3971c3283..19d3a66a87d 100644
--- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
+++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
+import java.util.List;
import java.util.Map;
/**
@@ -96,8 +97,9 @@ public interface BlobContainer {
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
+
/**
- * Deletes a blob with giving name, if the blob exists. If the blob does not exist,
+ * Deletes the blob with the given name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
*
* @param blobName
@@ -107,6 +109,33 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;
+ /**
+ * Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
+ * when one or multiple of the given blobs don't exist and simply ignore this case.
+ *
+ * @param blobNames The names of the blob to delete.
+ * @throws IOException if a subset of blob exists but could not be deleted.
+ */
+ default void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException {
+ IOException ioe = null;
+ for (String blobName : blobNames) {
+ try {
+ deleteBlob(blobName);
+ } catch (NoSuchFileException e) {
+ // ignored
+ } catch (IOException e) {
+ if (ioe == null) {
+ ioe = e;
+ } else {
+ ioe.addSuppressed(e);
+ }
+ }
+ }
+ if (ioe != null) {
+ throw ioe;
+ }
+ }
+
/**
* Deletes a blob with giving name, ignoring if the blob does not exist.
*
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index c6eb7e5c6c2..04af29438d4 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -100,7 +100,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.nio.file.DirectoryNotEmptyException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
@@ -464,22 +463,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final Collection indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
- for (final IndexId indexId : indicesToCleanUp) {
try {
- indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
- } catch (DirectoryNotEmptyException dnee) {
- // if the directory isn't empty for some reason, it will fail to clean up;
- // we'll ignore that and accept that cleanup didn't fully succeed.
- // since we are using UUIDs for path names, this won't be an issue for
- // snapshotting indices of the same name
- logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
- "but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
+ indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
+ indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException ioe) {
// a different IOException occurred while trying to delete - will just log the issue for now
- logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
- "but failed to clean up its index folder.", metadata.name(), indexId), ioe);
+ logger.warn(() ->
+ new ParameterizedMessage(
+ "[{}] indices {} are no longer part of any snapshots in the repository, " +
+ "but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
}
- }
} catch (IOException | ResourceNotFoundException ex) {
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
}
@@ -1016,16 +1009,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
- for (final String blobName : blobs.keySet()) {
- if (FsBlobContainer.isTempBlobName(blobName)) {
- try {
- blobContainer.deleteBlobIgnoringIfNotExists(blobName);
- } catch (IOException e) {
- logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
- snapshotId, shardId, blobName), e);
- throw e;
- }
- }
+ final List blobNames =
+ blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
+ try {
+ blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
+ } catch (IOException e) {
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
+ snapshotId, shardId, blobNames), e);
+ throw e;
}
// If we deleted all snapshots, we don't need to create a new index file
@@ -1034,28 +1025,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
// Delete old index files
- for (final String blobName : blobs.keySet()) {
- if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
- try {
- blobContainer.deleteBlobIgnoringIfNotExists(blobName);
- } catch (IOException e) {
- logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
- snapshotId, shardId, blobName), e);
- throw e;
- }
- }
+ final List indexBlobs =
+ blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
+ try {
+ blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
+ } catch (IOException e) {
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
+ snapshotId, shardId, indexBlobs), e);
+ throw e;
}
// Delete all blobs that don't exist in a snapshot
- for (final String blobName : blobs.keySet()) {
- if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
- try {
- blobContainer.deleteBlobIgnoringIfNotExists(blobName);
- } catch (IOException e) {
- logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
- snapshotId, shardId, blobName), e);
- }
- }
+ final List orphanedBlobs = blobs.keySet().stream()
+ .filter(blobName ->
+ blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
+ .collect(Collectors.toList());
+ try {
+ blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
+ } catch (IOException e) {
+ logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
+ snapshotId, shardId, orphanedBlobs), e);
}
} catch (IOException e) {
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
index 3e4e639dd01..21071f7cb50 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
@@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
@@ -136,6 +137,23 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
}
}
+ public void testDeleteBlobs() throws IOException {
+ try (BlobStore store = newBlobStore()) {
+ final List blobNames = Arrays.asList("foobar", "barfoo");
+ final BlobContainer container = store.blobContainer(new BlobPath());
+ container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
+ byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
+ final BytesArray bytesArray = new BytesArray(data);
+ for (String blobName : blobNames) {
+ writeBlob(container, blobName, bytesArray, randomBoolean());
+ }
+ assertEquals(container.listBlobs().size(), 2);
+ container.deleteBlobsIgnoringIfNotExists(blobNames);
+ assertTrue(container.listBlobs().isEmpty());
+ container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
+ }
+ }
+
public void testDeleteBlobIgnoringIfNotExists() throws IOException {
try (BlobStore store = newBlobStore()) {
BlobPath blobPath = new BlobPath();