Recursive Delete on BlobContainer (#43281) (#43920)

This is a prerequisite of #42189:

* Add directory delete method to blob container specific to each implementation:
  * Some notes on the implementations:
       * AWS + GCS: We can simply exploit the fact that both AWS and GCS return blobs lexicographically ordered which allows us to simply delete in the same order that we receive the blobs from the listing request. For AWS this simply required listing without the delimiter setting (so we get a deep listing) and for GCS the same behavior is achieved by not using the directory mode on the listing invocation. The nice thing about this is, that even for very large numbers of blobs the memory requirements are now capped nicely since we go page by page when deleting.
       * For Azure I extended the parallelization to the listing calls as well and made it work recursively. I verified that this works with thread count `1` since we only block once in the initial thread and then fan out to a "graph" of child listeners that never block.
       * HDFS and FS are trivial since we have directory delete methods available for them
* Enhances third party tests to ensure the new functionality works (I manually ran them for all cloud providers)
This commit is contained in:
Armin Braun 2019-07-03 17:14:57 +02:00 committed by GitHub
parent 49d69bf987
commit be20fb80e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 230 additions and 39 deletions

View File

@ -96,6 +96,11 @@ public class URLBlobContainer extends AbstractBlobContainer {
throw new UnsupportedOperationException("URL repository is read only");
}
@Override
public void delete() {
throw new UnsupportedOperationException("URL repository is read only");
}
/**
* This operation is not supported by URLBlobContainer
*/

View File

@ -23,6 +23,7 @@ import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
@ -38,7 +39,6 @@ import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -127,23 +127,34 @@ public class AzureBlobContainer extends AbstractBlobContainer {
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blobNames.isEmpty()) {
return;
public void delete() throws IOException {
try {
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
final PlainActionFuture<Collection<Void>> result = PlainActionFuture.newFuture();
final GroupedActionListener<Void> listener = new GroupedActionListener<>(result, blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint.
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.submit(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
if (blobNames.isEmpty()) {
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.execute(new ActionRunnable<Void>(listener) {
@Override
protected void doRun() throws IOException {
deleteBlobIgnoringIfNotExists(blobName);
listener.onResponse(null);
}
});
}
}
try {
result.actionGet();

View File

@ -36,6 +36,7 @@ import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -91,6 +92,10 @@ public class AzureBlobStore implements BlobStore {
service.deleteBlob(clientName, container, blob);
}
public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
service.deleteBlobDirectory(clientName, container, path, executor);
}
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
return service.getInputStream(clientName, container, blob);
}

View File

@ -40,6 +40,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@ -49,6 +50,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import java.io.IOException;
import java.io.InputStream;
@ -57,11 +59,15 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileAlreadyExistsException;
import java.security.InvalidKeyException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static java.util.Collections.emptyMap;
@ -187,6 +193,50 @@ public class AzureStorageService {
});
}
void deleteBlobDirectory(String account, String container, String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
deleteBlob(account, container, blobPath);
}
@Override
public void onFailure(Exception e) {
exceptions.add(e);
}
@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
});
}
});
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
result.actionGet();
if (exceptions.isEmpty() == false) {
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
exceptions.forEach(ex::addSuppressed);
throw ex;
}
}
public InputStream getInputStream(String account, String container, String blob)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);

View File

@ -86,6 +86,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
blobStore.deleteBlob(buildKey(blobName));
}
@Override
public void delete() throws IOException {
blobStore.deleteDirectory(path().buildAsString());
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.gcs;
import com.google.api.gax.paging.Page;
import com.google.cloud.BatchResult;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
@ -306,6 +307,23 @@ class GoogleCloudStorageBlobStore implements BlobStore {
}
}
/**
* Deletes the given path and all its children.
*
* @param pathStr Name of path to delete
*/
void deleteDirectory(String pathStr) throws IOException {
SocketAccess.doPrivilegedVoidIOException(() -> {
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
do {
final Collection<String> blobsToDelete = new ArrayList<>();
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
deleteBlobsIgnoringIfNotExists(blobsToDelete);
page = page.getNextPage();
} while (page != null);
});
}
/**
* Deletes multiple blobs from the specific bucket using a batch request
*

View File

@ -78,6 +78,11 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));
}
@Override
public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream does buffering internally

View File

@ -54,6 +54,7 @@ import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -130,12 +131,53 @@ class S3BlobContainer extends AbstractBlobContainer {
deleteBlobIgnoringIfNotExists(blobName);
}
@Override
public void delete() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null;
while (true) {
ObjectListing list;
if (prevListing != null) {
final ObjectListing finalPrevListing = prevListing;
list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing));
} else {
final ListObjectsRequest listObjectsRequest = new ListObjectsRequest();
listObjectsRequest.setBucketName(blobStore.bucket());
listObjectsRequest.setPrefix(keyPath);
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final List<String> blobsToDelete =
list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
if (list.isTruncated()) {
doDeleteBlobs(blobsToDelete, false);
prevListing = list;
} else {
final List<String> lastBlobsToDelete = new ArrayList<>(blobsToDelete);
lastBlobsToDelete.add(keyPath);
doDeleteBlobs(lastBlobsToDelete, false);
break;
}
}
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
doDeleteBlobs(blobNames, true);
}
private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
final Set<String> outstanding;
if (relative) {
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
} else {
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();

View File

@ -88,4 +88,11 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
// to become consistent.
assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES);
}
@Override
protected void assertDeleted(BlobPath path, String name) throws Exception {
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
// to become consistent.
assertBusy(() -> super.assertDeleted(path, name), 10L, TimeUnit.MINUTES);
}
}

View File

@ -109,6 +109,12 @@ public interface BlobContainer {
*/
void deleteBlob(String blobName) throws IOException;
/**
* Deletes this container and all its contents from the repository.
* @throws IOException on failure
*/
void delete() throws IOException;
/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception
* when one or multiple of the given blobs don't exist and simply ignore this case.

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.blobstore;
import org.elasticsearch.common.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@ -68,6 +70,20 @@ public class BlobPath implements Iterable<String> {
return p + SEPARATOR;
}
/**
* Returns this path's parent path.
*
* @return Parent path or {@code null} if there is none
*/
@Nullable
public BlobPath parent() {
if (paths.isEmpty()) {
return null;
} else {
return new BlobPath(new ArrayList<>(paths.subList(0, paths.size() - 1)));
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();

View File

@ -122,6 +122,11 @@ public class FsBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void delete() throws IOException {
IOUtils.rm(path);
}
@Override
public boolean blobExists(String blobName) {
return Files.exists(path.resolve(blobName));

View File

@ -64,6 +64,11 @@ public class BlobContainerWrapper implements BlobContainer {
delegate.deleteBlob(blobName);
}
@Override
public void delete() throws IOException {
delegate.delete();
}
@Override
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
delegate.deleteBlobIgnoringIfNotExists(blobName);

View File

@ -22,7 +22,6 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
@ -34,20 +33,19 @@ import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeTestCase {
@ -67,27 +65,26 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
public void setUp() throws Exception {
super.setUp();
createRepository("test-repo");
deleteAndAssertEmpty(getRepository().basePath());
}
private void deleteAndAssertEmpty(BlobPath path) throws Exception {
final BlobStoreRepository repo = getRepository();
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repo.threadPool().generic().execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
deleteContents(repo.blobStore().blobContainer(repo.basePath()));
repo.blobStore().blobContainer(path).delete();
future.onResponse(null);
}
});
future.actionGet();
assertChildren(repo.basePath(), Collections.emptyList());
}
private static void deleteContents(BlobContainer container) throws IOException {
final List<String> toDelete = new ArrayList<>();
for (Map.Entry<String, BlobContainer> child : container.children().entrySet()) {
deleteContents(child.getValue());
toDelete.add(child.getKey());
final BlobPath parent = path.parent();
if (parent == null) {
assertChildren(path, Collections.emptyList());
} else {
assertDeleted(parent, path.toArray()[path.toArray().length - 1]);
}
toDelete.addAll(container.listBlobs().keySet());
container.deleteBlobsIgnoringIfNotExists(toDelete);
}
public void testCreateSnapshot() {
@ -159,6 +156,11 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
assertBlobsByPrefix(repo.basePath().add("foo"), "nest",
Collections.singletonMap("nested-blob", new PlainBlobMetaData("nested-blob", testBlobLen)));
assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList());
if (randomBoolean()) {
deleteAndAssertEmpty(repo.basePath());
} else {
deleteAndAssertEmpty(repo.basePath().add("foo"));
}
}
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
@ -182,7 +184,21 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
}
}
protected void assertDeleted(BlobPath path, String name) throws Exception {
assertThat(listChildren(path), not(contains(name)));
}
protected void assertChildren(BlobPath path, Collection<String> children) throws Exception {
listChildren(path);
final Set<String> foundChildren = listChildren(path);
if (children.isEmpty()) {
assertThat(foundChildren, empty());
} else {
assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY)));
}
}
private Set<String> listChildren(BlobPath path) {
final PlainActionFuture<Set<String>> future = PlainActionFuture.newFuture();
final BlobStoreRepository repository = getRepository();
repository.threadPool().generic().execute(new ActionRunnable<Set<String>>(future) {
@ -192,12 +208,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
future.onResponse(blobStore.blobContainer(path).children().keySet());
}
});
Set<String> foundChildren = future.actionGet();
if (children.isEmpty()) {
assertThat(foundChildren, empty());
} else {
assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY)));
}
return future.actionGet();
}
private BlobStoreRepository getRepository() {