mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 18:35:25 +00:00
Use the the hack used in `CorruptedBlobStoreRepositoryIT` in more snapshot failure tests to verify that BwC repository metadata is handled properly in these so far not-test-covered scenarios. Also, some minor related dry-up of snapshot tests. Relates #57798
This commit is contained in:
parent
9f280621ba
commit
85f5c4192b
@ -28,15 +28,10 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
@ -95,8 +90,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
|
|
||||||
logger.info("--> move index-N blob to next generation");
|
logger.info("--> move index-N blob to next generation");
|
||||||
final RepositoryData repositoryData =
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
|
|
||||||
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));
|
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));
|
||||||
|
|
||||||
assertRepositoryBlocked(client, repoName, snapshot);
|
assertRepositoryBlocked(client, repoName, snapshot);
|
||||||
@ -204,7 +198,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||||
|
|
||||||
logger.info("--> move index-N blob to next generation");
|
logger.info("--> move index-N blob to next generation");
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
final long beforeMoveGen = repositoryData.getGenId();
|
final long beforeMoveGen = repositoryData.getGenId();
|
||||||
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
|
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
|
||||||
|
|
||||||
@ -237,16 +231,14 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
internalCluster().fullRestart();
|
internalCluster().fullRestart();
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
|
|
||||||
Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
|
||||||
|
|
||||||
logger.info("--> verify index-N blob is found at the new location");
|
logger.info("--> verify index-N blob is found at the new location");
|
||||||
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));
|
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 1));
|
||||||
|
|
||||||
logger.info("--> delete snapshot");
|
logger.info("--> delete snapshot");
|
||||||
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
|
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
|
||||||
|
|
||||||
logger.info("--> verify index-N blob is found at the expected location");
|
logger.info("--> verify index-N blob is found at the expected location");
|
||||||
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));
|
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 2));
|
||||||
|
|
||||||
logger.info("--> make sure snapshot doesn't exist");
|
logger.info("--> make sure snapshot doesn't exist");
|
||||||
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
|
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
|
||||||
@ -277,8 +269,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||||
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
}
|
}
|
||||||
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
|
||||||
|
|
||||||
final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds());
|
final SnapshotId snapshotToCorrupt = randomFrom(repositoryData.getSnapshotIds());
|
||||||
logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt);
|
logger.info("--> delete root level snapshot metadata blob for snapshot [{}]", snapshotToCorrupt);
|
||||||
@ -301,7 +292,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
|
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
|
||||||
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
|
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
|
||||||
ActionRunnable.supply(f, () ->
|
ActionRunnable.supply(f, () ->
|
||||||
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
|
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repoName), null)))),
|
||||||
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));
|
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));
|
||||||
|
|
||||||
logger.info("--> verify that snapshot with missing root level metadata can be deleted");
|
logger.info("--> verify that snapshot with missing root level metadata can be deleted");
|
||||||
@ -310,9 +301,9 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
|
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
|
||||||
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
|
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
|
||||||
ActionRunnable.supply(f, () ->
|
ActionRunnable.supply(f, () ->
|
||||||
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
|
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repoName), null)))),
|
||||||
is(Version.CURRENT));
|
is(Version.CURRENT));
|
||||||
final RepositoryData finalRepositoryData = getRepositoryData(repository);
|
final RepositoryData finalRepositoryData = getRepositoryData(repoName);
|
||||||
for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {
|
for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {
|
||||||
assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT));
|
assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT));
|
||||||
}
|
}
|
||||||
@ -342,7 +333,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
|
|
||||||
logger.info("--> corrupt index-N blob");
|
logger.info("--> corrupt index-N blob");
|
||||||
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100)));
|
Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100)));
|
||||||
|
|
||||||
logger.info("--> verify loading repository data throws RepositoryException");
|
logger.info("--> verify loading repository data throws RepositoryException");
|
||||||
@ -364,27 +355,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
final String repoName = "test-repo";
|
final String repoName = "test-repo";
|
||||||
final Path repoPath = randomRepoPath();
|
final Path repoPath = randomRepoPath();
|
||||||
createRepository(repoName, "fs", repoPath);
|
createRepository(repoName, "fs", repoPath);
|
||||||
|
final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
|
||||||
// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
|
|
||||||
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
|
|
||||||
final String oldVersionSnapshot = "old-version-snapshot";
|
|
||||||
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
|
|
||||||
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get();
|
|
||||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
|
|
||||||
|
|
||||||
logger.info("--> writing downgraded RepositoryData");
|
|
||||||
final RepositoryData repositoryData = getRepositoryData(repoName);
|
|
||||||
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
|
||||||
repositoryData.snapshotsToXContent(jsonBuilder, false);
|
|
||||||
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
|
||||||
NamedXContentRegistry.EMPTY,
|
|
||||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
|
||||||
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
|
|
||||||
repositoryData.getGenId(), randomBoolean());
|
|
||||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
|
||||||
BytesReference.toBytes(BytesReference.bytes(
|
|
||||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
|
|
||||||
StandardOpenOption.TRUNCATE_EXISTING);
|
|
||||||
|
|
||||||
logger.info("--> recreating repository to clear caches");
|
logger.info("--> recreating repository to clear caches");
|
||||||
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
||||||
@ -414,27 +385,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
|||||||
final String repoName = "test-repo";
|
final String repoName = "test-repo";
|
||||||
final Path repoPath = randomRepoPath();
|
final Path repoPath = randomRepoPath();
|
||||||
createRepository(repoName, "fs", repoPath);
|
createRepository(repoName, "fs", repoPath);
|
||||||
|
final String oldVersionSnapshot = initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
|
||||||
// Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
|
|
||||||
// generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
|
|
||||||
final String oldVersionSnapshot = "old-version-snapshot";
|
|
||||||
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
|
|
||||||
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices().setWaitForCompletion(true).get();
|
|
||||||
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
|
|
||||||
|
|
||||||
logger.info("--> writing downgraded RepositoryData");
|
|
||||||
final RepositoryData repositoryData = getRepositoryData(repoName);
|
|
||||||
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
|
||||||
repositoryData.snapshotsToXContent(jsonBuilder, false);
|
|
||||||
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
|
||||||
NamedXContentRegistry.EMPTY,
|
|
||||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
|
||||||
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), SnapshotsService.OLD_SNAPSHOT_FORMAT.toString())),
|
|
||||||
repositoryData.getGenId(), randomBoolean());
|
|
||||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
|
||||||
BytesReference.toBytes(BytesReference.bytes(
|
|
||||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), false))),
|
|
||||||
StandardOpenOption.TRUNCATE_EXISTING);
|
|
||||||
|
|
||||||
logger.info("--> recreating repository to clear caches");
|
logger.info("--> recreating repository to clear caches");
|
||||||
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
client().admin().cluster().prepareDeleteRepository(repoName).get();
|
||||||
|
@ -386,14 +386,16 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
|
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("mock").setSettings(
|
.setType("mock").setSettings(
|
||||||
Settings.builder()
|
Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("random", randomAlphaOfLength(10))
|
.put("random", randomAlphaOfLength(10))
|
||||||
.put("wait_after_unblock", 200)
|
.put("wait_after_unblock", 200)
|
||||||
).get();
|
).get();
|
||||||
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
// Pick one node and block it
|
// Pick one node and block it
|
||||||
String blockedNode = blockNodeWithIndex("test-repo", "test-idx");
|
String blockedNode = blockNodeWithIndex("test-repo", "test-idx");
|
||||||
@ -804,11 +806,13 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
final Client client = client();
|
final Client client = client();
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("fs").setSettings(Settings.builder()
|
.setType("fs").setSettings(Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("compress", randomBoolean())
|
.put("compress", randomBoolean())
|
||||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
|
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
|
||||||
.put("number_of_replicas", 0)));
|
.put("number_of_replicas", 0)));
|
||||||
@ -841,7 +845,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
assertTrue(snapshotInfo.state().completed());
|
assertTrue(snapshotInfo.state().completed());
|
||||||
}, 1, TimeUnit.MINUTES);
|
}, 1, TimeUnit.MINUTES);
|
||||||
|
|
||||||
logger.info("--> verify that snapshot was succesful");
|
logger.info("--> verify that snapshot was successful");
|
||||||
|
|
||||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
|
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
|
||||||
.setSnapshots("test-snap").get();
|
.setSnapshots("test-snap").get();
|
||||||
@ -859,11 +863,13 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
final Client client = client();
|
final Client client = client();
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("mock").setSettings(Settings.builder()
|
.setType("mock").setSettings(Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("compress", randomBoolean())
|
.put("compress", randomBoolean())
|
||||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
|
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
|
||||||
.put("number_of_replicas", 0)));
|
.put("number_of_replicas", 0)));
|
||||||
@ -920,11 +926,13 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
internalCluster().startDataOnlyNodes(2);
|
internalCluster().startDataOnlyNodes(2);
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("mock").setSettings(Settings.builder()
|
.setType("mock").setSettings(Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("compress", randomBoolean())
|
.put("compress", randomBoolean())
|
||||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
||||||
.put("number_of_shards", 6).put("number_of_replicas", 0)));
|
.put("number_of_shards", 6).put("number_of_replicas", 0)));
|
||||||
@ -1175,11 +1183,14 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
internalCluster().startMasterOnlyNode();
|
internalCluster().startMasterOnlyNode();
|
||||||
internalCluster().startDataOnlyNodes(2);
|
internalCluster().startDataOnlyNodes(2);
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("mock").setSettings(Settings.builder()
|
.setType("mock").setSettings(Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("compress", randomBoolean())
|
.put("compress", randomBoolean())
|
||||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
||||||
.put("number_of_shards", 5).put("number_of_replicas", 0)));
|
.put("number_of_shards", 5).put("number_of_replicas", 0)));
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
@ -1224,11 +1235,14 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||||||
internalCluster().startMasterOnlyNode();
|
internalCluster().startMasterOnlyNode();
|
||||||
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
|
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||||
.setType("mock").setSettings(Settings.builder()
|
.setType("mock").setSettings(Settings.builder()
|
||||||
.put("location", randomRepoPath())
|
.put("location", repoPath)
|
||||||
.put("compress", randomBoolean())
|
.put("compress", randomBoolean())
|
||||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||||
|
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
|
||||||
|
|
||||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
||||||
.put("number_of_shards", 2).put("number_of_replicas", 0)));
|
.put("number_of_shards", 2).put("number_of_replicas", 0)));
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
|
@ -83,7 +83,6 @@ import org.elasticsearch.ingest.IngestTestPlugin;
|
|||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
@ -1336,10 +1335,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||||||
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
|
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
|
||||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||||
|
|
||||||
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
final Map<String, IndexId> indexIds = getRepositoryData("test-repo").getIndices();
|
||||||
Repository repository = service.repository("test-repo");
|
|
||||||
|
|
||||||
final Map<String, IndexId> indexIds = getRepositoryData(repository).getIndices();
|
|
||||||
final Path indicesPath = repo.resolve("indices");
|
final Path indicesPath = repo.resolve("indices");
|
||||||
|
|
||||||
logger.info("--> delete index metadata and shard metadata");
|
logger.info("--> delete index metadata and shard metadata");
|
||||||
@ -1347,7 +1343,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||||||
Path shardZero = indicesPath.resolve(indexIds.get(index).getId()).resolve("0");
|
Path shardZero = indicesPath.resolve(indexIds.get(index).getId()).resolve("0");
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
Files.delete(
|
Files.delete(
|
||||||
shardZero.resolve("index-" + getRepositoryData(repository).shardGenerations().getShardGen(indexIds.get(index), 0)));
|
shardZero.resolve("index-" + getRepositoryData("test-repo").shardGenerations().getShardGen(indexIds.get(index), 0)));
|
||||||
}
|
}
|
||||||
Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat"));
|
Files.delete(shardZero.resolve("snap-" + snapshotInfo.snapshotId().getUUID() + ".dat"));
|
||||||
}
|
}
|
||||||
@ -2785,10 +2781,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||||
assertThat(snapshotInfo.indices(), hasSize(nbIndices));
|
assertThat(snapshotInfo.indices(), hasSize(nbIndices));
|
||||||
|
|
||||||
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
final RepositoryData repositoryData = getRepositoryData("test-repo");
|
||||||
Repository repository = service.repository("test-repo");
|
|
||||||
|
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
|
||||||
final Map<String, IndexId> indexIds = repositoryData.getIndices();
|
final Map<String, IndexId> indexIds = repositoryData.getIndices();
|
||||||
assertThat(indexIds.size(), equalTo(nbIndices));
|
assertThat(indexIds.size(), equalTo(nbIndices));
|
||||||
|
|
||||||
@ -2860,10 +2853,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||||
assertThat(snapshotInfo.indices(), hasSize(1));
|
assertThat(snapshotInfo.indices(), hasSize(1));
|
||||||
|
|
||||||
RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
|
final RepositoryData repositoryData = getRepositoryData("test-repo");
|
||||||
Repository repository = service.repository("test-repo");
|
|
||||||
|
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
|
||||||
final Map<String, IndexId> indexIds = repositoryData.getIndices();
|
final Map<String, IndexId> indexIds = repositoryData.getIndices();
|
||||||
assertThat(indexIds.size(), equalTo(1));
|
assertThat(indexIds.size(), equalTo(1));
|
||||||
|
|
||||||
|
@ -29,8 +29,6 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
|
|||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
|
||||||
import org.elasticsearch.repositories.Repository;
|
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -160,9 +158,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
|
|||||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
|
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
|
||||||
|
|
||||||
logger.info("--> delete shard-level snap-${uuid}.dat file for one shard in this snapshot to simulate concurrent delete");
|
logger.info("--> delete shard-level snap-${uuid}.dat file for one shard in this snapshot to simulate concurrent delete");
|
||||||
final RepositoriesService service = internalCluster().getMasterNodeInstance(RepositoriesService.class);
|
final String indexRepoId = getRepositoryData("test-repo").resolveIndexId(response.getSnapshotInfo().indices().get(0)).getId();
|
||||||
final Repository repository = service.repository("test-repo");
|
|
||||||
final String indexRepoId = getRepositoryData(repository).resolveIndexId(response.getSnapshotInfo().indices().get(0)).getId();
|
|
||||||
IOUtils.rm(repoPath.resolve("indices").resolve(indexRepoId).resolve("0").resolve(
|
IOUtils.rm(repoPath.resolve("indices").resolve(indexRepoId).resolve("0").resolve(
|
||||||
BlobStoreRepository.SNAPSHOT_PREFIX + response.getSnapshotInfo().snapshotId().getUUID() + ".dat"));
|
BlobStoreRepository.SNAPSHOT_PREFIX + response.getSnapshotInfo().snapshotId().getUUID() + ".dat"));
|
||||||
|
|
||||||
|
@ -18,20 +18,31 @@
|
|||||||
*/
|
*/
|
||||||
package org.elasticsearch.snapshots;
|
package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||||
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.elasticsearch.test.VersionUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -39,6 +50,7 @@ import java.nio.file.FileVisitResult;
|
|||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.SimpleFileVisitor;
|
import java.nio.file.SimpleFileVisitor;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
import java.nio.file.attribute.BasicFileAttributes;
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
@ -48,10 +60,14 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
||||||
|
|
||||||
|
private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Settings nodeSettings(int nodeOrdinal) {
|
protected Settings nodeSettings(int nodeOrdinal) {
|
||||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||||
@ -79,6 +95,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||||||
client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> {
|
client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> {
|
||||||
final String name = repositoryMetadata.name();
|
final String name = repositoryMetadata.name();
|
||||||
if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) {
|
if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) {
|
||||||
|
client().admin().cluster().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
|
||||||
client().admin().cluster().prepareCleanupRepository(name).get();
|
client().admin().cluster().prepareCleanupRepository(name).get();
|
||||||
}
|
}
|
||||||
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
|
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
|
||||||
@ -249,4 +266,42 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||||||
.setType(type)
|
.setType(type)
|
||||||
.setSettings(Settings.builder().put("location", location)));
|
.setSettings(Settings.builder().put("location", location)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Randomly write an empty snapshot of an older version to an empty repository to simulate an older repository metadata format.
|
||||||
|
*/
|
||||||
|
protected void maybeInitWithOldSnapshotVersion(String repoName, Path repoPath) throws IOException {
|
||||||
|
if (randomBoolean() && randomBoolean()) {
|
||||||
|
initWithSnapshotVersion(repoName, repoPath, VersionUtils.randomIndexCompatibleVersion(random()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Workaround to simulate BwC situation: taking a snapshot without indices here so that we don't create any new version shard
|
||||||
|
* generations (the existence of which would short-circuit checks for the repo containing old version snapshots)
|
||||||
|
*/
|
||||||
|
protected String initWithSnapshotVersion(String repoName, Path repoPath, Version version) throws IOException {
|
||||||
|
assertThat("This hack only works on an empty repository", getRepositoryData(repoName).getSnapshotIds(), empty());
|
||||||
|
final String oldVersionSnapshot = OLD_VERSION_SNAPSHOT_PREFIX + version.id;
|
||||||
|
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
|
||||||
|
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices("does-not-exist-for-sure-*")
|
||||||
|
.setWaitForCompletion(true).get();
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
|
||||||
|
|
||||||
|
logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version);
|
||||||
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
|
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
||||||
|
final boolean writeShardGens = version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
|
||||||
|
repositoryData.snapshotsToXContent(jsonBuilder, writeShardGens);
|
||||||
|
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
||||||
|
NamedXContentRegistry.EMPTY,
|
||||||
|
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
||||||
|
Strings.toString(jsonBuilder).replace(Version.CURRENT.toString(), version.toString())),
|
||||||
|
repositoryData.getGenId(), randomBoolean());
|
||||||
|
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
||||||
|
BytesReference.toBytes(BytesReference.bytes(
|
||||||
|
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens))),
|
||||||
|
StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
|
return oldVersionSnapshot;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user