RestoreService should update primary terms when restoring shards of existing indices (#38177)
When restoring shards of existing indices, the RestoreService also restores the values of primary terms stored in the snapshot index metadata. The primary terms are not updated and could potentially conflict with current index primary terms if the restored primary terms are lower than the existing ones. This situation is likely to happen with replicated closed indices (because primary terms are increased when the index is transitioning from open to closed state, and the snapshotted primary terms are the one at the time the index was opened) (see #38024) and maybe also with CCR. This commit changes the RestoreService so that it updates the primary terms using the maximum value between the snapshotted values and the existing values. Related to #33888
This commit is contained in:
parent
c1c4abae10
commit
da6269b456
|
@ -314,6 +314,12 @@ public class RestoreService implements ClusterStateApplier {
|
|||
currentIndexMetaData.getMappingVersion() + 1));
|
||||
indexMdBuilder.settingsVersion(Math.max(snapshotIndexMetaData.getSettingsVersion(),
|
||||
currentIndexMetaData.getSettingsVersion() + 1));
|
||||
|
||||
for (int shard = 0; shard < snapshotIndexMetaData.getNumberOfShards(); shard++) {
|
||||
indexMdBuilder.primaryTerm(shard,
|
||||
Math.max(snapshotIndexMetaData.primaryTerm(shard), currentIndexMetaData.primaryTerm(shard)));
|
||||
}
|
||||
|
||||
if (!request.includeAliases()) {
|
||||
// Remove all snapshot aliases
|
||||
if (!snapshotIndexMetaData.getAliases().isEmpty()) {
|
||||
|
|
|
@ -116,6 +116,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
|
@ -3704,6 +3705,48 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
}
|
||||
}
|
||||
|
||||
public void testRestoreIncreasesPrimaryTerms() {
|
||||
final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName, Settings.builder()
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build());
|
||||
ensureGreen(indexName);
|
||||
|
||||
if (randomBoolean()) {
|
||||
// open and close the index to increase the primary terms
|
||||
for (int i = 0; i < randomInt(3); i++) {
|
||||
assertAcked(client().admin().indices().prepareClose(indexName));
|
||||
assertAcked(client().admin().indices().prepareOpen(indexName));
|
||||
}
|
||||
}
|
||||
|
||||
final IndexMetaData indexMetaData = client().admin().cluster().prepareState().clear().setIndices(indexName)
|
||||
.setMetaData(true).get().getState().metaData().index(indexName);
|
||||
final int numPrimaries = getNumShards(indexName).numPrimaries;
|
||||
final Map<Integer, Long> primaryTerms = IntStream.range(0, numPrimaries)
|
||||
.boxed().collect(Collectors.toMap(shardId -> shardId, indexMetaData::primaryTerm));
|
||||
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(randomRepoSettings()));
|
||||
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true).setIndices(indexName).get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(numPrimaries));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), equalTo(0));
|
||||
|
||||
assertAcked(client().admin().indices().prepareClose(indexName));
|
||||
|
||||
final RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true).get();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(numPrimaries));
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||
|
||||
final IndexMetaData restoredIndexMetaData = client().admin().cluster().prepareState().clear().setIndices(indexName)
|
||||
.setMetaData(true).get().getState().metaData().index(indexName);
|
||||
for (int shardId = 0; shardId < numPrimaries; shardId++) {
|
||||
assertThat(restoredIndexMetaData.primaryTerm(shardId), equalTo(primaryTerms.get(shardId) + 1));
|
||||
}
|
||||
}
|
||||
|
||||
private RepositoryData getRepositoryData(Repository repository) throws InterruptedException {
|
||||
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName());
|
||||
final SetOnce<RepositoryData> repositoryData = new SetOnce<>();
|
||||
|
|
Loading…
Reference in New Issue