mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
We can simply filter out shard generation updates for indices that were removed from the cluster state concurrently to fix index deletes during partial snapshots as that completely removes any reference to those shards from the snapshot. Follow up to #50202 Closes #50200
This commit is contained in:
parent
2e7b1ab375
commit
4f24739fbe
@ -54,6 +54,13 @@ public final class ShardGenerations {
|
||||
this.shardGenerations = shardGenerations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the total number of shards tracked by this instance.
|
||||
*/
|
||||
public int totalShards() {
|
||||
return shardGenerations.values().stream().mapToInt(List::size).sum();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all indices for which shard generations are tracked.
|
||||
*
|
||||
|
@ -582,16 +582,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
private void cleanupAfterError(Exception exception) {
|
||||
threadPool.generic().execute(() -> {
|
||||
if (snapshotCreated) {
|
||||
final MetaData metaData = clusterService.state().metaData();
|
||||
repositoriesService.repository(snapshot.snapshot().getRepository())
|
||||
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
|
||||
buildGenerations(snapshot),
|
||||
buildGenerations(snapshot, metaData),
|
||||
snapshot.startTime(),
|
||||
ExceptionsHelper.stackTrace(exception),
|
||||
0,
|
||||
Collections.emptyList(),
|
||||
snapshot.repositoryStateId(),
|
||||
snapshot.includeGlobalState(),
|
||||
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
|
||||
metaDataForSnapshot(snapshot, metaData),
|
||||
snapshot.userMetadata(),
|
||||
snapshot.useShardGenerations(),
|
||||
ActionListener.runAfter(ActionListener.wrap(ignored -> {
|
||||
@ -607,11 +608,21 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
}
|
||||
}
|
||||
|
||||
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot) {
|
||||
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
|
||||
ShardGenerations.Builder builder = ShardGenerations.builder();
|
||||
final Map<String, IndexId> indexLookup = new HashMap<>();
|
||||
snapshot.indices().forEach(idx -> indexLookup.put(idx.getName(), idx));
|
||||
snapshot.shards().forEach(c -> builder.put(indexLookup.get(c.key.getIndexName()), c.key.id(), c.value.generation()));
|
||||
snapshot.shards().forEach(c -> {
|
||||
if (metaData.index(c.key.getIndex()) == null) {
|
||||
assert snapshot.partial() :
|
||||
"Index [" + c.key.getIndex() + "] was deleted during a snapshot but snapshot was not partial.";
|
||||
return;
|
||||
}
|
||||
final IndexId indexId = indexLookup.get(c.key.getIndexName());
|
||||
if (indexId != null) {
|
||||
builder.put(indexId, c.key.id(), c.value.generation());
|
||||
}
|
||||
});
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@ -1046,12 +1057,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
shardFailures.add(new SnapshotShardFailure(status.nodeId(), shardId, status.reason()));
|
||||
}
|
||||
}
|
||||
final ShardGenerations shardGenerations = buildGenerations(entry, metaData);
|
||||
repository.finalizeSnapshot(
|
||||
snapshot.getSnapshotId(),
|
||||
buildGenerations(entry),
|
||||
shardGenerations,
|
||||
entry.startTime(),
|
||||
failure,
|
||||
entry.shards().size(),
|
||||
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(),
|
||||
unmodifiableList(shardFailures),
|
||||
entry.repositoryStateId(),
|
||||
entry.includeGlobalState(),
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.snapshots;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
@ -143,6 +144,7 @@ import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.TestEnvironment;
|
||||
import org.elasticsearch.gateway.MetaStateService;
|
||||
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.analysis.AnalysisRegistry;
|
||||
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
|
||||
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
|
||||
@ -213,6 +215,7 @@ import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
@ -505,7 +508,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testConcurrentSnapshotDeleteAndDeleteIndex() {
|
||||
public void testConcurrentSnapshotDeleteAndDeleteIndex() throws IOException {
|
||||
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
||||
|
||||
String repoName = "repo";
|
||||
@ -516,11 +519,13 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
||||
|
||||
final StepListener<Collection<CreateIndexResponse>> createIndicesListener = new StepListener<>();
|
||||
final int indices = randomIntBetween(5, 20);
|
||||
|
||||
final SetOnce<Index> firstIndex = new SetOnce<>();
|
||||
continueOrDie(createRepoAndIndex(repoName, index, 1), createIndexResponse -> {
|
||||
firstIndex.set(masterNode.clusterService.state().metaData().index(index).getIndex());
|
||||
// create a few more indices to make it more likely that the subsequent index delete operation happens before snapshot
|
||||
// finalization
|
||||
final int indices = randomIntBetween(5, 20);
|
||||
final GroupedActionListener<CreateIndexResponse> listener = new GroupedActionListener<>(createIndicesListener, indices);
|
||||
for (int i = 0; i < indices; ++i) {
|
||||
client().admin().indices().create(new CreateIndexRequest("index-" + i), listener);
|
||||
@ -529,23 +534,55 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
final boolean partialSnapshot = randomBoolean();
|
||||
|
||||
continueOrDie(createIndicesListener, createIndexResponses ->
|
||||
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(false)
|
||||
.execute(createSnapshotResponseStepListener));
|
||||
.setPartial(partialSnapshot).execute(createSnapshotResponseStepListener));
|
||||
|
||||
continueOrDie(createSnapshotResponseStepListener,
|
||||
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), noopListener()));
|
||||
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index),
|
||||
new ActionListener<AcknowledgedResponse>() {
|
||||
@Override
|
||||
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
|
||||
if (partialSnapshot) {
|
||||
// Recreate index by the same name to test that we don't snapshot conflicting metadata in this scenario
|
||||
client().admin().indices().create(new CreateIndexRequest(index), noopListener());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
if (partialSnapshot) {
|
||||
throw new AssertionError("Delete index should always work during partial snapshots", e);
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
||||
Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(1));
|
||||
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
|
||||
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
||||
if (partialSnapshot) {
|
||||
// Single shard for each index so we either get all indices or all except for the deleted index
|
||||
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
|
||||
if (snapshotInfo.successfulShards() == indices + 1) {
|
||||
final IndexMetaData indexMetaData =
|
||||
repository.getSnapshotIndexMetaData(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
|
||||
// Make sure we snapshotted the metadata of this index and not the recreated version
|
||||
assertEquals(indexMetaData.getIndex(), firstIndex.get());
|
||||
}
|
||||
} else {
|
||||
// Index delete must be blocked for non-partial snapshots and we get a snapshot for every index
|
||||
assertEquals(snapshotInfo.successfulShards(), indices + 1);
|
||||
}
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user