diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 9216075e822..cfa2ff3dc28 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -436,6 +436,8 @@ final class StoreRecovery { final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); indexShard.skipTranslogRecovery(); + assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; + indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); } catch (Exception e) { diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 45ec0746a9b..7d6bf0ff912 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -71,8 +72,10 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; @@ -112,6 +115,7 @@ import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAlloc import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.IndexSettings.INDEX_REFRESH_INTERVAL_SETTING; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; +import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesExist; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAliasesMissing; @@ -3072,6 +3076,73 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } } + public void testSnapshottingWithMissingSequenceNumbers() { + final String repositoryName = "test-repo"; + final String snapshotName = "test-snap"; + final String indexName = "test-idx"; + final Client client = client(); + final Path repo = randomRepoPath(); + + logger.info("--> creating repository at {}", repo.toAbsolutePath()); + assertAcked(client.admin().cluster().preparePutRepository(repositoryName) + .setType("fs").setSettings(Settings.builder() + .put("location", repo) + .put("compress", false) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + logger.info("--> creating an index and indexing documents"); + final String dataNode = internalCluster().getDataNodeInstance(ClusterService.class).localNode().getName(); + final Settings settings = + Settings + .builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", dataNode) + .build(); + createIndex(indexName, settings); + ensureGreen(); + for (int i = 0; i < 5; i++) { + index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); + } + + final Index index = resolveIndex(indexName); + final IndexShard primary = internalCluster().getInstance(IndicesService.class, dataNode).getShardOrNull(new ShardId(index, 0)); + // create a gap in the sequence numbers + getEngineFromShard(primary).seqNoService().generateSeqNo(); + + for (int i = 5; i < 10; i++) { + index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); + } + + refresh(); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshotName) + .setWaitForCompletion(true).setIndices(indexName).get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + logger.info("--> delete indices"); + assertAcked(client.admin().indices().prepareDelete(indexName)); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") + .setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + logger.info("--> indexing some more"); + for (int i = 10; i < 15; i++) { + index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); + } + + IndicesStatsResponse stats = client().admin().indices().prepareStats(indexName).clear().get(); + ShardStats shardStats = stats.getShards()[0]; + assertTrue(shardStats.getShardRouting().primary()); + assertThat(shardStats.getSeqNoStats().getLocalCheckpoint(), equalTo(15L)); // 15 indexed docs and one "missing" op. + assertThat(shardStats.getSeqNoStats().getGlobalCheckpoint(), equalTo(15L)); + assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L)); + } + private void verifySnapshotInfo(final GetSnapshotsResponse response, final Map> indicesPerSnapshot) { for (SnapshotInfo snapshotInfo : response.getSnapshots()) { final List expected = snapshotInfo.indices(); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index be6a1b29681..e633f5adb70 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -738,6 +738,13 @@ public abstract class ESIntegTestCase extends ESTestCase { } } + /** + * creates an index with the given setting + */ + public final void createIndex(String name, Settings indexSettings) { + assertAcked(prepareCreate(name).setSettings(indexSettings)); + } + /** * Creates a new {@link CreateIndexRequestBuilder} with the settings obtained from {@link #indexSettings()}. */