Synchronize snapshot deletions on the cluster state (#22313)

Before, snapshot/restore would synchronize all operations on the cluster
state except for deleting snapshots.  This meant that only one
snapshot/restore operation would be allowed in the cluster at any given
time, except for deletions - there could be two or more snapshot
deletions running at the same time, or a deletion could be running,
unbeknowest to the rest of the cluster, and thus a snapshot or restore
would be allowed at the same time as the snapshot deletion was still in
progress.  This could cause any number of synchronization issues,
including the situation where a snapshot that was deleted could reappear
in the index-N file, even though its data was no longer present in the
repository.

This commit introduces a new custom type to the cluster state to
represent deletions in progress.  Now, another deletion cannot start if
a deletion is currently in progress.  Similarily, a snapshot or restore
cannot be started if a deletion is currently in progress.  In each case,
if attempting to run another snapshot/restore operation while a deletion
is in progress, a ConcurrentSnapshotExecutionException will be thrown.
This is the same exception thrown if trying to snapshot while another
snapshot is in progress, or restore while a snapshot is in progress.

Closes #19957
This commit is contained in:
Ali Beyad 2016-12-25 19:00:20 -05:00 committed by GitHub
parent 5185a9734d
commit 8261bd358a
18 changed files with 798 additions and 116 deletions

View File

@ -75,6 +75,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}, false);
}
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -107,6 +106,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
// register non plugin custom parts
registerPrototype(SnapshotsInProgress.TYPE, SnapshotsInProgress.PROTO);
registerPrototype(RestoreInProgress.TYPE, RestoreInProgress.PROTO);
registerPrototype(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.PROTO);
}
public static <T extends Custom> T lookupPrototype(String type) {
@ -715,8 +715,18 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
routingTable.writeTo(out);
nodes.writeTo(out);
blocks.writeTo(out);
out.writeVInt(customs.size());
boolean omitSnapshotDeletions = false;
if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED)
&& customs.containsKey(SnapshotDeletionsInProgress.TYPE)) {
// before the stated version, there were no SnapshotDeletionsInProgress, so
// don't transfer over the wire protocol
omitSnapshotDeletions = true;
}
out.writeVInt(omitSnapshotDeletions ? customs.size() - 1 : customs.size());
for (ObjectObjectCursor<String, Custom> cursor : customs) {
if (omitSnapshotDeletions && cursor.key.equals(SnapshotDeletionsInProgress.TYPE)) {
continue;
}
out.writeString(cursor.key);
cursor.value.writeTo(out);
}
@ -787,7 +797,21 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
nodes.writeTo(out);
metaData.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
Diff<ImmutableOpenMap<String, Custom>> customsDiff = customs;
if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED)) {
customsDiff = removeSnapshotDeletionsCustomDiff(customsDiff);
}
customsDiff.writeTo(out);
}
private Diff<ImmutableOpenMap<String, Custom>> removeSnapshotDeletionsCustomDiff(Diff<ImmutableOpenMap<String, Custom>> customs) {
if (customs instanceof DiffableUtils.ImmutableOpenMapDiff) {
@SuppressWarnings("unchecked")
DiffableUtils.ImmutableOpenMapDiff customsDiff = ((DiffableUtils.ImmutableOpenMapDiff) customs)
.withKeyRemoved(SnapshotDeletionsInProgress.TYPE);
return customsDiff;
}
return customs;
}
@Override

View File

@ -214,12 +214,17 @@ public final class DiffableUtils {
*
* @param <T> the object type
*/
private static class ImmutableOpenMapDiff<K, T> extends MapDiff<K, T, ImmutableOpenMap<K, T>> {
public static class ImmutableOpenMapDiff<K, T> extends MapDiff<K, T, ImmutableOpenMap<K, T>> {
protected ImmutableOpenMapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
super(in, keySerializer, valueSerializer);
}
private ImmutableOpenMapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer,
List<K> deletes, Map<K, Diff<T>> diffs, Map<K, T> upserts) {
super(keySerializer, valueSerializer, deletes, diffs, upserts);
}
public ImmutableOpenMapDiff(ImmutableOpenMap<K, T> before, ImmutableOpenMap<K, T> after,
KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) {
super(keySerializer, valueSerializer);
@ -245,6 +250,21 @@ public final class DiffableUtils {
}
}
/**
* Returns a new diff map with the given key removed, does not modify the invoking instance.
* If the key does not exist in the diff map, the same instance is returned.
*/
public ImmutableOpenMapDiff<K, T> withKeyRemoved(K key) {
if (this.diffs.containsKey(key) == false && this.upserts.containsKey(key) == false) {
return this;
}
Map<K, Diff<T>> newDiffs = new HashMap<>(this.diffs);
newDiffs.remove(key);
Map<K, T> newUpserts = new HashMap<>(this.upserts);
newUpserts.remove(key);
return new ImmutableOpenMapDiff<>(this.keySerializer, this.valueSerializer, this.deletes, newDiffs, newUpserts);
}
@Override
public ImmutableOpenMap<K, T> apply(ImmutableOpenMap<K, T> map) {
ImmutableOpenMap.Builder<K, T> builder = ImmutableOpenMap.builder();
@ -346,6 +366,15 @@ public final class DiffableUtils {
upserts = new HashMap<>();
}
protected MapDiff(KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer,
List<K> deletes, Map<K, Diff<T>> diffs, Map<K, T> upserts) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
this.deletes = deletes;
this.diffs = diffs;
this.upserts = upserts;
}
protected MapDiff(StreamInput in, KeySerializer<K> keySerializer, ValueSerializer<K, T> valueSerializer) throws IOException {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;

View File

@ -0,0 +1,217 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.snapshots.Snapshot;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
/**
* A class that represents the snapshot deletions that are in progress in the cluster.
*/
public class SnapshotDeletionsInProgress extends AbstractDiffable<Custom> implements Custom {
public static final String TYPE = "snapshot_deletions";
public static final SnapshotDeletionsInProgress PROTO = new SnapshotDeletionsInProgress(Collections.emptyList());
// the version where SnapshotDeletionsInProgress was introduced
public static final Version VERSION_INTRODUCED = Version.V_6_0_0_alpha1_UNRELEASED;
// the list of snapshot deletion request entries
private final List<Entry> entries;
private SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
}
public SnapshotDeletionsInProgress(StreamInput in) throws IOException {
this.entries = Collections.unmodifiableList(in.readList(Entry::new));
}
/**
* Returns a new instance of {@link SnapshotDeletionsInProgress} with the given
* {@link Entry} added.
*/
public static SnapshotDeletionsInProgress newInstance(Entry entry) {
return new SnapshotDeletionsInProgress(Collections.singletonList(entry));
}
/**
* Returns a new instance of {@link SnapshotDeletionsInProgress} which adds
* the given {@link Entry} to the invoking instance.
*/
public SnapshotDeletionsInProgress withAddedEntry(Entry entry) {
List<Entry> entries = new ArrayList<>(getEntries());
entries.add(entry);
return new SnapshotDeletionsInProgress(entries);
}
/**
* Returns a new instance of {@link SnapshotDeletionsInProgress} which removes
* the given entry from the invoking instance.
*/
public SnapshotDeletionsInProgress withRemovedEntry(Entry entry) {
List<Entry> entries = new ArrayList<>(getEntries());
entries.remove(entry);
return new SnapshotDeletionsInProgress(entries);
}
/**
* Returns an unmodifiable list of snapshot deletion entries.
*/
public List<Entry> getEntries() {
return entries;
}
/**
* Returns {@code true} if there are snapshot deletions in progress in the cluster,
* returns {@code false} otherwise.
*/
public boolean hasDeletionsInProgress() {
return entries.isEmpty() == false;
}
@Override
public String type() {
return TYPE;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SnapshotDeletionsInProgress that = (SnapshotDeletionsInProgress) o;
return entries.equals(that.entries);
}
@Override
public int hashCode() {
return 31 + entries.hashCode();
}
@Override
public Custom readFrom(StreamInput in) throws IOException {
return new SnapshotDeletionsInProgress(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(entries);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
builder.field("repository", entry.snapshot.getRepository());
builder.field("snapshot", entry.snapshot.getSnapshotId().getName());
builder.timeValueField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
}
builder.endObject();
}
builder.endArray();
return builder;
}
/**
* A class representing a snapshot deletion request entry in the cluster state.
*/
public static final class Entry implements Writeable {
private final Snapshot snapshot;
private final long startTime;
private final long repositoryStateId;
public Entry(Snapshot snapshot, long startTime, long repositoryStateId) {
this.snapshot = snapshot;
this.startTime = startTime;
this.repositoryStateId = repositoryStateId;
}
public Entry(StreamInput in) throws IOException {
this.snapshot = new Snapshot(in);
this.startTime = in.readVLong();
this.repositoryStateId = in.readLong();
}
/**
* The snapshot to delete.
*/
public Snapshot getSnapshot() {
return snapshot;
}
/**
* The start time in milliseconds for deleting the snapshots.
*/
public long getStartTime() {
return startTime;
}
/**
* The repository state id at the time the snapshot deletion began.
*/
public long getRepositoryStateId() {
return repositoryStateId;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Entry that = (Entry) o;
return snapshot.equals(that.snapshot)
&& startTime == that.startTime
&& repositoryStateId == that.repositoryStateId;
}
@Override
public int hashCode() {
return Objects.hash(snapshot, startTime, repositoryStateId);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeVLong(startTime);
out.writeLong(repositoryStateId);
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster;
import com.carrotsearch.hppc.ObjectContainer;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
@ -48,6 +49,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
public static final SnapshotsInProgress PROTO = new SnapshotsInProgress();
// denotes an undefined repository state id, which will happen when receiving a cluster state with
// a snapshot in progress from a pre 5.2.x node
public static final long UNDEFINED_REPOSITORY_STATE_ID = -2L;
// the version where repository state ids were introduced
private static final Version REPOSITORY_ID_INTRODUCED_VERSION = Version.V_6_0_0_alpha1_UNRELEASED;
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -74,9 +81,10 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private final List<IndexId> indices;
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
private final long startTime;
private final long repositoryStateId;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@ -90,10 +98,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
this.shards = shards;
this.waitingIndices = findWaitingIndices(shards);
}
this.repositoryStateId = repositoryStateId;
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards);
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards);
}
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@ -132,6 +142,10 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return startTime;
}
public long getRepositoryStateId() {
return repositoryStateId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
@ -147,6 +161,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (!waitingIndices.equals(entry.waitingIndices)) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
return true;
}
@ -161,6 +176,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
result = 31 * result + indices.hashCode();
result = 31 * result + waitingIndices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
return result;
}
@ -387,12 +403,17 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
}
long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID;
if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
repositoryStateId = in.readLong();
}
entries[i] = new Entry(snapshot,
includeGlobalState,
partial,
state,
Collections.unmodifiableList(indexBuilder),
startTime,
repositoryStateId,
builder.build());
}
return new SnapshotsInProgress(entries);
@ -417,6 +438,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
}
if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
out.writeLong(entry.repositoryStateId);
}
}
}
@ -430,6 +454,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
private static final String INDICES = "indices";
private static final String START_TIME_MILLIS = "start_time_millis";
private static final String START_TIME = "start_time";
private static final String REPOSITORY_STATE_ID = "repository_state_id";
private static final String SHARDS = "shards";
private static final String INDEX = "index";
private static final String SHARD = "shard";
@ -461,6 +486,7 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
}
builder.endArray();
builder.timeValueField(START_TIME_MILLIS, START_TIME, entry.startTime());
builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId());
builder.startArray(SHARDS);
{
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards) {

View File

@ -115,16 +115,19 @@ public interface Repository extends LifecycleComponent {
* @param failure global failure reason or null
* @param totalShards total number of shards
* @param shardFailures list of shard failures
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
* @return snapshot description
*/
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures);
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId);
/**
* Deletes snapshot
*
* @param snapshotId snapshot id
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
*/
void deleteSnapshot(SnapshotId snapshotId);
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId);
/**
* Returns snapshot throttle time in nanoseconds

View File

@ -44,8 +44,19 @@ import java.util.stream.Collectors;
*/
public final class RepositoryData implements ToXContent {
public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap());
/**
* The generation value indicating the repository has no index generational files.
*/
public static final long EMPTY_REPO_GEN = -1L;
/**
* An instance initialized for an empty repository.
*/
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, Collections.emptyList(), Collections.emptyMap());
/**
* The generational id of the index file from which the repository data was read.
*/
private final long genId;
/**
* The ids of the snapshots in the repository.
*/
@ -59,7 +70,8 @@ public final class RepositoryData implements ToXContent {
*/
private final Map<IndexId, Set<SnapshotId>> indexSnapshots;
public RepositoryData(List<SnapshotId> snapshotIds, Map<IndexId, Set<SnapshotId>> indexSnapshots) {
private RepositoryData(long genId, List<SnapshotId> snapshotIds, Map<IndexId, Set<SnapshotId>> indexSnapshots) {
this.genId = genId;
this.snapshotIds = Collections.unmodifiableList(snapshotIds);
this.indices = Collections.unmodifiableMap(indexSnapshots.keySet()
.stream()
@ -67,8 +79,22 @@ public final class RepositoryData implements ToXContent {
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
}
/**
* Creates an instance of {@link RepositoryData} on a fresh repository (one that has no index-N files).
*/
public static RepositoryData initRepositoryData(List<SnapshotId> snapshotIds, Map<IndexId, Set<SnapshotId>> indexSnapshots) {
return new RepositoryData(EMPTY_REPO_GEN, snapshotIds, indexSnapshots);
}
protected RepositoryData copy() {
return new RepositoryData(snapshotIds, indexSnapshots);
return new RepositoryData(genId, snapshotIds, indexSnapshots);
}
/**
* Gets the generational index file id from which this instance was read.
*/
public long getGenId() {
return genId;
}
/**
@ -91,7 +117,10 @@ public final class RepositoryData implements ToXContent {
*/
public RepositoryData addSnapshot(final SnapshotId snapshotId, final List<IndexId> snapshottedIndices) {
if (snapshotIds.contains(snapshotId)) {
throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data");
// if the snapshot id already exists in the repository data, it means an old master
// that is blocked from the cluster is trying to finalize a snapshot concurrently with
// the new master, so we make the operation idempotent
return this;
}
List<SnapshotId> snapshots = new ArrayList<>(snapshotIds);
snapshots.add(snapshotId);
@ -110,14 +139,7 @@ public final class RepositoryData implements ToXContent {
allIndexSnapshots.put(indexId, ids);
}
}
return new RepositoryData(snapshots, allIndexSnapshots);
}
/**
* Initializes the indices in the repository metadata; returns a new instance.
*/
public RepositoryData initIndices(final Map<IndexId, Set<SnapshotId>> indexSnapshots) {
return new RepositoryData(snapshotIds, indexSnapshots);
return new RepositoryData(genId, snapshots, allIndexSnapshots);
}
/**
@ -146,7 +168,7 @@ public final class RepositoryData implements ToXContent {
indexSnapshots.put(indexId, set);
}
return new RepositoryData(newSnapshotIds, indexSnapshots);
return new RepositoryData(genId, newSnapshotIds, indexSnapshots);
}
/**
@ -256,7 +278,7 @@ public final class RepositoryData implements ToXContent {
return builder;
}
public static RepositoryData fromXContent(final XContentParser parser) throws IOException {
public static RepositoryData fromXContent(final XContentParser parser, final long genId) throws IOException {
List<SnapshotId> snapshots = new ArrayList<>();
Map<IndexId, Set<SnapshotId>> indexSnapshots = new HashMap<>();
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
@ -305,7 +327,7 @@ public final class RepositoryData implements ToXContent {
} else {
throw new ElasticsearchParseException("start object expected");
}
return new RepositoryData(snapshots, indexSnapshots);
return new RepositoryData(genId, snapshots, indexSnapshots);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -324,7 +325,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository");
}
@ -352,7 +353,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
try {
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
writeIndexGen(updatedRepositoryData);
writeIndexGen(updatedRepositoryData, repositoryStateId);
// delete the snapshot file
safeSnapshotBlobDelete(snapshot, snapshotId.getUUID());
@ -454,7 +455,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final long startTime,
final String failure,
final int totalShards,
final List<SnapshotShardFailure> shardFailures) {
final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId) {
try {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
@ -467,7 +469,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final RepositoryData repositoryData = getRepositoryData();
List<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
if (!snapshotIds.contains(snapshotId)) {
writeIndexGen(repositoryData.addSnapshot(snapshotId, indices));
writeIndexGen(repositoryData.addSnapshot(snapshotId, indices), repositoryStateId);
}
return blobStoreSnapshot;
} catch (IOException ex) {
@ -628,7 +630,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Streams.copy(blob, out);
// EMPTY is safe here because RepositoryData#fromXContent calls namedObject
try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, out.bytes())) {
repositoryData = RepositoryData.fromXContent(parser);
repositoryData = RepositoryData.fromXContent(parser, indexGen);
}
}
return repositoryData;
@ -654,8 +656,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return snapshotsBlobContainer;
}
protected void writeIndexGen(final RepositoryData repositoryData) throws IOException {
protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = latestIndexBlobId();
if (repositoryStateId != SnapshotsInProgress.UNDEFINED_REPOSITORY_STATE_ID && currentGen != repositoryStateId) {
// the index file was updated by a concurrent operation, so we were operating on stale
// repository data
throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" +
repositoryStateId + "], actual current generation [" + currentGen +
"] - possibly due to simultaneous snapshot deletion requests");
}
final long newGen = currentGen + 1;
final BytesReference snapshotsBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) {
@ -665,12 +676,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
snapshotsBytes = bStream.bytes();
}
final long gen = latestIndexBlobId() + 1;
// write the index file
writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes);
writeAtomic(INDEX_FILE_PREFIX + Long.toString(newGen), snapshotsBytes);
// delete the N-2 index file if it exists, keep the previous one around as a backup
if (isReadOnly() == false && gen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2);
if (isReadOnly() == false && newGen - 2 >= 0) {
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) {
snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile);
}
@ -683,7 +693,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// write the current generation to the index-latest file
final BytesReference genBytes;
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
bStream.writeLong(gen);
bStream.writeLong(newGen);
genBytes = bStream.bytes();
}
if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) {
@ -719,7 +729,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// index-latest blob
// in a read-only repository, we can't know which of the two scenarios it is,
// but we will assume (1) because we can't do anything about (2) anyway
return -1;
return RepositoryData.EMPTY_REPO_GEN;
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -205,6 +206,13 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp
if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) {
throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster");
}
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
deletionsInProgress.getEntries().get(0).getSnapshot() + "]");
}
// Updating cluster state
ClusterState.Builder builder = ClusterState.builder(currentState);

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
@ -48,6 +49,7 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -232,7 +234,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public ClusterState execute(ClusterState currentState) {
validate(request, currentState);
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
"cannot snapshot while a snapshot deletion is in-progress");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null || snapshots.entries().isEmpty()) {
// Store newSnapshot here to be processed in clusterStateProcessed
@ -245,10 +251,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
State.INIT,
snapshotIndices,
System.currentTimeMillis(),
repositoryData.getGenId(),
null);
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
// TODO: What should we do if a snapshot is already running?
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "a snapshot is already running");
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
@ -468,7 +474,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
snapshot.startTime(),
ExceptionsHelper.detailedMessage(exception),
0,
Collections.emptyList());
Collections.emptyList(),
snapshot.getRepositoryStateId());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner);
@ -601,12 +608,35 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (event.routingTableChanged()) {
processStartedShards(event);
}
finalizeSnapshotDeletionFromPreviousMaster(event);
}
} catch (Exception e) {
logger.warn("Failed to update snapshot state ", e);
}
}
/**
* Finalizes a snapshot deletion in progress if the current node is the master but it
* was not master in the previous cluster state and there is still a lingering snapshot
* deletion in progress in the cluster state. This means that the old master failed
* before it could clean up an in-progress snapshot deletion. We attempt to delete the
* snapshot files and remove the deletion from the cluster state. It is possible that the
* old master was in a state of long GC and then it resumes and tries to delete the snapshot
* that has already been deleted by the current master. This is acceptable however, since
* the old master's snapshot deletion will just respond with an error but in actuality, the
* snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists.
*/
private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) {
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId());
}
}
}
/**
* Cleans up shard snapshots that were running on removed nodes
*
@ -667,7 +697,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(Exception e) {
logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot());
}
});
}, updatedSnapshot.getRepositoryStateId(), false);
} else if (snapshot.state() == State.SUCCESS && newMaster) {
// Finalize the snapshot
endSnapshot(snapshot);
@ -875,7 +905,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
}
}
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(snapshot.getSnapshotId(), entry.indices(), entry.startTime(), failure, entry.shards().size(), Collections.unmodifiableList(shardFailures));
SnapshotInfo snapshotInfo = repository.finalizeSnapshot(
snapshot.getSnapshotId(),
entry.indices(),
entry.startTime(),
failure,
entry.shards().size(),
Collections.unmodifiableList(shardFailures),
entry.getRepositoryStateId());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
} catch (Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
@ -904,6 +941,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure,
@Nullable ActionListener<SnapshotInfo> listener) {
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
@ -961,10 +999,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param snapshotName snapshotName
* @param listener listener
*/
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) {
public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener,
final boolean immediatePriority) {
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
Optional<SnapshotId> matchedEntry = repository.getRepositoryData().getSnapshotIds()
final RepositoryData repositoryData = repository.getRepositoryData();
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
@ -976,7 +1016,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (matchedEntry.isPresent() == false) {
throw new SnapshotMissingException(repositoryName, snapshotName);
}
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener);
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repositoryData.getGenId(), immediatePriority);
}
/**
@ -984,16 +1024,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* <p>
* If the snapshot is still running cancels the snapshot first and then deletes it from the repository.
*
* @param snapshot snapshot
* @param listener listener
* @param snapshot snapshot
* @param listener listener
* @param repositoryStateId the unique id for the state of the repository
*/
public void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener) {
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask() {
private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener, final long repositoryStateId,
final boolean immediatePriority) {
Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL;
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
boolean waitForSnapshot = false;
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted");
}
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
// don't allow snapshot deletions while a restore is taking place,
@ -1003,19 +1050,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete snapshot during a restore");
}
}
ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null) {
// No snapshots running - we can continue
return currentState;
}
SnapshotsInProgress.Entry snapshotEntry = snapshots.snapshot(snapshot);
SnapshotsInProgress.Entry snapshotEntry = snapshots != null ? snapshots.snapshot(snapshot) : null;
if (snapshotEntry == null) {
// This snapshot is not running - continue
if (!snapshots.entries().isEmpty()) {
// This snapshot is not running - delete
if (snapshots != null && !snapshots.entries().isEmpty()) {
// However other snapshots are running - cannot continue
throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete");
}
return currentState;
// add the snapshot deletion to the cluster state
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
snapshot,
System.currentTimeMillis(),
repositoryStateId
);
if (deletionsInProgress != null) {
deletionsInProgress = deletionsInProgress.withAddedEntry(entry);
} else {
deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry);
}
clusterStateBuilder.putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress);
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
@ -1060,8 +1115,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards);
snapshots = new SnapshotsInProgress(newSnapshot);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, snapshots);
}
return clusterStateBuilder.build();
}
@Override
@ -1079,7 +1135,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (completedSnapshot.equals(snapshot)) {
logger.trace("deleted snapshot completed - deleting files");
removeListener(this);
deleteSnapshotFromRepository(snapshot, listener);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->
deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(),
listener, true)
);
}
}
@ -1088,13 +1147,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (failedSnapshot.equals(snapshot)) {
logger.trace("deleted snapshot failed - deleting files", e);
removeListener(this);
deleteSnapshotFromRepository(snapshot, listener);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->
deleteSnapshot(failedSnapshot.getRepository(), failedSnapshot.getSnapshotId().getName(), listener, true)
);
}
}
});
} else {
logger.trace("deleted snapshot is not running - deleting files");
deleteSnapshotFromRepository(snapshot, listener);
deleteSnapshotFromRepository(snapshot, listener, repositoryStateId);
}
}
});
@ -1116,6 +1177,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
}
SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null) {
for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) {
if (entry.getSnapshot().getRepository().equals(repository)) {
return true;
}
}
}
return false;
}
@ -1124,15 +1193,62 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
*
* @param snapshot snapshot
* @param listener listener
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
*/
private void deleteSnapshotFromRepository(final Snapshot snapshot, final DeleteSnapshotListener listener) {
private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable final DeleteSnapshotListener listener,
long repositoryStateId) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
Repository repository = repositoriesService.repository(snapshot.getRepository());
repository.deleteSnapshot(snapshot.getSnapshotId());
listener.onResponse();
} catch (Exception t) {
listener.onFailure(t);
repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId);
removeSnapshotDeletionFromClusterState(snapshot, null, listener);
} catch (Exception ex) {
removeSnapshotDeletionFromClusterState(snapshot, ex, listener);
}
});
}
/**
* Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state.
*/
private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure,
@Nullable final DeleteSnapshotListener listener) {
clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletions != null) {
boolean changed = false;
if (deletions.hasDeletionsInProgress()) {
assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress";
SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0);
deletions = deletions.withRemovedEntry(entry);
changed = true;
}
if (changed) {
return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build();
}
}
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] failed to remove snapshot deletion metadata", snapshot), e);
if (listener != null) {
listener.onFailure(e);
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (listener != null) {
if (failure != null) {
listener.onFailure(failure);
} else {
listener.onResponse();
}
}
}
});
}

View File

@ -679,6 +679,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)),
Collections.emptyList(),
Math.abs(randomLong()),
(long) randomIntBetween(0, 1000),
ImmutableOpenMap.of()));
case 1:
return new RestoreInProgress(new RestoreInProgress.Entry(

View File

@ -59,7 +59,7 @@ public class MetaDataDeleteIndexServiceTests extends ESTestCase {
Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
System.currentTimeMillis(), ImmutableOpenMap.of()));
System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of()));
ClusterState state = ClusterState.builder(clusterState(index))
.putCustom(SnapshotsInProgress.TYPE, snaps)
.build();

View File

@ -1490,7 +1490,7 @@ public class IndexShardTests extends IndexShardTestCase {
public RepositoryData getRepositoryData() {
Map<IndexId, Set<SnapshotId>> map = new HashMap<>();
map.put(new IndexId(indexName, "blah"), emptySet());
return new RepositoryData(Collections.emptyList(), map);
return RepositoryData.initRepositoryData(Collections.emptyList(), map);
}
@Override
@ -1498,12 +1498,13 @@ public class IndexShardTests extends IndexShardTestCase {
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, List<SnapshotShardFailure> shardFailures) {
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId) {
return null;
}
@Override
public void deleteSnapshot(SnapshotId snapshotId) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) {
}
@Override

View File

@ -56,7 +56,10 @@ public class RepositoryDataTests extends ESTestCase {
XContentBuilder builder = JsonXContent.contentBuilder();
repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS);
XContentParser parser = createParser(JsonXContent.jsonXContent, builder.bytes());
assertEquals(repositoryData, RepositoryData.fromXContent(parser));
long gen = (long) randomIntBetween(0, 500);
RepositoryData fromXContent = RepositoryData.fromXContent(parser, gen);
assertEquals(repositoryData, fromXContent);
assertEquals(gen, fromXContent.getGenId());
}
public void testAddSnapshots() {
@ -64,8 +67,6 @@ public class RepositoryDataTests extends ESTestCase {
// test that adding the same snapshot id to the repository data throws an exception
final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0);
Map<String, IndexId> indexIdMap = repositoryData.getIndices();
expectThrows(IllegalArgumentException.class,
() -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList()));
// test that adding a snapshot and its indices works
SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID());
List<IndexId> indices = new ArrayList<>();
@ -91,22 +92,7 @@ public class RepositoryDataTests extends ESTestCase {
assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set
}
}
}
public void testInitIndices() {
final int numSnapshots = randomIntBetween(1, 30);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
// test that initializing indices works
Map<IndexId, Set<SnapshotId>> indices = randomIndices(snapshotIds);
RepositoryData newRepoData = repositoryData.initIndices(indices);
assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds());
for (IndexId indexId : indices.keySet()) {
assertEquals(indices.get(indexId), newRepoData.getSnapshots(indexId));
}
assertEquals(repositoryData.getGenId(), newRepoData.getGenId());
}
public void testRemoveSnapshot() {
@ -135,12 +121,8 @@ public class RepositoryDataTests extends ESTestCase {
}
public static RepositoryData generateRandomRepoData() {
return generateRandomRepoData(new ArrayList<>());
}
public static RepositoryData generateRandomRepoData(final List<SnapshotId> origSnapshotIds) {
List<SnapshotId> snapshotIds = randomSnapshots(origSnapshotIds);
return new RepositoryData(snapshotIds, randomIndices(snapshotIds));
List<SnapshotId> snapshotIds = randomSnapshots(new ArrayList<>());
return RepositoryData.initRepositoryData(snapshotIds, randomIndices(snapshotIds));
}
private static List<SnapshotId> randomSnapshots(final List<SnapshotId> origSnapshotIds) {

View File

@ -24,16 +24,18 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.client.Client;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -105,26 +107,22 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
// write to and read from a index file with no entries
assertThat(repository.getSnapshots().size(), equalTo(0));
final RepositoryData emptyData = RepositoryData.EMPTY;
repository.writeIndexGen(emptyData);
final RepositoryData readData = repository.getRepositoryData();
assertEquals(readData, emptyData);
assertEquals(readData.getIndices().size(), 0);
assertEquals(readData.getSnapshotIds().size(), 0);
repository.writeIndexGen(emptyData, emptyData.getGenId());
RepositoryData repoData = repository.getRepositoryData();
assertEquals(repoData, emptyData);
assertEquals(repoData.getIndices().size(), 0);
assertEquals(repoData.getSnapshotIds().size(), 0);
assertEquals(0L, repoData.getGenId());
// write to and read from an index file with snapshots but no indices
final int numSnapshots = randomIntBetween(1, 20);
final List<SnapshotId> snapshotIds = new ArrayList<>(numSnapshots);
for (int i = 0; i < numSnapshots; i++) {
snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap());
repository.writeIndexGen(repositoryData);
assertEquals(repository.getRepositoryData(), repositoryData);
repoData = addRandomSnapshotsToRepoData(repoData, false);
repository.writeIndexGen(repoData, repoData.getGenId());
assertEquals(repoData, repository.getRepositoryData());
// write to and read from a index file with random repository data
repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
repository.writeIndexGen(repoData, repoData.getGenId());
assertEquals(repoData, repository.getRepositoryData());
}
public void testIndexGenerationalFiles() throws Exception {
@ -132,26 +130,38 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
repository.writeIndexGen(repositoryData, repositoryData.getGenId());
assertThat(repository.getRepositoryData(), equalTo(repositoryData));
assertThat(repository.latestIndexBlobId(), equalTo(0L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
// adding more and writing to a new index generational file
repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData);
repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true);
repository.writeIndexGen(repositoryData, repositoryData.getGenId());
assertEquals(repository.getRepositoryData(), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(1L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
// removing a snapshot and writing to a new index generational file
repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0));
repository.writeIndexGen(repositoryData);
repositoryData = repository.getRepositoryData().removeSnapshot(repositoryData.getSnapshotIds().get(0));
repository.writeIndexGen(repositoryData, repositoryData.getGenId());
assertEquals(repository.getRepositoryData(), repositoryData);
assertThat(repository.latestIndexBlobId(), equalTo(2L));
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
}
public void testRepositoryDataConcurrentModificationNotAllowed() throws IOException {
final BlobStoreRepository repository = setupRepo();
// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
repository.writeIndexGen(repositoryData, repositoryData.getGenId());
// write repo data again to index generational file, errors because we already wrote to the
// N+1 generation from which this repository data instance was created
expectThrows(RepositoryException.class, () -> repository.writeIndexGen(repositoryData, repositoryData.getGenId()));
}
private BlobStoreRepository setupRepo() {
final Client client = client();
final Path location = ESIntegTestCase.randomRepoPath(node().settings());
@ -170,4 +180,18 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
return repository;
}
private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boolean inclIndices) {
int numSnapshots = randomIntBetween(1, 20);
for (int i = 0; i < numSnapshots; i++) {
SnapshotId snapshotId = new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID());
int numIndices = inclIndices ? randomIntBetween(0, 20) : 0;
List<IndexId> indexIds = new ArrayList<>(numIndices);
for (int j = 0; j < numIndices; j++) {
indexIds.add(new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()));
}
repoData = repoData.addSnapshot(snapshotId, indexIds);
}
return repoData;
}
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
/**
* Tests for snapshot/restore that require at least 2 threads available
* in the thread pool (for example, tests that use the mock repository that
* block on master).
*/
public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.snapshot.core", 2)
.put("thread_pool.snapshot.max", 2)
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(MockRepository.Plugin.class);
}
public void testConcurrentSnapshotDeletionsNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)).get());
logger.info("--> snapshot twice");
final String index = "test-idx1";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
final String index2 = "test-idx2";
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index2, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot2 = "test-snap2";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of first snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> try deleting the second snapshot, should fail because the first deletion is in progress");
try {
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get();
fail("should not be able to delete snapshots concurrently");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot delete - another snapshot is currently being deleted"));
}
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> wait until first snapshot is finished");
assertAcked(future.actionGet());
logger.info("--> delete second snapshot, which should now work");
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get();
assertTrue(client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().isEmpty());
}
public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)).get());
logger.info("--> snapshot");
final String index = "test-idx";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> try creating a second snapshot, should fail because the deletion is in progress");
final String snapshot2 = "test-snap2";
try {
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
fail("should not be able to create a snapshot while another is being deleted");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot snapshot while a snapshot deletion is in-progress"));
}
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
logger.info("--> creating second snapshot, which should now work");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}
public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)).get());
logger.info("--> snapshot");
final String index = "test-idx";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
final String index2 = "test-idx2";
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index2, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot2 = "test-snap2";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
client().admin().indices().prepareClose(index, index2).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress");
try {
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
fail("should not be able to restore a snapshot while another is being deleted");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress"));
}
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
logger.info("--> restoring snapshot, which should now work");
client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}
}

View File

@ -2277,6 +2277,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
State.ABORTED,
Collections.singletonList(indexId),
System.currentTimeMillis(),
repositoryData.getGenId(),
shards.build()));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build();
}

View File

@ -30,3 +30,10 @@ setup:
- match: { snapshot.state : SUCCESS }
- match: { snapshot.shards.successful: 1 }
- match: { snapshot.shards.failed : 0 }
- do:
snapshot.delete:
repository: test_repo_create_1
snapshot: test_snapshot
- match: { acknowledged: true }