This fixes a gap in testing and a bug that can occur in various forms: When we would start a snapshot or clone related to a shard that was done snapshotting/cloning but its overall operation was not yet finalized at the time of starting the operation, we would base the operation off of the wrong generation. This would not cause a corrupted repo, but would cause the operation to be `PARTIAL`. This commit fixes the state machine to take into account the correct generation in this case. Closes #63498
This commit is contained in:
parent
845ccc2264
commit
f70391c6cc
|
@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexStat
|
|||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
|
@ -469,6 +470,104 @@ public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
|
|||
"] because its snapshot was not successful."));
|
||||
}
|
||||
|
||||
public void testStartSnapshotWithSuccessfulShardClonePendingFinalization() throws Exception {
|
||||
final String masterName = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock");
|
||||
|
||||
final String indexName = "test-idx";
|
||||
createIndexWithContent(indexName);
|
||||
|
||||
final String sourceSnapshot = "source-snapshot";
|
||||
createFullSnapshot(repoName, sourceSnapshot);
|
||||
|
||||
blockMasterOnWriteIndexFile(repoName);
|
||||
final String cloneName = "clone-blocked";
|
||||
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
|
||||
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
blockNodeOnAnyFiles(repoName, dataNode);
|
||||
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
|
||||
awaitNumberOfSnapshotsInProgress(2);
|
||||
assertFalse(blockedClone.isDone());
|
||||
unblockNode(repoName, masterName);
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
awaitMasterFinishRepoOperations();
|
||||
unblockNode(repoName, dataNode);
|
||||
assertAcked(blockedClone.get());
|
||||
assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
|
||||
assertSuccessful(otherSnapshot);
|
||||
}
|
||||
|
||||
public void testStartCloneWithSuccessfulShardClonePendingFinalization() throws Exception {
|
||||
final String masterName = internalCluster().startMasterOnlyNode();
|
||||
internalCluster().startDataOnlyNode();
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock");
|
||||
|
||||
final String indexName = "test-idx";
|
||||
createIndexWithContent(indexName);
|
||||
|
||||
final String sourceSnapshot = "source-snapshot";
|
||||
createFullSnapshot(repoName, sourceSnapshot);
|
||||
|
||||
blockMasterOnWriteIndexFile(repoName);
|
||||
final String cloneName = "clone-blocked";
|
||||
final ActionFuture<AcknowledgedResponse> blockedClone = startClone(repoName, sourceSnapshot, cloneName, indexName);
|
||||
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
final String otherCloneName = "other-clone";
|
||||
final ActionFuture<AcknowledgedResponse> otherClone = startClone(repoName, sourceSnapshot, otherCloneName, indexName);
|
||||
awaitNumberOfSnapshotsInProgress(2);
|
||||
assertFalse(blockedClone.isDone());
|
||||
unblockNode(repoName, masterName);
|
||||
awaitNoMoreRunningOperations(masterName);
|
||||
awaitMasterFinishRepoOperations();
|
||||
assertAcked(blockedClone.get());
|
||||
assertAcked(otherClone.get());
|
||||
assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
|
||||
assertEquals(getSnapshot(repoName, otherCloneName).state(), SnapshotState.SUCCESS);
|
||||
}
|
||||
|
||||
public void testStartCloneWithSuccessfulShardSnapshotPendingFinalization() throws Exception {
|
||||
final String masterName = internalCluster().startMasterOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
|
||||
internalCluster().startDataOnlyNode();
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock");
|
||||
|
||||
final String indexName = "test-idx";
|
||||
createIndexWithContent(indexName);
|
||||
|
||||
final String sourceSnapshot = "source-snapshot";
|
||||
createFullSnapshot(repoName, sourceSnapshot);
|
||||
|
||||
blockMasterOnWriteIndexFile(repoName);
|
||||
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
|
||||
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
final String cloneName = "clone";
|
||||
final ActionFuture<AcknowledgedResponse> clone = startClone(repoName, sourceSnapshot, cloneName, indexName);
|
||||
logger.info("--> wait for clone to start fully with shards assigned in the cluster state");
|
||||
try {
|
||||
awaitClusterState(clusterState -> {
|
||||
final List<SnapshotsInProgress.Entry> entries =
|
||||
clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries();
|
||||
return entries.size() == 2 && entries.get(1).clones().isEmpty() == false;
|
||||
});
|
||||
assertFalse(blockedSnapshot.isDone());
|
||||
} finally {
|
||||
unblockNode(repoName, masterName);
|
||||
}
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
|
||||
awaitMasterFinishRepoOperations();
|
||||
|
||||
assertSuccessful(blockedSnapshot);
|
||||
assertAcked(clone.get());
|
||||
assertEquals(getSnapshot(repoName, cloneName).state(), SnapshotState.SUCCESS);
|
||||
}
|
||||
|
||||
private ActionFuture<AcknowledgedResponse> startCloneFromDataNode(String repoName, String sourceSnapshot, String targetSnapshot,
|
||||
String... indices) {
|
||||
return startClone(dataNodeClient(), repoName, sourceSnapshot, targetSnapshot, indices);
|
||||
|
|
|
@ -435,7 +435,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDeleteFromNonMasterClient(repoName, firstSnapshot);
|
||||
awaitNDeletionsInProgress(1);
|
||||
|
||||
blockDataNode(repoName, dataNode2);
|
||||
blockNodeOnAnyFiles(repoName, dataNode2);
|
||||
final ActionFuture<CreateSnapshotResponse> snapshotThreeFuture = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
|
||||
waitForBlock(dataNode2, repoName, TimeValue.timeValueSeconds(30L));
|
||||
|
||||
|
@ -1248,6 +1248,33 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
assertThat(getRepositoryData(repoName).getGenId(), is(0L));
|
||||
}
|
||||
|
||||
public void testStartWithSuccessfulShardSnapshotPendingFinalization() throws Exception {
|
||||
final String masterName = internalCluster().startMasterOnlyNode();
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock");
|
||||
|
||||
createIndexWithContent("test-idx");
|
||||
createFullSnapshot(repoName, "first-snapshot");
|
||||
|
||||
blockMasterOnWriteIndexFile(repoName);
|
||||
final ActionFuture<CreateSnapshotResponse> blockedSnapshot = startFullSnapshot(repoName, "snap-blocked");
|
||||
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
blockNodeOnAnyFiles(repoName, dataNode);
|
||||
final ActionFuture<CreateSnapshotResponse> otherSnapshot = startFullSnapshot(repoName, "other-snapshot");
|
||||
awaitNumberOfSnapshotsInProgress(2);
|
||||
assertFalse(blockedSnapshot.isDone());
|
||||
unblockNode(repoName, masterName);
|
||||
awaitNumberOfSnapshotsInProgress(1);
|
||||
|
||||
awaitMasterFinishRepoOperations();
|
||||
|
||||
unblockNode(repoName, dataNode);
|
||||
assertSuccessful(blockedSnapshot);
|
||||
assertSuccessful(otherSnapshot);
|
||||
}
|
||||
|
||||
private static String startDataNodeWithLargeSnapshotPool() {
|
||||
return internalCluster().startDataOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoryShardId;
|
||||
import org.elasticsearch.repositories.RepositoryOperation;
|
||||
import org.elasticsearch.snapshots.InFlightShardSnapshotStates;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
@ -743,6 +744,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
}
|
||||
}
|
||||
}
|
||||
for (String repoName : assignedShardsByRepo.keySet()) {
|
||||
// make sure in-flight-shard-states can be built cleanly for the entries without tripping assertions
|
||||
InFlightShardSnapshotStates.forRepo(repoName, entries);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.repositories.IndexId;
|
||||
import org.elasticsearch.repositories.RepositoryShardId;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Holds information about currently in-flight shard level snapshot or clone operations on a per-shard level.
|
||||
* Concretely, this means information on which shards are actively being written to in the repository currently
|
||||
* as well as the latest written shard generation per shard in case there is a shard generation for a shard that has
|
||||
* been cleanly written out to the repository but not yet made part of the current {@link org.elasticsearch.repositories.RepositoryData}
|
||||
* through a snapshot finalization.
|
||||
*/
|
||||
public final class InFlightShardSnapshotStates {
|
||||
|
||||
/**
|
||||
* Compute information about all shard ids that currently have in-flight state for the given repository.
|
||||
*
|
||||
* @param repoName repository name
|
||||
* @param snapshots snapshots in progress
|
||||
* @return in flight shard states for all snapshot operation running for the given repository name
|
||||
*/
|
||||
public static InFlightShardSnapshotStates forRepo(String repoName, List<SnapshotsInProgress.Entry> snapshots) {
|
||||
final Map<String, Map<Integer, String>> generations = new HashMap<>();
|
||||
final Map<String, Set<Integer>> busyIds = new HashMap<>();
|
||||
for (SnapshotsInProgress.Entry runningSnapshot : snapshots) {
|
||||
if (runningSnapshot.repository().equals(repoName) == false) {
|
||||
continue;
|
||||
}
|
||||
if (runningSnapshot.isClone()) {
|
||||
for (ObjectObjectCursor<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> clone : runningSnapshot.clones()) {
|
||||
final RepositoryShardId repoShardId = clone.key;
|
||||
addStateInformation(generations, busyIds, clone.value, repoShardId.shardId(), repoShardId.indexName());
|
||||
}
|
||||
} else {
|
||||
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : runningSnapshot.shards()) {
|
||||
final ShardId sid = shard.key;
|
||||
addStateInformation(generations, busyIds, shard.value, sid.id(), sid.getIndexName());
|
||||
}
|
||||
}
|
||||
}
|
||||
return new InFlightShardSnapshotStates(generations, busyIds);
|
||||
}
|
||||
|
||||
private static void addStateInformation(Map<String, Map<Integer, String>> generations, Map<String, Set<Integer>> busyIds,
|
||||
SnapshotsInProgress.ShardSnapshotStatus shardState, int shardId, String indexName) {
|
||||
if (shardState.isActive()) {
|
||||
busyIds.computeIfAbsent(indexName, k -> new HashSet<>()).add(shardId);
|
||||
assert assertGenerationConsistency(generations, indexName, shardId, shardState.generation());
|
||||
} else if (shardState.state() == SnapshotsInProgress.ShardState.SUCCESS) {
|
||||
assert busyIds.getOrDefault(indexName, Collections.emptySet()).contains(shardId) == false :
|
||||
"Can't have a successful operation queued after an in-progress operation";
|
||||
generations.computeIfAbsent(indexName, k -> new HashMap<>()).put(shardId, shardState.generation());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Map that maps index name to a nested map of shard id to most recent successful shard generation for that
|
||||
* shard id.
|
||||
*/
|
||||
private final Map<String, Map<Integer, String>> generations;
|
||||
|
||||
/**
|
||||
* Map of index name to a set of shard ids that currently are actively executing an operation on the repository.
|
||||
*/
|
||||
private final Map<String, Set<Integer>> activeShardIds;
|
||||
|
||||
|
||||
private InFlightShardSnapshotStates(Map<String, Map<Integer, String>> generations, Map<String, Set<Integer>> activeShardIds) {
|
||||
this.generations = generations;
|
||||
this.activeShardIds = activeShardIds;
|
||||
}
|
||||
|
||||
private static boolean assertGenerationConsistency(Map<String, Map<Integer, String>> generations, String indexName,
|
||||
int shardId, String activeGeneration) {
|
||||
final String bestGeneration = generations.getOrDefault(indexName, Collections.emptyMap()).get(shardId);
|
||||
assert bestGeneration == null || activeGeneration.equals(bestGeneration);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a given shard currently has an actively executing shard operation.
|
||||
*
|
||||
* @param indexName name of the shard's index
|
||||
* @param shardId shard id of the shard
|
||||
* @return true if shard has an actively executing shard operation
|
||||
*/
|
||||
boolean isActive(String indexName, int shardId) {
|
||||
return activeShardIds.getOrDefault(indexName, Collections.emptySet()).contains(shardId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine the current generation for a shard by first checking if any in-flight but successful new shard
|
||||
* snapshots or clones have set a relevant generation and then falling back to {@link ShardGenerations#getShardGen}
|
||||
* if not.
|
||||
*
|
||||
* @param indexId index id of the shard
|
||||
* @param shardId shard id of the shard
|
||||
* @param shardGenerations current shard generations in the repository data
|
||||
* @return most recent shard generation for the given shard
|
||||
*/
|
||||
@Nullable
|
||||
String generationForShard(IndexId indexId, int shardId, ShardGenerations shardGenerations) {
|
||||
final String inFlightBest = generations.getOrDefault(indexId.getName(), Collections.emptyMap()).get(shardId);
|
||||
if (inFlightBest != null) {
|
||||
return inFlightBest;
|
||||
}
|
||||
return shardGenerations.getShardGen(indexId, shardId);
|
||||
}
|
||||
}
|
|
@ -635,26 +635,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
boolean changed = false;
|
||||
final String localNodeId = currentState.nodes().getLocalNodeId();
|
||||
final String repoName = cloneEntry.repository();
|
||||
final Map<String, IndexId> indexIds = getInFlightIndexIds(updatedEntries, repoName);
|
||||
final ShardGenerations shardGenerations = repoData.shardGenerations();
|
||||
for (int i = 0; i < updatedEntries.size(); i++) {
|
||||
if (cloneEntry.equals(updatedEntries.get(i))) {
|
||||
if (cloneEntry.snapshot().equals(updatedEntries.get(i).snapshot())) {
|
||||
final ImmutableOpenMap.Builder<RepositoryShardId, ShardSnapshotStatus> clonesBuilder =
|
||||
ImmutableOpenMap.builder();
|
||||
// TODO: could be optimized by just dealing with repo shard id directly
|
||||
final Set<RepositoryShardId> busyShardsInRepo =
|
||||
busyShardsForRepo(repoName, snapshotsInProgress, currentState.metadata())
|
||||
.stream()
|
||||
.map(shardId -> new RepositoryShardId(indexIds.get(shardId.getIndexName()), shardId.getId()))
|
||||
.collect(Collectors.toSet());
|
||||
final InFlightShardSnapshotStates inFlightShardStates =
|
||||
InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
|
||||
for (Tuple<IndexId, Integer> count : counts) {
|
||||
for (int shardId = 0; shardId < count.v2(); shardId++) {
|
||||
final RepositoryShardId repoShardId = new RepositoryShardId(count.v1(), shardId);
|
||||
if (busyShardsInRepo.contains(repoShardId)) {
|
||||
final String indexName = repoShardId.indexName();
|
||||
if (inFlightShardStates.isActive(indexName, shardId)) {
|
||||
clonesBuilder.put(repoShardId, ShardSnapshotStatus.UNASSIGNED_QUEUED);
|
||||
} else {
|
||||
clonesBuilder.put(repoShardId,
|
||||
new ShardSnapshotStatus(localNodeId, shardGenerations.getShardGen(count.v1(), shardId)));
|
||||
clonesBuilder.put(repoShardId, new ShardSnapshotStatus(localNodeId,
|
||||
inFlightShardStates.generationForShard(repoShardId.index(), shardId, shardGenerations)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2617,12 +2613,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @return list of shard to be included into current snapshot
|
||||
*/
|
||||
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(
|
||||
@Nullable SnapshotsInProgress snapshotsInProgress, @Nullable SnapshotDeletionsInProgress deletionsInProgress,
|
||||
SnapshotsInProgress snapshotsInProgress, @Nullable SnapshotDeletionsInProgress deletionsInProgress,
|
||||
Metadata metadata, RoutingTable routingTable, List<IndexId> indices, boolean useShardGenerations,
|
||||
RepositoryData repositoryData, String repoName) {
|
||||
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
|
||||
final Set<ShardId> inProgressShards = busyShardsForRepo(repoName, snapshotsInProgress, metadata);
|
||||
final InFlightShardSnapshotStates inFlightShardStates =
|
||||
InFlightShardSnapshotStates.forRepo(repoName, snapshotsInProgress.entries());
|
||||
final boolean readyToExecute = deletionsInProgress == null || deletionsInProgress.getEntries().stream()
|
||||
.noneMatch(entry -> entry.repository().equals(repoName) && entry.state() == SnapshotDeletionsInProgress.State.STARTED);
|
||||
for (IndexId index : indices) {
|
||||
|
@ -2638,12 +2635,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
final ShardId shardId = indexRoutingTable.shard(i).shardId();
|
||||
final String shardRepoGeneration;
|
||||
if (useShardGenerations) {
|
||||
if (isNewIndex) {
|
||||
final String inFlightGeneration = inFlightShardStates.generationForShard(index, shardId.id(), shardGenerations);
|
||||
if (inFlightGeneration == null && isNewIndex) {
|
||||
assert shardGenerations.getShardGen(index, shardId.getId()) == null
|
||||
: "Found shard generation for new index [" + index + "]";
|
||||
shardRepoGeneration = ShardGenerations.NEW_SHARD_GEN;
|
||||
} else {
|
||||
shardRepoGeneration = shardGenerations.getShardGen(index, shardId.getId());
|
||||
shardRepoGeneration = inFlightGeneration;
|
||||
}
|
||||
} else {
|
||||
shardRepoGeneration = null;
|
||||
|
@ -2654,7 +2652,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
"missing routing table", shardRepoGeneration);
|
||||
} else {
|
||||
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
|
||||
if (readyToExecute == false || inProgressShards.contains(shardId)) {
|
||||
if (readyToExecute == false || inFlightShardStates.isActive(indexName, i)) {
|
||||
shardSnapshotStatus = ShardSnapshotStatus.UNASSIGNED_QUEUED;
|
||||
} else if (primary == null || !primary.assignedToNode()) {
|
||||
shardSnapshotStatus = new ShardSnapshotStatus(null, ShardState.MISSING,
|
||||
|
@ -2679,45 +2677,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return builder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute all shard ids that currently have an actively executing snapshot for the given repository.
|
||||
*
|
||||
* @param repoName repository name
|
||||
* @param snapshots snapshots in progress
|
||||
* @return shard ids that currently have an actively executing shard snapshot on a data node
|
||||
*/
|
||||
private static Set<ShardId> busyShardsForRepo(String repoName, @Nullable SnapshotsInProgress snapshots, Metadata metadata) {
|
||||
final List<SnapshotsInProgress.Entry> runningSnapshots = snapshots == null ? Collections.emptyList() : snapshots.entries();
|
||||
final Set<ShardId> inProgressShards = new HashSet<>();
|
||||
for (SnapshotsInProgress.Entry runningSnapshot : runningSnapshots) {
|
||||
if (runningSnapshot.repository().equals(repoName) == false) {
|
||||
continue;
|
||||
}
|
||||
if (runningSnapshot.isClone()) {
|
||||
for (ObjectObjectCursor<RepositoryShardId, ShardSnapshotStatus> clone : runningSnapshot.clones()) {
|
||||
final ShardSnapshotStatus shardState = clone.value;
|
||||
if (shardState.isActive()) {
|
||||
IndexMetadata indexMeta = metadata.index(clone.key.indexName());
|
||||
final Index index;
|
||||
if (indexMeta == null) {
|
||||
index = new Index(clone.key.indexName(), IndexMetadata.INDEX_UUID_NA_VALUE);
|
||||
} else {
|
||||
index = indexMeta.getIndex();
|
||||
}
|
||||
inProgressShards.add(new ShardId(index, clone.key.shardId()));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : runningSnapshot.shards()) {
|
||||
if (shard.value.isActive()) {
|
||||
inProgressShards.add(shard.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return inProgressShards;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the data streams that are currently being snapshotted (with partial == false) and that are contained in the
|
||||
* indices-to-check set.
|
||||
|
|
|
@ -60,6 +60,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolStats;
|
||||
import org.junit.After;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -194,7 +195,8 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
internalCluster().stopRandomNode(settings -> settings.get("node.name").equals(node));
|
||||
}
|
||||
|
||||
public static void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
|
||||
public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
|
||||
logger.info("--> waiting for [{}] to be blocked on node [{}]", repository, node);
|
||||
long start = System.currentTimeMillis();
|
||||
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
|
||||
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
|
||||
|
@ -236,7 +238,14 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
|
||||
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
|
||||
.repository(repositoryName)).setBlockAndFailOnWriteIndexFile();
|
||||
return masterName;
|
||||
}
|
||||
|
||||
public static String blockMasterOnWriteIndexFile(final String repositoryName) {
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getMasterNodeInstance(RepositoriesService.class)
|
||||
.repository(repositoryName)).setBlockOnWriteIndexFile();
|
||||
return masterName;
|
||||
}
|
||||
|
||||
|
@ -570,4 +579,17 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
assertThat(snapshotInfos, hasSize(1));
|
||||
return snapshotInfos.get(0);
|
||||
}
|
||||
|
||||
protected void awaitMasterFinishRepoOperations() throws Exception {
|
||||
logger.info("--> waiting for master to finish all repo operations on its SNAPSHOT pool");
|
||||
final ThreadPool masterThreadPool = internalCluster().getMasterNodeInstance(ThreadPool.class);
|
||||
assertBusy(() -> {
|
||||
for (ThreadPoolStats.Stats stat : masterThreadPool.stats()) {
|
||||
if (ThreadPool.Names.SNAPSHOT.equals(stat.getName())) {
|
||||
assertEquals(stat.getActive(), 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -111,8 +111,15 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private volatile boolean blockOnDeleteIndexN;
|
||||
|
||||
/** Allows blocking on writing the index-N blob; this is a way to enforce blocking the
|
||||
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
|
||||
/**
|
||||
* Allows blocking on writing the index-N blob and subsequently failing it on unblock.
|
||||
* This is a way to enforce blocking the finalization of a snapshot, while permitting other IO operations to proceed unblocked.
|
||||
*/
|
||||
private volatile boolean blockAndFailOnWriteIndexFile;
|
||||
|
||||
/**
|
||||
* Same as {@link #blockAndFailOnWriteIndexFile} but proceeds without error after unblock.
|
||||
*/
|
||||
private volatile boolean blockOnWriteIndexFile;
|
||||
|
||||
/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
|
||||
|
@ -190,6 +197,7 @@ public class MockRepository extends FsRepository {
|
|||
// Clean blocking flags, so we wouldn't try to block again
|
||||
blockOnDataFiles = false;
|
||||
blockOnAnyFiles = false;
|
||||
blockAndFailOnWriteIndexFile = false;
|
||||
blockOnWriteIndexFile = false;
|
||||
blockAndFailOnWriteSnapFile = false;
|
||||
blockOnDeleteIndexN = false;
|
||||
|
@ -210,8 +218,14 @@ public class MockRepository extends FsRepository {
|
|||
blockAndFailOnWriteSnapFile = blocked;
|
||||
}
|
||||
|
||||
public void setBlockOnWriteIndexFile(boolean blocked) {
|
||||
blockOnWriteIndexFile = blocked;
|
||||
public void setBlockAndFailOnWriteIndexFile() {
|
||||
assert blockOnWriteIndexFile == false : "Either fail or wait after blocking on index-N not both";
|
||||
blockAndFailOnWriteIndexFile = true;
|
||||
}
|
||||
|
||||
public void setBlockOnWriteIndexFile() {
|
||||
assert blockAndFailOnWriteIndexFile == false : "Either fail or wait after blocking on index-N not both";
|
||||
blockOnWriteIndexFile = true;
|
||||
}
|
||||
|
||||
public void setBlockOnDeleteIndexFile() {
|
||||
|
@ -242,7 +256,7 @@ public class MockRepository extends FsRepository {
|
|||
logger.debug("[{}] Blocking execution", metadata.name());
|
||||
boolean wasBlocked = false;
|
||||
try {
|
||||
while (blockOnDataFiles || blockOnAnyFiles || blockOnWriteIndexFile ||
|
||||
while (blockOnDataFiles || blockOnAnyFiles || blockAndFailOnWriteIndexFile || blockOnWriteIndexFile ||
|
||||
blockAndFailOnWriteSnapFile || blockOnDeleteIndexN || blockOnWriteShardLevelMeta || blockOnReadIndexMeta) {
|
||||
blocked = true;
|
||||
this.wait();
|
||||
|
@ -467,8 +481,12 @@ public class MockRepository extends FsRepository {
|
|||
if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {
|
||||
throw new IOException("Random IOException");
|
||||
}
|
||||
if (blobName.startsWith("index-") && blockOnWriteIndexFile) {
|
||||
if (blobName.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX)) {
|
||||
if (blockAndFailOnWriteIndexFile) {
|
||||
blockExecutionAndFail(blobName);
|
||||
} else if (blockOnWriteIndexFile) {
|
||||
blockExecutionAndMaybeWait(blobName);
|
||||
}
|
||||
}
|
||||
if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
|
||||
// Simulate a failure between the write and move operation in FsBlobContainer
|
||||
|
|
Loading…
Reference in New Issue