mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-26 06:46:10 +00:00
recovery from snapshot should fill gaps (#27850)
When snapshotting the primary we capture a lucene commit at an arbitrary moment from a sequence number perspective. This means that it is possible that the commit misses operations and that there is a gap between the local checkpoint in the commit and the maximum sequence number. When we restore, this will create a primary that "misses" operations and currently will mean that the sequence number system is stuck (i.e., the local checkpoint will be stuck). To fix this we should fill in gaps when we restore, in a similar fashion to normal store recovery.
This commit is contained in:
parent
26fc717ddd
commit
9cd69e7ec1
@ -436,6 +436,8 @@ final class StoreRecovery {
|
|||||||
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
|
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
|
||||||
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
|
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
|
||||||
indexShard.skipTranslogRecovery();
|
indexShard.skipTranslogRecovery();
|
||||||
|
assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
|
||||||
|
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
|
||||||
indexShard.finalizeRecovery();
|
indexShard.finalizeRecovery();
|
||||||
indexShard.postRecovery("restore done");
|
indexShard.postRecovery("restore done");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
|||||||
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
|
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
@ -71,8 +72,10 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||||
@ -112,6 +115,7 @@ import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAlloc
|
|||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
|
import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||||
|
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing;
|
||||||
@ -3072,6 +3076,73 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSnapshottingWithMissingSequenceNumbers() {
|
||||||
|
final String repositoryName = "test-repo";
|
||||||
|
final String snapshotName = "test-snap";
|
||||||
|
final String indexName = "test-idx";
|
||||||
|
final Client client = client();
|
||||||
|
final Path repo = randomRepoPath();
|
||||||
|
|
||||||
|
logger.info("--> creating repository at {}", repo.toAbsolutePath());
|
||||||
|
assertAcked(client.admin().cluster().preparePutRepository(repositoryName)
|
||||||
|
.setType("fs").setSettings(Settings.builder()
|
||||||
|
.put("location", repo)
|
||||||
|
.put("compress", false)
|
||||||
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
logger.info("--> creating an index and indexing documents");
|
||||||
|
final String dataNode = internalCluster().getDataNodeInstance(ClusterService.class).localNode().getName();
|
||||||
|
final Settings settings =
|
||||||
|
Settings
|
||||||
|
.builder()
|
||||||
|
.put("index.number_of_shards", 1)
|
||||||
|
.put("index.number_of_replicas", 0)
|
||||||
|
.put("index.routing.allocation.include._name", dataNode)
|
||||||
|
.build();
|
||||||
|
createIndex(indexName, settings);
|
||||||
|
ensureGreen();
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
final Index index = resolveIndex(indexName);
|
||||||
|
final IndexShard primary = internalCluster().getInstance(IndicesService.class, dataNode).getShardOrNull(new ShardId(index, 0));
|
||||||
|
// create a gap in the sequence numbers
|
||||||
|
getEngineFromShard(primary).seqNoService().generateSeqNo();
|
||||||
|
|
||||||
|
for (int i = 5; i < 10; i++) {
|
||||||
|
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
refresh();
|
||||||
|
|
||||||
|
logger.info("--> snapshot");
|
||||||
|
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshotName)
|
||||||
|
.setWaitForCompletion(true).setIndices(indexName).get();
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||||
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
|
|
||||||
|
logger.info("--> delete indices");
|
||||||
|
assertAcked(client.admin().indices().prepareDelete(indexName));
|
||||||
|
|
||||||
|
logger.info("--> restore all indices from the snapshot");
|
||||||
|
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
|
||||||
|
.setWaitForCompletion(true).execute().actionGet();
|
||||||
|
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||||
|
|
||||||
|
logger.info("--> indexing some more");
|
||||||
|
for (int i = 10; i < 15; i++) {
|
||||||
|
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);
|
||||||
|
}
|
||||||
|
|
||||||
|
IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get();
|
||||||
|
ShardStats shardStats = stats.getShards()[0];
|
||||||
|
assertTrue(shardStats.getShardRouting().primary());
|
||||||
|
assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(15L)); // 15 indexed docs and one "missing" op.
|
||||||
|
assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(15L));
|
||||||
|
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
|
||||||
|
}
|
||||||
|
|
||||||
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
|
private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map<String, List<String>> indicesPerSnapshot) {
|
||||||
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
|
for (SnapshotInfo snapshotInfo : response.getSnapshots()) {
|
||||||
final List<String> expected = snapshotInfo.indices();
|
final List<String> expected = snapshotInfo.indices();
|
||||||
|
@ -738,6 +738,13 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* creates an index with the given setting
|
||||||
|
*/
|
||||||
|
public final void createIndex(String name, Settings indexSettings) {
|
||||||
|
assertAcked(prepareCreate(name).setSettings(indexSettings));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
|
* Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}.
|
||||||
*/
|
*/
|
||||||
|
Loading…
x
Reference in New Issue
Block a user