Enable Fully Concurrent Snapshot Operations (#56911) (#59578)

Enables fully concurrent snapshot operations:
* Snapshot create- and delete operations can be started in any order
* Delete operations wait for snapshot finalization to finish, are batched as much as possible to improve efficiency and once enqueued in the cluster state prevent new snapshots from starting on data nodes until executed
   * We could be even more concurrent here in a follow-up by interleaving deletes and snapshots on a per-shard level. I decided not to do this for now since it seemed not worth the added complexity yet. Due to batching+deduplicating of deletes the pain of having a delete stuck behind a long -running snapshot seemed manageable (dropped client connections + resulting retries don't cause issues due to deduplication of delete jobs, batching of deletes allows enqueuing more and more deletes even if a snapshot blocks for a long time that will all be executed in essentially constant time (due to bulk snapshot deletion, deleting multiple snapshots is mostly about as fast as deleting a single one))
* Snapshot creation is completely concurrent across shards, but per shard snapshots are linearized for each repository as are snapshot finalizations

See updated JavaDoc and added test cases for more details and illustration on the functionality.

Some notes:

The queuing of snapshot finalizations and deletes and the related locking/synchronization is a little awkward in this version but can be much simplified with some refactoring.  The problem is that snapshot finalizations resolve their listeners on the `SNAPSHOT` pool while deletes resolve the listener on the master update thread. With some refactoring both of these could be moved to the master update thread, effectively removing the need for any synchronization around the `SnapshotService` state. I didn't do this refactoring here because it's a fairly large change and not necessary for the functionality but plan to do so in a follow-up.

This change allows for completely removing any trickery around synchronizing deletes and snapshots from SLM and 100% does away with SLM errors from collisions between deletes and snapshots.

Snapshotting a single index in parallel to a long running full backup will execute without having to wait for the long running backup as required by the ILM/SLM use case of moving indices to "snapshot tier". Finalizations are linearized but ordered according to which snapshot saw all of its shards complete first
This commit is contained in:
Armin Braun 2020-07-15 03:42:31 +02:00 committed by GitHub
parent 06d94cbb2a
commit 2dd086445c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 2967 additions and 579 deletions

View File

@ -285,7 +285,7 @@ class S3Repository extends BlobStoreRepository {
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}

View File

@ -1,155 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import java.util.Collection;
import java.util.Collections;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
/**
* Tests for snapshot/restore that require at least 2 threads available
* in the thread pool (for example, tests that use the mock repository that
* block on master).
*/
public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put("thread_pool.snapshot.core", 2)
.put("thread_pool.snapshot.max", 2)
.build();
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(MockRepository.Plugin.class);
}
public void testConcurrentSnapshotDeletionsNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)).get());
logger.info("--> snapshot twice");
final String index = "test-idx1";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
final String index2 = "test-idx2";
assertAcked(prepareCreate(index2, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index2, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot2 = "test-snap2";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of first snapshot");
ActionFuture<AcknowledgedResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> try deleting the second snapshot, should fail because the first deletion is in progress");
try {
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get();
fail("should not be able to delete snapshots concurrently");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot delete - another snapshot is currently being deleted"));
}
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> wait until first snapshot is finished");
assertAcked(future.actionGet());
logger.info("--> delete second snapshot, which should now work");
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).get();
assertTrue(client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().isEmpty());
}
public void testSnapshottingWithInProgressDeletionNotAllowed() throws Exception {
logger.info("--> creating repository");
final String repo = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)).get());
logger.info("--> snapshot");
final String index = "test-idx";
assertAcked(prepareCreate(index, 1, Settings.builder().put("number_of_shards", 1).put("number_of_replicas", 0)));
for (int i = 0; i < 10; i++) {
index(index, "_doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
final String snapshot1 = "test-snap1";
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ActionFuture<AcknowledgedResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> try creating a second snapshot, should fail because the deletion is in progress");
final String snapshot2 = "test-snap2";
try {
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
fail("should not be able to create a snapshot while another is being deleted");
} catch (ConcurrentSnapshotExecutionException e) {
assertThat(e.getMessage(), containsString("cannot snapshot while a snapshot deletion is in-progress"));
}
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> wait until snapshot deletion is finished");
assertAcked(future.actionGet());
logger.info("--> creating second snapshot, which should now work");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setWaitForCompletion(true).get();
assertEquals(1, client().admin().cluster().prepareGetSnapshots(repo).setSnapshots("_all").get().getSnapshots().size());
}
}

View File

@ -195,6 +195,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
break;
case INIT:
case WAITING:
case QUEUED:
stage = SnapshotIndexShardStage.STARTED;
break;
case SUCCESS:

View File

@ -21,12 +21,13 @@ package org.elasticsearch.cluster;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
@ -34,10 +35,12 @@ import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* A class that represents the snapshot deletions that are in progress in the cluster.
@ -53,6 +56,8 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
private SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = entries;
assert entries.size() == entries.stream().map(Entry::uuid).distinct().count() : "Found duplicate UUIDs in entries " + entries;
assert assertNoConcurrentDeletionsForSameRepository(entries);
}
public static SnapshotDeletionsInProgress of(List<SnapshotDeletionsInProgress.Entry> entries) {
@ -63,7 +68,18 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
}
public SnapshotDeletionsInProgress(StreamInput in) throws IOException {
this.entries = Collections.unmodifiableList(in.readList(Entry::new));
this(in.readList(Entry::new));
}
private static boolean assertNoConcurrentDeletionsForSameRepository(List<Entry> entries) {
final Set<String> activeRepositories = new HashSet<>();
for (Entry entry : entries) {
if (entry.state() == State.STARTED) {
final boolean added = activeRepositories.add(entry.repository());
assert added : "Found multiple running deletes for a single repository in " + entries;
}
}
return true;
}
/**
@ -85,13 +101,20 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
}
/**
* Returns a new instance of {@link SnapshotDeletionsInProgress} which removes
* the given entry from the invoking instance.
* Returns a new instance of {@link SnapshotDeletionsInProgress} that has the entry with the given {@code deleteUUID} removed from its
* entries.
*/
public SnapshotDeletionsInProgress withRemovedEntry(Entry entry) {
List<Entry> entries = new ArrayList<>(getEntries());
entries.remove(entry);
return SnapshotDeletionsInProgress.of(entries);
public SnapshotDeletionsInProgress withRemovedEntry(String deleteUUID) {
List<Entry> updatedEntries = new ArrayList<>(entries.size() - 1);
boolean removed = false;
for (Entry entry : entries) {
if (entry.uuid().equals(deleteUUID)) {
removed = true;
} else {
updatedEntries.add(entry);
}
}
return removed ? SnapshotDeletionsInProgress.of(updatedEntries) : this;
}
/**
@ -185,17 +208,23 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
public static final class Entry implements Writeable, RepositoryOperation {
private final List<SnapshotId> snapshots;
private final String repoName;
private final State state;
private final long startTime;
private final long repositoryStateId;
private final String uuid;
public Entry(List<SnapshotId> snapshots, String repoName, long startTime, long repositoryStateId) {
public Entry(List<SnapshotId> snapshots, String repoName, long startTime, long repositoryStateId, State state) {
this(snapshots, repoName, startTime, repositoryStateId, state, UUIDs.randomBase64UUID());
}
private Entry(List<SnapshotId> snapshots, String repoName, long startTime, long repositoryStateId, State state, String uuid) {
this.snapshots = snapshots;
assert snapshots.size() == new HashSet<>(snapshots).size() : "Duplicate snapshot ids in " + snapshots;
this.repoName = repoName;
this.startTime = startTime;
this.repositoryStateId = repositoryStateId;
assert repositoryStateId > RepositoryData.EMPTY_REPO_GEN :
"Can't delete based on an empty or unknown repository generation but saw [" + repositoryStateId + "]";
this.state = state;
this.uuid = uuid;
}
public Entry(StreamInput in) throws IOException {
@ -209,6 +238,45 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
}
this.startTime = in.readVLong();
this.repositoryStateId = in.readLong();
if (in.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
this.state = State.readFrom(in);
this.uuid = in.readString();
} else {
this.state = State.STARTED;
this.uuid = IndexMetadata.INDEX_UUID_NA_VALUE;
}
}
public Entry started() {
assert state == State.WAITING;
return new Entry(snapshots, repository(), startTime, repositoryStateId, State.STARTED, uuid);
}
public Entry withAddedSnapshots(Collection<SnapshotId> newSnapshots) {
assert state == State.WAITING;
final Collection<SnapshotId> updatedSnapshots = new HashSet<>(snapshots);
if (updatedSnapshots.addAll(newSnapshots) == false) {
return this;
}
return new Entry(Collections.unmodifiableList(new ArrayList<>(updatedSnapshots)), repository(), startTime, repositoryStateId,
State.WAITING, uuid);
}
public Entry withSnapshots(Collection<SnapshotId> snapshots) {
return new Entry(Collections.unmodifiableList(new ArrayList<>(snapshots)), repository(), startTime, repositoryStateId, state,
uuid);
}
public Entry withRepoGen(long repoGen) {
return new Entry(snapshots, repository(), startTime, repoGen, state, uuid);
}
public State state() {
return state;
}
public String uuid() {
return uuid;
}
public List<SnapshotId> getSnapshots() {
@ -234,12 +302,14 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
return repoName.equals(that.repoName)
&& snapshots.equals(that.snapshots)
&& startTime == that.startTime
&& repositoryStateId == that.repositoryStateId;
&& repositoryStateId == that.repositoryStateId
&& state == that.state
&& uuid.equals(that.uuid);
}
@Override
public int hashCode() {
return Objects.hash(snapshots, repoName, startTime, repositoryStateId);
return Objects.hash(snapshots, repoName, startTime, repositoryStateId, state, uuid);
}
@Override
@ -254,6 +324,10 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
}
out.writeVLong(startTime);
out.writeLong(repositoryStateId);
if (out.getVersion().onOrAfter(SnapshotsService.FULL_CONCURRENCY_VERSION)) {
state.writeTo(out);
out.writeString(uuid);
}
}
@Override
@ -265,5 +339,47 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
public long repositoryStateId() {
return repositoryStateId;
}
@Override
public String toString() {
return "SnapshotDeletionsInProgress.Entry[[" + uuid + "][" + state + "]" + snapshots + "]";
}
}
public enum State implements Writeable {
/**
* Delete is waiting to execute because there are snapshots and or a delete operation that has to complete before this delete may
* run.
*/
WAITING((byte) 0),
/**
* Delete is physically executing on the repository.
*/
STARTED((byte) 1);
private final byte value;
State(byte value) {
this.value = value;
}
public static State readFrom(StreamInput in) throws IOException {
final byte value = in.readByte();
switch (value) {
case 0:
return WAITING;
case 1:
return STARTED;
default:
throw new IllegalArgumentException("No snapshot delete state for value [" + value + "]");
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(value);
}
}
}

View File

@ -41,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -168,6 +169,18 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this(entry, entry.state, shards, entry.failure);
}
public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, newRepoGen, shards, failure,
userMetadata, version);
}
public Entry withShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
failure, userMetadata, version);
}
@Override
public String repository() {
return snapshot.getRepository();
@ -325,7 +338,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
}
public static class ShardSnapshotStatus {
/**
* Shard snapshot status for shards that are waiting for another operation to finish before they can be assigned to a node.
*/
public static final ShardSnapshotStatus UNASSIGNED_QUEUED =
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.QUEUED, null);
private final ShardState state;
@Nullable
private final String nodeId;
@Nullable
@ -338,17 +360,23 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this(nodeId, ShardState.INIT, generation);
}
public ShardSnapshotStatus(String nodeId, ShardState state, String generation) {
public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, @Nullable String generation) {
this(nodeId, state, null, generation);
}
public ShardSnapshotStatus(String nodeId, ShardState state, String reason, String generation) {
public ShardSnapshotStatus(@Nullable String nodeId, ShardState state, String reason, @Nullable String generation) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
this.generation = generation;
assert assertConsistent();
}
private boolean assertConsistent() {
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
assert (state != ShardState.INIT && state != ShardState.WAITING) || nodeId != null : "Null node id for state [" + state + "]";
return true;
}
public ShardSnapshotStatus(StreamInput in) throws IOException {
@ -366,10 +394,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return state;
}
@Nullable
public String nodeId() {
return nodeId;
}
@Nullable
public String generation() {
return this.generation;
}
@ -378,6 +408,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return reason;
}
/**
* Checks if this shard snapshot is actively executing.
* A shard is defined as actively executing if it either is in a state that may write to the repository
* ({@link ShardState#INIT} or {@link ShardState#ABORTED}) or about to write to it in state {@link ShardState#WAITING}.
*/
public boolean isActive() {
return state == ShardState.INIT || state == ShardState.ABORTED || state == ShardState.WAITING;
}
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
@ -455,6 +494,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final List<Entry> entries;
private static boolean assertConsistentEntries(List<Entry> entries) {
final Map<String, Set<ShardId>> assignedShardsByRepo = new HashMap<>();
for (Entry entry : entries) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
if (shard.value.isActive()) {
assert assignedShardsByRepo.computeIfAbsent(entry.repository(), k -> new HashSet<>()).add(shard.key) :
"Found duplicate shard assignments in " + entries;
}
}
}
return true;
}
public static SnapshotsInProgress of(List<Entry> entries) {
if (entries.isEmpty()) {
return EMPTY;
@ -464,6 +516,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
assert assertConsistentEntries(entries);
}
public List<Entry> entries() {
@ -638,7 +691,14 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
FAILED((byte) 3, true, true),
ABORTED((byte) 4, false, true),
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);
/**
* Shard snapshot is waiting for the primary to snapshot to become available.
*/
WAITING((byte) 6, false, false),
/**
* Shard snapshot is waiting for another shard snapshot for the same shard and to the same repository to finish.
*/
QUEUED((byte) 7, false, false);
private final byte value;
@ -674,6 +734,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return MISSING;
case 6:
return WAITING;
case 7:
return QUEUED;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}

View File

@ -116,6 +116,7 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ProxyConnectionStrategy;
import org.elasticsearch.transport.RemoteClusterService;
@ -553,6 +554,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
HandshakingTransportAddressConnector.PROBE_HANDSHAKE_TIMEOUT_SETTING,
DiscoveryUpgradeService.BWC_PING_TIMEOUT_SETTING,
DiscoveryUpgradeService.ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING,
SnapshotsService.MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING,
FsHealthService.ENABLED_SETTING,
FsHealthService.REFRESH_INTERVAL_SETTING,
FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING,

View File

@ -92,7 +92,7 @@ public class FilterRepository implements Repository {
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
}

View File

@ -154,7 +154,7 @@ public interface Repository extends LifecycleComponent {
* @param listener completion listener
*/
void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener);
ActionListener<RepositoryData> listener);
/**
* Returns snapshot throttle time in nanoseconds
*/

View File

@ -409,14 +409,18 @@ public final class RepositoryData {
/**
* Resolve the given index names to index ids, creating new index ids for
* new indices in the repository.
*
* @param indicesToResolve names of indices to resolve
* @param inFlightIds name to index mapping for currently in-flight snapshots not yet in the repository data to fall back to
*/
public List<IndexId> resolveNewIndices(final List<String> indicesToResolve) {
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
final IndexId indexId;
if (indices.containsKey(index)) {
indexId = indices.get(index);
} else {
IndexId indexId = indices.get(index);
if (indexId == null) {
indexId = inFlightIds.get(index);
}
if (indexId == null) {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
}
snapshotIndices.add(indexId);

View File

@ -439,7 +439,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private long bestGeneration(Collection<? extends RepositoryOperation> operations) {
final String repoName = metadata.name();
assert operations.size() <= 1 : "Assumed one or no operations but received " + operations;
return operations.stream().filter(e -> e.repository().equals(repoName)).mapToLong(RepositoryOperation::repositoryStateId)
.max().orElse(RepositoryData.EMPTY_REPO_GEN);
}
@ -542,6 +541,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return metadata;
}
@Override
public RepositoryStats stats() {
final BlobStore store = blobStore.get();
if (store == null) {
@ -567,7 +567,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
@ -641,7 +641,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Map<String, BlobContainer> foundIndices,
Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, Version repoMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
@ -662,13 +662,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(),
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(),
afterCleanupsListener);
@ -676,11 +676,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> {
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(newRepoData)), 2);
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
@ -1594,10 +1594,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
"Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() +
"] after write to generation [" + newGen + "]");
}
return stateFilter.apply(ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
.putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).withUpdatedGeneration(
metadata.name(), newGen, newGen))).build());
return updateRepositoryGenerationsIfNecessary(stateFilter.apply(ClusterState.builder(currentState)
.metadata(Metadata.builder(currentState.getMetadata()).putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE)
.withUpdatedGeneration(metadata.name(), newGen, newGen))).build()), expectedGen, newGen);
}
@Override
@ -1662,6 +1662,45 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return true;
}
/**
* Updates the repository generation that running deletes and snapshot finalizations will be based on for this repository if any such
* operations are found in the cluster state while setting the safe repository generation.
*
* @param state cluster state to update
* @param oldGen previous safe repository generation
* @param newGen new safe repository generation
* @return updated cluster state
*/
private ClusterState updateRepositoryGenerationsIfNecessary(ClusterState state, long oldGen, long newGen) {
final String repoName = metadata.name();
final SnapshotsInProgress updatedSnapshotsInProgress;
boolean changedSnapshots = false;
final List<SnapshotsInProgress.Entry> snapshotEntries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
snapshotEntries.add(entry.withRepoGen(newGen));
changedSnapshots = true;
} else {
snapshotEntries.add(entry);
}
}
updatedSnapshotsInProgress = changedSnapshots ? SnapshotsInProgress.of(snapshotEntries) : null;
final SnapshotDeletionsInProgress updatedDeletionsInProgress;
boolean changedDeletions = false;
final List<SnapshotDeletionsInProgress.Entry> deletionEntries = new ArrayList<>();
for (SnapshotDeletionsInProgress.Entry entry :
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries()) {
if (entry.repository().equals(repoName) && entry.repositoryStateId() == oldGen) {
deletionEntries.add(entry.withRepoGen(newGen));
changedDeletions = true;
} else {
deletionEntries.add(entry);
}
}
updatedDeletionsInProgress = changedDeletions ? SnapshotDeletionsInProgress.of(deletionEntries) : null;
return SnapshotsService.updateWithSnapshots(state, updatedSnapshotsInProgress, updatedDeletionsInProgress);
}
private RepositoryMetadata getRepoMetadata(ClusterState state) {
final RepositoryMetadata repositoryMetadata =
state.getMetadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).repository(metadata.name());
@ -1744,7 +1783,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {
final String generation = ShardGenerations.fixShardGeneration(snapshotStatus.generation());
final String generation = snapshotStatus.generation();
logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
final BlobContainer shardContainer = shardContainer(indexId, shardId);
final Set<String> blobs;
@ -2223,8 +2262,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnapshots(Set<String> blobs,
BlobContainer shardContainer,
@Nullable String generation) throws IOException {
assert ShardGenerations.fixShardGeneration(generation) == generation
: "Generation must not be numeric but received [" + generation + "]";
if (generation != null) {
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);

View File

@ -53,6 +53,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestDeduplicator;
@ -251,8 +252,10 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
assert SnapshotsService.useShardGenerations(entry.version()) || snapshotStatus.generation() == null :
"Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility";
assert SnapshotsService.useShardGenerations(entry.version()) ||
ShardGenerations.fixShardGeneration(snapshotStatus.generation()) == null :
"Found non-null, non-numeric shard generation [" + snapshotStatus.generation() +
"] for snapshot with old-format compatibility";
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(),
new ActionListener<String>() {
@Override

View File

@ -98,5 +98,42 @@
* <li>After the deletion of the snapshot's data from the repository finishes, the {@code SnapshotsService} will submit a cluster state
* update to remove the deletion's entry in {@code SnapshotDeletionsInProgress} which concludes the process of deleting a snapshot.</li>
* </ol>
*
* <h2>Concurrent Snapshot Operations</h2>
*
* Snapshot create and delete operations may be started concurrently. Operations targeting different repositories run independently of
* each other. Multiple operations targeting the same repository are executed according to the following rules:
*
* <h3>Concurrent Snapshot Creation</h3>
*
* If multiple snapshot creation jobs are started at the same time, the data-node operations of multiple snapshots may run in parallel
* across different shards. If multiple snapshots want to snapshot a certain shard, then the shard snapshots for that shard will be
* executed one by one. This is enforced by the master node setting the shard's snapshot state to
* {@link org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus#UNASSIGNED_QUEUED} for all but one snapshot. The order of
* operations on a single shard is given by the order in which the snapshots were started.
* As soon as all shards for a given snapshot have finished, it will be finalized as explained above. Finalization will happen one snapshot
* at a time, working in the order in which snapshots had their shards completed.
*
* <h3>Concurrent Snapshot Deletes</h3>
*
* A snapshot delete will be executed as soon as there are no more shard snapshots or snapshot finalizations executing running for a given
* repository. Before a delete is executed on the repository it will be set to state
* {@link org.elasticsearch.cluster.SnapshotDeletionsInProgress.State#STARTED}. If it cannot be executed when it is received it will be
* set to state {@link org.elasticsearch.cluster.SnapshotDeletionsInProgress.State#WAITING} initially.
* If a delete is received for a given repository while there is already an ongoing delete for the same repository, there are two possible
* scenarios:
* 1. If the delete is in state {@code META_DATA} (i.e. already running on the repository) then the new delete will be added in state
* {@code WAITING} and will be executed after the current delete. The only exception here would be the case where the new delete covers
* the exact same snapshots as the already running delete. In this case no new delete operation is added and second delete request will
* simply wait for the existing delete to return.
* 2. If the existing delete is in state {@code WAITING} then the existing
* {@link org.elasticsearch.cluster.SnapshotDeletionsInProgress.Entry} in the cluster state will be updated to cover both the snapshots
* in the existing delete as well as additional snapshots that may be found in the second delete request.
*
* In either of the above scenarios, in-progress snapshots will be aborted in the same cluster state update that adds a delete to the
* cluster state, if a delete applies to them.
*
* If a snapshot request is received while there already is a delete in the cluster state for the same repository, that snapshot will not
* start doing any shard snapshots until the delete has been executed.
*/
package org.elasticsearch.snapshots;

View File

@ -121,10 +121,10 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
ClusterState.Builder builder = ClusterState.builder(ClusterState.EMPTY_STATE)
.putCustom(SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.of(Collections.singletonList(
new SnapshotDeletionsInProgress.Entry(
Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())), "repo1",
randomNonNegativeLong(), randomNonNegativeLong()))));
SnapshotDeletionsInProgress.of(Collections.singletonList(
new SnapshotDeletionsInProgress.Entry(
Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())), "repo1",
randomNonNegativeLong(), randomNonNegativeLong(), SnapshotDeletionsInProgress.State.STARTED))));
if (includeRestore) {
builder.putCustom(RestoreInProgress.TYPE,
new RestoreInProgress.Builder().add(

View File

@ -121,7 +121,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
return nodes;
}
static final Settings DEFAULT_SETTINGS = Settings.builder()
public static final Settings DEFAULT_SETTINGS = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "5s") // for hitting simulated network failures quickly
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1) // for hitting simulated network failures quickly
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "5s") // for hitting simulated network failures quickly

View File

@ -172,7 +172,7 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
listener.onResponse(null);
}

View File

@ -212,6 +212,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
@ -226,7 +227,6 @@ import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -554,31 +554,19 @@ public class SnapshotResiliencyTests extends ESTestCase {
createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener));
final StepListener<Boolean> deleteSnapshotStepListener = new StepListener<>();
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
continueOrDie(createOtherSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), ActionListener.wrap(
resp -> deleteSnapshotStepListener.onResponse(true),
e -> {
final Throwable unwrapped =
ExceptionsHelper.unwrap(e, ConcurrentSnapshotExecutionException.class);
assertThat(unwrapped, instanceOf(ConcurrentSnapshotExecutionException.class));
deleteSnapshotStepListener.onResponse(false);
})));
createSnapshotResponse -> client().admin().cluster().prepareDeleteSnapshot(
repoName, snapshotName).execute(deleteSnapshotStepListener));
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
continueOrDie(deleteSnapshotStepListener, deleted -> {
if (deleted) {
// The delete worked out, creating a third snapshot
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.execute(createAnotherSnapshotResponseStepListener);
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
} else {
createAnotherSnapshotResponseStepListener.onResponse(null);
}
});
deterministicTaskQueue.runAllRunnableTasks();
@ -616,11 +604,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
final int inProgressSnapshots = randomIntBetween(1, 5);
final StepListener<Collection<CreateSnapshotResponse>> createOtherSnapshotResponseStepListener = new StepListener<>();
final ActionListener<CreateSnapshotResponse> createSnapshotListener =
new GroupedActionListener<>(createOtherSnapshotResponseStepListener, inProgressSnapshots);
continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener));
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
for (int i = 0; i < inProgressSnapshots; i++) {
client().admin().cluster().prepareCreateSnapshot(repoName, "other-" + i).execute(createSnapshotListener);
}
});
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
@ -1012,6 +1005,66 @@ public class SnapshotResiliencyTests extends ESTestCase {
assertEquals(0, snapshotInfo.failedShards());
}
public void testRunConcurrentSnapshots() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
final String repoName = "repo";
final List<String> snapshotNames = IntStream.range(1, randomIntBetween(2, 4))
.mapToObj(i -> "snapshot-" + i).collect(Collectors.toList());
final String index = "test";
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(1, 100);
final TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<Collection<CreateSnapshotResponse>> allSnapshotsListener = new StepListener<>();
final ActionListener<CreateSnapshotResponse> snapshotListener =
new GroupedActionListener<>(allSnapshotsListener, snapshotNames.size());
final AtomicBoolean doneIndexing = new AtomicBoolean(false);
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
for (String snapshotName : snapshotNames) {
scheduleNow(() -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(snapshotListener));
}
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < documents; ++i) {
bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
}
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
client().bulk(bulkRequest, bulkResponseStepListener);
continueOrDie(bulkResponseStepListener, bulkResponse -> {
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
assertEquals(documents, bulkResponse.getItems().length);
doneIndexing.set(true);
});
});
final AtomicBoolean doneSnapshotting = new AtomicBoolean(false);
continueOrDie(allSnapshotsListener, createSnapshotResponses -> {
for (CreateSnapshotResponse createSnapshotResponse : createSnapshotResponses) {
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
}
doneSnapshotting.set(true);
});
runUntil(() -> doneIndexing.get() && doneSnapshotting.get(), TimeUnit.MINUTES.toMillis(5L));
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
assertThat(snapshotIds, hasSize(snapshotNames.size()));
for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}
}
private RepositoryData getRepositoryData(Repository repository) {
final PlainActionFuture<RepositoryData> res = PlainActionFuture.newFuture();
repository.getRepositoryData(res);

View File

@ -110,7 +110,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
listener.onResponse(null);
}

View File

@ -55,8 +55,8 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import java.io.IOException;
@ -232,6 +232,12 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
return masterName;
}
public static void blockMasterFromDeletingIndexNFile(String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockOnDeleteIndexFile();
}
public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
@ -249,6 +255,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
return null;
}
public static void blockNodeOnAnyFiles(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository)).setBlockOnAnyFiles(true);
}
public static void blockDataNode(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository)).blockOnDataFiles(true);
@ -280,7 +291,8 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
assertTrue("No repository is blocked waiting on a data node", blocked);
}
public static void unblockNode(final String repository, final String node) {
public void unblockNode(final String repository, final String node) {
logger.info("--> unblocking [{}] on node [{}]", repository, node);
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repository)).unblock();
}
@ -416,7 +428,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
}
private void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());

View File

@ -42,6 +42,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.fs.FsRepository;
import java.io.IOException;
@ -104,10 +105,12 @@ public class MockRepository extends FsRepository {
private final Environment env;
private volatile boolean blockOnControlFiles;
private volatile boolean blockOnAnyFiles;
private volatile boolean blockOnDataFiles;
private volatile boolean blockOnDeleteIndexN;
/** Allows blocking on writing the index-N blob; this is a way to enforce blocking the
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
private volatile boolean blockOnWriteIndexFile;
@ -125,7 +128,7 @@ public class MockRepository extends FsRepository {
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
useLuceneCorruptionException = metadata.settings().getAsBoolean("use_lucene_corruption", false);
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnAnyFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
randomPrefix = metadata.settings().get("random", "default");
@ -171,9 +174,10 @@ public class MockRepository extends FsRepository {
blocked = false;
// Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false;
blockOnControlFiles = false;
blockOnAnyFiles = false;
blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false;
blockOnDeleteIndexN = false;
this.notifyAll();
}
@ -181,6 +185,10 @@ public class MockRepository extends FsRepository {
blockOnDataFiles = blocked;
}
public void setBlockOnAnyFiles(boolean blocked) {
blockOnAnyFiles = blocked;
}
public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
blockAndFailOnWriteSnapFile = blocked;
}
@ -189,6 +197,10 @@ public class MockRepository extends FsRepository {
blockOnWriteIndexFile = blocked;
}
public void setBlockOnDeleteIndexFile() {
blockOnDeleteIndexN = true;
}
public boolean blocked() {
return blocked;
}
@ -197,8 +209,8 @@ public class MockRepository extends FsRepository {
logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnControlFiles || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile) {
while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN) {
blocked = true;
this.wait();
wasBlocked = true;
@ -275,7 +287,7 @@ public class MockRepository extends FsRepository {
if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
throw new IOException("Random IOException");
} else if (blockOnControlFiles) {
} else if (blockOnAnyFiles) {
blockExecutionAndMaybeWait(blobName);
} else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
blockExecutionAndFail(blobName);
@ -339,6 +351,15 @@ public class MockRepository extends FsRepository {
return deleteResult.add(deleteBlobCount, deleteByteCount);
}
@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
if (blockOnDeleteIndexN && blobNames.stream().anyMatch(
name -> name.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX))) {
blockExecutionAndMaybeWait("index-{N}");
}
super.deleteBlobsIgnoringIfNotExists(blobNames);
}
@Override
public Map<String, BlobMetadata> listBlobs() throws IOException {
maybeIOExceptionOrBlock("");

View File

@ -275,7 +275,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public void deleteSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
ActionListener<RepositoryData> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

View File

@ -355,7 +355,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
SnapshotDeletionsInProgress delInProgress = SnapshotDeletionsInProgress.of(
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(
Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository(), 0, 0)));
Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository(), 0, 0,
SnapshotDeletionsInProgress.State.STARTED)));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotDeletionsInProgress.TYPE, delInProgress)
.build();