* Adds Bulk delete API to blob container * Implement bulk delete API for S3 * Adjust S3Fixture to accept both path styles for bulk deletes since the S3 SDK uses both during our ITs * Closes #40250
This commit is contained in:
parent
c22a2cea12
commit
c4e84e2b34
|
@ -23,6 +23,7 @@ import com.amazonaws.AmazonClientException;
|
|||
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -56,6 +57,12 @@ import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING
|
|||
|
||||
class S3BlobContainer extends AbstractBlobContainer {
|
||||
|
||||
/**
|
||||
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
|
||||
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
|
||||
*/
|
||||
private static final int MAX_BULK_DELETES = 1000;
|
||||
|
||||
private final S3BlobStore blobStore;
|
||||
private final String keyPath;
|
||||
|
||||
|
@ -118,6 +125,51 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
deleteBlobIgnoringIfNotExists(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
||||
if (blobNames.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
||||
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
|
||||
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
|
||||
final List<String> partition = new ArrayList<>();
|
||||
for (String blob : blobNames) {
|
||||
partition.add(buildKey(blob));
|
||||
if (partition.size() == MAX_BULK_DELETES ) {
|
||||
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
|
||||
partition.clear();
|
||||
}
|
||||
}
|
||||
if (partition.isEmpty() == false) {
|
||||
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
|
||||
}
|
||||
SocketAccess.doPrivilegedVoid(() -> {
|
||||
AmazonClientException aex = null;
|
||||
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
|
||||
try {
|
||||
clientReference.client().deleteObjects(deleteRequest);
|
||||
} catch (AmazonClientException e) {
|
||||
if (aex == null) {
|
||||
aex = e;
|
||||
} else {
|
||||
aex.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (aex != null) {
|
||||
throw aex;
|
||||
}
|
||||
});
|
||||
} catch (final AmazonClientException e) {
|
||||
throw new IOException("Exception when deleting blobs [" + blobNames + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
|
||||
return new DeleteObjectsRequest(bucket).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)).withQuiet(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
|
||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
||||
|
|
|
@ -324,7 +324,7 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
|
|||
// Delete Multiple Objects
|
||||
//
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
|
||||
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), (request) -> {
|
||||
final RequestHandler bulkDeleteHandler = request -> {
|
||||
final List<String> deletes = new ArrayList<>();
|
||||
final List<String> errors = new ArrayList<>();
|
||||
|
||||
|
@ -344,7 +344,6 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
|
|||
if (closingOffset != -1) {
|
||||
offset = offset + startMarker.length();
|
||||
final String objectName = requestBody.substring(offset, closingOffset);
|
||||
|
||||
boolean found = false;
|
||||
for (Bucket bucket : buckets.values()) {
|
||||
if (bucket.objects.containsKey(objectName)) {
|
||||
|
@ -369,7 +368,9 @@ public class AmazonS3Fixture extends AbstractHttpFixture {
|
|||
}
|
||||
}
|
||||
return newInternalError(request.getId(), "Something is wrong with this POST multiple deletes request");
|
||||
});
|
||||
};
|
||||
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/"), bulkDeleteHandler);
|
||||
handlers.insert(nonAuthPath(HttpPost.METHOD_NAME, "/{bucket}"), bulkDeleteHandler);
|
||||
|
||||
// non-authorized requests
|
||||
|
||||
|
|
|
@ -158,11 +158,7 @@ class MockAmazonS3 extends AbstractAmazonS3 {
|
|||
|
||||
final List<DeleteObjectsResult.DeletedObject> deletions = new ArrayList<>();
|
||||
for (DeleteObjectsRequest.KeyVersion key : request.getKeys()) {
|
||||
if (blobs.remove(key.getKey()) == null) {
|
||||
AmazonS3Exception exception = new AmazonS3Exception("[" + key + "] does not exist.");
|
||||
exception.setStatusCode(404);
|
||||
throw exception;
|
||||
} else {
|
||||
if (blobs.remove(key.getKey()) != null) {
|
||||
DeleteObjectsResult.DeletedObject deletion = new DeleteObjectsResult.DeletedObject();
|
||||
deletion.setKey(key.getKey());
|
||||
deletions.add(deletion);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -96,8 +97,9 @@ public interface BlobContainer {
|
|||
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
|
||||
*/
|
||||
void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
|
||||
* Deletes the blob with the given name, if the blob exists. If the blob does not exist,
|
||||
* this method throws a NoSuchFileException.
|
||||
*
|
||||
* @param blobName
|
||||
|
@ -107,6 +109,33 @@ public interface BlobContainer {
|
|||
*/
|
||||
void deleteBlob(String blobName) throws IOException;
|
||||
|
||||
/**
|
||||
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
|
||||
* when one or multiple of the given blobs don't exist and simply ignore this case.
|
||||
*
|
||||
* @param blobNames The names of the blob to delete.
|
||||
* @throws IOException if a subset of blob exists but could not be deleted.
|
||||
*/
|
||||
default void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
||||
IOException ioe = null;
|
||||
for (String blobName : blobNames) {
|
||||
try {
|
||||
deleteBlob(blobName);
|
||||
} catch (NoSuchFileException e) {
|
||||
// ignored
|
||||
} catch (IOException e) {
|
||||
if (ioe == null) {
|
||||
ioe = e;
|
||||
} else {
|
||||
ioe.addSuppressed(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ioe != null) {
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes a blob with giving name, ignoring if the blob does not exist.
|
||||
*
|
||||
|
|
|
@ -100,7 +100,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.DirectoryNotEmptyException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -464,22 +463,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final Collection<IndexId> indicesToCleanUp = Sets.newHashSet(repositoryData.getIndices().values());
|
||||
indicesToCleanUp.removeAll(updatedRepositoryData.getIndices().values());
|
||||
final BlobContainer indicesBlobContainer = blobStore().blobContainer(basePath().add("indices"));
|
||||
for (final IndexId indexId : indicesToCleanUp) {
|
||||
try {
|
||||
indicesBlobContainer.deleteBlobIgnoringIfNotExists(indexId.getId());
|
||||
} catch (DirectoryNotEmptyException dnee) {
|
||||
// if the directory isn't empty for some reason, it will fail to clean up;
|
||||
// we'll ignore that and accept that cleanup didn't fully succeed.
|
||||
// since we are using UUIDs for path names, this won't be an issue for
|
||||
// snapshotting indices of the same name
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up its index folder due to the directory not being empty.", metadata.name(), indexId), dnee);
|
||||
indicesBlobContainer.deleteBlobsIgnoringIfNotExists(
|
||||
indicesToCleanUp.stream().map(IndexId::getId).collect(Collectors.toList()));
|
||||
} catch (IOException ioe) {
|
||||
// a different IOException occurred while trying to delete - will just log the issue for now
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] index [{}] no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up its index folder.", metadata.name(), indexId), ioe);
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage(
|
||||
"[{}] indices {} are no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up their index folders.", metadata.name(), indicesToCleanUp), ioe);
|
||||
}
|
||||
}
|
||||
} catch (IOException | ResourceNotFoundException ex) {
|
||||
throw new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex);
|
||||
}
|
||||
|
@ -1016,16 +1009,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
try {
|
||||
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
|
||||
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
|
||||
for (final String blobName : blobs.keySet()) {
|
||||
if (FsBlobContainer.isTempBlobName(blobName)) {
|
||||
try {
|
||||
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
|
||||
snapshotId, shardId, blobName), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
final List<String> blobNames =
|
||||
blobs.keySet().stream().filter(FsBlobContainer::isTempBlobName).collect(Collectors.toList());
|
||||
try {
|
||||
blobContainer.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
|
||||
snapshotId, shardId, blobNames), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// If we deleted all snapshots, we don't need to create a new index file
|
||||
|
@ -1034,28 +1025,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
// Delete old index files
|
||||
for (final String blobName : blobs.keySet()) {
|
||||
if (blobName.startsWith(SNAPSHOT_INDEX_PREFIX)) {
|
||||
try {
|
||||
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blob [{}] during finalization",
|
||||
snapshotId, shardId, blobName), e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
final List<String> indexBlobs =
|
||||
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
|
||||
try {
|
||||
blobContainer.deleteBlobsIgnoringIfNotExists(indexBlobs);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete index blobs {} during finalization",
|
||||
snapshotId, shardId, indexBlobs), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
// Delete all blobs that don't exist in a snapshot
|
||||
for (final String blobName : blobs.keySet()) {
|
||||
if (blobName.startsWith(DATA_BLOB_PREFIX) && (updatedSnapshots.findNameFile(canonicalName(blobName)) == null)) {
|
||||
try {
|
||||
blobContainer.deleteBlobIgnoringIfNotExists(blobName);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blob [{}] during finalization",
|
||||
snapshotId, shardId, blobName), e);
|
||||
}
|
||||
}
|
||||
final List<String> orphanedBlobs = blobs.keySet().stream()
|
||||
.filter(blobName ->
|
||||
blobName.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blobName)) == null)
|
||||
.collect(Collectors.toList());
|
||||
try {
|
||||
blobContainer.deleteBlobsIgnoringIfNotExists(orphanedBlobs);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete data blobs {} during finalization",
|
||||
snapshotId, shardId, orphanedBlobs), e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String message = "Failed to finalize " + reason + " with shard index [" + currentIndexGen + "]";
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.nio.file.FileAlreadyExistsException;
|
|||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
|
||||
|
@ -136,6 +137,23 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testDeleteBlobs() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
final List<String> blobNames = Arrays.asList("foobar", "barfoo");
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
|
||||
final BytesArray bytesArray = new BytesArray(data);
|
||||
for (String blobName : blobNames) {
|
||||
writeBlob(container, blobName, bytesArray, randomBoolean());
|
||||
}
|
||||
assertEquals(container.listBlobs().size(), 2);
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
assertTrue(container.listBlobs().isEmpty());
|
||||
container.deleteBlobsIgnoringIfNotExists(blobNames); // does not raise when blobs don't exist
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeleteBlobIgnoringIfNotExists() throws IOException {
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
BlobPath blobPath = new BlobPath();
|
||||
|
|
Loading…
Reference in New Issue