Fix Snapshot Repository Corruption in Downgrade Scenarios (#50692) (#50797)

* Fix Snapshot Repository Corruption in Downgrade Scenarios (#50692)

This PR introduces test infrastructure for downgrading a cluster while interacting with a given repository.
It fixes the fact that repository metadata in the new format could be written while there's still older snapshots in the repository that require the old-format metadata to be restorable.
This commit is contained in:
Armin Braun 2020-01-09 21:21:13 +01:00 committed by GitHub
parent 344c21813b
commit f70e8f6ab5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 522 additions and 9 deletions

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.Version
import org.elasticsearch.gradle.info.BuildParams
import org.elasticsearch.gradle.testclusters.RestTestRunnerTask
apply plugin: 'elasticsearch.testclusters'
apply plugin: 'elasticsearch.standalone-test'
tasks.register("bwcTest") {
description = 'Runs backwards compatibility tests.'
group = 'verification'
}
dependencies {
testCompile project(':client:rest-high-level')
}
for (Version bwcVersion : bwcVersions.indexCompatible) {
String baseName = "v${bwcVersion}"
String oldClusterName = "${baseName}-old"
String newClusterName = "${baseName}-new"
def clusterSettings = { v ->
return {
version = v
numberOfNodes = 2
setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
javaHome = BuildParams.runtimeJavaHome
}
}
testClusters {
"${oldClusterName}" clusterSettings(bwcVersion.toString())
"${newClusterName}" clusterSettings(project.version)
}
tasks.register("${baseName}#Step1OldClusterTest", RestTestRunnerTask) {
useCluster testClusters."${oldClusterName}"
mustRunAfter(precommit)
doFirst {
project.delete("${buildDir}/cluster/shared/repo/${baseName}")
}
systemProperty 'tests.rest.suite', 'step1'
}
tasks.register("${baseName}#Step2NewClusterTest", RestTestRunnerTask) {
useCluster testClusters."${newClusterName}"
dependsOn "${baseName}#Step1OldClusterTest"
systemProperty 'tests.rest.suite', 'step2'
}
tasks.register("${baseName}#Step3OldClusterTest", RestTestRunnerTask) {
useCluster testClusters."${oldClusterName}"
dependsOn "${baseName}#Step2NewClusterTest"
systemProperty 'tests.rest.suite', 'step3'
}
tasks.register("${baseName}#Step4NewClusterTest", RestTestRunnerTask) {
useCluster testClusters."${newClusterName}"
dependsOn "${baseName}#Step3OldClusterTest"
systemProperty 'tests.rest.suite', 'step4'
}
tasks.matching { it.name.startsWith(baseName) && it.name.endsWith("ClusterTest") }.configureEach {
it.systemProperty 'tests.old_cluster_version', bwcVersion.toString().minus("-SNAPSHOT")
it.systemProperty 'tests.path.repo', "${buildDir}/cluster/shared/repo/${baseName}"
def clusterName = it.name.contains("Step2") || it.name.contains("Step4") ? "${newClusterName}" : "${oldClusterName}"
it.nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${clusterName}".allHttpSocketURI.join(",")}")
it.nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${clusterName}".getName()}")
}
if (project.bwc_tests_enabled) {
bwcTest.dependsOn(
tasks.register("${baseName}#bwcTest") {
dependsOn tasks.named("${baseName}#Step4NewClusterTest")
}
)
}
}
task bwcTestSnapshots {
if (project.bwc_tests_enabled) {
for (final def version : bwcVersions.unreleasedIndexCompatible) {
dependsOn "v${version}#bwcTest"
}
}
}
check.dependsOn(bwcTestSnapshots)
configurations {
testArtifacts.extendsFrom testRuntime
}
task testJar(type: Jar) {
appendix 'test'
from sourceSets.test.output
}
artifacts {
testArtifacts testJar
}
test.enabled = false

View File

@ -0,0 +1,288 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.upgrades;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
/**
* Tests that verify that a snapshot repository is not getting corrupted and continues to function properly when accessed by multiple
* clusters of different versions. Concretely this test suite is simulating the following scenario:
* <ul>
* <li>Start and run against a cluster in an old version: {@link TestStep#STEP1_OLD_CLUSTER}</li>
* <li>Start and run against a cluster running the current version: {@link TestStep#STEP2_NEW_CLUSTER}</li>
* <li>Run against the old version cluster from the first step: {@link TestStep#STEP3_OLD_CLUSTER}</li>
* <li>Run against the current version cluster from the second step: {@link TestStep#STEP4_NEW_CLUSTER}</li>
* </ul>
* TODO: Add two more steps: delete all old version snapshots from the repository, then downgrade again and verify that the repository
* is not being corrupted. This requires first merging the logic for reading the min_version field in RepositoryData back to 7.6.
*/
public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
private enum TestStep {
STEP1_OLD_CLUSTER("step1"),
STEP2_NEW_CLUSTER("step2"),
STEP3_OLD_CLUSTER("step3"),
STEP4_NEW_CLUSTER("step4");
private final String name;
TestStep(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
public static TestStep parse(String value) {
switch (value) {
case "step1":
return STEP1_OLD_CLUSTER;
case "step2":
return STEP2_NEW_CLUSTER;
case "step3":
return STEP3_OLD_CLUSTER;
case "step4":
return STEP4_NEW_CLUSTER;
default:
throw new AssertionError("unknown test step: " + value);
}
}
}
protected static final TestStep TEST_STEP = TestStep.parse(System.getProperty("tests.rest.suite"));
@Override
protected boolean preserveSnapshotsUponCompletion() {
return true;
}
@Override
protected boolean preserveReposUponCompletion() {
return true;
}
public void testCreateAndRestoreSnapshot() throws IOException {
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
createIndex(client, "test-index", shards);
createRepository(client, repoName, false);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
final String snapshotToDeleteName = "snapshot-to-delete";
// Create a snapshot and delete it right away again to test the impact of each version's cleanup functionality that is run
// as part of the snapshot delete
createSnapshot(client, repoName, snapshotToDeleteName);
final List<Map<String, Object>> snapshotsIncludingToDelete = listSnapshots(repoName);
// Every step creates one snapshot and we have to add one more for the temporary snapshot
assertThat(snapshotsIncludingToDelete, hasSize(TEST_STEP.ordinal() + 1 + 1));
assertThat(snapshotsIncludingToDelete.stream().map(
sn -> (String) sn.get("snapshot")).collect(Collectors.toList()), hasItem(snapshotToDeleteName));
deleteSnapshot(client, repoName, snapshotToDeleteName);
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
switch (TEST_STEP) {
case STEP2_NEW_CLUSTER:
case STEP4_NEW_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
break;
case STEP1_OLD_CLUSTER:
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TEST_STEP);
break;
case STEP3_OLD_CLUSTER:
assertSnapshotStatusSuccessful(
client, repoName, "snapshot-" + TEST_STEP, "snapshot-" + TestStep.STEP3_OLD_CLUSTER);
break;
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
for (TestStep value : TestStep.values()) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + value, shards);
}
}
} finally {
deleteRepository(repoName);
}
}
public void testReadOnlyRepo() throws IOException {
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
final boolean readOnly = TEST_STEP.ordinal() > 1; // only restore from read-only repo in steps 3 and 4
createRepository(client, repoName, readOnly);
if (readOnly == false) {
createIndex(client, "test-index", shards);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
}
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
switch (TEST_STEP) {
case STEP1_OLD_CLUSTER:
assertThat(snapshots, hasSize(1));
break;
case STEP2_NEW_CLUSTER:
case STEP4_NEW_CLUSTER:
case STEP3_OLD_CLUSTER:
assertThat(snapshots, hasSize(2));
break;
}
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER || TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
assertSnapshotStatusSuccessful(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
} else {
assertSnapshotStatusSuccessful(client, repoName,
"snapshot-" + TestStep.STEP1_OLD_CLUSTER, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
}
if (TEST_STEP == TestStep.STEP3_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else if (TEST_STEP == TestStep.STEP4_NEW_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
}
}
}
public void testUpgradeMovesRepoToNewMetaVersion() throws IOException {
if (TEST_STEP.ordinal() > 1) {
// Only testing the first 2 steps here
return;
}
final String repoName = getTestName();
try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(adminClient().getNodes().toArray(new Node[0])))) {
final int shards = 3;
createIndex(client, "test-index", shards);
createRepository(client, repoName, false);
createSnapshot(client, repoName, "snapshot-" + TEST_STEP);
final List<Map<String, Object>> snapshots = listSnapshots(repoName);
// Every step creates one snapshot
assertThat(snapshots, hasSize(TEST_STEP.ordinal() + 1));
assertSnapshotStatusSuccessful(client, repoName,
snapshots.stream().map(sn -> (String) sn.get("snapshot")).toArray(String[]::new));
if (TEST_STEP == TestStep.STEP1_OLD_CLUSTER) {
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER, shards);
} else {
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP1_OLD_CLUSTER);
ensureSnapshotRestoreWorks(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER, shards);
createSnapshot(client, repoName, "snapshot-1");
ensureSnapshotRestoreWorks(client, repoName, "snapshot-1", shards);
deleteSnapshot(client, repoName, "snapshot-" + TestStep.STEP2_NEW_CLUSTER);
createSnapshot(client, repoName, "snapshot-2");
ensureSnapshotRestoreWorks(client, repoName, "snapshot-2", shards);
}
} finally {
deleteRepository(repoName);
}
}
private static void assertSnapshotStatusSuccessful(RestHighLevelClient client, String repoName,
String... snapshots) throws IOException {
final SnapshotsStatusResponse statusResponse = client.snapshot()
.status(new SnapshotsStatusRequest(repoName, snapshots), RequestOptions.DEFAULT);
for (SnapshotStatus status : statusResponse.getSnapshots()) {
assertThat(status.getShardsStats().getFailedShards(), is(0));
}
}
private void deleteSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
assertThat(client.snapshot().delete(new DeleteSnapshotRequest(repoName, name), RequestOptions.DEFAULT).isAcknowledged(), is(true));
}
@SuppressWarnings("unchecked")
private List<Map<String, Object>> listSnapshots(String repoName) throws IOException {
try (InputStream entity = client().performRequest(
new Request("GET", "/_snapshot/" + repoName + "/_all")).getEntity().getContent();
XContentParser parser = JsonXContent.jsonXContent.createParser(
xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, entity)) {
final Map<String, Object> raw = parser.map();
// Bwc lookup since the format of the snapshots response changed between versions
if (raw.containsKey("snapshots")) {
return (List<Map<String, Object>>) raw.get("snapshots");
} else {
return (List<Map<String, Object>>) ((List<Map<?, ?>>) raw.get("responses")).get(0).get("snapshots");
}
}
}
private static void ensureSnapshotRestoreWorks(RestHighLevelClient client, String repoName, String name,
int shards) throws IOException {
wipeAllIndices();
final RestoreInfo restoreInfo =
client.snapshot().restore(new RestoreSnapshotRequest().repository(repoName).snapshot(name).waitForCompletion(true),
RequestOptions.DEFAULT).getRestoreInfo();
assertThat(restoreInfo.failedShards(), is(0));
assertThat(restoreInfo.successfulShards(), equalTo(shards));
}
private static void createRepository(RestHighLevelClient client, String repoName, boolean readOnly) throws IOException {
assertThat(client.snapshot().createRepository(new PutRepositoryRequest(repoName).type("fs").settings(
Settings.builder().put("location", "./" + repoName).put("readonly", readOnly)), RequestOptions.DEFAULT).isAcknowledged(),
is(true));
}
private static void createSnapshot(RestHighLevelClient client, String repoName, String name) throws IOException {
client.snapshot().create(new CreateSnapshotRequest(repoName, name).waitForCompletion(true), RequestOptions.DEFAULT);
}
private void createIndex(RestHighLevelClient client, String name, int shards) throws IOException {
final Request putIndexRequest = new Request("PUT", "/" + name);
putIndexRequest.setJsonEntity("{\n" +
" \"settings\" : {\n" +
" \"index\" : {\n" +
" \"number_of_shards\" : " + shards + ", \n" +
" \"number_of_replicas\" : 0 \n" +
" }\n" +
" }\n" +
"}");
final Response response = client.getLowLevelClient().performRequest(putIndexRequest);
assertThat(response.getStatusLine().getStatusCode(), is(HttpURLConnection.HTTP_OK));
}
}

View File

@ -76,6 +76,8 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
private final RepositoriesService repositoriesService;
private final SnapshotsService snapshotsService;
@Override
protected String executor() {
return ThreadPool.Names.GENERIC;
@ -83,11 +85,13 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
@Inject
public TransportCleanupRepositoryAction(TransportService transportService, ClusterService clusterService,
RepositoriesService repositoriesService, ThreadPool threadPool, ActionFilters actionFilters,
RepositoriesService repositoriesService, SnapshotsService snapshotsService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(CleanupRepositoryAction.NAME, transportService, clusterService, threadPool, actionFilters,
CleanupRepositoryRequest::new, indexNameExpressionResolver);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
// We add a state applier that will remove any dangling repository cleanup actions on master failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
@ -215,8 +219,10 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)
&& snapshotsService.hasOldVersionSnapshots(repositoryName, repositoryData, null) == false,
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
));
}
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -28,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParserUtils;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import java.io.IOException;
import java.util.ArrayList;
@ -321,6 +323,7 @@ public final class RepositoryData {
private static final String NAME = "name";
private static final String UUID = "uuid";
private static final String STATE = "state";
private static final String MIN_VERSION = "min_version";
/**
* Writes the snapshots metadata and the related indices metadata to x-content.
@ -361,6 +364,12 @@ public final class RepositoryData {
builder.endObject();
}
builder.endObject();
if (shouldWriteShardGens) {
// TODO: write this field once 7.6 is able to read it and add tests to :qa:snapshot-repository-downgrade that make sure older
// ES versions can't corrupt the repository by writing to it and all the snapshots in it are v7.6 or newer
// Add min version field to make it impossible for older ES versions to deserialize this object
// builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString());
}
builder.endObject();
return builder;
}
@ -468,6 +477,12 @@ public final class RepositoryData {
shardGenerations.put(indexId, i, gens.get(i));
}
}
} else if (MIN_VERSION.equals(field)) {
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
throw new ElasticsearchParseException("version string expected [min_version]");
}
final Version version = Version.fromString(parser.text());
assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
} else {
throw new ElasticsearchParseException("unknown field name [" + field + "]");
}

View File

@ -80,6 +80,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -276,6 +277,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
repositoriesService.repository(repositoryName).getRepositoryData(repositoryDataListener);
repositoryDataListener.whenComplete(repositoryData -> {
final boolean hasOldFormatSnapshots = hasOldVersionSnapshots(repositoryName, repositoryData, null);
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry newSnapshot = null;
@ -309,7 +311,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
repositoryData.getGenId(),
null,
request.userMetadata(),
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION));
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
} else {
@ -357,6 +360,31 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}, listener::onFailure);
}
public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
final boolean hasOldFormatSnapshots;
if (snapshotIds.isEmpty()) {
hasOldFormatSnapshots = false;
} else {
if (repositoryData.shardGenerations().totalShards() > 0) {
hasOldFormatSnapshots = false;
} else {
try {
final Repository repository = repositoriesService.repository(repositoryName);
hasOldFormatSnapshots = snapshotIds.stream().map(repository::getSnapshotInfo).anyMatch(
snapshotInfo -> (excluded == null || snapshotInfo.snapshotId().equals(excluded) == false)
&& snapshotInfo.version().before(SHARD_GEN_IN_REPO_DATA_VERSION));
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return true;
}
}
}
assert hasOldFormatSnapshots == false || repositoryData.shardGenerations().totalShards() == 0 :
"Found non-empty shard generations [" + repositoryData.shardGenerations() + "] but repository contained old version snapshots";
return hasOldFormatSnapshots;
}
/**
* Validates snapshot request
*
@ -1416,12 +1444,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
Version version) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Repository repository = repositoriesService.repository(snapshot.getRepository());
repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION),
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(),
repositoryStateId,
version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION) &&
hasOldVersionSnapshots(snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()) == false,
ActionListener.wrap(v -> {
logger.info("snapshot [{}] deleted", snapshot);
removeSnapshotDeletionFromClusterState(snapshot, null, l);
}, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
));
)), ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)));
}));
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
@ -33,9 +34,11 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Locale;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
@ -231,6 +234,50 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
.addSnapshots(snapshot).get());
}
public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
final String snapshotPrefix = "test-snap-";
final int snapshots = randomIntBetween(1, 2);
logger.info("--> creating [{}] snapshots", snapshots);
for (int i = 0; i < snapshots; ++i) {
// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotPrefix + i)
.setIndices().setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), is(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
}
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final RepositoryData repositoryData = getRepositoryData(repository);
final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds());
logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt);
Files.delete(repo.resolve(String.format(Locale.ROOT, BlobStoreRepository.SNAPSHOT_NAME_FORMAT, snapshotToCorrupt.getUUID())));
logger.info("--> verify that repo is assumed in old metadata format");
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, repositoryData, null)))), is(true));
logger.info("--> verify that snapshot with missing root level metadata can be deleted");
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
is(false));
}
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,

View File

@ -1281,7 +1281,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
actionFilters, indexNameExpressionResolver
));
actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService,
repositoriesService, threadPool, actionFilters, indexNameExpressionResolver));
repositoriesService, snapshotsService, threadPool, actionFilters, indexNameExpressionResolver));
actions.put(CreateSnapshotAction.INSTANCE,
new TransportCreateSnapshotAction(
transportService, clusterService, threadPool,

View File

@ -626,13 +626,17 @@ public abstract class ESRestTestCase extends ESTestCase {
}
}
if (preserveReposUponCompletion() == false) {
logger.debug("wiping snapshot repository [{}]", repoName);
adminClient().performRequest(new Request("DELETE", "_snapshot/" + repoName));
deleteRepository(repoName);
}
}
return inProgressSnapshots;
}
protected void deleteRepository(String repoName) throws IOException {
logger.debug("wiping snapshot repository [{}]", repoName);
adminClient().performRequest(new Request("DELETE", "_snapshot/" + repoName));
}
/**
* Remove any cluster settings.
*/