Repository Cleanup Endpoint (#43900) (#45780)

* Repository Cleanup Endpoint (#43900)

* Snapshot cleanup functionality via transport/REST endpoint.
* Added all the infrastructure for this with the HLRC and node client
* Made use of it in tests and resolved relevant TODO
* Added new `Custom` CS element that tracks the cleanup logic.
Kept it similar to the delete and in progress classes and gave it
some (for now) redundant way of handling multiple cleanups but only allow one
* Use the exact same mechanism used by deletes to have the combination
of CS entry and increment in repository state ID provide some
concurrency safety (the initial approach of just an entry in the CS
was not enough, we must increment the repository state ID to be safe
against concurrent modifications, otherwise we run the risk of "cleaning up"
blobs that just got created without noticing)
* Isolated the logic to the transport action class as much as I could.
It's not ideal, but we don't need to keep any state and do the same
for other repository operations
(like getting the detailed snapshot shard status)
This commit is contained in:
Armin Braun 2019-08-21 17:59:49 +02:00 committed by GitHub
parent fe2a7523ec
commit 6aaee8aa0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1229 additions and 64 deletions

View File

@ -20,6 +20,8 @@
package org.elasticsearch.client;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
@ -170,6 +172,35 @@ public final class SnapshotClient {
VerifyRepositoryResponse::fromXContent, listener, emptySet());
}
/**
* Cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public CleanupRepositoryResponse cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, emptySet());
}
/**
* Asynchronously cleans up a snapshot repository.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html"> Snapshot and Restore
* API on elastic.co</a>
* @param cleanupRepositoryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void cleanupRepositoryAsync(CleanupRepositoryRequest cleanupRepositoryRequest, RequestOptions options,
ActionListener<CleanupRepositoryResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(cleanupRepositoryRequest, SnapshotRequestConverters::cleanupRepository,
options, CleanupRepositoryResponse::fromXContent, listener, emptySet());
}
/**
* Creates a snapshot.
* <p>

View File

@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
@ -94,6 +95,20 @@ final class SnapshotRequestConverters {
return request;
}
static Request cleanupRepository(CleanupRepositoryRequest cleanupRepositoryRequest) {
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_snapshot")
.addPathPart(cleanupRepositoryRequest.name())
.addPathPartAsIs("_cleanup")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params parameters = new RequestConverters.Params();
parameters.withMasterTimeout(cleanupRepositoryRequest.masterNodeTimeout());
parameters.withTimeout(cleanupRepositoryRequest.timeout());
request.addParameters(parameters.asMap());
return request;
}
static Request createSnapshot(CreateSnapshotRequest createSnapshotRequest) throws IOException {
String endpoint = new RequestConverters.EndpointBuilder().addPathPart("_snapshot")
.addPathPart(createSnapshotRequest.repository())

View File

@ -20,6 +20,8 @@
package org.elasticsearch.client;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
@ -135,6 +137,17 @@ public class SnapshotIT extends ESRestHighLevelClientTestCase {
assertThat(response.getNodes().size(), equalTo(1));
}
public void testCleanupRepository() throws IOException {
AcknowledgedResponse putRepositoryResponse = createTestRepository("test", FsRepository.TYPE, "{\"location\": \".\"}");
assertTrue(putRepositoryResponse.isAcknowledged());
CleanupRepositoryRequest request = new CleanupRepositoryRequest("test");
CleanupRepositoryResponse response = execute(request, highLevelClient().snapshot()::cleanupRepository,
highLevelClient().snapshot()::cleanupRepositoryAsync);
assertThat(response.result().bytes(), equalTo(0L));
assertThat(response.result().blobs(), equalTo(0L));
}
public void testCreateSnapshot() throws IOException {
String repository = "test_repository";
assertTrue(createTestRepository(repository, FsRepository.TYPE, "{\"location\": \".\"}").isAcknowledged());

View File

@ -332,6 +332,42 @@ POST /_snapshot/my_unverified_backup/_verify
It returns a list of nodes where repository was successfully verified or an error message if verification process failed.
[float]
===== Repository Cleanup
Repositories can over time accumulate data that is not referenced by any existing snapshot. This is a result of the data safety guarantees
the snapshot functionality provides in failure scenarios during snapshot creation and the decentralized nature of the snapshot creation
process. This unreferenced data does in no way negatively impact the performance or safety of a snapshot repository but leads to higher
than necessary storage use. In order to clean up this unreferenced data, users can call the cleanup endpoint for a repository which will
trigger a complete accounting of the repositories contents and subsequent deletion of all unreferenced data that was found.
[source,js]
-----------------------------------
POST /_snapshot/my_repository/_cleanup
-----------------------------------
// CONSOLE
// TEST[continued]
The response to a cleanup request looks as follows:
[source,js]
--------------------------------------------------
{
"results": {
"deleted_bytes": 20,
"deleted_blobs": 5
}
}
--------------------------------------------------
// TESTRESPONSE
Depending on the concrete repository implementation the numbers shown for bytes free as well as the number of blobs removed will either
be an approximation or an exact result. Any non-zero value for the number of blobs removed implies that unreferenced blobs were found and
subsequently cleaned up.
Please note that most of the cleanup operations executed by this endpoint are automatically executed when deleting any snapshot from a
repository. If you regularly delete snapshots, you will in most cases not get any or only minor space savings from using this functionality
and should lower your frequency of invoking it accordingly.
[float]
[[snapshots-take-snapshot]]
=== Snapshot

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import java.io.BufferedInputStream;
@ -97,7 +98,7 @@ public class URLBlobContainer extends AbstractBlobContainer {
}
@Override
public void delete() {
public DeleteResult delete() {
throw new UnsupportedOperationException("URL repository is read only");
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;
@ -126,9 +127,9 @@ public class AzureBlobContainer extends AbstractBlobContainer {
}
@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
try {
blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}

View File

@ -21,12 +21,12 @@ package org.elasticsearch.repositories.azure;
import com.microsoft.azure.storage.LocationMode;
import com.microsoft.azure.storage.StorageException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;
@ -92,8 +92,9 @@ public class AzureBlobStore implements BlobStore {
service.deleteBlob(clientName, container, blob);
}
public void deleteBlobDirectory(String path, Executor executor) throws URISyntaxException, StorageException, IOException {
service.deleteBlobDirectory(clientName, container, path, executor);
public DeleteResult deleteBlobDirectory(String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
return service.deleteBlobDirectory(clientName, container, path, executor);
}
public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {

View File

@ -43,6 +43,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
@ -193,13 +194,15 @@ public class AzureStorageService {
});
}
void deleteBlobDirectory(String account, String container, String path, Executor executor)
DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
@ -209,7 +212,17 @@ public class AzureStorageService {
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
deleteBlob(account, container, blobPath);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}
@Override
@ -235,6 +248,7 @@ public class AzureStorageService {
exceptions.forEach(ex::addSuppressed);
throw ex;
}
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
public InputStream getInputStream(String account, String container, String blob)

View File

@ -22,6 +22,7 @@ package org.elasticsearch.repositories.gcs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import java.io.IOException;
@ -77,8 +78,8 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
}
@Override
public void delete() throws IOException {
blobStore.deleteDirectory(path().buildAsString());
public DeleteResult delete() throws IOException {
return blobStore.deleteDirectory(path().buildAsString());
}
@Override

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.core.internal.io.Streams;
@ -55,6 +56,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -300,15 +302,24 @@ class GoogleCloudStorageBlobStore implements BlobStore {
*
* @param pathStr Name of path to delete
*/
void deleteDirectory(String pathStr) throws IOException {
SocketAccess.doPrivilegedVoidIOException(() -> {
DeleteResult deleteDirectory(String pathStr) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> {
DeleteResult deleteResult = DeleteResult.ZERO;
Page<Blob> page = client().get(bucketName).list(BlobListOption.prefix(pathStr));
do {
final Collection<String> blobsToDelete = new ArrayList<>();
page.getValues().forEach(b -> blobsToDelete.add(b.getName()));
final AtomicLong blobsDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
page.getValues().forEach(b -> {
blobsToDelete.add(b.getName());
blobsDeleted.incrementAndGet();
bytesDeleted.addAndGet(b.getSize());
});
deleteBlobsIgnoringIfNotExists(blobsToDelete);
deleteResult = deleteResult.add(blobsDeleted.get(), bytesDeleted.get());
page = page.getNextPage();
} while (page != null);
return deleteResult;
});
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
@ -69,9 +70,13 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
}
}
// TODO: See if we can get precise result reporting.
private static final DeleteResult DELETE_RESULT = new DeleteResult(1L, 0L);
@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
store.execute(fileContext -> fileContext.delete(path, true));
return DELETE_RESULT;
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.hdfs;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.bootstrap.JavaVersion;
import org.elasticsearch.common.settings.MockSecureSettings;
@ -30,6 +31,7 @@ import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import java.util.Collection;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class)
public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
@ -58,4 +60,14 @@ public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase {
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
}
// HDFS repository doesn't have precise cleanup stats so we only check whether or not any blobs were removed
@Override
protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) {
if (blobs > 0) {
assertThat(response.result().blobs(), greaterThan(0L));
} else {
assertThat(response.result().blobs(), equalTo(0L));
}
}
}

View File

@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
@ -42,6 +41,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.Tuple;
@ -54,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -121,7 +122,9 @@ class S3BlobContainer extends AbstractBlobContainer {
}
@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
final AtomicLong deletedBytes = new AtomicLong();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ObjectListing prevListing = null;
while (true) {
@ -135,8 +138,12 @@ class S3BlobContainer extends AbstractBlobContainer {
listObjectsRequest.setPrefix(keyPath);
list = SocketAccess.doPrivileged(() -> clientReference.client().listObjects(listObjectsRequest));
}
final List<String> blobsToDelete =
list.getObjectSummaries().stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
final List<String> blobsToDelete = new ArrayList<>();
list.getObjectSummaries().forEach(s3ObjectSummary -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3ObjectSummary.getSize());
blobsToDelete.add(s3ObjectSummary.getKey());
});
if (list.isTruncated()) {
doDeleteBlobs(blobsToDelete, false);
prevListing = list;
@ -150,6 +157,7 @@ class S3BlobContainer extends AbstractBlobContainer {
} catch (final AmazonClientException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}
return new DeleteResult(deletedBlobs.get(), deletedBytes.get());
}
@Override

View File

@ -0,0 +1,34 @@
{
"snapshot.cleanup_repository": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/modules-snapshots.html",
"stability": "stable",
"url": {
"paths": [
{
"path": "/_snapshot/{repository}/_cleanup",
"methods": [
"POST"
],
"parts": {
"repository": {
"type": "string",
"required" : true,
"description": "A repository name"
}
}
}
]
},
"params": {
"master_timeout": {
"type" : "time",
"description" : "Explicit operation timeout for connection to master node"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
}
},
"body": {}
}
}

View File

@ -38,6 +38,51 @@ setup:
- match: { acknowledged: true }
---
"Create a snapshot and clean up repository":
- skip:
version: " - 7.99.99"
reason: cleanup introduced in 8.0
- do:
snapshot.cleanup_repository:
repository: test_repo_create_1
- match: { results.deleted_bytes: 0 }
- match: { results.deleted_blobs: 0 }
- do:
snapshot.create:
repository: test_repo_create_1
snapshot: test_snapshot
wait_for_completion: true
- match: { snapshot.snapshot: test_snapshot }
- match: { snapshot.state : SUCCESS }
- match: { snapshot.shards.successful: 1 }
- match: { snapshot.shards.failed : 0 }
- do:
snapshot.cleanup_repository:
repository: test_repo_create_1
- match: { results.deleted_bytes: 0 }
- match: { results.deleted_blobs: 0 }
- do:
snapshot.delete:
repository: test_repo_create_1
snapshot: test_snapshot
- match: { acknowledged: true }
- do:
snapshot.cleanup_repository:
repository: test_repo_create_1
- match: { results.deleted_bytes: 0 }
- match: { results.deleted_blobs: 0 }
---
"Create a snapshot for missing index":
- skip:

View File

@ -48,6 +48,8 @@ import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageAction;
import org.elasticsearch.action.admin.cluster.node.usage.TransportNodesUsageAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.TransportRemoteInfoAction;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.TransportDeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction;
@ -226,6 +228,7 @@ import org.elasticsearch.rest.action.RestFieldCapabilitiesAction;
import org.elasticsearch.rest.action.RestMainAction;
import org.elasticsearch.rest.action.admin.cluster.RestAddVotingConfigExclusionAction;
import org.elasticsearch.rest.action.admin.cluster.RestCancelTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestCleanupRepositoryAction;
import org.elasticsearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterAllocationExplainAction;
import org.elasticsearch.rest.action.admin.cluster.RestClusterGetSettingsAction;
@ -455,6 +458,7 @@ public class ActionModule extends AbstractModule {
actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
actions.register(CleanupRepositoryAction.INSTANCE, TransportCleanupRepositoryAction.class);
actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
@ -577,6 +581,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestGetRepositoriesAction(restController, settingsFilter));
registerHandler.accept(new RestDeleteRepositoryAction(restController));
registerHandler.accept(new RestVerifyRepositoryAction(restController));
registerHandler.accept(new RestCleanupRepositoryAction(restController));
registerHandler.accept(new RestGetSnapshotsAction(restController));
registerHandler.accept(new RestCreateSnapshotAction(restController));
registerHandler.accept(new RestRestoreSnapshotAction(restController));

View File

@ -0,0 +1,31 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.cleanup;
import org.elasticsearch.action.ActionType;
public final class CleanupRepositoryAction extends ActionType<CleanupRepositoryResponse> {
public static final CleanupRepositoryAction INSTANCE = new CleanupRepositoryAction();
public static final String NAME = "cluster:admin/repository/_cleanup";
private CleanupRepositoryAction() {
super(NAME, CleanupRepositoryResponse::new);
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.cleanup;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.action.ValidateActions.addValidationError;
public class CleanupRepositoryRequest extends AcknowledgedRequest<CleanupRepositoryRequest> {
private String repository;
public CleanupRepositoryRequest(String repository) {
this.repository = repository;
}
public CleanupRepositoryRequest(StreamInput in) throws IOException {
repository = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(repository);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (repository == null) {
validationException = addValidationError("repository is null", null);
}
return validationException;
}
public String name() {
return repository;
}
public void name(String repository) {
this.repository = repository;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.cleanup;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class CleanupRepositoryRequestBuilder extends MasterNodeOperationRequestBuilder<CleanupRepositoryRequest,
CleanupRepositoryResponse,
CleanupRepositoryRequestBuilder> {
public CleanupRepositoryRequestBuilder(ElasticsearchClient client, ActionType<CleanupRepositoryResponse> action,
String repository) {
super(client, action, new CleanupRepositoryRequest(repository));
}
public CleanupRepositoryRequestBuilder setName(String repository) {
request.name(repository);
return this;
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.cleanup;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import java.io.IOException;
public final class CleanupRepositoryResponse extends ActionResponse implements ToXContentObject {
private static final ObjectParser<CleanupRepositoryResponse, Void> PARSER =
new ObjectParser<>(CleanupRepositoryResponse.class.getName(), true, CleanupRepositoryResponse::new);
static {
PARSER.declareObject((response, cleanupResult) -> response.result = cleanupResult,
RepositoryCleanupResult.PARSER, new ParseField("results"));
}
private RepositoryCleanupResult result;
public CleanupRepositoryResponse() {
}
public CleanupRepositoryResponse(RepositoryCleanupResult result) {
this.result = result;
}
public CleanupRepositoryResponse(StreamInput in) throws IOException {
result = new RepositoryCleanupResult(in);
}
public RepositoryCleanupResult result() {
return result;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
result.writeTo(out);
}
public static CleanupRepositoryResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject().field("results");
result.toXContent(builder, params);
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,248 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.repositories.cleanup;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* Repository cleanup action for repository implementations based on {@link BlobStoreRepository}.
*
* The steps taken by the repository cleanup operation are as follows:
* <ol>
* <li>Check that there are no running repository cleanup, snapshot create, or snapshot delete actions
* and add an entry for the repository that is to be cleaned up to {@link RepositoryCleanupInProgress}</li>
* <li>Run cleanup actions on the repository. Note, these are executed exclusively on the master node.
* For the precise operations execute see {@link BlobStoreRepository#cleanup}</li>
* <li>Remove the entry in {@link RepositoryCleanupInProgress} in the first step.</li>
* </ol>
*
* On master failover during the cleanup operation it is simply removed from the cluster state. This is safe because the logic in
* {@link BlobStoreRepository#cleanup} ensures that the repository state id has not changed between creation of the cluster state entry
* and any delete/write operations. TODO: This will not work if we also want to clean up at the shard level as those will involve writes
* as well as deletes.
*/
public final class TransportCleanupRepositoryAction extends TransportMasterNodeAction<CleanupRepositoryRequest,
CleanupRepositoryResponse> {
private static final Logger logger = LogManager.getLogger(TransportCleanupRepositoryAction.class);
private static final Version MIN_VERSION = Version.V_7_4_0;
private final RepositoriesService repositoriesService;
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
}
@Inject
public TransportCleanupRepositoryAction(TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(CleanupRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
CleanupRepositoryRequest::new, indexNameExpressionResolver);
this.repositoriesService = repositoriesService;
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
clusterService.addStateApplier(event -> {
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) {
return;
}
clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return removeInProgressCleanup(currentState);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("Removed repository cleanup task [{}] from cluster state", repositoryCleanupInProgress);
}
@Override
public void onFailure(String source, Exception e) {
logger.warn(
"Failed to remove repository cleanup task [{}] from cluster state", repositoryCleanupInProgress);
}
});
}
});
}
private static ClusterState removeInProgressCleanup(final ClusterState currentState) {
RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (cleanupInProgress != null) {
boolean changed = false;
if (cleanupInProgress.cleanupInProgress() == false) {
cleanupInProgress = new RepositoryCleanupInProgress();
changed = true;
}
if (changed) {
return ClusterState.builder(currentState).putCustom(
RepositoryCleanupInProgress.TYPE, cleanupInProgress).build();
}
}
return currentState;
}
@Override
protected CleanupRepositoryResponse read(StreamInput in) throws IOException {
return new CleanupRepositoryResponse(in);
}
@Override
protected void masterOperation(CleanupRepositoryRequest request, ClusterState state,
ActionListener<CleanupRepositoryResponse> listener) {
if (state.nodes().getMinNodeVersion().onOrAfter(MIN_VERSION)) {
cleanupRepo(request.name(), ActionListener.map(listener, CleanupRepositoryResponse::new));
} else {
throw new IllegalArgumentException("Repository cleanup is only supported from version [" + MIN_VERSION
+ "] but the oldest node version in the cluster is [" + state.nodes().getMinNodeVersion() + ']');
}
}
@Override
protected ClusterBlockException checkBlock(CleanupRepositoryRequest request, ClusterState state) {
// Cluster is not affected but we look up repositories in metadata
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
}
/**
* Runs cleanup operations on the given repository.
* @param repositoryName Repository to clean up
* @param listener Listener for cleanup result
*/
private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanupResult> listener) {
final Repository repository = repositoriesService.repository(repositoryName);
if (repository instanceof BlobStoreRepository == false) {
listener.onFailure(new IllegalArgumentException("Repository [" + repositoryName + "] does not support repository cleanup"));
return;
}
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
final long repositoryStateId = repository.getRepositoryData().getGenId();
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress");
}
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new IllegalStateException("Cannot cleanup [" + repositoryName + "] - a snapshot is currently being deleted");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && !snapshots.entries().isEmpty()) {
throw new IllegalStateException("Cannot cleanup [" + repositoryName + "] - a snapshot is currently running");
}
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
}
@Override
public void onFailure(String source, Exception e) {
after(e, null);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId, ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
}
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
if (failure == null) {
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
} else {
logger.debug(() -> new ParameterizedMessage(
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
}
assert failure != null || result != null;
clusterService.submitStateUpdateTask(
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return removeInProgressCleanup(currentState);
}
@Override
public void onFailure(String source, Exception e) {
if (failure != null) {
e.addSuppressed(failure);
}
logger.warn(() ->
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (failure == null) {
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
repositoryName, repositoryStateId, result);
listener.onResponse(result);
} else {
logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]",
repositoryName, repositoryStateId), failure);
listener.onFailure(failure);
}
}
});
}
});
}
}

View File

@ -49,6 +49,9 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageResponse;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
@ -453,6 +456,21 @@ public interface ClusterAdminClient extends ElasticsearchClient {
*/
GetRepositoriesRequestBuilder prepareGetRepositories(String... name);
/**
* Cleans up repository.
*/
CleanupRepositoryRequestBuilder prepareCleanupRepository(String repository);
/**
* Cleans up repository.
*/
ActionFuture<CleanupRepositoryResponse> cleanupRepository(CleanupRepositoryRequest repository);
/**
* Cleans up repository.
*/
void cleanupRepository(CleanupRepositoryRequest repository, ActionListener<CleanupRepositoryResponse> listener);
/**
* Verifies a repository.
*/

View File

@ -26,6 +26,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksReque
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
@ -471,6 +472,16 @@ public class Requests {
return new DeleteRepositoryRequest(name);
}
/**
* Cleanup repository
*
* @param name repository name
* @return cleanup repository request
*/
public static CleanupRepositoryRequest cleanupRepositoryRequest(String name) {
return new CleanupRepositoryRequest(name);
}
/**
* Verifies snapshot repository
*

View File

@ -64,6 +64,10 @@ import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageAction;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequest;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.usage.NodesUsageResponse;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequestBuilder;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequestBuilder;
@ -1019,6 +1023,21 @@ public abstract class AbstractClient implements Client {
return new GetRepositoriesRequestBuilder(this, GetRepositoriesAction.INSTANCE, name);
}
@Override
public CleanupRepositoryRequestBuilder prepareCleanupRepository(String repository) {
return new CleanupRepositoryRequestBuilder(this, CleanupRepositoryAction.INSTANCE, repository);
}
@Override
public ActionFuture<CleanupRepositoryResponse> cleanupRepository(CleanupRepositoryRequest request) {
return execute(CleanupRepositoryAction.INSTANCE, request);
}
@Override
public void cleanupRepository(CleanupRepositoryRequest request, ActionListener<CleanupRepositoryResponse> listener) {
execute(CleanupRepositoryAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<RestoreSnapshotResponse> restoreSnapshot(RestoreSnapshotRequest request) {
return execute(RestoreSnapshotAction.INSTANCE, request);

View File

@ -121,6 +121,8 @@ public class ClusterModule extends AbstractModule {
registerClusterCustom(entries, RestoreInProgress.TYPE, RestoreInProgress::new, RestoreInProgress::readDiffFrom);
registerClusterCustom(entries, SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress::new,
SnapshotDeletionsInProgress::readDiffFrom);
registerClusterCustom(entries, RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress::new,
RepositoryCleanupInProgress::readDiffFrom);
// Metadata
registerMetaDataCustom(entries, RepositoriesMetaData.TYPE, RepositoriesMetaData::new, RepositoriesMetaData::readDiffFrom);
registerMetaDataCustom(entries, IngestMetadata.TYPE, IngestMetadata::new, IngestMetadata::readDiffFrom);

View File

@ -0,0 +1,120 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final String TYPE = "repository_cleanup";
private final List<Entry> entries;
public RepositoryCleanupInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
}
RepositoryCleanupInProgress(StreamInput in) throws IOException {
this.entries = in.readList(Entry::new);
}
public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(ClusterState.Custom.class, TYPE, in);
}
public static Entry startedEntry(String repository, long repositoryStateId) {
return new Entry(repository, repositoryStateId);
}
public boolean cleanupInProgress() {
// TODO: Should we allow parallelism across repositories here maybe?
return entries.isEmpty();
}
@Override
public String getWriteableName() {
return TYPE;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(entries);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
builder.field("repository", entry.repository);
}
builder.endObject();
}
builder.endArray();
return builder;
}
@Override
public String toString() {
return Strings.toString(this);
}
@Override
public Version getMinimalSupportedVersion() {
return Version.V_7_4_0;
}
public static final class Entry implements Writeable {
private final String repository;
private final long repositoryStateId;
private Entry(StreamInput in) throws IOException {
repository = in.readString();
repositoryStateId = in.readLong();
}
private Entry(String repository, long repositoryStateId) {
this.repository = repository;
this.repositoryStateId = repositoryStateId;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(repository);
out.writeLong(repositoryStateId);
}
@Override
public String toString() {
return "{" + repository + '}' + '{' + repositoryStateId + '}';
}
}
}

View File

@ -102,9 +102,11 @@ public interface BlobContainer {
/**
* Deletes this container and all its contents from the repository.
*
* @return delete result
* @throws IOException on failure
*/
void delete() throws IOException;
DeleteResult delete() throws IOException;
/**
* Deletes the blobs with given names. Unlike {@link #deleteBlob(String)} this method will not throw an exception

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.blobstore;
/**
* The result of deleting multiple blobs from a {@link BlobStore}.
*/
public final class DeleteResult {
public static final DeleteResult ZERO = new DeleteResult(0, 0);
private final long blobsDeleted;
private final long bytesDeleted;
public DeleteResult(long blobsDeleted, long bytesDeleted) {
this.blobsDeleted = blobsDeleted;
this.bytesDeleted = bytesDeleted;
}
public long blobsDeleted() {
return blobsDeleted;
}
public long bytesDeleted() {
return bytesDeleted;
}
public DeleteResult add(DeleteResult other) {
return new DeleteResult(blobsDeleted + other.blobsDeleted(), bytesDeleted + other.bytesDeleted());
}
public DeleteResult add(long blobs, long bytes) {
return new DeleteResult(blobsDeleted + blobs, bytesDeleted + bytes);
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.core.internal.io.IOUtils;
@ -45,6 +46,7 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.unmodifiableMap;
@ -123,8 +125,26 @@ public class FsBlobContainer extends AbstractBlobContainer {
}
@Override
public void delete() throws IOException {
IOUtils.rm(path);
public DeleteResult delete() throws IOException {
final AtomicLong filesDeleted = new AtomicLong(0L);
final AtomicLong bytesDeleted = new AtomicLong(0L);
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException {
assert impossible == null;
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
filesDeleted.incrementAndGet();
bytesDeleted.addAndGet(attrs.size());
return FileVisitResult.CONTINUE;
}
});
return new DeleteResult(filesDeleted.get(), bytesDeleted.get());
}
@Override

View File

@ -0,0 +1,88 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
public final class RepositoryCleanupResult implements Writeable, ToXContentObject {
public static final ObjectParser<RepositoryCleanupResult, Void> PARSER =
new ObjectParser<>(RepositoryCleanupResult.class.getName(), true, RepositoryCleanupResult::new);
private static final String DELETED_BLOBS = "deleted_blobs";
private static final String DELETED_BYTES = "deleted_bytes";
static {
PARSER.declareLong((result, bytes) -> result.bytes = bytes, new ParseField(DELETED_BYTES));
PARSER.declareLong((result, blobs) -> result.blobs = blobs, new ParseField(DELETED_BLOBS));
}
private long bytes;
private long blobs;
private RepositoryCleanupResult() {
this(DeleteResult.ZERO);
}
public RepositoryCleanupResult(DeleteResult result) {
this.blobs = result.blobsDeleted();
this.bytes = result.bytesDeleted();
}
public RepositoryCleanupResult(StreamInput in) throws IOException {
bytes = in.readLong();
blobs = in.readLong();
}
public long bytes() {
return bytes;
}
public long blobs() {
return blobs;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeLong(bytes);
out.writeLong(blobs);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.startObject().field(DELETED_BYTES, bytes).field(DELETED_BLOBS, blobs).endObject();
}
@Override
public String toString() {
return Strings.toString(this);
}
}

View File

@ -44,6 +44,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -80,6 +81,7 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException;
@ -428,7 +430,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
foundIndices = blobStore().blobContainer(basePath().add("indices")).children();
foundIndices = blobStore().blobContainer(indicesPath()).children();
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
@ -451,18 +454,61 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
.orElse(Collections.emptyList()),
snapshotId,
ActionListener.map(listener, v -> {
cleanupStaleIndices(foundIndices, survivingIndices);
cleanupStaleRootFiles(Sets.difference(rootBlobs, new HashSet<>(snapMetaFilesToDelete)), updatedRepositoryData);
cleanupStaleIndices(foundIndices, survivingIndices.values().stream().map(IndexId::getId).collect(Collectors.toSet()));
cleanupStaleRootFiles(
staleRootBlobs(updatedRepositoryData, Sets.difference(rootBlobs, new HashSet<>(snapMetaFilesToDelete))));
return null;
})
);
}
}
private void cleanupStaleRootFiles(Set<String> rootBlobNames, RepositoryData repositoryData) {
/**
* Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the
* repository.
* TODO: Add shard level cleanups
* <ul>
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param listener Lister to complete when done
*/
public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResult> listener) {
ActionListener.completeWith(listener, () -> {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getGenId() != repositoryStateId) {
// Check that we are working on the expected repository version before gathering the data to clean up
throw new RepositoryException(metadata.name(), "concurrent modification of the repository before cleanup started, " +
"expected current generation [" + repositoryStateId + "], actual current generation ["
+ repositoryData.getGenId() + "]");
}
Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
final Set<String> survivingIndexIds =
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
final List<String> staleRootBlobs = staleRootBlobs(repositoryData, rootBlobs.keySet());
if (survivingIndexIds.equals(foundIndices.keySet()) && staleRootBlobs.isEmpty()) {
// Nothing to clean up we return
return new RepositoryCleanupResult(DeleteResult.ZERO);
}
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId);
final DeleteResult deleteIndicesResult = cleanupStaleIndices(foundIndices, survivingIndexIds);
List<String> cleaned = cleanupStaleRootFiles(staleRootBlobs);
return new RepositoryCleanupResult(
deleteIndicesResult.add(cleaned.size(), cleaned.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()));
});
}
// Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData
private List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
final Set<String> allSnapshotIds =
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
final List<String> blobsToDelete = rootBlobNames.stream().filter(
return rootBlobNames.stream().filter(
blob -> {
if (FsBlobContainer.isTempBlobName(blob)) {
return true;
@ -483,12 +529,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return false;
}
).collect(Collectors.toList());
}
private List<String> cleanupStaleRootFiles(List<String> blobsToDelete) {
if (blobsToDelete.isEmpty()) {
return;
return blobsToDelete;
}
try {
logger.info("[{}] Found stale root level blobs {}. Cleaning them up", metadata.name(), blobsToDelete);
blobContainer().deleteBlobsIgnoringIfNotExists(blobsToDelete);
return blobsToDelete;
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] The following blobs are no longer part of any snapshot [{}] but failed to remove them",
@ -500,18 +550,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of root level blobs", metadata.name()), e);
}
return Collections.emptyList();
}
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Map<String, IndexId> survivingIndices) {
private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
final Set<String> survivingIndexIds = survivingIndices.values().stream()
.map(IndexId::getId).collect(Collectors.toSet());
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
indexEntry.getValue().delete();
deleteResult = deleteResult.add(indexEntry.getValue().delete());
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
@ -527,6 +577,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
return deleteResult;
}
private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import static org.elasticsearch.client.Requests.cleanupRepositoryRequest;
import static org.elasticsearch.rest.RestRequest.Method.POST;
/**
* Cleans up a repository
*/
public class RestCleanupRepositoryAction extends BaseRestHandler {
public RestCleanupRepositoryAction(RestController controller) {
controller.registerHandler(POST, "/_snapshot/{repository}/_cleanup", this);
}
@Override
public String getName() {
return "cleanup_repository_action";
}
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
CleanupRepositoryRequest cleanupRepositoryRequest = cleanupRepositoryRequest(request.param("repository"));
cleanupRepositoryRequest.timeout(request.paramAsTime("timeout", cleanupRepositoryRequest.timeout()));
cleanupRepositoryRequest.masterNodeTimeout(request.paramAsTime("master_timeout", cleanupRepositoryRequest.masterNodeTimeout()));
return channel -> client.admin().cluster().cleanupRepository(cleanupRepositoryRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
@ -264,6 +265,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a repository cleanup is in-progress");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
@ -1133,6 +1139,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot delete - another snapshot is currently being deleted");
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) {
throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(),
"cannot delete snapshot while a repository cleanup is in-progress");
}
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
// don't allow snapshot deletions while a restore is taking place,

View File

@ -20,7 +20,6 @@ package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -72,14 +71,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
@After
public void assertRepoConsistency() {
if (skipRepoConsistencyCheckReason == null) {
client().admin().cluster().prepareGetRepositories().get().repositories()
.stream()
.map(RepositoryMetaData::name)
.forEach(name -> {
final List<SnapshotInfo> snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots();
// Delete one random snapshot to trigger repository cleanup.
if (snapshots.isEmpty() == false) {
client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get();
client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetaData -> {
final String name = repositoryMetaData.name();
if (repositoryMetaData.settings().getAsBoolean("readonly", false) == false) {
client().admin().cluster().prepareCleanupRepository(name).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
});

View File

@ -488,13 +488,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
() -> client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")
.execute().actionGet());
// TODO: Replace this by repository cleanup endpoint call once that's available
logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup");
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-tmp")
.setWaitForCompletion(true)
.setIndices("test-idx")
.get();
client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-tmp").get();
client().admin().cluster().prepareCleanupRepository("test-repo").get();
// Subtract four files that will remain in the repository:
// (1) index-(N+1)

View File

@ -21,6 +21,7 @@ package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import java.io.IOException;
import java.io.InputStream;
@ -60,8 +61,8 @@ public class BlobContainerWrapper implements BlobContainer {
}
@Override
public void delete() throws IOException {
delegate.delete();
public DeleteResult delete() throws IOException {
return delegate.delete();
}
@Override

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
@ -47,6 +48,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -219,13 +221,20 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
}
@Override
public void delete() {
public DeleteResult delete() {
ensureNotClosed();
final String thisPath = path.buildAsString();
final AtomicLong bytesDeleted = new AtomicLong(0L);
final AtomicLong blobsDeleted = new AtomicLong(0L);
synchronized (context.actions) {
consistentView(context.actions).stream().filter(action -> action.path.startsWith(thisPath))
.forEach(a -> context.actions.add(new BlobStoreAction(Operation.DELETE, a.path)));
.forEach(a -> {
context.actions.add(new BlobStoreAction(Operation.DELETE, a.path));
bytesDeleted.addAndGet(a.data.length);
blobsDeleted.incrementAndGet();
});
}
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}
@Override

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
@ -330,14 +331,20 @@ public class MockRepository extends FsRepository {
}
@Override
public void delete() throws IOException {
public DeleteResult delete() throws IOException {
DeleteResult deleteResult = DeleteResult.ZERO;
for (BlobContainer child : children().values()) {
child.delete();
deleteResult = deleteResult.add(child.delete());
}
for (String blob : listBlobs().values().stream().map(BlobMetaData::name).collect(Collectors.toList())) {
final Map<String, BlobMetaData> blobs = listBlobs();
long deleteBlobCount = blobs.size();
long deleteByteCount = 0L;
for (String blob : blobs.values().stream().map(BlobMetaData::name).collect(Collectors.toList())) {
deleteBlobIgnoringIfNotExists(blob);
deleteByteCount += blobs.get(blob).length();
}
blobStore().blobContainer(path().parent()).deleteBlob(path().toArray()[path().toArray().length - 1]);
return deleteResult.add(deleteBlobCount, deleteByteCount);
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.PlainActionFuture;
@ -208,30 +209,49 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
.state(),
equalTo(SnapshotState.SUCCESS));
logger.info("--> creating a dangling index folder");
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final Executor genericExec = repo.threadPool().executor(ThreadPool.Names.GENERIC);
logger.info("--> creating a dangling index folder");
createDanglingIndex(repo, genericExec);
logger.info("--> deleting a snapshot to trigger repository cleanup");
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
assertConsistentRepository(repo, genericExec);
logger.info("--> Create dangling index");
createDanglingIndex(repo, genericExec);
logger.info("--> Execute repository cleanup");
final CleanupRepositoryResponse response = client().admin().cluster().prepareCleanupRepository("test-repo").get();
assertCleanupResponse(response, 3L, 1L);
}
protected void assertCleanupResponse(CleanupRepositoryResponse response, long bytes, long blobs) {
assertThat(response.result().blobs(), equalTo(1L + 2L));
assertThat(response.result().bytes(), equalTo(3L + 2 * 3L));
}
private void createDanglingIndex(final BlobStoreRepository repo, final Executor genericExec) throws Exception {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
genericExec.execute(new ActionRunnable<Void>(future) {
@Override
protected void doRun() throws Exception {
final BlobStore blobStore = repo.blobStore();
blobStore.blobContainer(repo.basePath().add("indices").add("foo"))
.writeBlob("bar", new ByteArrayInputStream(new byte[0]), 0, false);
.writeBlob("bar", new ByteArrayInputStream(new byte[3]), 3, false);
for (String prefix : Arrays.asList("snap-", "meta-")) {
blobStore.blobContainer(repo.basePath())
.writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[0]), 0, false);
.writeBlob(prefix + "foo.dat", new ByteArrayInputStream(new byte[3]), 3, false);
}
future.onResponse(null);
}
});
future.actionGet();
assertTrue(assertCorruptionVisible(repo, genericExec));
logger.info("--> deleting a snapshot to trigger repository cleanup");
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
assertConsistentRepository(repo, genericExec);
}
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception {