diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index 3c37d1870e5..5cce5482ec5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -75,6 +75,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction { // register non plugin custom parts registerPrototype(SnapshotsInProgress.TYPE, SnapshotsInProgress.PROTO); registerPrototype(RestoreInProgress.TYPE, RestoreInProgress.PROTO); + registerPrototype(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.PROTO); } public static T lookupPrototype(String type) { @@ -715,8 +715,18 @@ public class ClusterState implements ToXContent, Diffable { routingTable.writeTo(out); nodes.writeTo(out); blocks.writeTo(out); - out.writeVInt(customs.size()); + boolean omitSnapshotDeletions = false; + if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED) + && customs.containsKey(SnapshotDeletionsInProgress.TYPE)) { + // before the stated version, there were no SnapshotDeletionsInProgress, so + // don't transfer over the wire protocol + omitSnapshotDeletions = true; + } + out.writeVInt(omitSnapshotDeletions ? customs.size() - 1 : customs.size()); for (ObjectObjectCursor cursor : customs) { + if (omitSnapshotDeletions && cursor.key.equals(SnapshotDeletionsInProgress.TYPE)) { + continue; + } out.writeString(cursor.key); cursor.value.writeTo(out); } @@ -787,7 +797,21 @@ public class ClusterState implements ToXContent, Diffable { nodes.writeTo(out); metaData.writeTo(out); blocks.writeTo(out); - customs.writeTo(out); + Diff> customsDiff = customs; + if (out.getVersion().before(SnapshotDeletionsInProgress.VERSION_INTRODUCED)) { + customsDiff = removeSnapshotDeletionsCustomDiff(customsDiff); + } + customsDiff.writeTo(out); + } + + private Diff> removeSnapshotDeletionsCustomDiff(Diff> customs) { + if (customs instanceof DiffableUtils.ImmutableOpenMapDiff) { + @SuppressWarnings("unchecked") + DiffableUtils.ImmutableOpenMapDiff customsDiff = ((DiffableUtils.ImmutableOpenMapDiff) customs) + .withKeyRemoved(SnapshotDeletionsInProgress.TYPE); + return customsDiff; + } + return customs; } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java index 1a3557890dd..234f22010fa 100644 --- a/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java +++ b/core/src/main/java/org/elasticsearch/cluster/DiffableUtils.java @@ -214,12 +214,17 @@ public final class DiffableUtils { * * @param the object type */ - private static class ImmutableOpenMapDiff extends MapDiff> { + public static class ImmutableOpenMapDiff extends MapDiff> { protected ImmutableOpenMapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { super(in, keySerializer, valueSerializer); } + private ImmutableOpenMapDiff(KeySerializer keySerializer, ValueSerializer valueSerializer, + List deletes, Map> diffs, Map upserts) { + super(keySerializer, valueSerializer, deletes, diffs, upserts); + } + public ImmutableOpenMapDiff(ImmutableOpenMap before, ImmutableOpenMap after, KeySerializer keySerializer, ValueSerializer valueSerializer) { super(keySerializer, valueSerializer); @@ -245,6 +250,21 @@ public final class DiffableUtils { } } + /** + * Returns a new diff map with the given key removed, does not modify the invoking instance. + * If the key does not exist in the diff map, the same instance is returned. + */ + public ImmutableOpenMapDiff withKeyRemoved(K key) { + if (this.diffs.containsKey(key) == false && this.upserts.containsKey(key) == false) { + return this; + } + Map> newDiffs = new HashMap<>(this.diffs); + newDiffs.remove(key); + Map newUpserts = new HashMap<>(this.upserts); + newUpserts.remove(key); + return new ImmutableOpenMapDiff<>(this.keySerializer, this.valueSerializer, this.deletes, newDiffs, newUpserts); + } + @Override public ImmutableOpenMap apply(ImmutableOpenMap map) { ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); @@ -346,6 +366,15 @@ public final class DiffableUtils { upserts = new HashMap<>(); } + protected MapDiff(KeySerializer keySerializer, ValueSerializer valueSerializer, + List deletes, Map> diffs, Map upserts) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.deletes = deletes; + this.diffs = diffs; + this.upserts = upserts; + } + protected MapDiff(StreamInput in, KeySerializer keySerializer, ValueSerializer valueSerializer) throws IOException { this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java new file mode 100644 index 00000000000..f6257fd7a92 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState.Custom; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.snapshots.Snapshot; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * A class that represents the snapshot deletions that are in progress in the cluster. + */ +public class SnapshotDeletionsInProgress extends AbstractDiffable implements Custom { + + public static final String TYPE = "snapshot_deletions"; + public static final SnapshotDeletionsInProgress PROTO = new SnapshotDeletionsInProgress(Collections.emptyList()); + // the version where SnapshotDeletionsInProgress was introduced + public static final Version VERSION_INTRODUCED = Version.V_6_0_0_alpha1_UNRELEASED; + + // the list of snapshot deletion request entries + private final List entries; + + private SnapshotDeletionsInProgress(List entries) { + this.entries = Collections.unmodifiableList(entries); + } + + public SnapshotDeletionsInProgress(StreamInput in) throws IOException { + this.entries = Collections.unmodifiableList(in.readList(Entry::new)); + } + + /** + * Returns a new instance of {@link SnapshotDeletionsInProgress} with the given + * {@link Entry} added. + */ + public static SnapshotDeletionsInProgress newInstance(Entry entry) { + return new SnapshotDeletionsInProgress(Collections.singletonList(entry)); + } + + /** + * Returns a new instance of {@link SnapshotDeletionsInProgress} which adds + * the given {@link Entry} to the invoking instance. + */ + public SnapshotDeletionsInProgress withAddedEntry(Entry entry) { + List entries = new ArrayList<>(getEntries()); + entries.add(entry); + return new SnapshotDeletionsInProgress(entries); + } + + /** + * Returns a new instance of {@link SnapshotDeletionsInProgress} which removes + * the given entry from the invoking instance. + */ + public SnapshotDeletionsInProgress withRemovedEntry(Entry entry) { + List entries = new ArrayList<>(getEntries()); + entries.remove(entry); + return new SnapshotDeletionsInProgress(entries); + } + + /** + * Returns an unmodifiable list of snapshot deletion entries. + */ + public List getEntries() { + return entries; + } + + /** + * Returns {@code true} if there are snapshot deletions in progress in the cluster, + * returns {@code false} otherwise. + */ + public boolean hasDeletionsInProgress() { + return entries.isEmpty() == false; + } + + @Override + public String type() { + return TYPE; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SnapshotDeletionsInProgress that = (SnapshotDeletionsInProgress) o; + return entries.equals(that.entries); + } + + @Override + public int hashCode() { + return 31 + entries.hashCode(); + } + + @Override + public Custom readFrom(StreamInput in) throws IOException { + return new SnapshotDeletionsInProgress(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(entries); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startArray(TYPE); + for (Entry entry : entries) { + builder.startObject(); + { + builder.field("repository", entry.snapshot.getRepository()); + builder.field("snapshot", entry.snapshot.getSnapshotId().getName()); + builder.timeValueField("start_time_millis", "start_time", entry.startTime); + builder.field("repository_state_id", entry.repositoryStateId); + } + builder.endObject(); + } + builder.endArray(); + return builder; + } + + /** + * A class representing a snapshot deletion request entry in the cluster state. + */ + public static final class Entry implements Writeable { + private final Snapshot snapshot; + private final long startTime; + private final long repositoryStateId; + + public Entry(Snapshot snapshot, long startTime, long repositoryStateId) { + this.snapshot = snapshot; + this.startTime = startTime; + this.repositoryStateId = repositoryStateId; + } + + public Entry(StreamInput in) throws IOException { + this.snapshot = new Snapshot(in); + this.startTime = in.readVLong(); + this.repositoryStateId = in.readLong(); + } + + /** + * The snapshot to delete. + */ + public Snapshot getSnapshot() { + return snapshot; + } + + /** + * The start time in milliseconds for deleting the snapshots. + */ + public long getStartTime() { + return startTime; + } + + /** + * The repository state id at the time the snapshot deletion began. + */ + public long getRepositoryStateId() { + return repositoryStateId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Entry that = (Entry) o; + return snapshot.equals(that.snapshot) + && startTime == that.startTime + && repositoryStateId == that.repositoryStateId; + } + + @Override + public int hashCode() { + return Objects.hash(snapshot, startTime, repositoryStateId); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + snapshot.writeTo(out); + out.writeVLong(startTime); + out.writeLong(repositoryStateId); + } + } +} diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 6df5f85987d..26ddbec7a2a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.ObjectContainer; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; @@ -48,6 +49,12 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus public static final SnapshotsInProgress PROTO = new SnapshotsInProgress(); + // denotes an undefined repository state id, which will happen when receiving a cluster state with + // a snapshot in progress from a pre 5.2.x node + public static final long UNDEFINED_REPOSITORY_STATE_ID = -2L; + // the version where repository state ids were introduced + private static final Version REPOSITORY_ID_INTRODUCED_VERSION = Version.V_6_0_0_alpha1_UNRELEASED; + @Override public boolean equals(Object o) { if (this == o) return true; @@ -74,9 +81,10 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus private final List indices; private final ImmutableOpenMap> waitingIndices; private final long startTime; + private final long repositoryStateId; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, - long startTime, ImmutableOpenMap shards) { + long startTime, long repositoryStateId, ImmutableOpenMap shards) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -90,10 +98,12 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus this.shards = shards; this.waitingIndices = findWaitingIndices(shards); } + this.repositoryStateId = repositoryStateId; } public Entry(Entry entry, State state, ImmutableOpenMap shards) { - this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, shards); + this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, + entry.repositoryStateId, shards); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -132,6 +142,10 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus return startTime; } + public long getRepositoryStateId() { + return repositoryStateId; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -147,6 +161,7 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus if (!snapshot.equals(entry.snapshot)) return false; if (state != entry.state) return false; if (!waitingIndices.equals(entry.waitingIndices)) return false; + if (repositoryStateId != entry.repositoryStateId) return false; return true; } @@ -161,6 +176,7 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus result = 31 * result + indices.hashCode(); result = 31 * result + waitingIndices.hashCode(); result = 31 * result + Long.hashCode(startTime); + result = 31 * result + Long.hashCode(repositoryStateId); return result; } @@ -387,12 +403,17 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus State shardState = State.fromValue(in.readByte()); builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState)); } + long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID; + if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { + repositoryStateId = in.readLong(); + } entries[i] = new Entry(snapshot, includeGlobalState, partial, state, Collections.unmodifiableList(indexBuilder), startTime, + repositoryStateId, builder.build()); } return new SnapshotsInProgress(entries); @@ -417,6 +438,9 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus out.writeOptionalString(shardEntry.value.nodeId()); out.writeByte(shardEntry.value.state().value()); } + if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { + out.writeLong(entry.repositoryStateId); + } } } @@ -430,6 +454,7 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus private static final String INDICES = "indices"; private static final String START_TIME_MILLIS = "start_time_millis"; private static final String START_TIME = "start_time"; + private static final String REPOSITORY_STATE_ID = "repository_state_id"; private static final String SHARDS = "shards"; private static final String INDEX = "index"; private static final String SHARD = "shard"; @@ -461,6 +486,7 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus } builder.endArray(); builder.timeValueField(START_TIME_MILLIS, START_TIME, entry.startTime()); + builder.field(REPOSITORY_STATE_ID, entry.getRepositoryStateId()); builder.startArray(SHARDS); { for (ObjectObjectCursor shardEntry : entry.shards) { diff --git a/core/src/main/java/org/elasticsearch/repositories/Repository.java b/core/src/main/java/org/elasticsearch/repositories/Repository.java index b1f534bf684..462b7ea1dab 100644 --- a/core/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/core/src/main/java/org/elasticsearch/repositories/Repository.java @@ -115,16 +115,19 @@ public interface Repository extends LifecycleComponent { * @param failure global failure reason or null * @param totalShards total number of shards * @param shardFailures list of shard failures + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began * @return snapshot description */ - SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures); + SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId); /** * Deletes snapshot * * @param snapshotId snapshot id + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began */ - void deleteSnapshot(SnapshotId snapshotId); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId); /** * Returns snapshot throttle time in nanoseconds diff --git a/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 4927e2b41b7..eb0bbb2f868 100644 --- a/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/core/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -44,8 +44,19 @@ import java.util.stream.Collectors; */ public final class RepositoryData implements ToXContent { - public static final RepositoryData EMPTY = new RepositoryData(Collections.emptyList(), Collections.emptyMap()); + /** + * The generation value indicating the repository has no index generational files. + */ + public static final long EMPTY_REPO_GEN = -1L; + /** + * An instance initialized for an empty repository. + */ + public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, Collections.emptyList(), Collections.emptyMap()); + /** + * The generational id of the index file from which the repository data was read. + */ + private final long genId; /** * The ids of the snapshots in the repository. */ @@ -59,7 +70,8 @@ public final class RepositoryData implements ToXContent { */ private final Map> indexSnapshots; - public RepositoryData(List snapshotIds, Map> indexSnapshots) { + private RepositoryData(long genId, List snapshotIds, Map> indexSnapshots) { + this.genId = genId; this.snapshotIds = Collections.unmodifiableList(snapshotIds); this.indices = Collections.unmodifiableMap(indexSnapshots.keySet() .stream() @@ -67,8 +79,22 @@ public final class RepositoryData implements ToXContent { this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots); } + /** + * Creates an instance of {@link RepositoryData} on a fresh repository (one that has no index-N files). + */ + public static RepositoryData initRepositoryData(List snapshotIds, Map> indexSnapshots) { + return new RepositoryData(EMPTY_REPO_GEN, snapshotIds, indexSnapshots); + } + protected RepositoryData copy() { - return new RepositoryData(snapshotIds, indexSnapshots); + return new RepositoryData(genId, snapshotIds, indexSnapshots); + } + + /** + * Gets the generational index file id from which this instance was read. + */ + public long getGenId() { + return genId; } /** @@ -91,7 +117,10 @@ public final class RepositoryData implements ToXContent { */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final List snapshottedIndices) { if (snapshotIds.contains(snapshotId)) { - throw new IllegalArgumentException("[" + snapshotId + "] already exists in the repository data"); + // if the snapshot id already exists in the repository data, it means an old master + // that is blocked from the cluster is trying to finalize a snapshot concurrently with + // the new master, so we make the operation idempotent + return this; } List snapshots = new ArrayList<>(snapshotIds); snapshots.add(snapshotId); @@ -110,14 +139,7 @@ public final class RepositoryData implements ToXContent { allIndexSnapshots.put(indexId, ids); } } - return new RepositoryData(snapshots, allIndexSnapshots); - } - - /** - * Initializes the indices in the repository metadata; returns a new instance. - */ - public RepositoryData initIndices(final Map> indexSnapshots) { - return new RepositoryData(snapshotIds, indexSnapshots); + return new RepositoryData(genId, snapshots, allIndexSnapshots); } /** @@ -146,7 +168,7 @@ public final class RepositoryData implements ToXContent { indexSnapshots.put(indexId, set); } - return new RepositoryData(newSnapshotIds, indexSnapshots); + return new RepositoryData(genId, newSnapshotIds, indexSnapshots); } /** @@ -256,7 +278,7 @@ public final class RepositoryData implements ToXContent { return builder; } - public static RepositoryData fromXContent(final XContentParser parser) throws IOException { + public static RepositoryData fromXContent(final XContentParser parser, final long genId) throws IOException { List snapshots = new ArrayList<>(); Map> indexSnapshots = new HashMap<>(); if (parser.nextToken() == XContentParser.Token.START_OBJECT) { @@ -305,7 +327,7 @@ public final class RepositoryData implements ToXContent { } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(snapshots, indexSnapshots); + return new RepositoryData(genId, snapshots, indexSnapshots); } } diff --git a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index c7abded5e0e..72855bc7f30 100644 --- a/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/core/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -39,6 +39,7 @@ import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -324,7 +325,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } @Override - public void deleteSnapshot(SnapshotId snapshotId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"); } @@ -352,7 +353,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp try { // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); - writeIndexGen(updatedRepositoryData); + writeIndexGen(updatedRepositoryData, repositoryStateId); // delete the snapshot file safeSnapshotBlobDelete(snapshot, snapshotId.getUUID()); @@ -454,7 +455,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final long startTime, final String failure, final int totalShards, - final List shardFailures) { + final List shardFailures, + final long repositoryStateId) { try { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), @@ -467,7 +469,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final RepositoryData repositoryData = getRepositoryData(); List snapshotIds = repositoryData.getSnapshotIds(); if (!snapshotIds.contains(snapshotId)) { - writeIndexGen(repositoryData.addSnapshot(snapshotId, indices)); + writeIndexGen(repositoryData.addSnapshot(snapshotId, indices), repositoryStateId); } return blobStoreSnapshot; } catch (IOException ex) { @@ -628,7 +630,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Streams.copy(blob, out); // EMPTY is safe here because RepositoryData#fromXContent calls namedObject try (XContentParser parser = XContentHelper.createParser(NamedXContentRegistry.EMPTY, out.bytes())) { - repositoryData = RepositoryData.fromXContent(parser); + repositoryData = RepositoryData.fromXContent(parser, indexGen); } } return repositoryData; @@ -654,8 +656,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp return snapshotsBlobContainer; } - protected void writeIndexGen(final RepositoryData repositoryData) throws IOException { + protected void writeIndexGen(final RepositoryData repositoryData, final long repositoryStateId) throws IOException { assert isReadOnly() == false; // can not write to a read only repository + final long currentGen = latestIndexBlobId(); + if (repositoryStateId != SnapshotsInProgress.UNDEFINED_REPOSITORY_STATE_ID && currentGen != repositoryStateId) { + // the index file was updated by a concurrent operation, so we were operating on stale + // repository data + throw new RepositoryException(metadata.name(), "concurrent modification of the index-N file, expected current generation [" + + repositoryStateId + "], actual current generation [" + currentGen + + "] - possibly due to simultaneous snapshot deletion requests"); + } + final long newGen = currentGen + 1; final BytesReference snapshotsBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { try (StreamOutput stream = new OutputStreamStreamOutput(bStream)) { @@ -665,12 +676,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } snapshotsBytes = bStream.bytes(); } - final long gen = latestIndexBlobId() + 1; // write the index file - writeAtomic(INDEX_FILE_PREFIX + Long.toString(gen), snapshotsBytes); + writeAtomic(INDEX_FILE_PREFIX + Long.toString(newGen), snapshotsBytes); // delete the N-2 index file if it exists, keep the previous one around as a backup - if (isReadOnly() == false && gen - 2 >= 0) { - final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(gen - 2); + if (isReadOnly() == false && newGen - 2 >= 0) { + final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); if (snapshotsBlobContainer.blobExists(oldSnapshotIndexFile)) { snapshotsBlobContainer.deleteBlob(oldSnapshotIndexFile); } @@ -683,7 +693,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { - bStream.writeLong(gen); + bStream.writeLong(newGen); genBytes = bStream.bytes(); } if (snapshotsBlobContainer.blobExists(INDEX_LATEST_BLOB)) { @@ -719,7 +729,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // index-latest blob // in a read-only repository, we can't know which of the two scenarios it is, // but we will assume (1) because we can't do anything about (2) anyway - return -1; + return RepositoryData.EMPTY_REPO_GEN; } } } diff --git a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java index e2b389d1e05..ac01bc6fc5d 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.RestoreInProgress.ShardRestoreStatus; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -205,6 +206,13 @@ public class RestoreService extends AbstractComponent implements ClusterStateApp if (restoreInProgress != null && !restoreInProgress.entries().isEmpty()) { throw new ConcurrentSnapshotExecutionException(snapshot, "Restore process is already running in this cluster"); } + // Check if the snapshot to restore is currently being deleted + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(snapshot, + "cannot restore a snapshot while a snapshot deletion is in-progress [" + + deletionsInProgress.getEntries().get(0).getSnapshot() + "]"); + } // Updating cluster state ClusterState.Builder builder = ClusterState.builder(currentState); diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 056b2e7b10d..1b5bfde167c 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; @@ -48,6 +49,7 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -232,7 +234,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public ClusterState execute(ClusterState currentState) { validate(request, currentState); - + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, + "cannot snapshot while a snapshot deletion is in-progress"); + } SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots == null || snapshots.entries().isEmpty()) { // Store newSnapshot here to be processed in clusterStateProcessed @@ -245,10 +251,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus State.INIT, snapshotIndices, System.currentTimeMillis(), + repositoryData.getGenId(), null); snapshots = new SnapshotsInProgress(newSnapshot); } else { - // TODO: What should we do if a snapshot is already running? throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, "a snapshot is already running"); } return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); @@ -468,7 +474,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus snapshot.startTime(), ExceptionsHelper.detailedMessage(exception), 0, - Collections.emptyList()); + Collections.emptyList(), + snapshot.getRepositoryStateId()); } catch (Exception inner) { inner.addSuppressed(exception); logger.warn((Supplier) () -> new ParameterizedMessage("[{}] failed to close snapshot in repository", snapshot.snapshot()), inner); @@ -601,12 +608,35 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (event.routingTableChanged()) { processStartedShards(event); } + finalizeSnapshotDeletionFromPreviousMaster(event); } } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); } } + /** + * Finalizes a snapshot deletion in progress if the current node is the master but it + * was not master in the previous cluster state and there is still a lingering snapshot + * deletion in progress in the cluster state. This means that the old master failed + * before it could clean up an in-progress snapshot deletion. We attempt to delete the + * snapshot files and remove the deletion from the cluster state. It is possible that the + * old master was in a state of long GC and then it resumes and tries to delete the snapshot + * that has already been deleted by the current master. This is acceptable however, since + * the old master's snapshot deletion will just respond with an error but in actuality, the + * snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists. + */ + private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) { + if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { + SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; + SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId()); + } + } + } + /** * Cleans up shard snapshots that were running on removed nodes * @@ -667,7 +697,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void onFailure(Exception e) { logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); } - }); + }, updatedSnapshot.getRepositoryStateId(), false); } else if (snapshot.state() == State.SUCCESS && newMaster) { // Finalize the snapshot endSnapshot(snapshot); @@ -875,7 +905,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason())); } } - SnapshotInfo snapshotInfo = repository.finalizeSnapshot(snapshot.getSnapshotId(), entry.indices(), entry.startTime(), failure, entry.shards().size(), Collections.unmodifiableList(shardFailures)); + SnapshotInfo snapshotInfo = repository.finalizeSnapshot( + snapshot.getSnapshotId(), + entry.indices(), + entry.startTime(), + failure, + entry.shards().size(), + Collections.unmodifiableList(shardFailures), + entry.getRepositoryStateId()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); } catch (Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e); @@ -904,6 +941,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, @Nullable ActionListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { + @Override public ClusterState execute(ClusterState currentState) { SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); @@ -961,10 +999,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param snapshotName snapshotName * @param listener listener */ - public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener) { + public void deleteSnapshot(final String repositoryName, final String snapshotName, final DeleteSnapshotListener listener, + final boolean immediatePriority) { // First, look for the snapshot in the repository final Repository repository = repositoriesService.repository(repositoryName); - Optional matchedEntry = repository.getRepositoryData().getSnapshotIds() + final RepositoryData repositoryData = repository.getRepositoryData(); + Optional matchedEntry = repositoryData.getSnapshotIds() .stream() .filter(s -> s.getName().equals(snapshotName)) .findFirst(); @@ -976,7 +1016,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (matchedEntry.isPresent() == false) { throw new SnapshotMissingException(repositoryName, snapshotName); } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener); + deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repositoryData.getGenId(), immediatePriority); } /** @@ -984,16 +1024,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus *

* If the snapshot is still running cancels the snapshot first and then deletes it from the repository. * - * @param snapshot snapshot - * @param listener listener + * @param snapshot snapshot + * @param listener listener + * @param repositoryStateId the unique id for the state of the repository */ - public void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener) { - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask() { + private void deleteSnapshot(final Snapshot snapshot, final DeleteSnapshotListener listener, final long repositoryStateId, + final boolean immediatePriority) { + Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; + clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { boolean waitForSnapshot = false; @Override public ClusterState execute(ClusterState currentState) throws Exception { + SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { + throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete - another snapshot is currently being deleted"); + } RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE); if (restoreInProgress != null) { // don't allow snapshot deletions while a restore is taking place, @@ -1003,19 +1050,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus throw new ConcurrentSnapshotExecutionException(snapshot, "cannot delete snapshot during a restore"); } } + ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null) { - // No snapshots running - we can continue - return currentState; - } - SnapshotsInProgress.Entry snapshotEntry = snapshots.snapshot(snapshot); + SnapshotsInProgress.Entry snapshotEntry = snapshots != null ? snapshots.snapshot(snapshot) : null; if (snapshotEntry == null) { - // This snapshot is not running - continue - if (!snapshots.entries().isEmpty()) { + // This snapshot is not running - delete + if (snapshots != null && !snapshots.entries().isEmpty()) { // However other snapshots are running - cannot continue throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete"); } - return currentState; + // add the snapshot deletion to the cluster state + SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( + snapshot, + System.currentTimeMillis(), + repositoryStateId + ); + if (deletionsInProgress != null) { + deletionsInProgress = deletionsInProgress.withAddedEntry(entry); + } else { + deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry); + } + clusterStateBuilder.putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress); } else { // This snapshot is currently running - stopping shards first waitForSnapshot = true; @@ -1060,8 +1115,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards); snapshots = new SnapshotsInProgress(newSnapshot); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); + clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, snapshots); } + return clusterStateBuilder.build(); } @Override @@ -1079,7 +1135,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (completedSnapshot.equals(snapshot)) { logger.trace("deleted snapshot completed - deleting files"); removeListener(this); - deleteSnapshotFromRepository(snapshot, listener); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> + deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), + listener, true) + ); } } @@ -1088,13 +1147,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (failedSnapshot.equals(snapshot)) { logger.trace("deleted snapshot failed - deleting files", e); removeListener(this); - deleteSnapshotFromRepository(snapshot, listener); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> + deleteSnapshot(failedSnapshot.getRepository(), failedSnapshot.getSnapshotId().getName(), listener, true) + ); } } }); } else { logger.trace("deleted snapshot is not running - deleting files"); - deleteSnapshotFromRepository(snapshot, listener); + deleteSnapshotFromRepository(snapshot, listener, repositoryStateId); } } }); @@ -1116,6 +1177,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } } + SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletionsInProgress != null) { + for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) { + if (entry.getSnapshot().getRepository().equals(repository)) { + return true; + } + } + } return false; } @@ -1124,15 +1193,62 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * * @param snapshot snapshot * @param listener listener + * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began */ - private void deleteSnapshotFromRepository(final Snapshot snapshot, final DeleteSnapshotListener listener) { + private void deleteSnapshotFromRepository(final Snapshot snapshot, @Nullable final DeleteSnapshotListener listener, + long repositoryStateId) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { try { Repository repository = repositoriesService.repository(snapshot.getRepository()); - repository.deleteSnapshot(snapshot.getSnapshotId()); - listener.onResponse(); - } catch (Exception t) { - listener.onFailure(t); + repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId); + removeSnapshotDeletionFromClusterState(snapshot, null, listener); + } catch (Exception ex) { + removeSnapshotDeletionFromClusterState(snapshot, ex, listener); + } + }); + } + + /** + * Removes the snapshot deletion from {@link SnapshotDeletionsInProgress} in the cluster state. + */ + private void removeSnapshotDeletionFromClusterState(final Snapshot snapshot, @Nullable final Exception failure, + @Nullable final DeleteSnapshotListener listener) { + clusterService.submitStateUpdateTask("remove snapshot deletion metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + SnapshotDeletionsInProgress deletions = currentState.custom(SnapshotDeletionsInProgress.TYPE); + if (deletions != null) { + boolean changed = false; + if (deletions.hasDeletionsInProgress()) { + assert deletions.getEntries().size() == 1 : "should have exactly one deletion in progress"; + SnapshotDeletionsInProgress.Entry entry = deletions.getEntries().get(0); + deletions = deletions.withRemovedEntry(entry); + changed = true; + } + if (changed) { + return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletions).build(); + } + } + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("[{}] failed to remove snapshot deletion metadata", snapshot), e); + if (listener != null) { + listener.onFailure(e); + } + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (listener != null) { + if (failure != null) { + listener.onFailure(failure); + } else { + listener.onResponse(); + } + } } }); } diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index 9d44dbbca38..9fdbf13fc8c 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -679,6 +679,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase { SnapshotsInProgress.State.fromValue((byte) randomIntBetween(0, 6)), Collections.emptyList(), Math.abs(randomLong()), + (long) randomIntBetween(0, 1000), ImmutableOpenMap.of())); case 1: return new RestoreInProgress(new RestoreInProgress.Entry( diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java index a48f3ae3e10..0ad36713810 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java @@ -59,7 +59,7 @@ public class MetaDataDeleteIndexServiceTests extends ESTestCase { Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid")); SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), - System.currentTimeMillis(), ImmutableOpenMap.of())); + System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of())); ClusterState state = ClusterState.builder(clusterState(index)) .putCustom(SnapshotsInProgress.TYPE, snaps) .build(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 023dd49f2f0..d01aeb2bf00 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1490,7 +1490,7 @@ public class IndexShardTests extends IndexShardTestCase { public RepositoryData getRepositoryData() { Map> map = new HashMap<>(); map.put(new IndexId(indexName, "blah"), emptySet()); - return new RepositoryData(Collections.emptyList(), map); + return RepositoryData.initRepositoryData(Collections.emptyList(), map); } @Override @@ -1498,12 +1498,13 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures) { + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId) { return null; } @Override - public void deleteSnapshot(SnapshotId snapshotId) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId) { } @Override diff --git a/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index c1ac1abfdb5..97d415fe4f9 100644 --- a/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/core/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -56,7 +56,10 @@ public class RepositoryDataTests extends ESTestCase { XContentBuilder builder = JsonXContent.contentBuilder(); repositoryData.toXContent(builder, ToXContent.EMPTY_PARAMS); XContentParser parser = createParser(JsonXContent.jsonXContent, builder.bytes()); - assertEquals(repositoryData, RepositoryData.fromXContent(parser)); + long gen = (long) randomIntBetween(0, 500); + RepositoryData fromXContent = RepositoryData.fromXContent(parser, gen); + assertEquals(repositoryData, fromXContent); + assertEquals(gen, fromXContent.getGenId()); } public void testAddSnapshots() { @@ -64,8 +67,6 @@ public class RepositoryDataTests extends ESTestCase { // test that adding the same snapshot id to the repository data throws an exception final SnapshotId snapshotId = repositoryData.getSnapshotIds().get(0); Map indexIdMap = repositoryData.getIndices(); - expectThrows(IllegalArgumentException.class, - () -> repositoryData.addSnapshot(new SnapshotId(snapshotId.getName(), snapshotId.getUUID()), Collections.emptyList())); // test that adding a snapshot and its indices works SnapshotId newSnapshot = new SnapshotId(randomAsciiOfLength(7), UUIDs.randomBase64UUID()); List indices = new ArrayList<>(); @@ -91,22 +92,7 @@ public class RepositoryDataTests extends ESTestCase { assertEquals(snapshotIds.size(), 1); // if it was a new index, only the new snapshot should be in its set } } - } - - public void testInitIndices() { - final int numSnapshots = randomIntBetween(1, 30); - final List snapshotIds = new ArrayList<>(numSnapshots); - for (int i = 0; i < numSnapshots; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); - } - RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap()); - // test that initializing indices works - Map> indices = randomIndices(snapshotIds); - RepositoryData newRepoData = repositoryData.initIndices(indices); - assertEquals(repositoryData.getSnapshotIds(), newRepoData.getSnapshotIds()); - for (IndexId indexId : indices.keySet()) { - assertEquals(indices.get(indexId), newRepoData.getSnapshots(indexId)); - } + assertEquals(repositoryData.getGenId(), newRepoData.getGenId()); } public void testRemoveSnapshot() { @@ -135,12 +121,8 @@ public class RepositoryDataTests extends ESTestCase { } public static RepositoryData generateRandomRepoData() { - return generateRandomRepoData(new ArrayList<>()); - } - - public static RepositoryData generateRandomRepoData(final List origSnapshotIds) { - List snapshotIds = randomSnapshots(origSnapshotIds); - return new RepositoryData(snapshotIds, randomIndices(snapshotIds)); + List snapshotIds = randomSnapshots(new ArrayList<>()); + return RepositoryData.initRepositoryData(snapshotIds, randomIndices(snapshotIds)); } private static List randomSnapshots(final List origSnapshotIds) { diff --git a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 07e6aa0f16c..f5f036a2359 100644 --- a/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/core/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -24,16 +24,18 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.client.Client; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESSingleNodeTestCase; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -105,26 +107,22 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { // write to and read from a index file with no entries assertThat(repository.getSnapshots().size(), equalTo(0)); final RepositoryData emptyData = RepositoryData.EMPTY; - repository.writeIndexGen(emptyData); - final RepositoryData readData = repository.getRepositoryData(); - assertEquals(readData, emptyData); - assertEquals(readData.getIndices().size(), 0); - assertEquals(readData.getSnapshotIds().size(), 0); + repository.writeIndexGen(emptyData, emptyData.getGenId()); + RepositoryData repoData = repository.getRepositoryData(); + assertEquals(repoData, emptyData); + assertEquals(repoData.getIndices().size(), 0); + assertEquals(repoData.getSnapshotIds().size(), 0); + assertEquals(0L, repoData.getGenId()); // write to and read from an index file with snapshots but no indices - final int numSnapshots = randomIntBetween(1, 20); - final List snapshotIds = new ArrayList<>(numSnapshots); - for (int i = 0; i < numSnapshots; i++) { - snapshotIds.add(new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); - } - RepositoryData repositoryData = new RepositoryData(snapshotIds, Collections.emptyMap()); - repository.writeIndexGen(repositoryData); - assertEquals(repository.getRepositoryData(), repositoryData); + repoData = addRandomSnapshotsToRepoData(repoData, false); + repository.writeIndexGen(repoData, repoData.getGenId()); + assertEquals(repoData, repository.getRepositoryData()); // write to and read from a index file with random repository data - repositoryData = generateRandomRepoData(); - repository.writeIndexGen(repositoryData); - assertThat(repository.getRepositoryData(), equalTo(repositoryData)); + repoData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); + repository.writeIndexGen(repoData, repoData.getGenId()); + assertEquals(repoData, repository.getRepositoryData()); } public void testIndexGenerationalFiles() throws Exception { @@ -132,26 +130,38 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); - repository.writeIndexGen(repositoryData); + repository.writeIndexGen(repositoryData, repositoryData.getGenId()); assertThat(repository.getRepositoryData(), equalTo(repositoryData)); assertThat(repository.latestIndexBlobId(), equalTo(0L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); // adding more and writing to a new index generational file - repositoryData = generateRandomRepoData(); - repository.writeIndexGen(repositoryData); + repositoryData = addRandomSnapshotsToRepoData(repository.getRepositoryData(), true); + repository.writeIndexGen(repositoryData, repositoryData.getGenId()); assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(1L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); // removing a snapshot and writing to a new index generational file - repositoryData = repositoryData.removeSnapshot(repositoryData.getSnapshotIds().get(0)); - repository.writeIndexGen(repositoryData); + repositoryData = repository.getRepositoryData().removeSnapshot(repositoryData.getSnapshotIds().get(0)); + repository.writeIndexGen(repositoryData, repositoryData.getGenId()); assertEquals(repository.getRepositoryData(), repositoryData); assertThat(repository.latestIndexBlobId(), equalTo(2L)); assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); } + public void testRepositoryDataConcurrentModificationNotAllowed() throws IOException { + final BlobStoreRepository repository = setupRepo(); + + // write to index generational file + RepositoryData repositoryData = generateRandomRepoData(); + repository.writeIndexGen(repositoryData, repositoryData.getGenId()); + + // write repo data again to index generational file, errors because we already wrote to the + // N+1 generation from which this repository data instance was created + expectThrows(RepositoryException.class, () -> repository.writeIndexGen(repositoryData, repositoryData.getGenId())); + } + private BlobStoreRepository setupRepo() { final Client client = client(); final Path location = ESIntegTestCase.randomRepoPath(node().settings()); @@ -170,4 +180,18 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { return repository; } + private RepositoryData addRandomSnapshotsToRepoData(RepositoryData repoData, boolean inclIndices) { + int numSnapshots = randomIntBetween(1, 20); + for (int i = 0; i < numSnapshots; i++) { + SnapshotId snapshotId = new SnapshotId(randomAsciiOfLength(8), UUIDs.randomBase64UUID()); + int numIndices = inclIndices ? randomIntBetween(0, 20) : 0; + List indexIds = new ArrayList<>(numIndices); + for (int j = 0; j < numIndices; j++) { + indexIds.add(new IndexId(randomAsciiOfLength(8), UUIDs.randomBase64UUID())); + } + repoData = repoData.addSnapshot(snapshotId, indexIds); + } + return repoData; + } + } diff --git a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java new file mode 100644 index 00000000000..d1759d83e34 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -0,0 +1,211 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.snapshots.mockstore.MockRepository; + +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.containsString; + +/** + * Tests for snapshot/restore that require at least 2 threads available + * in the thread pool (for example, tests that use the mock repository that + * block on master). + */ +public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put("thread_pool.snapshot.core", 2) + .put("thread_pool.snapshot.max", 2) + .build(); + } + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(MockRepository.Plugin.class); + } + + public void testConcurrentSnapshotDeletionsNotAllowed() throws Exception { + logger.info("--> creating repository"); + final String repo = "test-repo"; + assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200)).get()); + + logger.info("--> snapshot twice"); + final String index = "test-idx1"; + assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + for (int i = 0; i < 10; i++) { + index(index, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + final String snapshot1 = "test-snap1"; + client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); + final String index2 = "test-idx2"; + assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + for (int i = 0; i < 10; i++) { + index(index2, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + final String snapshot2 = "test-snap2"; + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); + + String blockedNode = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + logger.info("--> start deletion of first snapshot"); + ListenableActionFuture future = + client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); + logger.info("--> waiting for block to kick in on node [{}]", blockedNode); + waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); + + logger.info("--> try deleting the second snapshot, should fail because the first deletion is in progress"); + try { + client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get(); + fail("should not be able to delete snapshots concurrently"); + } catch (ConcurrentSnapshotExecutionException e) { + assertThat(e.getMessage(), containsString("cannot delete - another snapshot is currently being deleted")); + } + + logger.info("--> unblocking blocked node [{}]", blockedNode); + unblockNode(repo, blockedNode); + + logger.info("--> wait until first snapshot is finished"); + assertAcked(future.actionGet()); + + logger.info("--> delete second snapshot, which should now work"); + client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get(); + assertTrue(client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().isEmpty()); + } + + public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception { + logger.info("--> creating repository"); + final String repo = "test-repo"; + assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200)).get()); + + logger.info("--> snapshot"); + final String index = "test-idx"; + assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + for (int i = 0; i < 10; i++) { + index(index, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + final String snapshot1 = "test-snap1"; + client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); + + String blockedNode = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + logger.info("--> start deletion of snapshot"); + ListenableActionFuture future = + client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); + logger.info("--> waiting for block to kick in on node [{}]", blockedNode); + waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); + + logger.info("--> try creating a second snapshot, should fail because the deletion is in progress"); + final String snapshot2 = "test-snap2"; + try { + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); + fail("should not be able to create a snapshot while another is being deleted"); + } catch (ConcurrentSnapshotExecutionException e) { + assertThat(e.getMessage(), containsString("cannot snapshot while a snapshot deletion is in-progress")); + } + + logger.info("--> unblocking blocked node [{}]", blockedNode); + unblockNode(repo, blockedNode); + + logger.info("--> wait until snapshot deletion is finished"); + assertAcked(future.actionGet()); + + logger.info("--> creating second snapshot, which should now work"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); + assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size()); + } + + public void testRestoreWithInProgressDeletionsNotAllowed() throws Exception { + logger.info("--> creating repository"); + final String repo = "test-repo"; + assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings( + Settings.builder() + .put("location", randomRepoPath()) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200)).get()); + + logger.info("--> snapshot"); + final String index = "test-idx"; + assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + for (int i = 0; i < 10; i++) { + index(index, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + final String snapshot1 = "test-snap1"; + client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); + final String index2 = "test-idx2"; + assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0))); + for (int i = 0; i < 10; i++) { + index(index2, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + final String snapshot2 = "test-snap2"; + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get(); + client().admin().indices().prepareClose(index, index2).get(); + + String blockedNode = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); + logger.info("--> start deletion of snapshot"); + ListenableActionFuture future = + client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); + logger.info("--> waiting for block to kick in on node [{}]", blockedNode); + waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); + + logger.info("--> try restoring the other snapshot, should fail because the deletion is in progress"); + try { + client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); + fail("should not be able to restore a snapshot while another is being deleted"); + } catch (ConcurrentSnapshotExecutionException e) { + assertThat(e.getMessage(), containsString("cannot restore a snapshot while a snapshot deletion is in-progress")); + } + + logger.info("--> unblocking blocked node [{}]", blockedNode); + unblockNode(repo, blockedNode); + + logger.info("--> wait until snapshot deletion is finished"); + assertAcked(future.actionGet()); + + logger.info("--> restoring snapshot, which should now work"); + client().admin().cluster().prepareRestoreSnapshot(repo, snapshot1).setWaitForCompletion(true).get(); + assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size()); + } +} diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index dda634023b6..ef6275e2ccd 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2277,6 +2277,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas State.ABORTED, Collections.singletonList(indexId), System.currentTimeMillis(), + repositoryData.getGenId(), shards.build())); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries))).build(); } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yaml index 43db8e5206f..b12b9d09b6f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/snapshot.create/10_basic.yaml @@ -30,3 +30,10 @@ setup: - match: { snapshot.state : SUCCESS } - match: { snapshot.shards.successful: 1 } - match: { snapshot.shards.failed : 0 } + + - do: + snapshot.delete: + repository: test_repo_create_1 + snapshot: test_snapshot + + - match: { acknowledged: true }