diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 697b443c93a..f9406e45447 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -96,6 +96,11 @@ public class URLBlobContainer extends AbstractBlobContainer { throw new UnsupportedOperationException("URL repository is read only"); } + @Override + public void delete() { + throw new UnsupportedOperationException("URL repository is read only"); + } + /** * This operation is not supported by URLBlobContainer */ diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 513bcf50abc..3dfd5903721 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -23,6 +23,7 @@ import com.microsoft.azure.storage.LocationMode; import com.microsoft.azure.storage.StorageException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; @@ -38,7 +39,6 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URISyntaxException; import java.nio.file.NoSuchFileException; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -127,23 +127,34 @@ public class AzureBlobContainer extends AbstractBlobContainer { } @Override - public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { - if (blobNames.isEmpty()) { - return; + public void delete() throws IOException { + try { + blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME)); + } catch (URISyntaxException | StorageException e) { + throw new IOException(e); } - final PlainActionFuture> result = PlainActionFuture.newFuture(); - final GroupedActionListener listener = new GroupedActionListener<>(result, blobNames.size()); - final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); - // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint. - // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. - for (String blobName : blobNames) { - executor.submit(new ActionRunnable(listener) { - @Override - protected void doRun() throws IOException { - deleteBlobIgnoringIfNotExists(blobName); - listener.onResponse(null); - } - }); + } + + @Override + public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + final PlainActionFuture result = PlainActionFuture.newFuture(); + if (blobNames.isEmpty()) { + result.onResponse(null); + } else { + final GroupedActionListener listener = + new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size()); + final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + // Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint + // TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way. + for (String blobName : blobNames) { + executor.execute(new ActionRunnable(listener) { + @Override + protected void doRun() throws IOException { + deleteBlobIgnoringIfNotExists(blobName); + listener.onResponse(null); + } + }); + } } try { result.actionGet(); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 5d3f6c85703..a7d9bb93a51 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -36,6 +36,7 @@ import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Collectors; @@ -91,6 +92,10 @@ public class AzureBlobStore implements BlobStore { service.deleteBlob(clientName, container, blob); } + public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException { + service.deleteBlobDirectory(clientName, container, path, executor); + } + public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException { return service.getInputStream(clientName, container, blob); } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index f153aa3031c..be98edda83d 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -40,6 +40,7 @@ import com.microsoft.azure.storage.blob.ListBlobItem; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; @@ -49,6 +50,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import java.io.IOException; import java.io.InputStream; @@ -57,11 +59,15 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.file.FileAlreadyExistsException; import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static java.util.Collections.emptyMap; @@ -187,6 +193,50 @@ public class AzureStorageService { }); } + void deleteBlobDirectory(String account, String container, String path, Executor executor) + throws URISyntaxException, StorageException, IOException { + final Tuple> client = client(account); + final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); + final Collection exceptions = Collections.synchronizedList(new ArrayList<>()); + final AtomicLong outstanding = new AtomicLong(1L); + final PlainActionFuture result = PlainActionFuture.newFuture(); + SocketAccess.doPrivilegedVoidException(() -> { + for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) { + // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ + // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / + final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1); + outstanding.incrementAndGet(); + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + deleteBlob(account, container, blobPath); + } + + @Override + public void onFailure(Exception e) { + exceptions.add(e); + } + + @Override + public void onAfter() { + if (outstanding.decrementAndGet() == 0) { + result.onResponse(null); + } + } + }); + } + }); + if (outstanding.decrementAndGet() == 0) { + result.onResponse(null); + } + result.actionGet(); + if (exceptions.isEmpty() == false) { + final IOException ex = new IOException("Deleting directory [" + path + "] failed"); + exceptions.forEach(ex::addSuppressed); + throw ex; + } + } + public InputStream getInputStream(String account, String container, String blob) throws URISyntaxException, StorageException, IOException { final Tuple> client = client(account); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index a281d83eb41..75d4ad92fbf 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -86,6 +86,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { blobStore.deleteBlob(buildKey(blobName)); } + @Override + public void delete() throws IOException { + blobStore.deleteDirectory(path().buildAsString()); + } + @Override public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList())); diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 7449649e709..743b6ba30eb 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories.gcs; +import com.google.api.gax.paging.Page; import com.google.cloud.BatchResult; import com.google.cloud.ReadChannel; import com.google.cloud.WriteChannel; @@ -306,6 +307,23 @@ class GoogleCloudStorageBlobStore implements BlobStore { } } + /** + * Deletes the given path and all its children. + * + * @param pathStr Name of path to delete + */ + void deleteDirectory(String pathStr) throws IOException { + SocketAccess.doPrivilegedVoidIOException(() -> { + Page page = client().get(bucketName).list(BlobListOption.prefix(pathStr)); + do { + final Collection blobsToDelete = new ArrayList<>(); + page.getValues().forEach(b -> blobsToDelete.add(b.getName())); + deleteBlobsIgnoringIfNotExists(blobsToDelete); + page = page.getNextPage(); + } while (page != null); + }); + } + /** * Deletes multiple blobs from the specific bucket using a batch request * diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index fcf303dfc09..b050645f995 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -78,6 +78,11 @@ final class HdfsBlobContainer extends AbstractBlobContainer { } } + @Override + public void delete() throws IOException { + store.execute(fileContext -> fileContext.delete(path, true)); + } + @Override public InputStream readBlob(String blobName) throws IOException { // FSDataInputStream does buffering internally diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index c3eee2075ee..47f7ee26e83 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -54,6 +54,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -130,12 +131,53 @@ class S3BlobContainer extends AbstractBlobContainer { deleteBlobIgnoringIfNotExists(blobName); } + @Override + public void delete() throws IOException { + try (AmazonS3Reference clientReference = blobStore.clientReference()) { + ObjectListing prevListing = null; + while (true) { + ObjectListing list; + if (prevListing != null) { + final ObjectListing finalPrevListing = prevListing; + list = SocketAccess.doPrivileged(() -> clientReference.client().listNextBatchOfObjects(finalPrevListing)); + } else { + final ListObjectsRequest listObjectsRequest = new ListObjectsRequest(); + listObjectsRequest.setBucketName(blobStore.bucket()); + listObjectsRequest.setPrefix(keyPath); + list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest)); + } + final List blobsToDelete = + list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList()); + if (list.isTruncated()) { + doDeleteBlobs(blobsToDelete, false); + prevListing = list; + } else { + final List lastBlobsToDelete = new ArrayList<>(blobsToDelete); + lastBlobsToDelete.add(keyPath); + doDeleteBlobs(lastBlobsToDelete, false); + break; + } + } + } catch (final AmazonClientException e) { + throw new IOException("Exception when deleting blob container [" + keyPath + "]", e); + } + } + @Override public void deleteBlobsIgnoringIfNotExists(List blobNames) throws IOException { + doDeleteBlobs(blobNames, true); + } + + private void doDeleteBlobs(List blobNames, boolean relative) throws IOException { if (blobNames.isEmpty()) { return; } - final Set outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet()); + final Set outstanding; + if (relative) { + outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet()); + } else { + outstanding = new HashSet<>(blobNames); + } try (AmazonS3Reference clientReference = blobStore.clientReference()) { // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes final List deleteRequests = new ArrayList<>(); diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index 6fd716328f3..bdaace00f80 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -88,4 +88,11 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes // to become consistent. assertBusy(() -> super.assertChildren(path, children), 10L, TimeUnit.MINUTES); } + + @Override + protected void assertDeleted(BlobPath path, String name) throws Exception { + // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that + // to become consistent. + assertBusy(() -> super.assertDeleted(path, name), 10L, TimeUnit.MINUTES); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index b8f811295ed..a44d1fb0530 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -109,6 +109,12 @@ public interface BlobContainer { */ void deleteBlob(String blobName) throws IOException; + /** + * Deletes this container and all its contents from the repository. + * @throws IOException on failure + */ + void delete() throws IOException; + /** * Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception * when one or multiple of the given blobs don't exist and simply ignore this case. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java index ea02aebb0aa..95ce3b27efe 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobPath.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.blobstore; +import org.elasticsearch.common.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -68,6 +70,20 @@ public class BlobPath implements Iterable { return p + SEPARATOR; } + /** + * Returns this path's parent path. + * + * @return Parent path or {@code null} if there is none + */ + @Nullable + public BlobPath parent() { + if (paths.isEmpty()) { + return null; + } else { + return new BlobPath(new ArrayList<>(paths.subList(0, paths.size() - 1))); + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index a916515da9e..b51115b2466 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -122,6 +122,11 @@ public class FsBlobContainer extends AbstractBlobContainer { } } + @Override + public void delete() throws IOException { + IOUtils.rm(path); + } + @Override public boolean blobExists(String blobName) { return Files.exists(path.resolve(blobName)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java index 935ae9f51b6..df1d003aa55 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java @@ -64,6 +64,11 @@ public class BlobContainerWrapper implements BlobContainer { delegate.deleteBlob(blobName); } + @Override + public void delete() throws IOException { + delegate.delete(); + } + @Override public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException { delegate.deleteBlobIgnoringIfNotExists(blobName); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 8d1d3e4004c..28083f49e1a 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -22,7 +22,6 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -34,20 +33,19 @@ import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeTestCase { @@ -67,27 +65,26 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT public void setUp() throws Exception { super.setUp(); createRepository("test-repo"); + deleteAndAssertEmpty(getRepository().basePath()); + } + + private void deleteAndAssertEmpty(BlobPath path) throws Exception { final BlobStoreRepository repo = getRepository(); final PlainActionFuture future = PlainActionFuture.newFuture(); repo.threadPool().generic().execute(new ActionRunnable(future) { @Override protected void doRun() throws Exception { - deleteContents(repo.blobStore().blobContainer(repo.basePath())); + repo.blobStore().blobContainer(path).delete(); future.onResponse(null); } }); future.actionGet(); - assertChildren(repo.basePath(), Collections.emptyList()); - } - - private static void deleteContents(BlobContainer container) throws IOException { - final List toDelete = new ArrayList<>(); - for (Map.Entry child : container.children().entrySet()) { - deleteContents(child.getValue()); - toDelete.add(child.getKey()); + final BlobPath parent = path.parent(); + if (parent == null) { + assertChildren(path, Collections.emptyList()); + } else { + assertDeleted(parent, path.toArray()[path.toArray().length - 1]); } - toDelete.addAll(container.listBlobs().keySet()); - container.deleteBlobsIgnoringIfNotExists(toDelete); } public void testCreateSnapshot() { @@ -159,6 +156,11 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT assertBlobsByPrefix(repo.basePath().add("foo"), "nest", Collections.singletonMap("nested-blob", new PlainBlobMetaData("nested-blob", testBlobLen))); assertChildren(repo.basePath().add("foo").add("nested"), Collections.emptyList()); + if (randomBoolean()) { + deleteAndAssertEmpty(repo.basePath()); + } else { + deleteAndAssertEmpty(repo.basePath().add("foo")); + } } protected void assertBlobsByPrefix(BlobPath path, String prefix, Map blobs) throws Exception { @@ -182,7 +184,21 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT } } + protected void assertDeleted(BlobPath path, String name) throws Exception { + assertThat(listChildren(path), not(contains(name))); + } + protected void assertChildren(BlobPath path, Collection children) throws Exception { + listChildren(path); + final Set foundChildren = listChildren(path); + if (children.isEmpty()) { + assertThat(foundChildren, empty()); + } else { + assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY))); + } + } + + private Set listChildren(BlobPath path) { final PlainActionFuture> future = PlainActionFuture.newFuture(); final BlobStoreRepository repository = getRepository(); repository.threadPool().generic().execute(new ActionRunnable>(future) { @@ -192,12 +208,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT future.onResponse(blobStore.blobContainer(path).children().keySet()); } }); - Set foundChildren = future.actionGet(); - if (children.isEmpty()) { - assertThat(foundChildren, empty()); - } else { - assertThat(foundChildren, containsInAnyOrder(children.toArray(Strings.EMPTY_ARRAY))); - } + return future.actionGet(); } private BlobStoreRepository getRepository() {