diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 2bccb913a34..6066e78b49f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -28,10 +28,16 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.xcontent.DeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -40,6 +46,7 @@ import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -49,6 +56,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -351,6 +359,66 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo)); } + public void testHandleSnapshotErrorWithBwCFormat() throws IOException { + final String repoName = "test-repo"; + final Path repoPath = randomRepoPath(); + createRepository(repoName, "fs", repoPath); + + // Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard + // generations (the existence of which would short-circuit checks for the repo containing old version snapshots) + final String oldVersionSnapshot = "old-version-snapshot"; + final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster() + .prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get(); + assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0)); + + logger.info("--> writing downgraded RepositoryData"); + final RepositoryData repositoryData = getRepositoryData(repoName); + final XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); + repositoryData.snapshotsToXContent(jsonBuilder, false); + final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.THROW_UNSUPPORTED_OPERATION, + Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())), + repositoryData.getGenId()); + Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), + BytesReference.toBytes(BytesReference.bytes( + downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))), + StandardOpenOption.TRUNCATE_EXISTING); + + logger.info("--> recreating repository to clear caches"); + client().admin().cluster().prepareDeleteRepository(repoName).get(); + createRepository(repoName, "fs", repoPath); + + final String indexName = "test-index"; + createIndex(indexName); + + assertCreateSnapshotSuccess(repoName, "snapshot-1"); + + // In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should + // not break subsequent repository operations + logger.info("--> move shard level metadata to new generation"); + final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName); + final Path shardPath = repoPath.resolve("indices").resolve(indexId.getId()).resolve("0"); + final Path initialShardMetaPath = shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "0"); + assertFileExists(initialShardMetaPath); + Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1")); + + logger.info("--> delete old version snapshot"); + client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get(); + + assertCreateSnapshotSuccess(repoName, "snapshot-2"); + } + + private void assertCreateSnapshotSuccess(String repoName, String snapshotName) { + logger.info("--> create another snapshot"); + final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).get().getSnapshotInfo(); + assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS)); + final int successfulShards = snapshotInfo.successfulShards(); + assertThat(successfulShards, greaterThan(0)); + assertThat(successfulShards, equalTo(snapshotInfo.totalShards())); + } + private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) { logger.info("--> try to delete snapshot"); final RepositoryException repositoryException3 = expectThrows(RepositoryException.class, diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 098cfa775a5..bc8df1b1651 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1226,7 +1226,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp loaded = repositoryDataFromCachedEntry(cached); } else { loaded = getRepositoryData(genToLoad); - cacheRepositoryData(loaded); + // We can cache in the most recent version here without regard to the actual repository metadata version since we're + // only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe + cacheRepositoryData(loaded, true); } listener.onResponse(loaded); return; @@ -1261,16 +1263,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given * generation will always contain the same {@link RepositoryData}. * - * @param updated RepositoryData to cache if newer than the cache contents + * @param updated RepositoryData to cache if newer than the cache contents + * @param writeShardGens whether to cache shard generation values */ - private void cacheRepositoryData(RepositoryData updated) { + private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; BytesStreamOutput out = new BytesStreamOutput(); try { try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out); XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) { - updated.snapshotsToXContent(builder, true); + updated.snapshotsToXContent(builder, writeShardGens); } serialized = out.bytes(); final int len = serialized.length(); @@ -1556,7 +1559,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen); - cacheRepositoryData(writtenRepositoryData); + cacheRepositoryData(writtenRepositoryData, writeShardGens); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 6636aaae1c7..39e49f52b97 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -93,6 +93,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { skipRepoConsistencyCheckReason = reason; } + protected RepositoryData getRepositoryData(String repository) { + return getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repository)); + } + protected RepositoryData getRepositoryData(Repository repository) { ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, internalCluster().getMasterName()); final PlainActionFuture repositoryData = PlainActionFuture.newFuture();