* Allow Deleting Multiple Snapshots at Once (#55474) Adds deleting multiple snapshots in one go without significantly changing the mechanics of snapshot deletes otherwise. This change does not yet allow mixing snapshot delete and abort. Abort is still only allowed for a single snapshot delete by exact name.
This commit is contained in:
parent
2061652988
commit
3a64ecb6bf
|
@ -176,7 +176,7 @@ final class SnapshotRequestConverters {
|
|||
static Request deleteSnapshot(DeleteSnapshotRequest deleteSnapshotRequest) {
|
||||
String endpoint = new RequestConverters.EndpointBuilder().addPathPartAsIs("_snapshot")
|
||||
.addPathPart(deleteSnapshotRequest.repository())
|
||||
.addPathPart(deleteSnapshotRequest.snapshot())
|
||||
.addCommaSeparatedPathParts(deleteSnapshotRequest.snapshots())
|
||||
.build();
|
||||
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
|
||||
|
||||
|
|
|
@ -269,7 +269,7 @@ public class SnapshotRequestConvertersTests extends ESTestCase {
|
|||
|
||||
DeleteSnapshotRequest deleteSnapshotRequest = new DeleteSnapshotRequest();
|
||||
deleteSnapshotRequest.repository(repository);
|
||||
deleteSnapshotRequest.snapshot(snapshot);
|
||||
deleteSnapshotRequest.snapshots(snapshot);
|
||||
RequestConvertersTests.setRandomMasterTimeout(deleteSnapshotRequest, expectedParams);
|
||||
|
||||
Request request = SnapshotRequestConverters.deleteSnapshot(deleteSnapshotRequest);
|
||||
|
|
|
@ -753,7 +753,7 @@ public class SnapshotClientDocumentationIT extends ESRestHighLevelClientTestCase
|
|||
|
||||
// tag::delete-snapshot-request
|
||||
DeleteSnapshotRequest request = new DeleteSnapshotRequest(repositoryName);
|
||||
request.snapshot(snapshotName);
|
||||
request.snapshots(snapshotName);
|
||||
// end::delete-snapshot-request
|
||||
|
||||
// tag::delete-snapshot-request-masterTimeout
|
||||
|
|
|
@ -193,6 +193,15 @@ created the snapshotting process will be aborted and all files created as part o
|
|||
cleaned. Therefore, the delete snapshot operation can be used to cancel long running snapshot operations that were
|
||||
started by mistake.
|
||||
|
||||
It is also possible to delete multiple snapshots from a repository in one go, for example:
|
||||
|
||||
[source,console]
|
||||
-----------------------------------
|
||||
DELETE /_snapshot/my_backup/my_backup,my_fs_backup
|
||||
DELETE /_snapshot/my_backup/snap*
|
||||
-----------------------------------
|
||||
// TEST[skip:no my_fs_backup]
|
||||
|
||||
A repository can be unregistered using the following command:
|
||||
|
||||
[source,console]
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotsService;
|
|||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -286,12 +287,12 @@ class S3Repository extends BlobStoreRepository {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
|
||||
listener = delayedListener(listener);
|
||||
}
|
||||
super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
|
||||
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
|
|||
import org.elasticsearch.action.support.master.MasterNodeRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -31,15 +32,14 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
/**
|
||||
* Delete snapshot request
|
||||
* <p>
|
||||
* Delete snapshot request removes the snapshot record from the repository and cleans up all
|
||||
* files that are associated with this particular snapshot. All files that are shared with
|
||||
* at least one other existing snapshot are left intact.
|
||||
* Delete snapshot request removes snapshots from the repository and cleans up all files that are associated with the snapshots.
|
||||
* All files that are shared with at least one other existing snapshot are left intact.
|
||||
*/
|
||||
public class DeleteSnapshotRequest extends MasterNodeRequest<DeleteSnapshotRequest> {
|
||||
|
||||
private String repository;
|
||||
|
||||
private String snapshot;
|
||||
private String[] snapshots;
|
||||
|
||||
/**
|
||||
* Constructs a new delete snapshots request
|
||||
|
@ -48,14 +48,14 @@ public class DeleteSnapshotRequest extends MasterNodeRequest<DeleteSnapshotReque
|
|||
}
|
||||
|
||||
/**
|
||||
* Constructs a new delete snapshots request with repository and snapshot name
|
||||
* Constructs a new delete snapshots request with repository and snapshot names
|
||||
*
|
||||
* @param repository repository name
|
||||
* @param snapshot snapshot name
|
||||
* @param snapshots snapshot names
|
||||
*/
|
||||
public DeleteSnapshotRequest(String repository, String snapshot) {
|
||||
public DeleteSnapshotRequest(String repository, String... snapshots) {
|
||||
this.repository = repository;
|
||||
this.snapshot = snapshot;
|
||||
this.snapshots = snapshots;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -70,14 +70,26 @@ public class DeleteSnapshotRequest extends MasterNodeRequest<DeleteSnapshotReque
|
|||
public DeleteSnapshotRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
repository = in.readString();
|
||||
snapshot = in.readString();
|
||||
if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) {
|
||||
snapshots = in.readStringArray();
|
||||
} else {
|
||||
snapshots = new String[] {in.readString()};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(repository);
|
||||
out.writeString(snapshot);
|
||||
if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) {
|
||||
out.writeStringArray(snapshots);
|
||||
} else {
|
||||
if (snapshots.length != 1) {
|
||||
throw new IllegalArgumentException(
|
||||
"Can't write snapshot delete with more than one snapshot to version [" + out.getVersion() + "]");
|
||||
}
|
||||
out.writeString(snapshots[0]);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -86,8 +98,8 @@ public class DeleteSnapshotRequest extends MasterNodeRequest<DeleteSnapshotReque
|
|||
if (repository == null) {
|
||||
validationException = addValidationError("repository is missing", validationException);
|
||||
}
|
||||
if (snapshot == null) {
|
||||
validationException = addValidationError("snapshot is missing", validationException);
|
||||
if (snapshots == null || snapshots.length == 0) {
|
||||
validationException = addValidationError("snapshots are missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
@ -108,21 +120,21 @@ public class DeleteSnapshotRequest extends MasterNodeRequest<DeleteSnapshotReque
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns repository name
|
||||
* Returns snapshot names
|
||||
*
|
||||
* @return repository name
|
||||
* @return snapshot names
|
||||
*/
|
||||
public String snapshot() {
|
||||
return this.snapshot;
|
||||
public String[] snapshots() {
|
||||
return this.snapshots;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets snapshot name
|
||||
* Sets snapshot names
|
||||
*
|
||||
* @return this request
|
||||
*/
|
||||
public DeleteSnapshotRequest snapshot(String snapshot) {
|
||||
this.snapshot = snapshot;
|
||||
public DeleteSnapshotRequest snapshots(String... snapshots) {
|
||||
this.snapshots = snapshots;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,8 +39,8 @@ public class DeleteSnapshotRequestBuilder extends MasterNodeOperationRequestBuil
|
|||
/**
|
||||
* Constructs delete snapshot request builder with specified repository and snapshot names
|
||||
*/
|
||||
public DeleteSnapshotRequestBuilder(ElasticsearchClient client, DeleteSnapshotAction action, String repository, String snapshot) {
|
||||
super(client, action, new DeleteSnapshotRequest(repository, snapshot));
|
||||
public DeleteSnapshotRequestBuilder(ElasticsearchClient client, DeleteSnapshotAction action, String repository, String... snapshots) {
|
||||
super(client, action, new DeleteSnapshotRequest(repository, snapshots));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -57,11 +57,11 @@ public class DeleteSnapshotRequestBuilder extends MasterNodeOperationRequestBuil
|
|||
/**
|
||||
* Sets the snapshot name
|
||||
*
|
||||
* @param snapshot snapshot name
|
||||
* @param snapshots snapshot names
|
||||
* @return this builder
|
||||
*/
|
||||
public DeleteSnapshotRequestBuilder setSnapshot(String snapshot) {
|
||||
request.snapshot(snapshot);
|
||||
public DeleteSnapshotRequestBuilder setSnapshots(String... snapshots) {
|
||||
request.snapshots(snapshots);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* Transport action for delete snapshot operation
|
||||
|
@ -70,7 +71,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
|
|||
@Override
|
||||
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(),
|
||||
snapshotsService.deleteSnapshots(request.repository(), Arrays.asList(request.snapshots()),
|
||||
ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -529,7 +529,7 @@ public interface ClusterAdminClient extends ElasticsearchClient {
|
|||
/**
|
||||
* Delete snapshot.
|
||||
*/
|
||||
DeleteSnapshotRequestBuilder prepareDeleteSnapshot(String repository, String snapshot);
|
||||
DeleteSnapshotRequestBuilder prepareDeleteSnapshot(String repository, String... snapshot);
|
||||
|
||||
/**
|
||||
* Restores a snapshot.
|
||||
|
|
|
@ -526,14 +526,14 @@ public class Requests {
|
|||
}
|
||||
|
||||
/**
|
||||
* Deletes a snapshot
|
||||
* Deletes snapshots
|
||||
*
|
||||
* @param snapshot snapshot name
|
||||
* @param snapshots snapshot names
|
||||
* @param repository repository name
|
||||
* @return delete snapshot request
|
||||
*/
|
||||
public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, String snapshot) {
|
||||
return new DeleteSnapshotRequest(repository, snapshot);
|
||||
public static DeleteSnapshotRequest deleteSnapshotRequest(String repository, String... snapshots) {
|
||||
return new DeleteSnapshotRequest(repository, snapshots);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -976,8 +976,8 @@ public abstract class AbstractClient implements Client {
|
|||
}
|
||||
|
||||
@Override
|
||||
public DeleteSnapshotRequestBuilder prepareDeleteSnapshot(String repository, String name) {
|
||||
return new DeleteSnapshotRequestBuilder(this, DeleteSnapshotAction.INSTANCE, repository, name);
|
||||
public DeleteSnapshotRequestBuilder prepareDeleteSnapshot(String repository, String... names) {
|
||||
return new DeleteSnapshotRequestBuilder(this, DeleteSnapshotAction.INSTANCE, repository, names);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -29,10 +29,13 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryOperation;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
|
@ -140,8 +143,12 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
for (Entry entry : entries) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("repository", entry.snapshot.getRepository());
|
||||
builder.field("snapshot", entry.snapshot.getSnapshotId().getName());
|
||||
builder.field("repository", entry.repository());
|
||||
builder.startArray("snapshots");
|
||||
for (SnapshotId snapshot : entry.snapshots) {
|
||||
builder.value(snapshot.getName());
|
||||
}
|
||||
builder.endArray();
|
||||
builder.humanReadableField("start_time_millis", "start_time", new TimeValue(entry.startTime));
|
||||
builder.field("repository_state_id", entry.repositoryStateId);
|
||||
}
|
||||
|
@ -155,7 +162,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder("SnapshotDeletionsInProgress[");
|
||||
for (int i = 0; i < entries.size(); i++) {
|
||||
builder.append(entries.get(i).getSnapshot().getSnapshotId().getName());
|
||||
builder.append(entries.get(i).getSnapshots());
|
||||
if (i + 1 < entries.size()) {
|
||||
builder.append(",");
|
||||
}
|
||||
|
@ -167,12 +174,15 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
* A class representing a snapshot deletion request entry in the cluster state.
|
||||
*/
|
||||
public static final class Entry implements Writeable, RepositoryOperation {
|
||||
private final Snapshot snapshot;
|
||||
private final List<SnapshotId> snapshots;
|
||||
private final String repoName;
|
||||
private final long startTime;
|
||||
private final long repositoryStateId;
|
||||
|
||||
public Entry(Snapshot snapshot, long startTime, long repositoryStateId) {
|
||||
this.snapshot = snapshot;
|
||||
public Entry(List<SnapshotId> snapshots, String repoName, long startTime, long repositoryStateId) {
|
||||
this.snapshots = snapshots;
|
||||
assert snapshots.size() == new HashSet<>(snapshots).size() : "Duplicate snapshot ids in " + snapshots;
|
||||
this.repoName = repoName;
|
||||
this.startTime = startTime;
|
||||
this.repositoryStateId = repositoryStateId;
|
||||
assert repositoryStateId > RepositoryData.EMPTY_REPO_GEN :
|
||||
|
@ -180,16 +190,20 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
}
|
||||
|
||||
public Entry(StreamInput in) throws IOException {
|
||||
this.snapshot = new Snapshot(in);
|
||||
if (in.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) {
|
||||
this.repoName = in.readString();
|
||||
this.snapshots = in.readList(SnapshotId::new);
|
||||
} else {
|
||||
final Snapshot snapshot = new Snapshot(in);
|
||||
this.snapshots = Collections.singletonList(snapshot.getSnapshotId());
|
||||
this.repoName = snapshot.getRepository();
|
||||
}
|
||||
this.startTime = in.readVLong();
|
||||
this.repositoryStateId = in.readLong();
|
||||
}
|
||||
|
||||
/**
|
||||
* The snapshot to delete.
|
||||
*/
|
||||
public Snapshot getSnapshot() {
|
||||
return snapshot;
|
||||
public List<SnapshotId> getSnapshots() {
|
||||
return snapshots;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -208,26 +222,34 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
|
|||
return false;
|
||||
}
|
||||
Entry that = (Entry) o;
|
||||
return snapshot.equals(that.snapshot)
|
||||
return repoName.equals(that.repoName)
|
||||
&& snapshots.equals(that.snapshots)
|
||||
&& startTime == that.startTime
|
||||
&& repositoryStateId == that.repositoryStateId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(snapshot, startTime, repositoryStateId);
|
||||
return Objects.hash(snapshots, repoName, startTime, repositoryStateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
snapshot.writeTo(out);
|
||||
if (out.getVersion().onOrAfter(SnapshotsService.MULTI_DELETE_VERSION)) {
|
||||
out.writeString(repoName);
|
||||
out.writeCollection(snapshots);
|
||||
} else {
|
||||
assert snapshots.size() == 1 : "Only single deletion allowed in mixed version cluster containing [" + out.getVersion() +
|
||||
"] but saw " + snapshots;
|
||||
new Snapshot(repoName, snapshots.get(0)).writeTo(out);
|
||||
}
|
||||
out.writeVLong(startTime);
|
||||
out.writeLong(repositoryStateId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String repository() {
|
||||
return snapshot.getRepository();
|
||||
return repoName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.snapshots.SnapshotInfo;
|
|||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
@ -92,9 +93,9 @@ public class FilterRepository implements Repository {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
|
||||
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -487,7 +487,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
|
|||
SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null) {
|
||||
for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) {
|
||||
if (entry.getSnapshot().getRepository().equals(repository)) {
|
||||
if (entry.repository().equals(repository)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotInfo;
|
|||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
@ -153,15 +154,15 @@ public interface Repository extends LifecycleComponent {
|
|||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
|
||||
|
||||
/**
|
||||
* Deletes snapshot
|
||||
* Deletes snapshots
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param snapshotIds snapshot ids
|
||||
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
|
||||
* @param repositoryMetaVersion version of the updated repository metadata to write
|
||||
* @param listener completion listener
|
||||
*/
|
||||
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, ActionListener<Void> listener);
|
||||
|
||||
void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener);
|
||||
/**
|
||||
* Returns snapshot throttle time in nanoseconds
|
||||
*/
|
||||
|
|
|
@ -179,14 +179,23 @@ public final class RepositoryData {
|
|||
* Returns the list of {@link IndexId} that have their snapshots updated but not removed (because they are still referenced by other
|
||||
* snapshots) after removing the given snapshot from the repository.
|
||||
*
|
||||
* @param snapshotId SnapshotId to remove
|
||||
* @param snapshotIds SnapshotId to remove
|
||||
* @return List of indices that are changed but not removed
|
||||
*/
|
||||
public List<IndexId> indicesToUpdateAfterRemovingSnapshot(SnapshotId snapshotId) {
|
||||
public List<IndexId> indicesToUpdateAfterRemovingSnapshot(Collection<SnapshotId> snapshotIds) {
|
||||
return indexSnapshots.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().size() > 1 && entry.getValue().contains(snapshotId))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toList());
|
||||
.filter(entry -> {
|
||||
final Collection<SnapshotId> existingIds = entry.getValue();
|
||||
if (snapshotIds.containsAll(existingIds)) {
|
||||
return existingIds.size() > snapshotIds.size();
|
||||
}
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
if (entry.getValue().contains(snapshotId)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}).map(Map.Entry::getKey).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -244,44 +253,41 @@ public final class RepositoryData {
|
|||
}
|
||||
|
||||
/**
|
||||
* Remove a snapshot and remove any indices that no longer exist in the repository due to the deletion of the snapshot.
|
||||
* Remove snapshots and remove any indices that no longer exist in the repository due to the deletion of the snapshots.
|
||||
*
|
||||
* @param snapshotId Snapshot Id
|
||||
* @param snapshots Snapshot ids to remove
|
||||
* @param updatedShardGenerations Shard generations that changed as a result of removing the snapshot.
|
||||
* The {@code String[]} passed for each {@link IndexId} contains the new shard generation id for each
|
||||
* changed shard indexed by its shardId
|
||||
*/
|
||||
public RepositoryData removeSnapshot(final SnapshotId snapshotId, final ShardGenerations updatedShardGenerations) {
|
||||
Map<String, SnapshotId> newSnapshotIds = snapshotIds.values().stream()
|
||||
.filter(id -> !snapshotId.equals(id))
|
||||
public RepositoryData removeSnapshots(final Collection<SnapshotId> snapshots, final ShardGenerations updatedShardGenerations) {
|
||||
Map<String, SnapshotId> newSnapshotIds = snapshotIds.values().stream().filter(sn -> snapshots.contains(sn) == false)
|
||||
.collect(Collectors.toMap(SnapshotId::getUUID, Function.identity()));
|
||||
if (newSnapshotIds.size() == snapshotIds.size()) {
|
||||
throw new ResourceNotFoundException("Attempting to remove non-existent snapshot [{}] from repository data", snapshotId);
|
||||
if (newSnapshotIds.size() != snapshotIds.size() - snapshots.size()) {
|
||||
final Collection<SnapshotId> notFound = new HashSet<>(snapshots);
|
||||
notFound.removeAll(snapshotIds.values());
|
||||
throw new ResourceNotFoundException("Attempting to remove non-existent snapshots {} from repository data", notFound);
|
||||
}
|
||||
Map<String, SnapshotState> newSnapshotStates = new HashMap<>(snapshotStates);
|
||||
newSnapshotStates.remove(snapshotId.getUUID());
|
||||
final Map<String, Version> newSnapshotVersions = new HashMap<>(snapshotVersions);
|
||||
for (SnapshotId snapshotId : snapshots) {
|
||||
newSnapshotStates.remove(snapshotId.getUUID());
|
||||
newSnapshotVersions.remove(snapshotId.getUUID());
|
||||
}
|
||||
Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
|
||||
for (final IndexId indexId : indices.values()) {
|
||||
List<SnapshotId> remaining;
|
||||
List<SnapshotId> snapshotIds = this.indexSnapshots.get(indexId);
|
||||
assert snapshotIds != null;
|
||||
final int listIndex = snapshotIds.indexOf(snapshotId);
|
||||
if (listIndex > -1) {
|
||||
if (snapshotIds.size() == 1) {
|
||||
// removing the snapshot will mean no more snapshots
|
||||
// have this index, so just skip over it
|
||||
continue;
|
||||
}
|
||||
remaining = new ArrayList<>(snapshotIds);
|
||||
remaining.remove(listIndex);
|
||||
List<SnapshotId> remaining = new ArrayList<>(snapshotIds);
|
||||
if (remaining.removeAll(snapshots)) {
|
||||
remaining = Collections.unmodifiableList(remaining);
|
||||
} else {
|
||||
remaining = snapshotIds;
|
||||
}
|
||||
if (remaining.isEmpty() == false) {
|
||||
indexSnapshots.put(indexId, remaining);
|
||||
}
|
||||
}
|
||||
|
||||
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
|
||||
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
|
||||
|
|
|
@ -83,11 +83,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
|
||||
|
@ -108,8 +106,6 @@ import org.elasticsearch.repositories.RepositoryStats;
|
|||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.snapshots.SnapshotCreationException;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
|
@ -512,28 +508,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
} else {
|
||||
final long latestKnownGen = latestKnownRepoGen.get();
|
||||
if (latestKnownGen > repositoryStateId) {
|
||||
listener.onFailure(new ConcurrentSnapshotExecutionException(
|
||||
new Snapshot(metadata.name(), snapshotId), "Another concurrent operation moved repo generation to [ " + latestKnownGen
|
||||
+ "] but this delete assumed generation [" + repositoryStateId + "]"));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
||||
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
||||
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -577,7 +566,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
|
||||
* and then has all now unreferenced blobs in it deleted.
|
||||
*
|
||||
* @param snapshotId SnapshotId to delete
|
||||
* @param snapshotIds SnapshotIds to delete
|
||||
* @param repositoryStateId Expected repository state id
|
||||
* @param foundIndices All indices folders found in the repository before executing any writes to the repository during this
|
||||
* delete operation
|
||||
|
@ -586,28 +575,28 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* @param repositoryData RepositoryData found the in the repository before executing this delete
|
||||
* @param listener Listener to invoke once finished
|
||||
*/
|
||||
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
||||
private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
||||
Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
|
||||
ActionListener<Void> listener) {
|
||||
|
||||
if (writeShardGens) {
|
||||
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetadataAndComputeDeletesStep = new StepListener<>();
|
||||
writeUpdatedShardMetadataAndComputeDeletes(snapshotId, repositoryData, true, writeShardMetadataAndComputeDeletesStep);
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
|
||||
// Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
|
||||
// 1. Remove the snapshot from the list of existing snapshots
|
||||
// 1. Remove the snapshots from the list of existing snapshots
|
||||
// 2. Update the index shard generations of all updated shard folders
|
||||
//
|
||||
// Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
|
||||
// index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
|
||||
// written if all shard paths have been successfully updated.
|
||||
final StepListener<RepositoryData> writeUpdatedRepoDataStep = new StepListener<>();
|
||||
writeShardMetadataAndComputeDeletesStep.whenComplete(deleteResults -> {
|
||||
writeShardMetaDataAndComputeDeletesStep.whenComplete(deleteResults -> {
|
||||
final ShardGenerations.Builder builder = ShardGenerations.builder();
|
||||
for (ShardSnapshotMetaDeleteResult newGen : deleteResults) {
|
||||
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
||||
}
|
||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
|
||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build());
|
||||
writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
|
||||
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
|
||||
}, listener::onFailure);
|
||||
|
@ -617,20 +606,20 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final ActionListener<Void> afterCleanupsListener =
|
||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
||||
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, writeShardMetadataAndComputeDeletesStep.result(), afterCleanupsListener);
|
||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
|
||||
}, listener::onFailure);
|
||||
} else {
|
||||
// Write the new repository data first (with the removed snapshot), using no shard generations
|
||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
|
||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
|
||||
writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
final ActionListener<Void> afterCleanupsListener =
|
||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
||||
asyncCleanupUnlinkedRootAndIndicesBlobs(foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
||||
writeUpdatedShardMetadataAndComputeDeletes(snapshotId, repositoryData, false, writeMetaAndComputeDeletesStep);
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
||||
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
|
||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotId, deleteResults, afterCleanupsListener),
|
||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener),
|
||||
afterCleanupsListener::onFailure);
|
||||
}, listener::onFailure));
|
||||
}
|
||||
|
@ -643,17 +632,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
|
||||
}
|
||||
|
||||
private void asyncCleanupUnlinkedShardLevelBlobs(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
||||
private void asyncCleanupUnlinkedShardLevelBlobs(Collection<SnapshotId> snapshotIds,
|
||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
||||
ActionListener<Void> listener) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
||||
listener,
|
||||
l -> {
|
||||
try {
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotId, deleteResults));
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(resolveFilesToDelete(snapshotIds, deleteResults));
|
||||
l.onResponse(null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId),
|
||||
() -> new ParameterizedMessage("{} Failed to delete some blobs during snapshot delete", snapshotIds),
|
||||
e);
|
||||
throw e;
|
||||
}
|
||||
|
@ -661,11 +651,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
|
||||
private void writeUpdatedShardMetadataAndComputeDeletes(SnapshotId snapshotId, RepositoryData oldRepositoryData,
|
||||
private void writeUpdatedShardMetaDataAndComputeDeletes(Collection<SnapshotId> snapshotIds, RepositoryData oldRepositoryData,
|
||||
boolean useUUIDs, ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted) {
|
||||
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId);
|
||||
final List<IndexId> indices = oldRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);
|
||||
|
||||
if (indices.isEmpty()) {
|
||||
onAllShardsCompleted.onResponse(Collections.emptyList());
|
||||
|
@ -679,66 +669,72 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
for (IndexId indexId : indices) {
|
||||
final Set<SnapshotId> survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
|
||||
.filter(id -> id.equals(snapshotId) == false).collect(Collectors.toSet());
|
||||
executor.execute(ActionRunnable.wrap(deleteIndexMetadataListener, deleteIdxMetaListener -> {
|
||||
final IndexMetadata indexMetadata;
|
||||
.filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet());
|
||||
final StepListener<Collection<Integer>> shardCountListener = new StepListener<>();
|
||||
final ActionListener<Integer> allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size());
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
||||
try {
|
||||
indexMetadata = getSnapshotIndexMetadata(snapshotId, indexId);
|
||||
return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards();
|
||||
} catch (Exception ex) {
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
||||
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
||||
// by the stale data cleanup in the end.
|
||||
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just ignoring
|
||||
// it and letting the cleanup deal with it.
|
||||
deleteIdxMetaListener.onResponse(null);
|
||||
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
||||
// ignoring it and letting the cleanup deal with it.
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
}
|
||||
shardCountListener.whenComplete(counts -> {
|
||||
final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0);
|
||||
if (shardCount == 0) {
|
||||
deleteIndexMetadataListener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
final int shardCount = indexMetadata.getNumberOfShards();
|
||||
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
|
||||
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
|
||||
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
|
||||
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
|
||||
final Index index = indexMetadata.getIndex();
|
||||
for (int shardId = 0; shardId < indexMetadata.getNumberOfShards(); shardId++) {
|
||||
final ShardId shard = new ShardId(index, shardId);
|
||||
new GroupedActionListener<>(deleteIndexMetadataListener, shardCount);
|
||||
for (int shardId = 0; shardId < shardCount; shardId++) {
|
||||
final int finalShardId = shardId;
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobContainer shardContainer = shardContainer(indexId, shard);
|
||||
final Set<String> blobs = getShardBlobs(shard, shardContainer);
|
||||
final BlobContainer shardContainer = shardContainer(indexId, finalShardId);
|
||||
final Set<String> blobs = shardContainer.listBlobs().keySet();
|
||||
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
|
||||
final String newGen;
|
||||
if (useUUIDs) {
|
||||
newGen = UUIDs.randomBase64UUID();
|
||||
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(blobs, shardContainer,
|
||||
oldRepositoryData.shardGenerations().getShardGen(indexId, shard.getId())).v1();
|
||||
oldRepositoryData.shardGenerations().getShardGen(indexId, finalShardId)).v1();
|
||||
} else {
|
||||
Tuple<BlobStoreIndexShardSnapshots, Long> tuple =
|
||||
buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
|
||||
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
|
||||
newGen = Long.toString(tuple.v2() + 1);
|
||||
blobStoreIndexShardSnapshots = tuple.v1();
|
||||
}
|
||||
allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, shard, snapshotId,
|
||||
shardContainer, blobs, blobStoreIndexShardSnapshots, newGen));
|
||||
allShardsListener.onResponse(deleteFromShardSnapshotMeta(survivingSnapshots, indexId, finalShardId,
|
||||
snapshotIds, shardContainer, blobs, blobStoreIndexShardSnapshots, newGen));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
logger.warn(
|
||||
() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
||||
snapshotId, indexId.getName(), shard.id()), ex);
|
||||
() -> new ParameterizedMessage("{} failed to delete shard data for shard [{}][{}]",
|
||||
snapshotIds, indexId.getName(), finalShardId), ex);
|
||||
// Just passing null here to count down the listener instead of failing it, the stale data left behind
|
||||
// here will be retried in the next delete or repository cleanup
|
||||
allShardsListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}));
|
||||
}, deleteIndexMetadataListener::onFailure);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> resolveFilesToDelete(SnapshotId snapshotId, Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
||||
private List<String> resolveFilesToDelete(Collection<SnapshotId> snapshotIds,
|
||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
||||
final String basePath = basePath().buildAsString();
|
||||
final int basePathLen = basePath.length();
|
||||
return Stream.concat(
|
||||
|
@ -747,8 +743,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
||||
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
||||
}),
|
||||
deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
|
||||
indexContainer(indexId).path().buildAsString() + globalMetadataFormat.blobName(snapshotId.getUUID()))
|
||||
deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> {
|
||||
final String indexContainerPath = indexContainer(indexId).path().buildAsString();
|
||||
return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID()));
|
||||
})
|
||||
).map(absolutePath -> {
|
||||
assert absolutePath.startsWith(basePath);
|
||||
return absolutePath.substring(basePathLen);
|
||||
|
@ -1940,9 +1938,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* Delete snapshot from shard level metadata.
|
||||
*/
|
||||
private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(Set<SnapshotId> survivingSnapshots, IndexId indexId,
|
||||
ShardId snapshotShardId, SnapshotId snapshotId,
|
||||
int snapshotShardId, Collection<SnapshotId> snapshotIds,
|
||||
BlobContainer shardContainer, Set<String> blobs,
|
||||
BlobStoreIndexShardSnapshots snapshots, String indexGeneration) {
|
||||
BlobStoreIndexShardSnapshots snapshots,
|
||||
String indexGeneration) {
|
||||
// Build a list of snapshots that should be preserved
|
||||
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
||||
final Set<String> survivingSnapshotNames = survivingSnapshots.stream().map(SnapshotId::getName).collect(Collectors.toSet());
|
||||
|
@ -1953,18 +1952,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
try {
|
||||
if (newSnapshotsList.isEmpty()) {
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), ShardGenerations.DELETED_SHARD_GEN, blobs);
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, ShardGenerations.DELETED_SHARD_GEN, blobs);
|
||||
} else {
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
||||
writeShardIndexBlob(shardContainer, indexGeneration, updatedSnapshots);
|
||||
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), indexGeneration,
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId, indexGeneration,
|
||||
unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(snapshotShardId,
|
||||
"Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
|
||||
+ indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
||||
throw new RepositoryException(metadata.name(), "Failed to finalize snapshot deletion " + snapshotIds +
|
||||
" with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1975,16 +1973,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
||||
}
|
||||
|
||||
private static Set<String> getShardBlobs(final ShardId snapshotShardId, final BlobContainer shardContainer) {
|
||||
final Set<String> blobs;
|
||||
try {
|
||||
blobs = shardContainer.listBlobs().keySet();
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
|
||||
}
|
||||
return blobs;
|
||||
}
|
||||
|
||||
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
|
||||
// temporary blobs
|
||||
private static List<String> unusedBlobs(Set<String> blobs, Set<String> survivingSnapshotUUIDs,
|
||||
|
|
|
@ -215,7 +215,7 @@
|
|||
* <h2>Deleting a Snapshot</h2>
|
||||
*
|
||||
* <p>Deleting a snapshot is an operation that is exclusively executed on the master node that runs through the following sequence of
|
||||
* action when {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#deleteSnapshot} is invoked:</p>
|
||||
* action when {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#deleteSnapshots} is invoked:</p>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.</li>
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.cluster;
|
|||
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
|
@ -49,7 +50,8 @@ public class RestDeleteSnapshotAction extends BaseRestHandler {
|
|||
|
||||
@Override
|
||||
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
|
||||
DeleteSnapshotRequest deleteSnapshotRequest = deleteSnapshotRequest(request.param("repository"), request.param("snapshot"));
|
||||
DeleteSnapshotRequest deleteSnapshotRequest = deleteSnapshotRequest(request.param("repository"),
|
||||
Strings.splitStringByCommaToArray(request.param("snapshot")));
|
||||
deleteSnapshotRequest.masterNodeTimeout(request.paramAsTime("master_timeout", deleteSnapshotRequest.masterNodeTimeout()));
|
||||
return channel -> client.admin().cluster().deleteSnapshot(deleteSnapshotRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
|
|
@ -235,10 +235,10 @@ public class RestoreService implements ClusterStateApplier {
|
|||
// Check if the snapshot to restore is currently being deleted
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null
|
||||
&& deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshot().equals(snapshot))) {
|
||||
&& deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId))) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
|
||||
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
|
||||
deletionsInProgress.getEntries().get(0) + "]");
|
||||
}
|
||||
|
||||
// Updating cluster state
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.common.UUIDs;
|
|||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
|
@ -84,10 +85,10 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
|
@ -111,6 +112,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0;
|
||||
|
||||
public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0;
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
@ -624,7 +627,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
|
||||
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
|
||||
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), state.nodes().getMinNodeVersion());
|
||||
deleteSnapshotsFromRepository(entry.repository(), entry.getSnapshots(), null, entry.repositoryStateId(),
|
||||
state.nodes().getMinNodeVersion());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -992,17 +996,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
|
||||
/**
|
||||
* Deletes a snapshot from the repository or aborts a running snapshot.
|
||||
* First checks if the snapshot is still running and if so cancels the snapshot and then deletes it from the repository.
|
||||
* If the snapshot is not running, moves to trying to find a matching {@link Snapshot} for the given name in the repository and if
|
||||
* one is found deletes it by invoking {@link #deleteCompletedSnapshot}.
|
||||
* Deletes snapshots from the repository or aborts a running snapshot.
|
||||
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
|
||||
* the repository.
|
||||
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
|
||||
* given names in the repository and deletes them by invoking {@link #deleteCompletedSnapshots}.
|
||||
*
|
||||
* @param repositoryName repositoryName
|
||||
* @param snapshotName snapshotName
|
||||
* @param snapshotNames snapshotNames
|
||||
* @param listener listener
|
||||
*/
|
||||
public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener<Void> listener) {
|
||||
logger.info("deleting snapshot [{}] from repository [{}]", snapshotName, repositoryName);
|
||||
public void deleteSnapshots(final String repositoryName, final Collection<String> snapshotNames, final ActionListener<Void> listener) {
|
||||
logger.info("deleting snapshots {} from repository [{}]", snapshotNames, repositoryName);
|
||||
|
||||
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
|
||||
|
||||
|
@ -1012,8 +1017,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
if (snapshotNames.size() > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
|
||||
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
|
||||
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
|
||||
+ "]");
|
||||
}
|
||||
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
|
||||
final SnapshotsInProgress.Entry snapshotEntry;
|
||||
if (snapshotNames.size() == 1) {
|
||||
final String snapshotName = snapshotNames.iterator().next();
|
||||
if (Regex.isSimpleMatchPattern(snapshotName)) {
|
||||
snapshotEntry = null;
|
||||
} else {
|
||||
snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
|
||||
}
|
||||
} else {
|
||||
snapshotEntry = null;
|
||||
}
|
||||
if (snapshotEntry == null) {
|
||||
return currentState;
|
||||
}
|
||||
|
@ -1082,29 +1102,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (runningSnapshot == null) {
|
||||
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
|
||||
repositoriesService.repository(repositoryName).getRepositoryData(
|
||||
ActionListener.wrap(repositoryData -> {
|
||||
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
|
||||
.stream()
|
||||
.filter(s -> s.getName().equals(snapshotName))
|
||||
.findFirst();
|
||||
// If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in
|
||||
// the repository is not the one we expected to find when waiting for a finishing snapshot we fail.
|
||||
if (matchedEntry.isPresent()) {
|
||||
deleteCompletedSnapshot(
|
||||
new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), Priority.NORMAL, l);
|
||||
} else {
|
||||
l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
|
||||
}
|
||||
}, l::onFailure))));
|
||||
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData ->
|
||||
deleteCompletedSnapshots(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName),
|
||||
repositoryName, repositoryData.getGenId(), Priority.NORMAL, l), l::onFailure))));
|
||||
return;
|
||||
}
|
||||
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
||||
addListener(runningSnapshot, ActionListener.wrap(
|
||||
result -> {
|
||||
logger.debug("deleted snapshot completed - deleting files");
|
||||
deleteCompletedSnapshot(
|
||||
new Snapshot(repositoryName, result.v2().snapshotId()), result.v1().getGenId(), Priority.IMMEDIATE, listener);
|
||||
deleteCompletedSnapshots(Collections.singletonList(result.v2().snapshotId()), repositoryName,
|
||||
result.v1().getGenId(), Priority.IMMEDIATE, listener);
|
||||
},
|
||||
e -> {
|
||||
if (abortedDuringInit) {
|
||||
|
@ -1128,6 +1136,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
});
|
||||
}
|
||||
|
||||
private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, Collection<String> snapshotsOrPatterns,
|
||||
String repositoryName) {
|
||||
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
|
||||
Collectors.toMap(SnapshotId::getName, Function.identity()));
|
||||
final Set<SnapshotId> foundSnapshots = new HashSet<>();
|
||||
for (String snapshotOrPattern : snapshotsOrPatterns) {
|
||||
if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
|
||||
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
|
||||
if (foundId == null) {
|
||||
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
|
||||
} else {
|
||||
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
|
||||
}
|
||||
} else {
|
||||
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
|
||||
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
|
||||
foundSnapshots.add(entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
|
||||
}
|
||||
|
||||
// Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
|
||||
@Nullable
|
||||
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName,
|
||||
|
@ -1147,27 +1179,33 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
|
||||
/**
|
||||
* Deletes a snapshot that is assumed to be in the repository and not tracked as in-progress in the cluster state.
|
||||
* Deletes snapshots that are assumed to be in the repository and not tracked as in-progress in the cluster state.
|
||||
*
|
||||
* @param snapshot Snapshot to delete
|
||||
* @param snapshotIds Snapshots to delete
|
||||
* @param repoName Repository name
|
||||
* @param repositoryStateId Repository generation to base the delete on
|
||||
* @param listener Listener to complete when done
|
||||
*/
|
||||
private void deleteCompletedSnapshot(Snapshot snapshot, long repositoryStateId, Priority priority, ActionListener<Void> listener) {
|
||||
logger.debug("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", snapshot, repositoryStateId,
|
||||
private void deleteCompletedSnapshots(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId, Priority priority,
|
||||
ActionListener<Void> listener) {
|
||||
if (snapshotIds.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
logger.debug("deleting snapshots {} assuming repository generation [{}] and with priority [{}]", snapshotIds, repositoryStateId,
|
||||
priority);
|
||||
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotIds.get(0)),
|
||||
"cannot delete - another snapshot is currently being deleted in [" + deletionsInProgress + "]");
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot.getRepository(), snapshot.getSnapshotId().getName(),
|
||||
"cannot delete snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotIds.get(0)),
|
||||
"cannot delete snapshots while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
}
|
||||
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
|
||||
if (restoreInProgress != null) {
|
||||
|
@ -1176,8 +1214,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// and the files the restore depends on would all be gone
|
||||
|
||||
for (RestoreInProgress.Entry entry : restoreInProgress) {
|
||||
if (entry.snapshot().equals(snapshot)) {
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot,
|
||||
if (repoName.equals(entry.snapshot().getRepository()) && snapshotIds.contains(entry.snapshot().getSnapshotId())) {
|
||||
throw new ConcurrentSnapshotExecutionException(new Snapshot(repoName, snapshotIds.get(0)),
|
||||
"cannot delete snapshot during a restore in progress in [" + restoreInProgress + "]");
|
||||
}
|
||||
}
|
||||
|
@ -1185,11 +1223,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null && snapshots.entries().isEmpty() == false) {
|
||||
// However other snapshots are running - cannot continue
|
||||
throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete");
|
||||
throw new ConcurrentSnapshotExecutionException(
|
||||
repoName, snapshotIds.toString(), "another snapshot is currently running cannot delete");
|
||||
}
|
||||
// add the snapshot deletion to the cluster state
|
||||
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
|
||||
snapshot,
|
||||
snapshotIds,
|
||||
repoName,
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
repositoryStateId
|
||||
);
|
||||
|
@ -1208,7 +1248,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
|
||||
deleteSnapshotsFromRepository(repoName, snapshotIds, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -1226,12 +1266,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @return minimum node version that must still be able to read the repository metadata
|
||||
*/
|
||||
public Version minCompatibleVersion(Version minNodeVersion, String repositoryName, RepositoryData repositoryData,
|
||||
@Nullable SnapshotId excluded) {
|
||||
@Nullable Collection<SnapshotId> excluded) {
|
||||
Version minCompatVersion = minNodeVersion;
|
||||
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
for (SnapshotId snapshotId :
|
||||
snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).collect(Collectors.toList())) {
|
||||
for (SnapshotId snapshotId : snapshotIds.stream().filter(excluded == null ? sn -> true : sn -> excluded.contains(sn) == false)
|
||||
.collect(Collectors.toList())) {
|
||||
final Version known = repositoryData.getVersion(snapshotId);
|
||||
// If we don't have the version cached in the repository data yet we load it from the snapshot info blobs
|
||||
if (known == null) {
|
||||
|
@ -1270,30 +1310,31 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
/**
|
||||
* Deletes snapshot from repository
|
||||
*
|
||||
* @param snapshot snapshot
|
||||
* @param repoName repository name
|
||||
* @param snapshotIds snapshot ids
|
||||
* @param listener listener
|
||||
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
|
||||
* @param minNodeVersion minimum node version in the cluster
|
||||
*/
|
||||
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId,
|
||||
Version minNodeVersion) {
|
||||
private void deleteSnapshotsFromRepository(String repoName, Collection<SnapshotId> snapshotIds, @Nullable ActionListener<Void> listener,
|
||||
long repositoryStateId, Version minNodeVersion) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
|
||||
Repository repository = repositoriesService.repository(snapshot.getRepository());
|
||||
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(),
|
||||
Repository repository = repositoriesService.repository(repoName);
|
||||
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshots(snapshotIds,
|
||||
repositoryStateId,
|
||||
minCompatibleVersion(minNodeVersion, snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()),
|
||||
minCompatibleVersion(minNodeVersion, repoName, repositoryData, snapshotIds),
|
||||
ActionListener.wrap(v -> {
|
||||
logger.info("snapshot [{}] deleted", snapshot);
|
||||
removeSnapshotDeletionFromClusterState(snapshot, null, l);
|
||||
}, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
|
||||
)), ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)));
|
||||
logger.info("snapshots {} deleted", snapshotIds);
|
||||
removeSnapshotDeletionFromClusterState(snapshotIds, null, l);
|
||||
}, ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)
|
||||
)), ex -> removeSnapshotDeletionFromClusterState(snapshotIds, ex, l)));
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state.
|
||||
*/
|
||||
private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure,
|
||||
private void removeSnapshotDeletionFromClusterState(final Collection<SnapshotId> snapshotIds, @Nullable final Exception failure,
|
||||
@Nullable final ActionListener<Void> listener) {
|
||||
clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
@ -1316,7 +1357,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot deletion metadata", snapshot), e);
|
||||
logger.warn(() -> new ParameterizedMessage("{} failed to remove snapshot deletion metadata", snapshotIds), e);
|
||||
if (listener != null) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -1328,7 +1369,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
if (failure != null) {
|
||||
listener.onFailure(failure);
|
||||
} else {
|
||||
logger.info("Successfully deleted snapshot [{}]", snapshot);
|
||||
logger.info("Successfully deleted snapshots {}", snapshotIds);
|
||||
listener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@
|
|||
* {@link org.elasticsearch.cluster.SnapshotDeletionsInProgress}.</li>
|
||||
*
|
||||
* <li>Once the cluster state contains the deletion entry in {@code SnapshotDeletionsInProgress} the {@code SnapshotsService} will invoke
|
||||
* {@link org.elasticsearch.repositories.Repository#deleteSnapshot} for the given snapshot, which will remove files associated with the
|
||||
* {@link org.elasticsearch.repositories.Repository#deleteSnapshots} for the given snapshot, which will remove files associated with the
|
||||
* snapshot from the repository as well as update its meta-data to reflect the deletion of the snapshot.</li>
|
||||
*
|
||||
* <li>After the deletion of the snapshot's data from the repository finishes, the {@code SnapshotsService} will submit a cluster state
|
||||
|
|
|
@ -123,7 +123,7 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
|
|||
.putCustom(SnapshotDeletionsInProgress.TYPE,
|
||||
SnapshotDeletionsInProgress.newInstance(
|
||||
new SnapshotDeletionsInProgress.Entry(
|
||||
new Snapshot("repo1", new SnapshotId("snap1", UUIDs.randomBase64UUID())),
|
||||
Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())), "repo1",
|
||||
randomNonNegativeLong(), randomNonNegativeLong())
|
||||
));
|
||||
if (includeRestore) {
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.transport.Transport;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -172,7 +173,7 @@ public class RepositoriesServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
|
|
@ -67,7 +67,8 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
final List<SnapshotId> snapshotIds = repositoryData.getSnapshots(index);
|
||||
return snapshotIds.contains(randomSnapshot) && snapshotIds.size() > 1;
|
||||
}).toArray(IndexId[]::new);
|
||||
assertThat(repositoryData.indicesToUpdateAfterRemovingSnapshot(randomSnapshot), containsInAnyOrder(indicesToUpdate));
|
||||
assertThat(repositoryData.indicesToUpdateAfterRemovingSnapshot(
|
||||
Collections.singleton(randomSnapshot)), containsInAnyOrder(indicesToUpdate));
|
||||
}
|
||||
|
||||
public void testXContent() throws IOException {
|
||||
|
@ -152,7 +153,7 @@ public class RepositoryDataTests extends ESTestCase {
|
|||
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
|
||||
assertThat(snapshotIds.size(), greaterThan(0));
|
||||
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
|
||||
RepositoryData newRepositoryData = repositoryData.removeSnapshot(removedSnapshotId, ShardGenerations.EMPTY);
|
||||
RepositoryData newRepositoryData = repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY);
|
||||
// make sure the repository data's indices no longer contain the removed snapshot
|
||||
for (final IndexId indexId : newRepositoryData.getIndices().values()) {
|
||||
assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
|
||||
|
|
|
@ -184,8 +184,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 1L));
|
||||
|
||||
// removing a snapshot and writing to a new index generational file
|
||||
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
|
||||
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
|
||||
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshots(
|
||||
Collections.singleton(repositoryData.getSnapshotIds().iterator().next()), ShardGenerations.EMPTY);
|
||||
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 2L));
|
||||
|
|
|
@ -134,6 +134,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertRequ
|
|||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
@ -1277,8 +1278,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
int numberOfFilesBeforeDeletion = numberOfFiles(repo);
|
||||
|
||||
logger.info("--> delete all snapshots except the first one and last one");
|
||||
|
||||
if (randomBoolean()) {
|
||||
for (int i = 1; i < numberOfSnapshots - 1; i++) {
|
||||
client.admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-" + i).get();
|
||||
client.admin().cluster().prepareDeleteSnapshot("test-repo", new String[]{"test-snap-" + i}).get();
|
||||
}
|
||||
} else {
|
||||
client.admin().cluster().prepareDeleteSnapshot(
|
||||
"test-repo", IntStream.range(1, numberOfSnapshots - 1).mapToObj(i -> "test-snap-" + i).toArray(String[]::new)).get();
|
||||
}
|
||||
|
||||
int numberOfFilesAfterDeletion = numberOfFiles(repo);
|
||||
|
@ -3691,6 +3698,40 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertHitCount(client().prepareSearch("restored-3").setSize(0).get(), expectedCount);
|
||||
}
|
||||
|
||||
public void testBulkDeleteWithOverlappingPatterns() {
|
||||
final int numberOfSnapshots = between(5, 15);
|
||||
Path repo = randomRepoPath();
|
||||
logger.info("--> creating repository at {}", repo.toAbsolutePath());
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(Settings.builder()
|
||||
.put("location", repo)
|
||||
.put("compress", false)
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
final String[] indices = {"test-idx-1", "test-idx-2", "test-idx-3"};
|
||||
createIndex(indices);
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> creating {} snapshots ", numberOfSnapshots);
|
||||
for (int i = 0; i < numberOfSnapshots; i++) {
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index(randomFrom(indices), "_doc", Integer.toString(i * 10 + j), "foo", "bar" + i * 10 + j);
|
||||
}
|
||||
refresh();
|
||||
logger.info("--> snapshot {}", i);
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-" + i)
|
||||
.setWaitForCompletion(true).get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
}
|
||||
|
||||
logger.info("--> deleting all snapshots");
|
||||
client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-*", "*").get();
|
||||
final GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo").get();
|
||||
assertThat(getSnapshotsResponse.getSnapshots(), empty());
|
||||
}
|
||||
|
||||
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
|
||||
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
|
||||
final List<String> expected = snapshotInfo.indices();
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.snapshots.SnapshotInfo;
|
|||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -109,7 +110,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionReque
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
|
@ -269,7 +270,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
}
|
||||
|
||||
@Override
|
||||
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
|
||||
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
|
||||
ActionListener<Void> listener) {
|
||||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.elasticsearch.xpack.core.ilm.Step.StepKey;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.xpack.core.ilm.AbstractStepMasterTimeoutTestCase.emptyClusterState;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.arrayContaining;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class CleanupSnapshotStepTests extends AbstractStepTestCase<CleanupSnapshotStep> {
|
||||
|
@ -149,7 +149,7 @@ public class CleanupSnapshotStepTests extends AbstractStepTestCase<CleanupSnapsh
|
|||
ActionListener<Response> listener) {
|
||||
assertThat(action.name(), is(DeleteSnapshotAction.NAME));
|
||||
assertTrue(request instanceof DeleteSnapshotRequest);
|
||||
assertThat(((DeleteSnapshotRequest) request).snapshot(), equalTo(expectedSnapshotName));
|
||||
assertThat(((DeleteSnapshotRequest) request).snapshots(), arrayContaining(expectedSnapshotName));
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -379,6 +379,8 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
|
|||
for (SnapshotInfo info : snapshots) {
|
||||
final String policyId = getPolicyId(info);
|
||||
final long deleteStartTime = nowNanoSupplier.getAsLong();
|
||||
// TODO: Use snapshot multi-delete instead of this loop if all nodes in the cluster support it
|
||||
// i.e are newer or equal to SnapshotsService#MULTI_DELETE_VERSION
|
||||
deleteSnapshot(policyId, repo, info.snapshotId(), slmStats, ActionListener.wrap(acknowledgedResponse -> {
|
||||
deleted.incrementAndGet();
|
||||
if (acknowledgedResponse.isAcknowledged()) {
|
||||
|
|
|
@ -354,7 +354,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
|
|||
assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));
|
||||
|
||||
SnapshotDeletionsInProgress delInProgress = new SnapshotDeletionsInProgress(
|
||||
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(snapshot, 0, 0)));
|
||||
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(
|
||||
Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository(), 0, 0)));
|
||||
state = ClusterState.builder(new ClusterName("cluster"))
|
||||
.putCustom(SnapshotDeletionsInProgress.TYPE, delInProgress)
|
||||
.build();
|
||||
|
|
Loading…
Reference in New Issue