Cleanup Duplication in Snapshot ITs (#58818) (#58915)

Just a few obvious static cleanups of duplication to push back against the ever increasing complexity of these tests.
This commit is contained in:
Armin Braun 2020-07-02 16:00:01 +02:00 committed by GitHub
parent 9569375ae7
commit 62152852dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 230 additions and 480 deletions

View File

@ -166,12 +166,10 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
createRandomIndex(idxName);
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
final String masterNode1 = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(allMasterEligibleNodes);
@ -256,21 +254,18 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
index(idxName, "type", JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject());
final String repoName = "test-repo";
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType("mock")
.setSettings(Settings.builder().put("location", randomRepoPath())));
createRepository(repoName, "mock", randomRepoPath());
final String masterNode = internalCluster().getMasterName();
AbstractSnapshotIntegTestCase.blockAllDataNodes(repoName);
blockAllDataNodes(repoName);
final String snapshot = "test-snap";
logger.info("--> starting snapshot");
ActionFuture<CreateSnapshotResponse> future = client(masterNode).admin().cluster()
.prepareCreateSnapshot(repoName, snapshot).setWaitForCompletion(true).execute();
AbstractSnapshotIntegTestCase.waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));
waitForBlockOnAnyDataNode(repoName, TimeValue.timeValueSeconds(10L));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode), Collections.singleton(dataNode)),
@ -284,7 +279,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
logger.info("--> stopping disrupting");
networkDisruption.stopDisrupting();
AbstractSnapshotIntegTestCase.unblockAllDataNodes(repoName);
unblockAllDataNodes(repoName);
ensureStableCluster(2, masterNode);
logger.info("--> done");
@ -295,11 +290,11 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
index(idxName, "type", JsonXContent.contentBuilder().startObject().field("foo", "bar").endObject());
logger.info("--> run a snapshot that fails to finalize but succeeds on the data node");
AbstractSnapshotIntegTestCase.blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture =
client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).execute();
AbstractSnapshotIntegTestCase.waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L));
AbstractSnapshotIntegTestCase.unblockNode(repoName, masterNode);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(10L));
unblockNode(repoName, masterNode);
assertFutureThrows(snapshotFuture, SnapshotException.class);
logger.info("--> create a snapshot expected to be successful");

View File

@ -34,7 +34,6 @@ import java.io.ByteArrayInputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
import static org.hamcrest.Matchers.is;
@ -82,12 +81,9 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
internalCluster().startMasterOnlyNodes(2);
internalCluster().startDataOnlyNodes(1);
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repoName, "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> snapshot");
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
@ -116,11 +112,8 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
internalCluster().startNodes(Settings.EMPTY);
final String repoName = "test-repo";
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repoName, "fs", Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> create three snapshots");
for (int i = 0; i < 3; ++i) {

View File

@ -65,14 +65,11 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repoName, "fs", Settings.builder()
.put("location", repo).put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
@ -173,12 +170,9 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repoName, "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
@ -248,14 +242,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
public void testHandlingMissingRootLevelSnapshotMetadata() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repoName, "fs", Settings.builder()
.put("location", repo)
.put("compress", false)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
final String snapshotPrefix = "test-snap-";
final int snapshots = randomIntBetween(1, 2);
@ -315,13 +307,11 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("compress", false)));
createRepository(repoName, "fs", Settings.builder()
.put("location", repo)
// Don't cache repository data because the test manually modifies the repository data
.put(BlobStoreRepository.CACHE_REPOSITORY_DATA.getKey(), false)
.put("compress", false));
final String snapshot = "test-snap";
@ -339,12 +329,9 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
logger.info("--> verify loading repository data throws RepositoryException");
expectThrows(RepositoryException.class, () -> getRepositoryData(repository));
logger.info("--> mount repository path in a new repository");
final String otherRepoName = "other-repo";
assertAcked(client.admin().cluster().preparePutRepository(otherRepoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)));
createRepository(otherRepoName, "fs", Settings.builder()
.put("location", repo).put("compress", false));
final Repository otherRepo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(otherRepoName);
logger.info("--> verify loading repository data from newly mounted repository throws RepositoryException");
@ -364,7 +351,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
final String indexName = "test-index";
createIndex(indexName);
assertCreateSnapshotSuccess(repoName, "snapshot-1");
createFullSnapshot(repoName, "snapshot-1");
// In the old metadata version the shard level metadata could be moved to the next generation for all sorts of reasons, this should
// not break subsequent repository operations
@ -378,7 +365,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
assertCreateSnapshotSuccess(repoName, "snapshot-2");
createFullSnapshot(repoName, "snapshot-2");
}
public void testRepairBrokenShardGenerations() throws IOException {
@ -394,7 +381,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
final String indexName = "test-index";
createIndex(indexName);
assertCreateSnapshotSuccess(repoName, "snapshot-1");
createFullSnapshot(repoName, "snapshot-1");
logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
@ -427,17 +414,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
client().admin().cluster().prepareDeleteRepository(repoName).get();
createRepository(repoName, "fs", repoPath);
assertCreateSnapshotSuccess(repoName, "snapshot-2");
}
private void assertCreateSnapshotSuccess(String repoName, String snapshotName) {
logger.info("--> create another snapshot");
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
final int successfulShards = snapshotInfo.successfulShards();
assertThat(successfulShards, greaterThan(0));
assertThat(successfulShards, equalTo(snapshotInfo.totalShards()));
createFullSnapshot(repoName, "snapshot-2");
}
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {

View File

@ -78,7 +78,6 @@ import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.snapshots.mockstore.MockRepository;
@ -227,12 +226,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertSettingValue.accept("new value");
createRepository("test-repo", "fs", randomRepoPath());
logger.info("--> start snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
createFullSnapshot("test-repo", "test-snap");
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet()
.getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
@ -276,13 +270,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
});
createRepository("test-repo", "fs", tempDir);
logger.info("--> start snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
createFullSnapshot("test-repo", "test-snap");
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet()
.getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
@ -395,16 +383,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
refresh();
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repoPath)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createRepository("test-repo", "mock",
Settings.builder().put("location", repoPath).put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200));
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
// Pick one node and block it
@ -448,16 +430,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
refresh();
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> creating repository");
Path repo = randomRepoPath();
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repo)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createRepository("test-repo", "mock",
Settings.builder().put("location", repo).put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200));
// Pick one node and block it
String blockedNode = blockNodeWithIndex("test-repo", "test-idx");
@ -484,8 +460,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> stopping node [{}]", blockedNode);
stopNode(blockedNode);
try {
AcknowledgedResponse deleteSnapshotResponse = deleteSnapshotResponseFuture.actionGet();
assertThat(deleteSnapshotResponse.isAcknowledged(), equalTo(true));
assertAcked(deleteSnapshotResponseFuture.actionGet());
} catch (SnapshotMissingException ex) {
// When master node is closed during this test, it sometime manages to delete the snapshot files before
// completely stopping. In this case the retried delete snapshot operation on the new master can fail
@ -755,13 +730,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
Settings nodeSettings = Settings.EMPTY;
logger.info("--> start two nodes");
internalCluster().startNodes(2, nodeSettings);
// Register mock repositories
client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put(MockRepository.Plugin.USERNAME_SETTING.getKey(), "notsecretusername")
.put(MockRepository.Plugin.PASSWORD_SETTING.getKey(), "verysecretpassword")
).get();
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath())
.put(MockRepository.Plugin.USERNAME_SETTING.getKey(), "notsecretusername")
.put(MockRepository.Plugin.PASSWORD_SETTING.getKey(), "verysecretpassword"));
NodeClient nodeClient = internalCluster().getInstance(NodeClient.class);
RestGetRepositoriesAction getRepoAction = new RestGetRepositoriesAction(internalCluster().getInstance(SettingsFilter.class));
@ -813,15 +785,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
internalCluster().startMasterOnlyNodes(2);
internalCluster().startDataOnlyNodes(2);
final Client client = client();
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repoPath)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repoPath).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
@ -870,15 +837,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNodes(2);
final Client client = client();
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repoPath)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "mock", Settings.builder()
.put("location", repoPath).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
@ -1077,12 +1039,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final String expression1 = nameExpressionResolver.resolveDateMathExpression(snapshotName);
logger.info("--> creating date math snapshot");
CreateSnapshotResponse snapshotResponse =
admin.cluster().prepareCreateSnapshot(repo, snapshotName)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
assertThat(snapshotResponse.status(), equalTo(RestStatus.OK));
createFullSnapshot(repo, snapshotName);
// snapshot could be taken before or after a day rollover
final String expression2 = nameExpressionResolver.resolveDateMathExpression(snapshotName);
@ -1111,12 +1068,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath);
logger.info("--> create a snapshot");
client.admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
createFullSnapshot(repositoryName, snapshot0);
SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot0)
@ -1149,15 +1101,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
// create another snapshot
// total size has to grow and has to be equal to files on fs
assertThat(client.admin().cluster()
.prepareCreateSnapshot(repositoryName, snapshot1)
.setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));
createFullSnapshot(repositoryName, snapshot1);
// drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot
assertTrue(client.admin().cluster()
.prepareDeleteSnapshot(repositoryName, snapshot0)
.get().isAcknowledged());
assertAcked(client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0).get());
response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1)
@ -1244,13 +1191,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> starting a master node and two data nodes");
internalCluster().startMasterOnlyNode();
final List<String> dataNodes = internalCluster().startDataOnlyNodes(2);
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", repoPath)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "mock", Settings.builder()
.put("location", repoPath).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
maybeInitWithOldSnapshotVersion("test-repo", repoPath);
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
@ -1294,11 +1238,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
public void testRetentionLeasesClearedOnRestore() throws Exception {
final String repoName = "test-repo-retention-leases";
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs")
.setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())));
createRepository(repoName, "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean()));
final String indexName = "index-retention-leases";
final int shardCount = randomIntBetween(1, 5);

View File

@ -47,7 +47,6 @@ import org.elasticsearch.action.ingest.GetPipelineResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
@ -109,7 +108,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -170,7 +168,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.build();
}
private Settings randomRepoSettings() {
private Settings.Builder randomRepoSettings() {
Settings.Builder repoSettings = Settings.builder();
repoSettings.put("location", randomRepoPath());
if (randomBoolean()) {
@ -185,14 +183,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
repoSettings.put("chunk_size", (String) null);
}
}
return repoSettings.build();
return repoSettings;
}
public void testBasicWorkFlow() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(randomRepoSettings()));
createRepository("test-repo", "fs", randomRepoSettings());
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen();
@ -372,8 +369,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testFreshIndexUUID() {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(randomRepoSettings()));
createRepository("test-repo", "fs", randomRepoSettings());
createIndex("test");
String originalIndexUUID = client().admin().indices().prepareGetSettings("test").get()
@ -425,8 +421,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testRestoreWithDifferentMappingsAndSettings() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(randomRepoSettings()));
createRepository("test-repo", "fs", randomRepoSettings());
logger.info("--> create index with foo type");
assertAcked(prepareCreate("test-idx", 2, Settings.builder()
@ -438,7 +433,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
ensureGreen();
logger.info("--> snapshot it");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
@ -456,7 +451,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client.admin().indices().prepareClose("test-idx").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
@ -474,10 +469,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testEmptySnapshot() throws Exception {
Client client = client();
logger.info("--> creating repository");
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createRepository("test-repo", "fs", Settings.builder().put("location", randomRepoPath()));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
@ -843,13 +835,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10))
.put("random_data_file_io_exception_rate", 0.3)));
createRepository("test-repo", "mock",
Settings.builder().put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10)).put("random_data_file_io_exception_rate", 0.3));
createIndex("test-idx");
ensureGreen();
@ -935,13 +923,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
logger.info("--> update repository with mock version");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("random_data_file_io_exception_rate", 0.3)));
createRepository("test-repo", "mock",
Settings.builder().put("location", repositoryLocation).put("random", randomAlphaOfLength(10))
.put("random_data_file_io_exception_rate", 0.3));
// Test restore after index deletion
logger.info("--> delete index");
@ -997,15 +981,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
logger.info("--> update repository with mock version");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("use_lucene_corruption", true)
.put("max_failure_number", 10000000L)
.put("random_data_file_io_exception_rate", 1.0)));
createRepository("test-repo", "mock",
Settings.builder().put("location", repositoryLocation).put("random", randomAlphaOfLength(10))
.put("use_lucene_corruption", true).put("max_failure_number", 10000000L)
.put("random_data_file_io_exception_rate", 1.0));
// Test restore after index deletion
logger.info("--> delete index");
@ -1194,13 +1173,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
equalTo(createSnapshotResponse.getSnapshotInfo().successfulShards()));
logger.info("--> update repository with mock version");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("random_data_file_io_exception_rate", 1.0) // Fail completely
));
createRepository("test-repo", "mock", Settings.builder()
.put("location", repositoryLocation).put("random", randomAlphaOfLength(10))
.put("random_data_file_io_exception_rate", 1.0) // Fail completely
);
// Test restore after index deletion
logger.info("--> delete index");
@ -1257,12 +1233,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx");
ensureGreen();
@ -1322,12 +1295,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
final String[] indices = {"test-idx-1", "test-idx-2"};
createIndex(indices);
@ -1372,12 +1342,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
@ -1408,12 +1375,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
@ -1452,11 +1416,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testDeleteSnapshotWithCorruptedGlobalState() throws Exception {
final Path repo = randomRepoPath();
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs")
.setSettings(Settings.builder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
indexRandom(true,
@ -1465,13 +1427,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"));
flushAndRefresh("test-idx-1", "test-idx-2");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap");
CreateSnapshotResponse createSnapshotResponse;
final Path globalStatePath = repo.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
if (randomBoolean()) {
@ -1500,22 +1457,14 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertRequestBuilderThrows(client().admin().cluster().prepareSnapshotStatus("test-repo").addSnapshots("test-snap"),
SnapshotMissingException.class);
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
createFullSnapshot("test-repo", "test-snap");
}
public void testSnapshotWithMissingShardLevelIndexFile() throws Exception {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(
Settings.builder().put("location", repo).put("compress", false)));
createRepository("test-repo", "fs", Settings.builder().put("location", repo).put("compress", false));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
@ -1698,13 +1647,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testMoveShardWhileSnapshotting() throws Exception {
Client client = client();
Path repositoryLocation = randomRepoPath();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)));
createRepository("test-repo", "mock", Settings.builder().put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10)).put("wait_after_unblock", 200));
// Create index on 2 nodes and make sure each node has a primary by setting no replicas
assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_replicas", 0)));
@ -1761,15 +1705,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();
Path repositoryLocation = randomRepoPath();
logger.info("--> creating repository");
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createRepository("test-repo", "mock",
Settings.builder().put("location", repositoryLocation).put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200));
// Create index on 2 nodes and make sure each node has a primary by setting no replicas
assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_replicas", 0)));
@ -1844,13 +1782,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testReadonlyRepository() throws Exception {
Client client = client();
logger.info("--> creating repository");
Path repositoryLocation = randomRepoPath();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repositoryLocation).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx");
ensureGreen();
@ -1874,13 +1809,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
logger.info("--> create read-only URL repository");
assertAcked(client.admin().cluster().preparePutRepository("readonly-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("readonly", true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("readonly-repo", "fs", Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("readonly", true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("readonly-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").execute().actionGet();
@ -1907,19 +1840,16 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testThrottling() throws Exception {
Client client = client();
logger.info("--> creating repository");
Path repositoryLocation = randomRepoPath();
boolean throttleSnapshot = randomBoolean();
boolean throttleRestore = randomBoolean();
boolean throttleRestoreViaRecoverySettings = throttleRestore && randomBoolean();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
.put("max_restore_bytes_per_sec",
throttleRestore && (throttleRestoreViaRecoverySettings == false) ? "10k" : "0")
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0")));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
.put("max_restore_bytes_per_sec",
throttleRestore && (throttleRestoreViaRecoverySettings == false) ? "10k" : "0")
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0"));
createIndex("test-idx");
ensureGreen();
@ -1976,13 +1906,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testDynamicRestoreThrottling() throws Exception {
Client client = client();
logger.info("--> creating repository");
Path repositoryLocation = randomRepoPath();
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repositoryLocation)
.put("compress", randomBoolean())
.put("chunk_size", 100, ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", 100, ByteSizeUnit.BYTES));
createIndex("test-idx");
ensureGreen();
@ -2030,16 +1956,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testSnapshotStatus() throws Exception {
Client client = client();
Path repositoryLocation = randomRepoPath();
logger.info("--> creating repository");
AcknowledgedResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", repositoryLocation)
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
createRepository("test-repo", "mock",
Settings.builder().put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10)).put("wait_after_unblock", 200));
// Create index on 2 nodes and make sure each node has a primary by setting no replicas
assertAcked(prepareCreate("test-idx", 2, Settings.builder().put("number_of_replicas", 0)));
@ -2151,12 +2070,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testSnapshotRelocatingPrimary() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
// Create index on two nodes and make sure each node has a primary by setting no replicas
assertAcked(prepareCreate("test-idx", 2, Settings.builder()
@ -2231,15 +2147,12 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> done");
}
public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException {
public void testSnapshotMoreThanOnce() throws InterruptedException {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
// only one shard
final Settings indexSettings = Settings.builder()
@ -2317,12 +2230,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testChangeSettingsOnRestore() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> create test index with case-preserving search analyzer");
@ -2430,12 +2341,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testRecreateBlocksOnRestore() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
Settings.Builder indexSettings = Settings.builder()
.put(indexSettings())
@ -2520,14 +2428,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testDeleteDataStreamDuringSnapshot() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true)));
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true));
String dataStream = "test-ds";
@ -2577,14 +2481,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true)));
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
@ -2640,13 +2540,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testCloseIndexDuringRestore() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
));
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
ensureGreen();
@ -2773,12 +2669,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
expectThrows(InvalidSnapshotNameException.class,
() -> client.admin().cluster().prepareCreateSnapshot("test-repo", "_foo").get());
@ -2795,11 +2688,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", repo).put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
logger.info("--> indexing some data");
@ -2845,12 +2735,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
/** Tests that a snapshot with a corrupted global state file can still be restored */
public void testRestoreSnapshotWithCorruptedGlobalState() throws Exception {
final Path repo = randomRepoPath();
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs")
.setSettings(Settings.builder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
final String repoName = "test-repo";
createRepository(repoName, "fs", Settings.builder()
.put("location", repo)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
createIndex("test-idx-1", "test-idx-2");
indexRandom(true,
@ -2859,41 +2747,36 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
client().prepareIndex("test-idx-2", "_doc").setSource("foo", "bar"));
flushAndRefresh("test-idx-1", "test-idx-2");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
final String snapshotName = "test-snap";
final SnapshotInfo snapshotInfo = createFullSnapshot(repoName, snapshotName);
final Path globalStatePath = repo.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
try(SeekableByteChannel outChan = Files.newByteChannel(globalStatePath, StandardOpenOption.WRITE)) {
outChan.truncate(randomInt(10));
}
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap"));
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo(snapshotName));
SnapshotsStatusResponse snapshotStatusResponse =
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get();
client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotName).get();
assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1));
assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo("test-snap"));
assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
assertAcked(client().admin().indices().prepareDelete("test-idx-1", "test-idx-2"));
SnapshotException ex = expectThrows(SnapshotException.class, () -> client().admin().cluster()
.prepareRestoreSnapshot("test-repo", "test-snap")
.prepareRestoreSnapshot(repoName, snapshotName)
.setRestoreGlobalState(true)
.setWaitForCompletion(true)
.get());
assertThat(ex.getRepositoryName(), equalTo("test-repo"));
assertThat(ex.getSnapshotName(), equalTo("test-snap"));
assertThat(ex.getRepositoryName(), equalTo(repoName));
assertThat(ex.getSnapshotName(), equalTo(snapshotName));
assertThat(ex.getMessage(), containsString("failed to read global metadata"));
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
@ -2932,13 +2815,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createRepository("test-repo", "fs", repo);
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.failedShards(), equalTo(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.indices(), hasSize(nbIndices));
final RepositoryData repositoryData = getRepositoryData("test-repo");
@ -3004,13 +2881,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createRepository("test-repo", "fs", repo);
final String snapshot1 = "test-snap-1";
logger.info("--> creating snapshot [{}]", snapshot1);
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot("test-repo", snapshot1)
.setWaitForCompletion(true)
.get()
.getSnapshotInfo();
assertThat(snapshotInfo.failedShards(), equalTo(0));
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", snapshot1);
assertThat(snapshotInfo.indices(), hasSize(1));
final RepositoryData repositoryData = getRepositoryData("test-repo");
@ -3070,12 +2941,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final Client client = client();
final Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repositoryName, "fs", Settings.builder()
.put("location", repo).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> creating an index and indexing documents");
createIndex(indexName);
ensureGreen();
@ -3132,15 +3000,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final String repositoryName = "test-repo";
final String indexName = "test-idx";
final Client client = client();
final Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repositoryName)
.setType("mock").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("wait_after_unblock", 200)));
createRepository(repositoryName, "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES).put("wait_after_unblock", 200));
logger.info("--> get snapshots on an empty repository");
expectThrows(SnapshotMissingException.class, () -> client.admin()
@ -3297,14 +3160,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
refresh();
logger.info("--> creating repository");
AcknowledgedResponse putRepositoryResponse =
client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertTrue(putRepositoryResponse.isAcknowledged());
createRepository(repo, "mock", Settings.builder()
.put("location", randomRepoPath()).put("random", randomAlphaOfLength(10)).put("wait_after_unblock", 200));
String blockedNode = blockNodeWithIndex(repo, index);
@ -3550,14 +3407,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final String snapshotName = "test-snap";
final String indexName = "test-idx";
final Client client = client();
final Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repositoryName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository(repositoryName, "fs", Settings.builder()
.put("location", randomRepoPath())
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
logger.info("--> creating an index and indexing documents");
final String dataNode = internalCluster().getDataNodeInstance(ClusterService.class).localNode().getName();
final Settings settings =
@ -3754,7 +3608,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final Map<Integer, Long> primaryTerms = IntStream.range(0, numPrimaries)
.boxed().collect(Collectors.toMap(shardId -> shardId, indexMetadata::primaryTerm));
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("fs").setSettings(randomRepoSettings()));
createRepository("test-repo", "fs", randomRepoSettings());
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices(indexName).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(numPrimaries));
@ -3794,9 +3648,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createRepository(repoName, "fs", absolutePath);
logger.info("--> snapshot with [{}] shards", initialShardCount);
final SnapshotInfo snapshot1 =
client().admin().cluster().prepareCreateSnapshot(repoName, "snap-1").setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshot1.state(), is(SnapshotState.SUCCESS));
final SnapshotInfo snapshot1 = createFullSnapshot(repoName, "snap-1");
assertThat(snapshot1.successfulShards(), is(initialShardCount));
logger.info("--> delete index");
@ -3813,9 +3665,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
logger.info("--> snapshot with [{}] shards", newShardCount);
final SnapshotInfo snapshot2 =
client().admin().cluster().prepareCreateSnapshot(repoName, "snap-2").setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshot2.state(), is(SnapshotState.SUCCESS));
final SnapshotInfo snapshot2 = createFullSnapshot(repoName, "snap-2");
assertThat(snapshot2.successfulShards(), is(newShardCount));
logger.info("--> restoring snapshot 1");
@ -3854,13 +3704,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testBulkDeleteWithOverlappingPatterns() {
final int numberOfSnapshots = between(5, 15);
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "fs", Settings.builder()
.put("location", randomRepoPath()).put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
final String[] indices = {"test-idx-1", "test-idx-2", "test-idx-3"};
createIndex(indices);
@ -3893,8 +3739,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final String dottedHiddenIndex = ".index-hidden";
final String repoName = "test-repo";
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName).setType("fs").setSettings(randomRepoSettings()));
createRepository(repoName, "fs", randomRepoSettings());
logger.info("--> creating indices");
createIndex(normalIndex, Settings.builder()

View File

@ -53,12 +53,9 @@ public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
String masterNode = internalCluster().startMasterOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES));
final int shards = between(1, 10);
assertAcked(prepareCreate("test-index", 0, Settings.builder().put("number_of_shards", shards).put("number_of_replicas", 0)));

View File

@ -40,7 +40,6 @@ import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
@ -64,12 +63,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
}
refresh();
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
createFullSnapshot("test-repo", "test-snap");
List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
@ -88,9 +82,7 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
public void testStatusAPICallInProgressSnapshot() throws Exception {
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setSettings(
Settings.builder().put("location", randomRepoPath()).put("block_on_data", true)));
createRepository("test-repo", "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true));
createIndex("test-idx-1");
ensureGreen();
@ -131,12 +123,9 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
final Path repoPath = randomRepoPath();
createRepository("test-repo", "fs", repoPath);
logger.info("--> snapshot");
final CreateSnapshotResponse response =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap");
logger.info("--> delete snap-${uuid}.dat file for this snapshot to simulate concurrent delete");
IOUtils.rm(repoPath.resolve(BlobStoreRepository.SNAPSHOT_PREFIX + response.getSnapshotInfo().snapshotId().getUUID() + ".dat"));
IOUtils.rm(repoPath.resolve(BlobStoreRepository.SNAPSHOT_PREFIX + snapshotInfo.snapshotId().getUUID() + ".dat"));
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster()
.getSnapshots(new GetSnapshotsRequest("test-repo", new String[] {"test-snap"})).actionGet());
@ -157,14 +146,12 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
}
refresh();
logger.info("--> snapshot");
final CreateSnapshotResponse response =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
final SnapshotInfo snapshotInfo = createFullSnapshot("test-repo", "test-snap");
logger.info("--> delete shard-level snap-${uuid}.dat file for one shard in this snapshot to simulate concurrent delete");
final String indexRepoId = getRepositoryData("test-repo").resolveIndexId(response.getSnapshotInfo().indices().get(0)).getId();
final String indexRepoId = getRepositoryData("test-repo").resolveIndexId(snapshotInfo.indices().get(0)).getId();
IOUtils.rm(repoPath.resolve("indices").resolve(indexRepoId).resolve("0").resolve(
BlobStoreRepository.SNAPSHOT_PREFIX + response.getSnapshotInfo().snapshotId().getUUID() + ".dat"));
BlobStoreRepository.SNAPSHOT_PREFIX + snapshotInfo.snapshotId().getUUID() + ".dat"));
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster()
.prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet());

View File

@ -270,11 +270,14 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repository)).unblock();
}
protected void createRepository(String repoName, String type, Path location) {
logger.info("--> creating repository");
protected void createRepository(String repoName, String type, Settings.Builder settings) {
logger.info("--> creating repository [{}] [{}]", repoName, type);
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType(type)
.setSettings(Settings.builder().put("location", location)));
.setType(type).setSettings(settings));
}
protected void createRepository(String repoName, String type, Path location) {
createRepository(repoName, type, Settings.builder().put("location", location));
}
/**
@ -314,4 +317,16 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
StandardOpenOption.TRUNCATE_EXISTING);
return oldVersionSnapshot;
}
protected SnapshotInfo createFullSnapshot(String repoName, String snapshotName) {
logger.info("--> creating full snapshot [{}] in [{}]", snapshotName, repoName);
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
assertThat(snapshotInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
return snapshotInfo;
}
}