Fix Concurrent Snapshot Ending And Stabilize Snapshot Finalization (#38368)
* The problem in #38226 is that in some corner cases multiple calls to `endSnapshot` were made concurrently, leading to non-deterministic behavior (`beginSnapshot` was triggering a repository finalization while one that was triggered by a `deleteSnapshot` was already in progress) * Fixed by: * Making all `endSnapshot` calls originate from the cluster state being in a "completed" state (apart from on short-circuit on initializing an empty snapshot). This forced putting the failure string into `SnapshotsInProgress.Entry`. * Adding deduplication logic to `endSnapshot` * Also: * Streamlined the init behavior to work the same way (keep state on the `SnapshotsService` to decide which snapshot entries are stale) * closes #38226
This commit is contained in:
parent
d862453d68
commit
2f6afd290e
|
@ -24,6 +24,7 @@ import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterState.Custom;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -87,9 +88,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
private final ImmutableOpenMap<String, List<ShardId>> waitingIndices;
|
||||
private final long startTime;
|
||||
private final long repositoryStateId;
|
||||
@Nullable private final String failure;
|
||||
|
||||
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
|
||||
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
|
||||
String failure) {
|
||||
this.state = state;
|
||||
this.snapshot = snapshot;
|
||||
this.includeGlobalState = includeGlobalState;
|
||||
|
@ -104,15 +107,26 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
this.waitingIndices = findWaitingIndices(shards);
|
||||
}
|
||||
this.repositoryStateId = repositoryStateId;
|
||||
this.failure = failure;
|
||||
}
|
||||
|
||||
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
|
||||
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null);
|
||||
}
|
||||
|
||||
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
|
||||
entry.repositoryStateId, shards);
|
||||
entry.repositoryStateId, shards, entry.failure);
|
||||
}
|
||||
|
||||
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
|
||||
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
|
||||
entry.repositoryStateId, shards, failure);
|
||||
}
|
||||
|
||||
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
this(entry, entry.state, shards);
|
||||
this(entry, entry.state, shards, entry.failure);
|
||||
}
|
||||
|
||||
public Snapshot snapshot() {
|
||||
|
@ -151,6 +165,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
return repositoryStateId;
|
||||
}
|
||||
|
||||
public String failure() {
|
||||
return failure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -427,6 +445,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
}
|
||||
}
|
||||
long repositoryStateId = in.readLong();
|
||||
final String failure;
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
failure = in.readOptionalString();
|
||||
} else {
|
||||
failure = null;
|
||||
}
|
||||
entries[i] = new Entry(snapshot,
|
||||
includeGlobalState,
|
||||
partial,
|
||||
|
@ -434,7 +458,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
Collections.unmodifiableList(indexBuilder),
|
||||
startTime,
|
||||
repositoryStateId,
|
||||
builder.build());
|
||||
builder.build(),
|
||||
failure);
|
||||
}
|
||||
this.entries = Arrays.asList(entries);
|
||||
}
|
||||
|
@ -463,6 +488,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
}
|
||||
}
|
||||
out.writeLong(entry.repositoryStateId);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeOptionalString(entry.failure);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -591,8 +591,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
// TODO: Add PARTIAL_SUCCESS status?
|
||||
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
|
||||
entries.add(updatedEntry);
|
||||
// Finalize snapshot in the repository
|
||||
snapshotsService.endSnapshot(updatedEntry);
|
||||
}
|
||||
} else {
|
||||
entries.add(entry);
|
||||
|
|
|
@ -83,7 +83,9 @@ import java.util.Set;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||
|
||||
|
@ -121,6 +123,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
private final Map<Snapshot, List<ActionListener<SnapshotInfo>>> snapshotCompletionListeners = new ConcurrentHashMap<>();
|
||||
|
||||
// Set of snapshots that are currently being initialized by this node
|
||||
private final Set<Snapshot> initializingSnapshots = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
// Set of snapshots that are currently being ended by this node
|
||||
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
@Inject
|
||||
public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
RepositoriesService repositoriesService, ThreadPool threadPool) {
|
||||
|
@ -207,7 +215,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
final ArrayList<SnapshotInfo> snapshotList = new ArrayList<>(snapshotSet);
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return Collections.unmodifiableList(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -223,7 +231,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
snapshotList.add(inProgressSnapshot(entry));
|
||||
}
|
||||
CollectionUtil.timSort(snapshotList);
|
||||
return Collections.unmodifiableList(snapshotList);
|
||||
return unmodifiableList(snapshotList);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -280,6 +288,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
System.currentTimeMillis(),
|
||||
repositoryData.getGenId(),
|
||||
null);
|
||||
initializingSnapshots.add(newSnapshot.snapshot());
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
} else {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
|
||||
|
@ -290,6 +299,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
|
||||
if (newSnapshot != null) {
|
||||
initializingSnapshots.remove(newSnapshot.snapshot());
|
||||
}
|
||||
newSnapshot = null;
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -297,7 +309,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
|
||||
if (newSnapshot != null) {
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), listener);
|
||||
final Snapshot current = newSnapshot.snapshot();
|
||||
assert initializingSnapshots.contains(current);
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<Snapshot>() {
|
||||
@Override
|
||||
public void onResponse(final Snapshot snapshot) {
|
||||
initializingSnapshots.remove(snapshot);
|
||||
listener.onResponse(snapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
initializingSnapshots.remove(current);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -370,6 +396,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
assert initializingSnapshots.contains(snapshot.snapshot());
|
||||
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
|
||||
|
||||
MetaData metaData = clusterState.metaData();
|
||||
|
@ -394,9 +421,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
|
||||
|
||||
SnapshotsInProgress.Entry endSnapshot;
|
||||
String failure;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
|
@ -407,9 +431,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
continue;
|
||||
}
|
||||
|
||||
if (entry.state() != State.ABORTED) {
|
||||
// Replace the snapshot that was just intialized
|
||||
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards =
|
||||
if (entry.state() == State.ABORTED) {
|
||||
entries.add(entry);
|
||||
} else {
|
||||
// Replace the snapshot that was just initialized
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
|
||||
shards(currentState, entry.indices());
|
||||
if (!partial) {
|
||||
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
|
||||
|
@ -417,9 +443,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
Set<String> missing = indicesWithMissingShards.v1();
|
||||
Set<String> closed = indicesWithMissingShards.v2();
|
||||
if (missing.isEmpty() == false || closed.isEmpty() == false) {
|
||||
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
|
||||
entries.add(endSnapshot);
|
||||
|
||||
final StringBuilder failureMessage = new StringBuilder();
|
||||
if (missing.isEmpty() == false) {
|
||||
failureMessage.append("Indices don't have primary shards ");
|
||||
|
@ -432,24 +455,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
failureMessage.append("Indices are closed ");
|
||||
failureMessage.append(closed);
|
||||
}
|
||||
failure = failureMessage.toString();
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
|
||||
entries.add(updatedSnapshot);
|
||||
if (completed(shards.values())) {
|
||||
endSnapshot = updatedSnapshot;
|
||||
}
|
||||
} else {
|
||||
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
|
||||
failure = "snapshot was aborted during initialization";
|
||||
endSnapshot = entry;
|
||||
entries.add(endSnapshot);
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
|
||||
}
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries)))
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -477,14 +491,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// completion listener in this method. For the snapshot completion to work properly, the snapshot
|
||||
// should still exist when listener is registered.
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
|
||||
// Now that snapshot completion listener is registered we can end the snapshot if needed
|
||||
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
|
||||
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
|
||||
// go ahead and continue working on this snapshot rather then end here.
|
||||
if (endSnapshot != null) {
|
||||
endSnapshot(endSnapshot, failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -552,7 +558,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
}
|
||||
|
||||
private SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
|
||||
private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
|
||||
return new SnapshotInfo(entry.snapshot().getSnapshotId(),
|
||||
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
entry.startTime(), entry.includeGlobalState());
|
||||
|
@ -610,7 +616,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
builder.add(entry);
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(builder);
|
||||
return unmodifiableList(builder);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -666,7 +672,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return unmodifiableMap(shardStatus);
|
||||
}
|
||||
|
||||
private SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
|
||||
private static SnapshotShardFailure findShardFailure(List<SnapshotShardFailure> shardFailures, ShardId shardId) {
|
||||
for (SnapshotShardFailure shardFailure : shardFailures) {
|
||||
if (shardId.getIndexName().equals(shardFailure.index()) && shardId.getId() == shardFailure.shardId()) {
|
||||
return shardFailure;
|
||||
|
@ -680,15 +686,28 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
try {
|
||||
if (event.localNodeMaster()) {
|
||||
// We don't remove old master when master flips anymore. So, we need to check for change in master
|
||||
if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) {
|
||||
processSnapshotsOnRemovedNodes(event);
|
||||
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshotsInProgress != null) {
|
||||
if (removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) {
|
||||
processSnapshotsOnRemovedNodes();
|
||||
}
|
||||
if (event.routingTableChanged()) {
|
||||
processStartedShards(event);
|
||||
if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) {
|
||||
processStartedShards();
|
||||
}
|
||||
removeFinishedSnapshotFromClusterState(event);
|
||||
// Cleanup all snapshots that have no more work left:
|
||||
// 1. Completed snapshots
|
||||
// 2. Snapshots in state INIT that the previous master failed to start
|
||||
// 3. Snapshots in any other state that have all their shard tasks completed
|
||||
snapshotsInProgress.entries().stream().filter(
|
||||
entry -> entry.state().completed()
|
||||
|| entry.state() == State.INIT && initializingSnapshots.contains(entry.snapshot()) == false
|
||||
|| entry.state() != State.INIT && completed(entry.shards().values())
|
||||
).forEach(this::endSnapshot);
|
||||
}
|
||||
if (event.previousState().nodes().isLocalNodeElectedMaster() == false) {
|
||||
finalizeSnapshotDeletionFromPreviousMaster(event);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Failed to update snapshot state ", e);
|
||||
}
|
||||
|
@ -706,7 +725,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* snapshot was deleted and a call to GET snapshots would reveal that the snapshot no longer exists.
|
||||
*/
|
||||
private void finalizeSnapshotDeletionFromPreviousMaster(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
|
||||
SnapshotDeletionsInProgress deletionsInProgress = event.state().custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
|
||||
|
@ -714,40 +732,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.getRepositoryStateId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a finished snapshot from the cluster state. This can happen if the previous
|
||||
* master node processed a cluster state update that marked the snapshot as finished,
|
||||
* but the previous master node died before removing the snapshot in progress from the
|
||||
* cluster state. It is then the responsibility of the new master node to end the
|
||||
* snapshot and remove it from the cluster state.
|
||||
*/
|
||||
private void removeFinishedSnapshotFromClusterState(ClusterChangedEvent event) {
|
||||
if (event.localNodeMaster() && !event.previousState().nodes().isLocalNodeElectedMaster()) {
|
||||
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshotsInProgress != null && !snapshotsInProgress.entries().isEmpty()) {
|
||||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
|
||||
if (entry.state().completed()) {
|
||||
endSnapshot(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up shard snapshots that were running on removed nodes
|
||||
*
|
||||
* @param event cluster changed event
|
||||
*/
|
||||
private void processSnapshotsOnRemovedNodes(ClusterChangedEvent event) {
|
||||
if (removedNodesCleanupNeeded(event)) {
|
||||
// Check if we just became the master
|
||||
final boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
|
||||
private void processSnapshotsOnRemovedNodes() {
|
||||
clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
DiscoveryNodes nodes = currentState.nodes();
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots == null) {
|
||||
|
@ -757,9 +749,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
|
||||
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
|
||||
boolean snapshotChanged = false;
|
||||
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
|
||||
boolean snapshotChanged = false;
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
|
||||
ShardSnapshotStatus shardStatus = shardEntry.value;
|
||||
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
|
||||
|
@ -780,13 +772,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardsMap = shards.build();
|
||||
if (!snapshot.state().completed() && completed(shardsMap.values())) {
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap);
|
||||
endSnapshot(updatedSnapshot);
|
||||
} else {
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap);
|
||||
}
|
||||
}
|
||||
entries.add(updatedSnapshot);
|
||||
} else if (snapshot.state() == State.INIT && newMaster) {
|
||||
} else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) {
|
||||
changed = true;
|
||||
// Mark the snapshot as aborted as it failed to start from the previous master
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
|
||||
|
@ -807,8 +798,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
if (changed) {
|
||||
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
|
||||
}
|
||||
return currentState;
|
||||
}
|
||||
|
@ -819,13 +810,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void processStartedShards(ClusterChangedEvent event) {
|
||||
if (waitingShardsStartedOrUnassigned(event)) {
|
||||
private void processStartedShards() {
|
||||
clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
RoutingTable routingTable = currentState.routingTable();
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null) {
|
||||
|
@ -840,7 +829,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
changed = true;
|
||||
if (!snapshot.state().completed() && completed(shards.values())) {
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
|
||||
endSnapshot(updatedSnapshot);
|
||||
} else {
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
|
||||
}
|
||||
|
@ -849,8 +837,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
if (changed) {
|
||||
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
|
||||
}
|
||||
}
|
||||
return currentState;
|
||||
|
@ -863,9 +851,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShards(
|
||||
private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShards(
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable) {
|
||||
boolean snapshotChanged = false;
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
|
||||
|
@ -905,11 +892,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
|
||||
private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) {
|
||||
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
|
||||
if (curr != null) {
|
||||
for (SnapshotsInProgress.Entry entry : curr.entries()) {
|
||||
if (entry.state() == State.STARTED && !entry.waitingIndices().isEmpty()) {
|
||||
private static boolean waitingShardsStartedOrUnassigned(SnapshotsInProgress snapshotsInProgress, ClusterChangedEvent event) {
|
||||
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
|
||||
if (entry.state() == State.STARTED) {
|
||||
for (ObjectCursor<String> index : entry.waitingIndices().keys()) {
|
||||
if (event.indexRoutingTableChanged(index.value)) {
|
||||
IndexRoutingTable indexShardRoutingTable = event.state().getRoutingTable().index(index.value);
|
||||
|
@ -923,32 +908,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
|
||||
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshotsInProgress == null) {
|
||||
return false;
|
||||
}
|
||||
// Check if we just became the master
|
||||
boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
|
||||
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
|
||||
if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) {
|
||||
// We just replaced old master and snapshots in intermediate states needs to be cleaned
|
||||
return true;
|
||||
}
|
||||
for (DiscoveryNode node : event.nodesDelta().removedNodes()) {
|
||||
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshot.shards().values()) {
|
||||
if (!shardStatus.value.state().completed() && node.getId().equals(shardStatus.value.nodeId())) {
|
||||
// At least one shard was running on the removed node - we need to fail it
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List<DiscoveryNode> removedNodes) {
|
||||
// If at least one shard was running on a removed node - we need to fail it
|
||||
return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot ->
|
||||
StreamSupport.stream(((Iterable<ShardSnapshotStatus>) () -> snapshot.shards().valuesIt()).spliterator(), false)
|
||||
.filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId))
|
||||
.anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -981,25 +949,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
*
|
||||
* @param entry snapshot
|
||||
*/
|
||||
void endSnapshot(final SnapshotsInProgress.Entry entry) {
|
||||
endSnapshot(entry, null);
|
||||
private void endSnapshot(final SnapshotsInProgress.Entry entry) {
|
||||
if (endingSnapshots.add(entry.snapshot()) == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Finalizes the shard in repository and then removes it from cluster state
|
||||
* <p>
|
||||
* This is non-blocking method that runs on a thread from SNAPSHOT thread pool
|
||||
*
|
||||
* @param entry snapshot
|
||||
* @param failure failure reason or null if snapshot was successful
|
||||
*/
|
||||
private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() {
|
||||
final Snapshot snapshot = entry.snapshot();
|
||||
final Repository repository = repositoriesService.repository(snapshot.getRepository());
|
||||
final String failure = entry.failure();
|
||||
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
|
||||
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardStatus : entry.shards()) {
|
||||
|
@ -1015,7 +974,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
entry.startTime(),
|
||||
failure,
|
||||
entry.shards().size(),
|
||||
Collections.unmodifiableList(shardFailures),
|
||||
unmodifiableList(shardFailures),
|
||||
entry.getRepositoryStateId(),
|
||||
entry.includeGlobalState());
|
||||
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
|
||||
|
@ -1047,7 +1006,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @param failure exception if snapshot failed
|
||||
* @param listener listener to notify when snapshot information is removed from the cluster state
|
||||
*/
|
||||
private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure,
|
||||
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, final Exception failure,
|
||||
@Nullable CleanupAfterErrorListener listener) {
|
||||
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
|
||||
|
||||
|
@ -1065,8 +1024,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
}
|
||||
if (changed) {
|
||||
snapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()]));
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
return ClusterState.builder(currentState)
|
||||
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
|
||||
}
|
||||
}
|
||||
return currentState;
|
||||
|
@ -1075,6 +1034,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e);
|
||||
endingSnapshots.remove(snapshot);
|
||||
if (listener != null) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
@ -1082,6 +1042,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public void onNoLongerMaster(String source) {
|
||||
endingSnapshots.remove(snapshot);
|
||||
if (listener != null) {
|
||||
listener.onNoLongerMaster();
|
||||
}
|
||||
|
@ -1101,6 +1062,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
logger.warn("Failed to notify listeners", e);
|
||||
}
|
||||
}
|
||||
endingSnapshots.remove(snapshot);
|
||||
if (listener != null) {
|
||||
listener.onResponse(snapshotInfo);
|
||||
}
|
||||
|
@ -1207,13 +1169,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
|
||||
final State state = snapshotEntry.state();
|
||||
final String failure;
|
||||
if (state == State.INIT) {
|
||||
// snapshot is still initializing, mark it as aborted
|
||||
shards = snapshotEntry.shards();
|
||||
assert shards.isEmpty();
|
||||
// No shards in this snapshot, we delete it right away since the SnapshotShardsService
|
||||
// has no work to do.
|
||||
endSnapshot(snapshotEntry);
|
||||
failure = "Snapshot was aborted during initialization";
|
||||
} else if (state == State.STARTED) {
|
||||
// snapshot is started - mark every non completed shard as aborted
|
||||
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
|
@ -1225,7 +1186,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
shardsBuilder.put(shardEntry.key, status);
|
||||
}
|
||||
shards = shardsBuilder.build();
|
||||
|
||||
failure = "Snapshot was aborted by deletion";
|
||||
} else {
|
||||
boolean hasUncompletedShards = false;
|
||||
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
|
||||
|
@ -1246,10 +1207,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// where we force to finish the snapshot
|
||||
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
|
||||
shards = snapshotEntry.shards();
|
||||
endSnapshot(snapshotEntry);
|
||||
}
|
||||
failure = snapshotEntry.failure();
|
||||
}
|
||||
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards);
|
||||
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
|
||||
clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot));
|
||||
}
|
||||
return clusterStateBuilder.build();
|
||||
|
@ -1400,7 +1361,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* @param indices list of indices to be snapshotted
|
||||
* @return list of shard to be included into current snapshot
|
||||
*/
|
||||
private ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState, List<IndexId> indices) {
|
||||
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
|
||||
List<IndexId> indices) {
|
||||
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
MetaData metaData = clusterState.metaData();
|
||||
for (IndexId index : indices) {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateListener;
|
||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
|
@ -52,7 +51,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
/**
|
||||
* Tests snapshot operations during disruptions.
|
||||
|
@ -156,9 +154,6 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
|
|||
logger.info("--> got exception from race in master operation retries");
|
||||
} else {
|
||||
logger.info("--> got exception from hanged master", ex);
|
||||
assertThat(cause, instanceOf(MasterNotDiscoveredException.class));
|
||||
cause = cause.getCause();
|
||||
assertThat(cause, instanceOf(FailedToCommitClusterStateException.class));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -988,7 +988,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
* can be restored when the node the shrunken index was created on is no longer part of
|
||||
* the cluster.
|
||||
*/
|
||||
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38226")
|
||||
public void testRestoreShrinkIndex() throws Exception {
|
||||
logger.info("--> starting a master node and a data node");
|
||||
internalCluster().startMasterOnlyNode();
|
||||
|
|
|
@ -3637,7 +3637,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
}
|
||||
|
||||
@TestLogging("org.elasticsearch.snapshots:TRACE")
|
||||
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/38226")
|
||||
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
|
||||
final Client client = client();
|
||||
|
||||
|
@ -3684,14 +3683,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
// The deletion must set the snapshot in the ABORTED state
|
||||
assertBusy(() -> {
|
||||
try {
|
||||
SnapshotsStatusResponse status =
|
||||
client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
|
||||
assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED));
|
||||
} catch (Exception e) {
|
||||
// Force assertBusy to retry on every exception
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
});
|
||||
|
||||
// Now unblock the repository
|
||||
|
|
Loading…
Reference in New Issue