mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
We were loading `RepositoryData` twice during snapshot initialization, redundantly checking if a snapshot existed already. The first snapshot existence check is somewhat redundant because a snapshot could be created between loading `RepositoryData` and updating the cluster state with the `INIT` state snapshot entry. Also, it is much safer to do the subsequent checks for index existence in the repo and and the presence of old version snapshots once the `INIT` state entry prevents further snapshots from being created concurrently. While the current state of things will never lead to corruption on a concurrent snapshot creation, it could result in a situation (though unlikely) where all the snapshot's work is done on the data nodes, only to find out that the repository generation was off during snapshot finalization, failing there and leaving a bunch of dead data in the repository that won't be used in a subsequent snapshot (because the shard generation was never referenced due to the failed snapshot finalization). Note: This is a step on the way to parallel repository operations by making snapshot related CS and repo related CS more tightly correlated.
This commit is contained in:
parent
0a8d8d7ae3
commit
4e8ab43a3e
@ -140,6 +140,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||
useShardGenerations);
|
||||
}
|
||||
|
||||
public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
|
||||
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
|
||||
failure, entry.userMetadata, useShardGenerations);
|
||||
}
|
||||
|
||||
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
|
||||
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
|
||||
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
|
||||
|
@ -107,7 +107,7 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||
* <li>On the master node the {@link #createSnapshot(CreateSnapshotRequest, ActionListener)} is called and makes sure that
|
||||
* no snapshot is currently running and registers the new snapshot in cluster state</li>
|
||||
* <li>When cluster state is updated
|
||||
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
|
||||
* the {@link #beginSnapshot} method kicks in and initializes
|
||||
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
|
||||
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
|
||||
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
|
||||
@ -274,90 +274,85 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
|
||||
validate(repositoryName, snapshotName);
|
||||
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
|
||||
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
|
||||
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
|
||||
repositoryDataListener.whenComplete(repositoryData -> {
|
||||
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
|
||||
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
|
||||
|
||||
private SnapshotsInProgress.Entry newSnapshot = null;
|
||||
private SnapshotsInProgress.Entry newSnapshot = null;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
validate(repositoryName, snapshotName, currentState);
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
|
||||
}
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots == null || snapshots.entries().isEmpty()) {
|
||||
// Store newSnapshot here to be processed in clusterStateProcessed
|
||||
List<String> indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
|
||||
request.indicesOptions(), request.indices()));
|
||||
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
||||
List<IndexId> snapshotIndices = repositoryData.resolveNewIndices(indices);
|
||||
newSnapshot = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId),
|
||||
request.includeGlobalState(), request.partial(),
|
||||
State.INIT,
|
||||
snapshotIndices,
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
repositoryData.getGenId(),
|
||||
null,
|
||||
request.userMetadata(),
|
||||
hasOldFormatSnapshots == false &&
|
||||
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
|
||||
initializingSnapshots.add(newSnapshot.snapshot());
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
} else {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
|
||||
}
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
private List<String> indices;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
validate(repositoryName, snapshotName, currentState);
|
||||
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
|
||||
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a snapshot deletion is in-progress in [" + deletionsInProgress + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
|
||||
if (newSnapshot != null) {
|
||||
initializingSnapshots.remove(newSnapshot.snapshot());
|
||||
}
|
||||
newSnapshot = null;
|
||||
listener.onFailure(e);
|
||||
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
|
||||
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName,
|
||||
"cannot snapshot while a repository cleanup is in-progress in [" + repositoryCleanupInProgress + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
|
||||
if (newSnapshot != null) {
|
||||
final Snapshot current = newSnapshot.snapshot();
|
||||
assert initializingSnapshots.contains(current);
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), new ActionListener<Snapshot>() {
|
||||
@Override
|
||||
public void onResponse(final Snapshot snapshot) {
|
||||
initializingSnapshots.remove(snapshot);
|
||||
listener.onResponse(snapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
initializingSnapshots.remove(current);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
if (snapshots != null && snapshots.entries().isEmpty() == false) {
|
||||
throw new ConcurrentSnapshotExecutionException(repositoryName, snapshotName, " a snapshot is already running");
|
||||
}
|
||||
// Store newSnapshot here to be processed in clusterStateProcessed
|
||||
indices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(currentState,
|
||||
request.indicesOptions(), request.indices()));
|
||||
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
|
||||
newSnapshot = new SnapshotsInProgress.Entry(
|
||||
new Snapshot(repositoryName, snapshotId),
|
||||
request.includeGlobalState(), request.partial(),
|
||||
State.INIT,
|
||||
Collections.emptyList(), // We'll resolve the list of indices when moving to the STARTED state in #beginSnapshot
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
RepositoryData.UNKNOWN_REPO_GEN,
|
||||
null,
|
||||
request.userMetadata(), false
|
||||
);
|
||||
initializingSnapshots.add(newSnapshot.snapshot());
|
||||
snapshots = new SnapshotsInProgress(newSnapshot);
|
||||
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
|
||||
if (newSnapshot != null) {
|
||||
initializingSnapshots.remove(newSnapshot.snapshot());
|
||||
}
|
||||
});
|
||||
}, listener::onFailure);
|
||||
newSnapshot = null;
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
|
||||
if (newSnapshot != null) {
|
||||
final Snapshot current = newSnapshot.snapshot();
|
||||
assert initializingSnapshots.contains(current);
|
||||
assert indices != null;
|
||||
beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<Snapshot>() {
|
||||
@Override
|
||||
public void onResponse(final Snapshot snapshot) {
|
||||
initializingSnapshots.remove(snapshot);
|
||||
listener.onResponse(snapshot);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
initializingSnapshots.remove(current);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
|
||||
@ -442,6 +437,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
private void beginSnapshot(final ClusterState clusterState,
|
||||
final SnapshotsInProgress.Entry snapshot,
|
||||
final boolean partial,
|
||||
final List<String> indices,
|
||||
final ActionListener<Snapshot> userCreateSnapshotListener) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
||||
|
||||
@ -474,13 +470,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
snapshot.snapshot().getSnapshotId(), snapshot.indices(),
|
||||
metaDataForSnapshot(snapshot, clusterState.metaData()));
|
||||
}
|
||||
|
||||
snapshotCreated = true;
|
||||
|
||||
logger.info("snapshot [{}] started", snapshot.snapshot());
|
||||
if (snapshot.indices().isEmpty()) {
|
||||
final boolean hasOldFormatSnapshots =
|
||||
hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null);
|
||||
final boolean writeShardGenerations = hasOldFormatSnapshots == false &&
|
||||
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
|
||||
if (indices.isEmpty()) {
|
||||
// No indices in this snapshot - we are done
|
||||
userCreateSnapshotListener.onResponse(snapshot.snapshot());
|
||||
endSnapshot(snapshot, clusterState.metaData());
|
||||
endSnapshot(new SnapshotsInProgress.Entry(
|
||||
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations,
|
||||
null), clusterState.metaData());
|
||||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
|
||||
@ -500,8 +503,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
assert entry.shards().isEmpty();
|
||||
hadAbortedInitializations = true;
|
||||
} else {
|
||||
final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
|
||||
// Replace the snapshot that was just initialized
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = shards(currentState, entry, repositoryData);
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
|
||||
shards(currentState, indexIds, writeShardGenerations, repositoryData);
|
||||
if (!partial) {
|
||||
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
|
||||
currentState.metaData());
|
||||
@ -520,12 +525,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
failureMessage.append("Indices are closed ");
|
||||
failureMessage.append(closed);
|
||||
}
|
||||
entries.add(
|
||||
new SnapshotsInProgress.Entry(entry, State.FAILED, shards, failureMessage.toString()));
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
|
||||
repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString()));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, shards));
|
||||
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
|
||||
shards, writeShardGenerations, null));
|
||||
}
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
@ -1507,17 +1513,19 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
/**
|
||||
* Calculates the list of shards that should be included into the current snapshot
|
||||
*
|
||||
* @param clusterState cluster state
|
||||
* @param snapshot SnapshotsInProgress Entry
|
||||
* @param clusterState cluster state
|
||||
* @param indices Indices to snapshot
|
||||
* @param useShardGenerations whether to write {@link ShardGenerations} during the snapshot
|
||||
* @return list of shard to be included into current snapshot
|
||||
*/
|
||||
private static ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards(ClusterState clusterState,
|
||||
SnapshotsInProgress.Entry snapshot,
|
||||
List<IndexId> indices,
|
||||
boolean useShardGenerations,
|
||||
RepositoryData repositoryData) {
|
||||
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> builder = ImmutableOpenMap.builder();
|
||||
MetaData metaData = clusterState.metaData();
|
||||
final ShardGenerations shardGenerations = repositoryData.shardGenerations();
|
||||
for (IndexId index : snapshot.indices()) {
|
||||
for (IndexId index : indices) {
|
||||
final String indexName = index.getName();
|
||||
final boolean isNewIndex = repositoryData.getIndices().containsKey(indexName) == false;
|
||||
IndexMetaData indexMetaData = metaData.index(indexName);
|
||||
@ -1530,7 +1538,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
|
||||
ShardId shardId = new ShardId(indexMetaData.getIndex(), i);
|
||||
final String shardRepoGeneration;
|
||||
if (snapshot.useShardGenerations()) {
|
||||
if (useShardGenerations) {
|
||||
if (isNewIndex) {
|
||||
assert shardGenerations.getShardGen(index, shardId.getId()) == null
|
||||
: "Found shard generation for new index [" + index + "]";
|
||||
|
Loading…
x
Reference in New Issue
Block a user