Dry up Snapshot Integ Tests some More (#62856) (#63248)

* Just some obvious drying up of these super complex tests.
* Mainly just shortening the diff of #61839 here by moving test utilities
to the abstract test case.
Also, making use of the now available functionality to simplify existing tests
and improve logging in them.
This commit is contained in:
Armin Braun 2020-10-05 18:33:59 +02:00 committed by GitHub
parent a522e932e8
commit de6eeecbd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 309 additions and 476 deletions

View File

@ -31,7 +31,6 @@ import org.elasticsearch.test.ESIntegTestCase;
import java.io.ByteArrayInputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
import static org.hamcrest.Matchers.is;
@ -49,11 +48,8 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
ensureStableCluster(nodeCount - 1);
logger.info("--> wait for cleanup to finish and disappear from cluster state");
assertBusy(() -> {
RepositoryCleanupInProgress cleanupInProgress =
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
assertFalse(cleanupInProgress.hasCleanupInProgress());
}, 30, TimeUnit.SECONDS);
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false);
}
public void testRepeatCleanupsDontRemove() throws Exception {
@ -71,11 +67,8 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
unblockNode("test-repo", masterNode);
logger.info("--> wait for cleanup to finish and disappear from cluster state");
assertBusy(() -> {
RepositoryCleanupInProgress cleanupInProgress =
client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE);
assertFalse(cleanupInProgress.hasCleanupInProgress());
}, 30, TimeUnit.SECONDS);
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress() == false);
}
private String startBlockedCleanup(String repoName) throws Exception {

View File

@ -242,7 +242,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
logger.info("--> waiting for concurrent snapshot(s) to finish");
createNSnapshots(otherRepoName, randomIntBetween(1, 5));
assertAcked(startDelete(otherRepoName, "*").get());
assertAcked(startDeleteSnapshot(otherRepoName, "*").get());
unblockNode(blockedRepoName, dataNode);
assertSuccessful(createSlowFuture);
@ -259,11 +259,11 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
createFullSnapshot( blockedRepoName, "blocked-snapshot");
blockNodeOnAnyFiles(blockedRepoName, masterNode);
final ActionFuture<AcknowledgedResponse> slowDeleteFuture = startDelete(blockedRepoName, "*");
final ActionFuture<AcknowledgedResponse> slowDeleteFuture = startDeleteSnapshot(blockedRepoName, "*");
logger.info("--> waiting for concurrent snapshot(s) to finish");
createNSnapshots(otherRepoName, randomIntBetween(1, 5));
assertAcked(startDelete(otherRepoName, "*").get());
assertAcked(startDeleteSnapshot(otherRepoName, "*").get());
unblockNode(blockedRepoName, masterNode);
assertAcked(slowDeleteFuture.actionGet());
@ -282,7 +282,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
createFullSnapshot(repoName, firstSnapshot);
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDelete(repoName, firstSnapshot);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, firstSnapshot);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "second-snapshot");
@ -320,7 +320,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
});
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDelete(repoName, firstSnapshot);
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
awaitNDeletionsInProgress(1);
logger.info("--> start third snapshot");
@ -369,7 +369,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
return snapshotsInProgress.entries().size() == 2 && snapshotHasCompletedShard(secondSnapshot, snapshotsInProgress);
});
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDelete(repoName, firstSnapshot);
final ActionFuture<AcknowledgedResponse> deleteSnapshotsResponse = startDeleteSnapshot(repoName, firstSnapshot);
awaitNDeletionsInProgress(1);
final ActionFuture<CreateSnapshotResponse> thirdSnapshotResponse = startFullSnapshot(repoName, "snapshot-three");
@ -380,7 +380,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
logger.info("--> waiting for all three snapshots to show up as in-progress");
assertBusy(() -> assertThat(currentSnapshots(repoName), hasSize(3)), 30L, TimeUnit.SECONDS);
final ActionFuture<AcknowledgedResponse> allDeletedResponse = startDelete(repoName, "*");
final ActionFuture<AcknowledgedResponse> allDeletedResponse = startDeleteSnapshot(repoName, "*");
logger.info("--> waiting for second and third snapshot to finish");
assertBusy(() -> {
@ -532,13 +532,13 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
createNSnapshots(repoName, randomIntBetween(2, 5));
blockMasterFromFinalizingSnapshotOnIndexFile(repoName);
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDelete(repoName, "*");
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startDeleteSnapshot(repoName, "*");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-queued");
awaitNSnapshotsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
final ActionFuture<AcknowledgedResponse> secondDeleteFuture = startDelete(repoName, "*");
final ActionFuture<AcknowledgedResponse> secondDeleteFuture = startDeleteSnapshot(repoName, "*");
awaitNDeletionsInProgress(2);
unblockNode(repoName, masterNode);
@ -562,9 +562,9 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<AcknowledgedResponse> firstDeleteFuture = startAndBlockOnDeleteSnapshot(repoName, "*");
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, "snapshot-queued");
awaitNSnapshotsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
final ActionFuture<AcknowledgedResponse> secondDeleteFuture = startDelete(repoName, "*");
final ActionFuture<AcknowledgedResponse> secondDeleteFuture = startDeleteSnapshot(repoName, "*");
awaitNDeletionsInProgress(2);
unblockNode(repoName, masterNode);
@ -590,7 +590,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(false).get();
startDelete(repoName, "*");
startDeleteSnapshot(repoName, "*");
awaitNDeletionsInProgress(2);
internalCluster().stopCurrentMasterNode();
@ -618,7 +618,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<CreateSnapshotResponse> createThirdSnapshot = client(masterNode).admin().cluster()
.prepareCreateSnapshot(repoName, "snapshot-three").setWaitForCompletion(true).execute();
awaitNSnapshotsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
final ActionFuture<AcknowledgedResponse> secondDeleteFuture =
client(masterNode).admin().cluster().prepareDeleteSnapshot(repoName, "*").execute();
@ -655,7 +655,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<CreateSnapshotResponse> secondFailedSnapshotFuture =
startFullSnapshotFromMasterClient(repoName, "failing-snapshot-2");
awaitNSnapshotsInProgress(2);
awaitNumberOfSnapshotsInProgress(2);
final ActionFuture<AcknowledgedResponse> failedDeleteFuture =
client(masterNode).admin().cluster().prepareDeleteSnapshot(repoName, "*").execute();
@ -751,7 +751,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
corruptIndexN(repoPath, generation);
final ActionFuture<CreateSnapshotResponse> snapshotFour = startFullSnapshotFromNonMasterClient(repoName, "snapshot-four");
awaitNSnapshotsInProgress(2);
awaitNumberOfSnapshotsInProgress(2);
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.DISCONNECT);
internalCluster().setDisruptionScheme(networkDisruption);
@ -786,18 +786,18 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
awaitNDeletionsInProgress(1);
final ActionFuture<CreateSnapshotResponse> createBlockedSnapshot =
startFullSnapshotFromNonMasterClient(blockedRepoName, "queued-snapshot");
awaitNSnapshotsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
final long generation = getRepositoryData(repoName).getGenId();
blockNodeOnAnyFiles(repoName, masterNode);
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshotFromNonMasterClient(repoName, "snapshot-three");
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
awaitNSnapshotsInProgress(2);
awaitNumberOfSnapshotsInProgress(2);
corruptIndexN(repoPath, generation);
final ActionFuture<CreateSnapshotResponse> snapshotFour = startFullSnapshotFromNonMasterClient(repoName, "snapshot-four");
awaitNSnapshotsInProgress(3);
awaitNumberOfSnapshotsInProgress(3);
internalCluster().stopCurrentMasterNode();
ensureStableCluster(3);
@ -844,7 +844,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final ActionFuture<AcknowledgedResponse> deleteFuture = startAndBlockOnDeleteSnapshot(repoName, "*");
final ActionFuture<CreateSnapshotResponse> snapshotThree = startFullSnapshot(repoName, "snapshot-three", true);
final ActionFuture<CreateSnapshotResponse> snapshotFour = startFullSnapshot(repoName, "snapshot-four", true);
awaitNSnapshotsInProgress(2);
awaitNumberOfSnapshotsInProgress(2);
assertAcked(client().admin().indices().prepareDelete("index-two"));
unblockNode(repoName, masterNode);
@ -910,7 +910,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String snapshotTwo = snapshots.get(1);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startAndBlockOnDeleteSnapshot(repoName, snapshotOne);
final ActionFuture<AcknowledgedResponse> deleteSnapshotTwo = startDelete(repoName, snapshotTwo);
final ActionFuture<AcknowledgedResponse> deleteSnapshotTwo = startDeleteSnapshot(repoName, snapshotTwo);
awaitNDeletionsInProgress(2);
unblockNode(repoName, masterName);
@ -937,7 +937,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String masterName = internalCluster().getMasterName();
final String snapshotOne = snapshotNames.get(0);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startDelete(repoName, snapshotOne);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne);
awaitNDeletionsInProgress(1);
unblockNode(repoName, masterName);
@ -957,7 +957,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
blockMasterFromDeletingIndexNFile(repoName);
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, snapshotName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<AcknowledgedResponse> deleteFuture = startDelete(repoName, snapshotName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, snapshotName);
awaitNDeletionsInProgress(1);
unblockNode(repoName, masterName);
assertSuccessful(snapshotFuture);
@ -1006,7 +1006,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
final String snapshotOne = snapshotNames.get(0);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startDelete(repoName, snapshotOne);
final ActionFuture<AcknowledgedResponse> deleteSnapshotOne = startDeleteSnapshot(repoName, snapshotOne);
awaitNDeletionsInProgress(1);
networkDisruption.startDisrupting();
ensureStableCluster(3, dataNode);
@ -1041,14 +1041,14 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
startFullSnapshotBlockedOnDataNode("blocked-snapshot-2", blockedRepoName, dataNode);
final ActionFuture<CreateSnapshotResponse> createSlowFuture3 =
startFullSnapshotBlockedOnDataNode("other-blocked-snapshot", otherBlockedRepoName, dataNode);
awaitNSnapshotsInProgress(3);
awaitNumberOfSnapshotsInProgress(3);
assertSnapshotStatusCountOnRepo("_all", 3);
assertSnapshotStatusCountOnRepo(blockedRepoName, 2);
assertSnapshotStatusCountOnRepo(otherBlockedRepoName, 1);
unblockNode(blockedRepoName, dataNode);
awaitNSnapshotsInProgress(1);
awaitNumberOfSnapshotsInProgress(1);
assertSnapshotStatusCountOnRepo("_all", 1);
assertSnapshotStatusCountOnRepo(blockedRepoName, 0);
assertSnapshotStatusCountOnRepo(otherBlockedRepoName, 1);
@ -1076,7 +1076,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
startFullSnapshotBlockedOnDataNode("blocked-snapshot-2", blockedRepoName, dataNode);
final ActionFuture<CreateSnapshotResponse> createSlowFuture3 =
startFullSnapshotBlockedOnDataNode("other-blocked-snapshot", otherBlockedRepoName, dataNode);
awaitNSnapshotsInProgress(3);
awaitNumberOfSnapshotsInProgress(3);
unblockNode(blockedRepoName, dataNode);
unblockNode(otherBlockedRepoName, dataNode);
@ -1110,7 +1110,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
client().admin().cluster().prepareCreateSnapshot(otherRepoName, "snapshot-other-blocked-1").setWaitForCompletion(false).get();
client().admin().cluster().prepareCreateSnapshot(otherRepoName, "snapshot-other-blocked-2").setWaitForCompletion(false).get();
awaitNSnapshotsInProgress(4);
awaitNumberOfSnapshotsInProgress(4);
final String initialMaster = internalCluster().getMasterName();
waitForBlock(initialMaster, repoName, TimeValue.timeValueSeconds(30L));
waitForBlock(initialMaster, otherRepoName, TimeValue.timeValueSeconds(30L));
@ -1146,10 +1146,10 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
++blockedSnapshots;
} else {
blockedDelete = true;
deleteFuture = startDelete(repoName, randomFrom(snapshotNames));
deleteFuture = startDeleteSnapshot(repoName, randomFrom(snapshotNames));
}
}
awaitNSnapshotsInProgress(blockedSnapshots);
awaitNumberOfSnapshotsInProgress(blockedSnapshots);
if (blockedDelete) {
awaitNDeletionsInProgress(1);
}
@ -1215,7 +1215,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
final String snapshotName = "snap-1";
final ActionFuture<CreateSnapshotResponse> snapshotFuture = startFullSnapshot(repoName, snapshotName);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(30L));
final ActionFuture<AcknowledgedResponse> deleteFuture = startDelete(repoName, snapshotName);
final ActionFuture<AcknowledgedResponse> deleteFuture = startDeleteSnapshot(repoName, snapshotName);
awaitNDeletionsInProgress(1);
unblockNode(repoName, masterNode);
assertAcked(deleteFuture.get());
@ -1255,11 +1255,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
return internalCluster().nonMasterClient().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
}
private ActionFuture<AcknowledgedResponse> startDelete(String repoName, String snapshotName) {
logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName);
return client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
}
private ActionFuture<CreateSnapshotResponse> startFullSnapshotFromNonMasterClient(String repoName, String snapshotName) {
logger.info("--> creating full snapshot [{}] to repo [{}] from non master client", snapshotName, repoName);
return internalCluster().nonMasterClient().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
@ -1272,40 +1267,12 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
.setWaitForCompletion(true).execute();
}
private ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName, String snapshotName) {
return startFullSnapshot(repoName, snapshotName, false);
}
private ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName, String snapshotName, boolean partial) {
logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
return client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.setPartial(partial).execute();
}
// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
.put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build();
private void createIndexWithContent(String indexName) {
createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA);
}
private void createIndexWithContent(String indexName, String nodeInclude, String nodeExclude) {
createIndexWithContent(indexName, indexSettingsNoReplicas(1)
.put("index.routing.allocation.include._name", nodeInclude)
.put("index.routing.allocation.exclude._name", nodeExclude).build());
}
private void createIndexWithContent(String indexName, Settings indexSettings) {
logger.info("--> creating index [{}]", indexName);
createIndex(indexName, indexSettings);
ensureGreen(indexName);
index(indexName, "_doc", "some_id", "foo", "bar");
}
private static boolean snapshotHasCompletedShard(String snapshot, SnapshotsInProgress snapshotsInProgress) {
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
if (entry.snapshot().getSnapshotId().getName().equals(snapshot)) {
@ -1332,12 +1299,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries().size() == count);
}
private void awaitNSnapshotsInProgress(int count) throws Exception {
logger.info("--> wait for [{}] snapshots to show up in the cluster state", count);
awaitClusterState(state ->
state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count);
}
private static List<SnapshotInfo> currentSnapshots(String repoName) {
return client().admin().cluster().prepareGetSnapshots(repoName).setSnapshots(GetSnapshotsRequest.CURRENT_SNAPSHOT)
.get().getSnapshots();
@ -1347,7 +1308,7 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
throws InterruptedException {
final String masterName = internalCluster().getMasterName();
blockNodeOnAnyFiles(repoName, masterName);
final ActionFuture<AcknowledgedResponse> fut = startDelete(repoName, snapshotName);
final ActionFuture<AcknowledgedResponse> fut = startDeleteSnapshot(repoName, snapshotName);
waitForBlock(masterName, repoName, TimeValue.timeValueSeconds(30L));
return fut;
}
@ -1359,12 +1320,4 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
waitForBlock(internalCluster().getMasterName(), blockedRepoName, TimeValue.timeValueSeconds(30L));
return fut;
}
private ActionFuture<CreateSnapshotResponse> startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName, String dataNode)
throws InterruptedException {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fut = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
return fut;
}
}

View File

@ -27,10 +27,8 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -55,6 +53,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -120,8 +119,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
startDeleteSnapshot(repoName, snapshot).get();
logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
@ -211,29 +209,10 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
.putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}
@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();
updateClusterState(currentState -> ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
.putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build());
logger.info("--> full cluster restart");
internalCluster().fullRestart();
@ -242,8 +221,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 1));
logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
startDeleteSnapshot(repoName, snapshot).get();
logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repoName).getGenId(), is(beforeMoveGen + 2));
@ -303,7 +281,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));
logger.info("--> verify that snapshot with missing root level metadata can be deleted");
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
assertAcked(startDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
@ -353,7 +331,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
}
public void testHandleSnapshotErrorWithBwCFormat() throws IOException {
public void testHandleSnapshotErrorWithBwCFormat() throws IOException, ExecutionException, InterruptedException {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
@ -377,13 +355,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
assertFileExists(initialShardMetaPath);
Files.move(initialShardMetaPath, shardPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + "1"));
logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
startDeleteSnapshot(repoName, oldVersionSnapshot).get();
createFullSnapshot(repoName, "snapshot-2");
}
public void testRepairBrokenShardGenerations() throws IOException {
public void testRepairBrokenShardGenerations() throws Exception {
final String repoName = "test-repo";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
@ -398,8 +375,7 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
createFullSnapshot(repoName, "snapshot-1");
logger.info("--> delete old version snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, oldVersionSnapshot).get();
startDeleteSnapshot(repoName, oldVersionSnapshot).get();
logger.info("--> move shard level metadata to new generation and make RepositoryData point at an older generation");
final IndexId indexId = getRepositoryData(repoName).resolveIndexId(indexName);

View File

@ -21,7 +21,6 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
@ -36,13 +35,9 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -51,7 +46,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@ -229,9 +223,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createRepository("test-repo", "fs");
createFullSnapshot("test-repo", "test-snap");
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet()
.getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> change the test persistent setting and break it");
setSettingValue.accept("new value 2");
@ -255,7 +247,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> start node");
internalCluster().startNode();
Client client = client();
createIndex("test-idx");
logger.info("--> add custom persistent metadata");
updateClusterState(currentState -> {
@ -273,9 +264,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createRepository("test-repo", "fs", tempDir);
createFullSnapshot("test-repo", "test-snap");
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute().actionGet()
.getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> change custom persistent metadata");
updateClusterState(currentState -> {
@ -299,20 +288,20 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
});
logger.info("--> delete repository");
assertAcked(client.admin().cluster().prepareDeleteRepository("test-repo"));
assertAcked(clusterAdmin().prepareDeleteRepository("test-repo"));
createRepository("test-repo-2", "fs", tempDir);
logger.info("--> restore snapshot");
client.admin().cluster().prepareRestoreSnapshot("test-repo-2", "test-snap").setRestoreGlobalState(true).setIndices("-*")
clusterAdmin().prepareRestoreSnapshot("test-repo-2", "test-snap").setRestoreGlobalState(true).setIndices("-*")
.setWaitForCompletion(true).execute().actionGet();
logger.info("--> make sure old repository wasn't restored");
assertRequestBuilderThrows(client.admin().cluster().prepareGetRepositories("test-repo"), RepositoryMissingException.class);
assertThat(client.admin().cluster().prepareGetRepositories("test-repo-2").get().repositories().size(), equalTo(1));
assertRequestBuilderThrows(clusterAdmin().prepareGetRepositories("test-repo"), RepositoryMissingException.class);
assertThat(clusterAdmin().prepareGetRepositories("test-repo-2").get().repositories().size(), equalTo(1));
logger.info("--> check that custom persistent metadata was restored");
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
ClusterState clusterState = clusterAdmin().prepareState().get().getState();
logger.info("Cluster state: {}", clusterState);
Metadata metadata = clusterState.getMetadata();
assertThat(((SnapshottableMetadata) metadata.custom(SnapshottableMetadata.TYPE)).getData(), equalTo("before_snapshot_s"));
@ -327,7 +316,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
ensureYellow();
logger.info("--> check that gateway-persistent custom metadata survived full cluster restart");
clusterState = client().admin().cluster().prepareState().get().getState();
clusterState = clusterAdmin().prepareState().get().getState();
logger.info("Cluster state: {}", clusterState);
metadata = clusterState.getMetadata();
assertThat(metadata.custom(SnapshottableMetadata.TYPE), nullValue());
@ -344,38 +333,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
equalTo("before_snapshot_s_gw_noapi"));
}
private void updateClusterState(final ClusterStateUpdater updater) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
return updater.execute(currentState);
}
@Override
public void onFailure(String source, @Nullable Exception e) {
countDownLatch.countDown();
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
countDownLatch.countDown();
}
});
countDownLatch.await();
}
private interface ClusterStateUpdater {
ClusterState execute(ClusterState currentState) throws Exception;
}
public void testSnapshotDuringNodeShutdown() throws Exception {
logger.info("--> start 2 nodes");
Client client = client();
assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(2)));
ensureGreen();
indexRandomDocs("test-idx", 100);
final Path repoPath = randomRepoPath();
createRepository("test-repo", "mock",
@ -387,7 +348,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
String blockedNode = blockNodeWithIndex("test-repo", "test-idx");
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(false)
.setIndices("test-idx")
.get();
@ -408,13 +369,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
public void testSnapshotWithStuckNode() throws Exception {
logger.info("--> start 2 nodes");
ArrayList<String> nodes = new ArrayList<>();
nodes.add(internalCluster().startNode());
nodes.add(internalCluster().startNode());
Client client = client();
List<String> nodes = internalCluster().startNodes(2);
assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(2)));
ensureGreen();
indexRandomDocs("test-idx", 100);
Path repo = randomRepoPath();
@ -429,7 +388,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertFileCount(repo, 0);
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(false)
.setIndices("test-idx")
.get();
@ -460,7 +419,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
.execute().actionGet());
logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup");
client().admin().cluster().prepareCleanupRepository("test-repo").get();
clusterAdmin().prepareCleanupRepository("test-repo").get();
// Expect two files to remain in the repository:
// (1) index-(N+1)
@ -479,11 +438,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> create an index that will have some unallocated shards");
assertAcked(prepareCreate("test-idx-some", 2, indexSettingsNoReplicas(6)));
ensureGreen();
indexRandomDocs("test-idx-some", 100);
logger.info("--> shutdown one of the nodes");
internalCluster().stopRandomDataNode();
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("<2")
assertThat(clusterAdmin().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("<2")
.execute().actionGet().isTimedOut(),
equalTo(false));
@ -492,47 +452,39 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
ensureGreen("test-idx-all");
logger.info("--> create an index that will be closed");
assertAcked(prepareCreate("test-idx-closed", 1, Settings.builder().put("number_of_shards", 4)
.put("number_of_replicas", 0)));
logger.info("--> indexing some data into test-idx-all");
for (int i = 0; i < 100; i++) {
index("test-idx-all", "doc", Integer.toString(i), "foo", "bar" + i);
index("test-idx-closed", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh("test-idx-closed", "test-idx-all"); // don't refresh test-idx-some it will take 30 sec until it times out...
assertThat(client().prepareSearch("test-idx-all").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertThat(client().prepareSearch("test-idx-closed").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertAcked(prepareCreate("test-idx-closed", 1, indexSettingsNoReplicas(4)));
indexRandomDocs("test-idx-all", 100);
indexRandomDocs("test-idx-closed", 100);
assertAcked(client().admin().indices().prepareClose("test-idx-closed"));
logger.info("--> create an index that will have no allocated shards");
assertAcked(prepareCreate("test-idx-none", 1, Settings.builder().put("number_of_shards", 6)
.put("index.routing.allocation.include.tag", "nowhere")
.put("number_of_replicas", 0)).setWaitForActiveShards(ActiveShardCount.NONE).get());
assertTrue(client().admin().indices().prepareExists("test-idx-none").get().isExists());
assertAcked(prepareCreate("test-idx-none", 1, indexSettingsNoReplicas(6)
.put("index.routing.allocation.include.tag", "nowhere")).setWaitForActiveShards(ActiveShardCount.NONE).get());
assertTrue(indexExists("test-idx-none"));
createRepository("test-repo", "fs");
logger.info("--> start snapshot with default settings without a closed index - should fail");
final SnapshotException sne = expectThrows(SnapshotException.class,
() -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
() -> clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-1")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true).execute().actionGet());
assertThat(sne.getMessage(), containsString("Indices don't have primary shards"));
if (randomBoolean()) {
logger.info("checking snapshot completion using status");
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-2")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(false).setPartial(true).execute().actionGet();
assertBusy(() -> {
SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo")
SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo")
.setSnapshots("test-snap-2").get();
List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
assertEquals(snapshotStatuses.size(), 1);
logger.trace("current snapshot status [{}]", snapshotStatuses.get(0));
assertTrue(snapshotStatuses.get(0).getState().completed());
}, 1, TimeUnit.MINUTES);
SnapshotsStatusResponse snapshotsStatusResponse = client().admin().cluster().prepareSnapshotStatus("test-repo")
SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo")
.setSnapshots("test-snap-2").get();
List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
assertThat(snapshotStatuses.size(), equalTo(1));
@ -545,17 +497,14 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
// There is slight delay between snapshot being marked as completed in the cluster state and on the file system
// After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well
assertBusy(() -> {
GetSnapshotsResponse response = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap-2").get();
assertThat(response.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = response.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap-2");
assertTrue(snapshotInfo.state().completed());
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
}, 1, TimeUnit.MINUTES);
} else {
logger.info("checking snapshot completion using wait_for_completion flag");
final CreateSnapshotResponse createSnapshotResponse =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2")
clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-2")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true).setPartial(true).execute().actionGet();
logger.info("State: [{}], Reason: [{}]",
@ -563,20 +512,19 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(22));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), lessThan(16));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(10));
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-2").execute().actionGet()
.getSnapshots().get(0).state(),
assertThat(getSnapshot("test-repo", "test-snap-2").state(),
equalTo(SnapshotState.PARTIAL));
}
assertAcked(client().admin().indices().prepareClose("test-idx-all"));
logger.info("--> restore incomplete snapshot - should fail");
assertFutureThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false)
assertFutureThrows(clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false)
.setWaitForCompletion(true).execute(),
SnapshotRestoreException.class);
logger.info("--> restore snapshot for the index that was snapshotted completely");
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2")
.setRestoreGlobalState(false).setIndices("test-idx-all").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
@ -586,7 +534,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> restore snapshot for the partial index");
cluster().wipeIndices("test-idx-some");
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2")
.setRestoreGlobalState(false).setIndices("test-idx-some").setPartial(true).setWaitForCompletion(true).get();
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
@ -596,7 +544,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> restore snapshot for the index that didn't have any shards snapshotted successfully");
cluster().wipeIndices("test-idx-none");
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2")
.setRestoreGlobalState(false).setIndices("test-idx-none").setPartial(true).setWaitForCompletion(true).get();
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(6));
@ -605,7 +553,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertThat(getCountForIndex("test-idx-some"), allOf(greaterThan(0L), lessThan(100L)));
logger.info("--> restore snapshot for the closed index that was snapshotted completely");
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2")
restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-2")
.setRestoreGlobalState(false).setIndices("test-idx-closed").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo(), notNullValue());
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), equalTo(4));
@ -629,10 +577,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> create an index that will have some unallocated shards");
assertAcked(prepareCreate("test-idx", 2, indexSettingsNoReplicas(numberOfShards)));
ensureGreen();
indexRandomDocs("test-idx", 100);
logger.info("--> start snapshot");
assertThat(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx")
assertThat(clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-1").setIndices("test-idx")
.setWaitForCompletion(true).get().getSnapshotInfo().state(),
equalTo(SnapshotState.SUCCESS));
@ -647,12 +596,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
}
});
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2")
assertThat(clusterAdmin().prepareHealth().setWaitForEvents(Priority.LANGUID).setTimeout("1m").setWaitForNodes("2")
.execute().actionGet().isTimedOut(),
equalTo(false));
logger.info("--> restore index snapshot");
assertThat(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false)
assertThat(clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap-1").setRestoreGlobalState(false)
.setWaitForCompletion(true).get().getRestoreInfo().successfulShards(),
equalTo(6));
@ -679,12 +628,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
internalCluster().startNode(nonMasterNode());
// Register mock repositories
for (int i = 0; i < 5; i++) {
client().admin().cluster().preparePutRepository("test-repo" + i)
clusterAdmin().preparePutRepository("test-repo" + i)
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())).setVerify(false).get();
}
logger.info("--> make sure that properly setup repository can be registered on all nodes");
client().admin().cluster().preparePutRepository("test-repo-0")
clusterAdmin().preparePutRepository("test-repo-0")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())).get();
@ -692,9 +641,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository");
Settings nodeSettings = Settings.EMPTY;
logger.info("--> start two nodes");
internalCluster().startNodes(2, nodeSettings);
internalCluster().startNodes(2);
createRepository("test-repo", "mock", Settings.builder()
.put("location", randomRepoPath())
.put(MockRepository.Plugin.USERNAME_SETTING.getKey(), "notsecretusername")
@ -770,18 +718,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertTrue(snapshotInfo.state().completed());
}, 1, TimeUnit.MINUTES);
assertBusy(() -> assertTrue(getSnapshot("test-repo", "test-snap").state().completed()), 1, TimeUnit.MINUTES);
logger.info("--> verify that snapshot was successful");
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
@ -798,6 +738,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertAcked(prepareCreate("test-idx", 0, indexSettingsNoReplicas(between(1, 20))));
ensureGreen();
indexRandomDocs("test-idx", randomIntBetween(10, 100));
final int numberOfShards = getNumShards("test-idx").numPrimaries;
@ -816,18 +757,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertTrue(snapshotInfo.state().completed());
}, 1, TimeUnit.MINUTES);
assertBusy(() -> assertTrue(getSnapshot("test-repo", "test-snap").state().completed()), 1, TimeUnit.MINUTES);
logger.info("--> verify that snapshot was partial");
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
assertThat(snapshotInfo.failedShards(), greaterThan(0));
@ -846,7 +779,6 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final Client client = client();
final String repo = "test-repo";
final String snapshot = "test-snap";
final String sourceIdx = "test-idx";
@ -859,21 +791,20 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
indexRandomDocs(sourceIdx, randomIntBetween(10, 100));
logger.info("--> shrink the index");
assertAcked(client.admin().indices().prepareUpdateSettings(sourceIdx)
assertAcked(client().admin().indices().prepareUpdateSettings(sourceIdx)
.setSettings(Settings.builder().put("index.blocks.write", true)).get());
assertAcked(client.admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get());
assertAcked(client().admin().indices().prepareResizeIndex(sourceIdx, shrunkIdx).get());
logger.info("--> snapshot the shrunk index");
CreateSnapshotResponse createResponse = client.admin().cluster()
assertSuccessful(clusterAdmin()
.prepareCreateSnapshot(repo, snapshot)
.setWaitForCompletion(true).setIndices(shrunkIdx).get();
assertEquals(SnapshotState.SUCCESS, createResponse.getSnapshotInfo().state());
.setWaitForCompletion(true).setIndices(shrunkIdx).execute());
logger.info("--> delete index and stop the data node");
assertAcked(client.admin().indices().prepareDelete(sourceIdx).get());
assertAcked(client.admin().indices().prepareDelete(shrunkIdx).get());
assertAcked(client().admin().indices().prepareDelete(sourceIdx).get());
assertAcked(client().admin().indices().prepareDelete(shrunkIdx).get());
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("1");
clusterAdmin().prepareHealth().setTimeout("30s").setWaitForNodes("1");
logger.info("--> start a new data node");
final Settings dataSettings = Settings.builder()
@ -881,10 +812,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) // to get a new node id
.build();
internalCluster().startDataOnlyNode(dataSettings);
client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForNodes("2");
clusterAdmin().prepareHealth().setTimeout("30s").setWaitForNodes("2");
logger.info("--> restore the shrunk index and ensure all shards are allocated");
RestoreSnapshotResponse restoreResponse = client().admin().cluster()
RestoreSnapshotResponse restoreResponse = clusterAdmin()
.prepareRestoreSnapshot(repo, snapshot).setWaitForCompletion(true)
.setIndices(shrunkIdx).get();
assertEquals(restoreResponse.getRestoreInfo().totalShards(),
@ -894,13 +825,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
public void testSnapshotWithDateMath() {
final String repo = "repo";
final AdminClient admin = client().admin();
final IndexNameExpressionResolver nameExpressionResolver = new IndexNameExpressionResolver();
final String snapshotName = "<snapshot-{now/d}>";
logger.info("--> creating repository");
assertAcked(admin.cluster().preparePutRepository(repo).setType("fs")
assertAcked(clusterAdmin().preparePutRepository(repo).setType("fs")
.setSettings(Settings.builder().put("location", randomRepoPath())
.put("compress", randomBoolean())));
@ -910,7 +840,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
// snapshot could be taken before or after a day rollover
final String expression2 = nameExpressionResolver.resolveDateMathExpression(snapshotName);
SnapshotsStatusResponse response = admin.cluster().prepareSnapshotStatus(repo)
SnapshotsStatusResponse response = clusterAdmin().prepareSnapshotStatus(repo)
.setSnapshots(Sets.newHashSet(expression1, expression2).toArray(Strings.EMPTY_ARRAY))
.setIgnoreUnavailable(true)
.get();
@ -919,8 +849,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertThat(snapshots.get(0).getState().completed(), equalTo(true));
}
public void testSnapshotTotalAndIncrementalSizes() throws IOException {
Client client = client();
public void testSnapshotTotalAndIncrementalSizes() throws Exception {
final String indexName = "test-blocks-1";
final String repositoryName = "repo-" + indexName;
final String snapshot0 = "snapshot-0";
@ -930,14 +859,14 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
int docs = between(10, 100);
for (int i = 0; i < docs; i++) {
client.prepareIndex(indexName, "type").setSource("test", "init").execute().actionGet();
client().prepareIndex(indexName, "type").setSource("test", "init").execute().actionGet();
}
final Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath);
createFullSnapshot(repositoryName, snapshot0);
SnapshotsStatusResponse response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
SnapshotsStatusResponse response = clusterAdmin().prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot0)
.get();
@ -965,7 +894,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
// add few docs - less than initially
docs = between(1, 5);
for (int i = 0; i < docs; i++) {
client.prepareIndex(indexName, "type").setSource("test", "test" + i).execute().actionGet();
client().prepareIndex(indexName, "type").setSource("test", "test" + i).execute().actionGet();
}
// create another snapshot
@ -973,11 +902,9 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createFullSnapshot(repositoryName, snapshot1);
// drop 1st one to avoid miscalculation as snapshot reuses some files of prev snapshot
assertAcked(client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0).get());
assertAcked(startDeleteSnapshot(repositoryName, snapshot0).get());
response = client.admin().cluster().prepareSnapshotStatus(repositoryName)
.setSnapshots(snapshot1)
.get();
response = clusterAdmin().prepareSnapshotStatus(repositoryName).setSnapshots(snapshot1).get();
final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);
@ -1024,12 +951,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final Path repoPath = randomRepoPath();
createRepository(repositoryName, "fs", repoPath);
logger.info("--> create a snapshot");
client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
createFullSnapshot(repositoryName, snapshot0);
final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
@ -1056,13 +978,12 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
for (int i = 0; i < docs; i++) {
client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet();
}
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(),
equalTo(RestStatus.OK));
createFullSnapshot(repositoryName, snapshot2);
final List<Path> snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
assertAcked(clusterAdmin().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
final List<Path> snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath);
assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots
}
@ -1078,6 +999,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertAcked(prepareCreate("test-idx", 0, indexSettingsNoReplicas(5)));
ensureGreen();
indexRandomDocs("test-idx", randomIntBetween(50, 100));
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");
logger.info("--> snapshot");
ServiceDisruptionScheme disruption = new BusyMasterServiceDisruption(random(), Priority.HIGH);
@ -1090,14 +1012,14 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> wait for shard snapshots to show as failed");
assertBusy(() -> assertThat(
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
clusterAdmin().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
.get(0).getShardsStats().getFailedShards(), greaterThanOrEqualTo(1)), 60L, TimeUnit.SECONDS);
unblockNode("test-repo", dataNode);
disruption.stopDisrupting();
// check that snapshot completes
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin()
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
@ -1126,7 +1048,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> wait for shard snapshot of first primary to show as failed");
assertBusy(() -> assertThat(
client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
clusterAdmin().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots()
.get(0).getShardsStats().getFailedShards(), is(1)), 60L, TimeUnit.SECONDS);
logger.info("--> restarting second data node, which should cause the primary shard on it to be failed");
@ -1134,7 +1056,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
// check that snapshot completes with both failed shards being accounted for in the snapshot result
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
GetSnapshotsResponse snapshotsStatusResponse = clusterAdmin()
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
@ -1174,7 +1096,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
final String snapshotName = "snapshot-retention-leases";
logger.debug("--> create snapshot {}:{}", repoName, snapshotName);
CreateSnapshotResponse createResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
CreateSnapshotResponse createResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).setIndices(indexName).get();
assertThat(createResponse.getSnapshotInfo().successfulShards(), equalTo(shardCount));
assertThat(createResponse.getSnapshotInfo().failedShards(), equalTo(0));
@ -1195,7 +1117,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
assertAcked(client().admin().indices().prepareClose(indexName));
logger.debug("--> restore index {} from snapshot", indexName);
RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
RestoreSnapshotResponse restoreResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).get();
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(shardCount));
assertThat(restoreResponse.getRestoreInfo().failedShards(), equalTo(0));
@ -1222,24 +1144,9 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createRepository(repoName, "mock");
blockAllDataNodes(repoName);
final String snapshotName = "test-snap";
final ActionFuture<CreateSnapshotResponse> snapshotResponse =
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute();
final ActionFuture<CreateSnapshotResponse> snapshotResponse = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNodeName, repoName, TimeValue.timeValueSeconds(30L));
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, otherDataNode);
final PlainActionFuture<Void> abortVisibleFuture = PlainActionFuture.newFuture();
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().stream()
.anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED)) {
abortVisibleFuture.onResponse(null);
clusterService.removeListener(this);
}
}
});
final AtomicBoolean blocked = new AtomicBoolean(true);
final TransportService transportService = internalCluster().getInstance(TransportService.class, otherDataNode);
@ -1254,10 +1161,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
});
logger.info("--> abort snapshot");
final ActionFuture<AcknowledgedResponse> deleteResponse =
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute();
final ActionFuture<AcknowledgedResponse> deleteResponse = startDeleteSnapshot(repoName, snapshotName);
abortVisibleFuture.get(30L, TimeUnit.SECONDS);
awaitClusterState(otherDataNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY)
.entries().stream().anyMatch(entry -> entry.state() == SnapshotsInProgress.State.ABORTED));
assertFalse("delete should not be able to finish until data node is unblocked", deleteResponse.isDone());
blocked.set(false);
@ -1274,8 +1181,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
createIndex("some-index");
stopNode(dataNode);
ensureStableCluster(1);
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setPartial(true).setWaitForCompletion(true).get();
final CreateSnapshotResponse createSnapshotResponse = startFullSnapshot(repoName, "test-snap", true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.PARTIAL));
}
@ -1342,11 +1248,11 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> wait for relocations to start");
assertBusy(() -> assertThat(
client().admin().cluster().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)),
clusterAdmin().prepareHealth(indexName).execute().actionGet().getRelocatingShards(), greaterThan(0)),
1L, TimeUnit.MINUTES);
logger.info("--> snapshot");
client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
clusterAdmin().prepareCreateSnapshot(repoName, "test-snap")
.setWaitForCompletion(false).setPartial(true).setIndices(indexName).get();
assertAcked(client().admin().indices().prepareDelete(indexName));

View File

@ -20,7 +20,6 @@
package org.elasticsearch.snapshots;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -80,6 +79,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
@ -109,8 +109,6 @@ import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
// The tests in here do a lot of state updates and other writes to disk and are slowed down too much by WindowsFS
@LuceneTestCase.SuppressFileSystems(value = "WindowsFS")
public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
@Override
@ -121,7 +119,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
public void testBasicWorkFlow() throws Exception {
Client client = client();
createRepository("test-repo", "fs");
createIndexWithRandomDocs("test-idx-1", 100);
createIndexWithRandomDocs("test-idx-2", 100);
@ -138,7 +135,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
if (!indicesToFlush.isEmpty()) {
String[] indices = indicesToFlush.toArray(new String[indicesToFlush.size()]);
logger.info("--> starting asynchronous flush for indices {}", Arrays.toString(indices));
flushResponseFuture = client.admin().indices().prepareFlush(indices).execute();
flushResponseFuture = client().admin().indices().prepareFlush(indices).execute();
}
}
@ -158,36 +155,36 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final boolean snapshotClosed = randomBoolean();
if (snapshotClosed) {
assertAcked(client.admin().indices().prepareClose(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
assertAcked(client().admin().indices().prepareClose(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
}
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndicesOptions(IndicesOptions.lenientExpand()).setIndices(indicesToSnapshot).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots(randomFrom("test-snap", "_all", "*", "*-snap", "test*")).get().getSnapshots();
List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo")
.setSnapshots(randomFrom("test-snap", "_all", "*", "*-snap", "test*")).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
if (snapshotClosed) {
assertAcked(client.admin().indices().prepareOpen(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
assertAcked(client().admin().indices().prepareOpen(indicesToSnapshot).setWaitForActiveShards(ActiveShardCount.ALL).get());
}
logger.info("--> delete some data");
for (int i = 0; i < 50; i++) {
client.prepareDelete("test-idx-1", "_doc", Integer.toString(i)).get();
client().prepareDelete("test-idx-1", "_doc", Integer.toString(i)).get();
}
for (int i = 50; i < 100; i++) {
client.prepareDelete("test-idx-2", "_doc", Integer.toString(i)).get();
client().prepareDelete("test-idx-2", "_doc", Integer.toString(i)).get();
}
for (int i = 0; i < 100; i += 2) {
client.prepareDelete("test-idx-3", "_doc", Integer.toString(i)).get();
client().prepareDelete("test-idx-3", "_doc", Integer.toString(i)).get();
}
assertAllSuccessful(refresh());
assertDocCount("test-idx-1", 50L);
@ -195,10 +192,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertDocCount("test-idx-3", 50L);
logger.info("--> close indices");
client.admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
client().admin().indices().prepareClose("test-idx-1", "test-idx-2").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
@ -207,7 +204,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertDocCount("test-idx-2", 100L);
assertDocCount("test-idx-3", 50L);
assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
assertNull(client().admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()));
for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) {
@ -221,17 +218,17 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete indices");
cluster().wipeIndices("test-idx-1", "test-idx-2");
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true)
restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true)
.setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertDocCount("test-idx-1", 100);
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
ClusterState clusterState = clusterAdmin().prepareState().get().getState();
assertThat(clusterState.getMetadata().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetadata().hasIndex("test-idx-2"), equalTo(false));
assertNull(client.admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
assertNull(client().admin().indices().prepareGetSettings("test-idx-1").get().getSetting("test-idx-1",
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING.getKey()));
for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) {
@ -257,7 +254,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
String typeName = "actions";
String expectedValue = "expected";
Client client = client();
// Write a document
String docId = Integer.toString(randomInt());
index(indexName, typeName, docId, "value", expectedValue);
@ -265,7 +261,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createRepository(repoName, "fs", absolutePath);
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
@ -274,19 +270,17 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setRenamePattern(indexName)
.setRenameReplacement(restoredIndexName)
.get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
assertThat(client().prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
}
public void testFreshIndexUUID() {
Client client = client();
createRepository("test-repo", "fs");
createIndex("test");
@ -295,7 +289,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertTrue(originalIndexUUID, originalIndexUUID != null);
assertFalse(originalIndexUUID, originalIndexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE));
ensureGreen();
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
@ -312,10 +306,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertFalse(newIndexUUID, newIndexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE));
assertFalse(newIndexUUID, newIndexUUID.equals(originalIndexUUID));
logger.info("--> close index");
client.admin().indices().prepareClose("test").get();
client().admin().indices().prepareClose("test").get();
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
@ -326,7 +320,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
newIndexUUID.equals(newAfterRestoreIndexUUID));
logger.info("--> restore indices with different names");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setRenamePattern("(.+)").setRenameReplacement("$1-copy").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
@ -340,21 +334,18 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createRepository("test-repo", "fs");
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).get();
CreateSnapshotResponse createSnapshotResponse = startFullSnapshot("test-repo", "test-snap").get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), equalTo(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(0));
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get()
.getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS));
}
public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
assertAcked(clusterAdmin().preparePutRepository("test-repo")
.setType("mock").setSettings(
Settings.builder()
.put("location", randomRepoPath())
@ -366,7 +357,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> snapshot");
try {
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").get();
if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) {
// If we are here, that means we didn't have any failures, let's check it
@ -379,10 +370,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(shardFailure.nodeId(), notNullValue());
assertThat(shardFailure.index(), equalTo("test-idx"));
}
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo")
.addSnapshots("test-snap").get();
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
if (snapshotInfo.state() == SnapshotState.SUCCESS) {
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards()));
@ -403,7 +391,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public void testDataFileFailureDuringSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
Client client = client();
createRepository("test-repo", "mock",
Settings.builder().put("location", randomRepoPath())
.put("random", randomAlphaOfLength(10)).put("random_data_file_io_exception_rate", 0.3));
@ -411,7 +398,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
createIndexWithRandomDocs("test-idx", 100);
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").get();
if (createSnapshotResponse.getSnapshotInfo().totalShards() == createSnapshotResponse.getSnapshotInfo().successfulShards()) {
logger.info("--> no failures");
@ -425,16 +412,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(shardFailure.nodeId(), notNullValue());
assertThat(shardFailure.index(), equalTo("test-idx"));
}
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo")
.addSnapshots("test-snap").get();
assertThat(getSnapshotsResponse.getSnapshots().size(), equalTo(1));
SnapshotInfo snapshotInfo = getSnapshotsResponse.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.PARTIAL));
assertThat(snapshotInfo.shardFailures().size(), greaterThan(0));
assertThat(snapshotInfo.totalShards(), greaterThan(snapshotInfo.successfulShards()));
// Verify that snapshot status also contains the same failures
SnapshotsStatusResponse snapshotsStatusResponse = client.admin().cluster().prepareSnapshotStatus("test-repo")
SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo")
.addSnapshots("test-snap").get();
assertThat(snapshotsStatusResponse.getSnapshots().size(), equalTo(1));
SnapshotStatus snapshotStatus = snapshotsStatusResponse.getSnapshots().get(0);
@ -469,6 +453,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
ensureGreen();
final NumShards numShards = getNumShards("test-idx");
indexRandomDocs("test-idx", 100);
logger.info("--> snapshot");
@ -591,7 +576,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.setSettings(Settings.builder()
.putNull("index.routing.allocation.include._name")
.build()));
assertAcked(client().admin().cluster().prepareReroute().setRetryFailed(true));
assertAcked(clusterAdmin().prepareReroute().setRetryFailed(true));
};
unrestorableUseCase(indexName, Settings.EMPTY, Settings.EMPTY, restoreIndexSettings,
@ -617,7 +602,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// create a snapshot
final NumShards numShards = getNumShards(indexName);
CreateSnapshotResponse snapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
CreateSnapshotResponse snapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
@ -630,7 +615,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertAcked(client().admin().indices().prepareDelete(indexName));
// update the test repository
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
assertAcked(clusterAdmin().preparePutRepository("test-repo")
.setType("mock")
.setSettings(Settings.builder()
.put("location", repositoryLocation)
@ -638,7 +623,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.build()));
// attempt to restore the snapshot with the given settings
RestoreSnapshotResponse restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
RestoreSnapshotResponse restoreResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setIndices(indexName)
.setIndexSettings(restoreIndexSettings)
.setWaitForCompletion(true)
@ -648,7 +633,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries));
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(0));
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().setCustoms(true).setRoutingTable(true).get();
ClusterStateResponse clusterStateResponse = clusterAdmin().prepareState().setCustoms(true).setRoutingTable(true).get();
// check that there is no restore in progress
RestoreInProgress restoreInProgress = clusterStateResponse.getState().custom(RestoreInProgress.TYPE);
@ -678,7 +663,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// delete the index and restore again
assertAcked(client().admin().indices().prepareDelete(indexName));
restoreResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
restoreResponse = clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).get();
assertThat(restoreResponse.getRestoreInfo().totalShards(), equalTo(numShards.numPrimaries));
assertThat(restoreResponse.getRestoreInfo().successfulShards(), equalTo(numShards.numPrimaries));
@ -693,6 +678,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Path repositoryLocation = randomRepoPath();
Client client = client();
createRepository("test-repo", "fs", repositoryLocation);
createIndexWithRandomDocs("test-idx", 100);
logger.info("--> snapshot");
@ -749,7 +735,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> snapshot");
final SnapshotException sne = expectThrows(SnapshotException.class,
() -> client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
() -> clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setWaitForCompletion(true).setIndices("test-idx").get());
assertThat(sne.getMessage(), containsString("Indices don't have primary shards"));
assertThat(getRepositoryData("test-repo"), is(RepositoryData.EMPTY));
@ -812,8 +798,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertDocCount("test-idx", 10L * numberOfSnapshots);
logger.info("--> delete the last snapshot");
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
startDeleteSnapshot("test-repo", lastSnapshot).get();
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repo, numberOfFiles[0]);
}
@ -864,16 +849,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> unblocking blocked node");
unblockNode("test-repo", blockedNode);
logger.info("--> waiting for completion");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("Number of failed shards [{}]",
waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size());
logger.info("--> done");
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.shardFailures(), empty());
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
@ -938,16 +920,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> unblocking blocked node");
unblockNode("test-repo", blockedNode);
logger.info("--> waiting for completion");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
logger.info("Number of failed shards [{}]", snapshotInfo.shardFailures().size());
logger.info("Number of failed shards [{}]",
waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)).shardFailures().size());
logger.info("--> done");
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).shardFailures().size(), equalTo(0));
final SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.shardFailures().size(), equalTo(0));
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
@ -966,6 +945,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
Path repositoryLocation = randomRepoPath();
createRepository("test-repo", "fs", repositoryLocation);
createIndexWithRandomDocs("test-idx", 100);
logger.info("--> snapshot");
@ -975,8 +955,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-snap").state(), equalTo(SnapshotState.SUCCESS));
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
@ -1189,7 +1168,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> wait for relocations to start");
assertBusy(() -> assertThat(
client().admin().cluster().prepareHealth("test-idx").execute().actionGet().getRelocatingShards(), greaterThan(0)),
clusterAdmin().prepareHealth("test-idx").execute().actionGet().getRelocatingShards(), greaterThan(0)),
1L, TimeUnit.MINUTES);
logger.info("--> snapshot");
@ -1212,6 +1191,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).build();
assertAcked(prepareCreate("test").setSettings(indexSettings));
ensureGreen();
indexRandomDocs("test", randomIntBetween(10, 100));
assertNoFailures(client().admin().indices().prepareForceMerge("test").setFlush(true).setMaxNumSegments(1).get());
@ -1220,8 +1200,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test").state(), equalTo(SnapshotState.SUCCESS));
{
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo")
.setSnapshots("test").get().getSnapshots().get(0);
@ -1236,8 +1215,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-1").state(), equalTo(SnapshotState.SUCCESS));
{
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo")
.setSnapshots("test-1").get().getSnapshots().get(0);
@ -1253,8 +1231,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards()));
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(),
equalTo(SnapshotState.SUCCESS));
assertThat(getSnapshot("test-repo", "test-2").state(), equalTo(SnapshotState.SUCCESS));
{
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo")
.setSnapshots("test-2").get().getSnapshots().get(0);
@ -1279,13 +1256,12 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.put("location", randomRepoPath()).put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true));
createIndexWithRandomDocs("test-idx-1", 100);
createIndexWithRandomDocs("test-idx-2", 100);
createIndexWithRandomDocs("test-idx-3", 100);
logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> future = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
ActionFuture<CreateSnapshotResponse> future = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute();
logger.info("--> wait for block to kick in");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
@ -1324,6 +1300,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client();
createRepository("test-repo", "mock");
createIndexWithRandomDocs("test-idx-1", 100);
createIndexWithRandomDocs("test-idx-2", 100);
@ -1405,7 +1382,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> try deleting the snapshot while the restore is in progress (should throw an error)");
ConcurrentSnapshotExecutionException e = expectThrows(ConcurrentSnapshotExecutionException.class, () ->
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).get());
clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).get());
assertEquals(repoName, e.getRepositoryName());
assertEquals(snapshotName, e.getSnapshotName());
assertThat(e.getMessage(), containsString("cannot delete snapshot during a restore"));
@ -1421,10 +1398,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
private void waitForIndex(final String index, TimeValue timeout) throws Exception {
assertBusy(
() -> {
boolean exists = client().admin().indices().prepareExists(index).execute().actionGet().isExists();
assertTrue("Expected index [" + index + "] to exist", exists);
},
() -> assertTrue("Expected index [" + index + "] to exist", indexExists(index)),
timeout.millis(),
TimeUnit.MILLISECONDS);
}
@ -1516,19 +1490,19 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
outChan.truncate(randomInt(10));
}
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots();
List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots(repoName).get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo(snapshotName));
SnapshotsStatusResponse snapshotStatusResponse =
client().admin().cluster().prepareSnapshotStatus(repoName).setSnapshots(snapshotName).get();
clusterAdmin().prepareSnapshotStatus(repoName).setSnapshots(snapshotName).get();
assertThat(snapshotStatusResponse.getSnapshots(), hasSize(1));
assertThat(snapshotStatusResponse.getSnapshots().get(0).getSnapshot().getSnapshotId().getName(), equalTo(snapshotName));
assertAcked(client().admin().indices().prepareDelete("test-idx-1", "test-idx-2"));
SnapshotException ex = expectThrows(SnapshotException.class, () -> client().admin().cluster()
SnapshotException ex = expectThrows(SnapshotException.class, () -> clusterAdmin()
.prepareRestoreSnapshot(repoName, snapshotName)
.setRestoreGlobalState(true)
.setWaitForCompletion(true)
@ -1537,7 +1511,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(ex.getSnapshotName(), equalTo(snapshotName));
assertThat(ex.getMessage(), containsString("failed to read global metadata"));
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.get();
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
@ -1594,7 +1568,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
outChan.truncate(randomInt(10));
}
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
assertThat(snapshotInfos.get(0).state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfos.get(0).snapshotId().getName(), equalTo("test-snap"));
@ -1603,7 +1577,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Predicate<String> isRestorableIndex = index -> corruptedIndex.getName().equals(index) == false;
client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
clusterAdmin().prepareRestoreSnapshot("test-repo", "test-snap")
.setIndices(nbDocsPerIndex.keySet().stream().filter(isRestorableIndex).toArray(String[]::new))
.setRestoreGlobalState(randomBoolean())
.setWaitForCompletion(true)
@ -1616,7 +1590,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
}
assertAcked(client().admin().cluster().prepareDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get());
assertAcked(startDeleteSnapshot("test-repo", snapshotInfo.snapshotId().getName()).get());
}
public void testCannotCreateSnapshotsWithSameName() throws Exception {
@ -1660,8 +1634,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(e.getMessage(), containsString("snapshot with the same name already exists"));
}
logger.info("--> delete the first snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName).get();
startDeleteSnapshot(repositoryName, snapshotName).get();
logger.info("--> try creating a snapshot with the same name, now it should work because the first one was deleted");
createSnapshotResponse = client.admin()
@ -1698,7 +1671,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
String blockedNode = blockNodeWithIndex(repo, index);
logger.info("--> snapshot");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot)
clusterAdmin().prepareCreateSnapshot(repo, snapshot)
.setWaitForCompletion(false)
.execute();
@ -1774,9 +1747,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
.setIndices("test-idx")
.get();
assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards());
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo-2")
.addSnapshots("test-snap-2").get();
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
assertEquals(SnapshotState.SUCCESS, getSnapshot("test-repo-2", "test-snap-2").state());
}
public void testGetSnapshotsFromIndexBlobOnly() throws Exception {
@ -1828,7 +1799,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
logger.info("--> verify _all returns snapshot info");
GetSnapshotsResponse response = client().admin().cluster()
GetSnapshotsResponse response = clusterAdmin()
.prepareGetSnapshots("test-repo")
.setSnapshots("_all")
.setVerbose(false)
@ -1837,7 +1808,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
verifySnapshotInfo(response, indicesPerSnapshot);
logger.info("--> verify wildcard returns snapshot info");
response = client().admin().cluster()
response = clusterAdmin()
.prepareGetSnapshots("test-repo")
.setSnapshots("test-snap-*")
.setVerbose(false)
@ -1847,7 +1818,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> verify individual requests return snapshot info");
for (int i = 0; i < numSnapshots; i++) {
response = client().admin().cluster()
response = clusterAdmin()
.prepareGetSnapshots("test-repo")
.setSnapshots("test-snap-" + i)
.setVerbose(false)
@ -1918,7 +1889,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
}
public void testSnapshotDifferentIndicesBySameName() throws InterruptedException {
public void testSnapshotDifferentIndicesBySameName() throws InterruptedException, ExecutionException {
String indexName = "testindex";
String repoName = "test-repo";
Path absolutePath = randomRepoPath().toAbsolutePath();
@ -1952,11 +1923,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(snapshot2.successfulShards(), is(newShardCount));
logger.info("--> restoring snapshot 1");
client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName)
clusterAdmin().prepareRestoreSnapshot(repoName, "snap-1").setIndices(indexName).setRenamePattern(indexName)
.setRenameReplacement("restored-1").setWaitForCompletion(true).get();
logger.info("--> restoring snapshot 2");
client().admin().cluster().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName)
clusterAdmin().prepareRestoreSnapshot(repoName, "snap-2").setIndices(indexName).setRenamePattern(indexName)
.setRenameReplacement("restored-2").setWaitForCompletion(true).get();
logger.info("--> verify doc counts");
@ -1975,10 +1946,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
snapshotToRestore = "snap-1";
expectedCount = docCount;
}
logger.info("--> deleting snapshot [{}]", snapshotToDelete);
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToDelete).get());
assertAcked(startDeleteSnapshot(repoName, snapshotToDelete).get());
logger.info("--> restoring snapshot [{}]", snapshotToRestore);
client().admin().cluster().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName)
clusterAdmin().prepareRestoreSnapshot(repoName, snapshotToRestore).setIndices(indexName).setRenamePattern(indexName)
.setRenameReplacement("restored-3").setWaitForCompletion(true).get();
logger.info("--> verify doc counts");
@ -2000,7 +1970,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
refresh();
logger.info("--> snapshot {}", i);
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-" + i)
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot("test-repo", "test-snap-" + i)
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
@ -2008,8 +1978,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
}
logger.info("--> deleting all snapshots");
client().admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap-*", "*").get();
final GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo").get();
clusterAdmin().prepareDeleteSnapshot("test-repo", "test-snap-*", "*").get();
final GetSnapshotsResponse getSnapshotsResponse = clusterAdmin().prepareGetSnapshots("test-repo").get();
assertThat(getSnapshotsResponse.getSnapshots(), empty());
}
@ -2053,7 +2023,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// Verify that hidden indices get restored with a wildcard restore
{
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster()
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin()
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("*")
@ -2071,7 +2041,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// Verify that exclusions work on hidden indices
{
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster()
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin()
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("*", "-.*")
@ -2089,7 +2059,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// Verify that hidden indices can be restored with a non-star pattern
{
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster()
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin()
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices("hid*")
@ -2107,7 +2077,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// Verify that hidden indices can be restored by fully specified name
{
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster()
RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin()
.prepareRestoreSnapshot(repoName, snapName)
.setWaitForCompletion(true)
.setIndices(dottedHiddenIndex)

View File

@ -19,7 +19,6 @@
package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.plugins.Plugin;
@ -66,8 +65,7 @@ public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
.get();
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
final SnapshotId snapshotId = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap")
.get().getSnapshots().get(0).snapshotId();
final SnapshotId snapshotId = getSnapshot("test-repo", "test-snap").snapshotId();
logger.info("--> start disrupting cluster");
final NetworkDisruption networkDisruption = isolateMasterDisruption(NetworkDisruption.NetworkDelay.random(random()));
@ -92,10 +90,7 @@ public class SnapshotShardsServiceIT extends AbstractSnapshotIntegTestCase {
internalCluster().clearDisruptionScheme(true);
assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
.prepareGetSnapshots("test-repo")
.setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap");
logger.info("Snapshot status [{}], successfulShards [{}]", snapshotInfo.state(), snapshotInfo.successfulShards());
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.successfulShards(), equalTo(shards));

View File

@ -65,8 +65,6 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
}
public void testStatusApiConsistency() {
Client client = client();
createRepository("test-repo", "fs");
createIndex("test-idx-1", "test-idx-2", "test-idx-3");
@ -82,13 +80,13 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
createFullSnapshot("test-repo", "test-snap");
List<SnapshotInfo> snapshotInfos = client.admin().cluster().prepareGetSnapshots("test-repo").get().getSnapshots();
List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots("test-repo").get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
SnapshotInfo snapshotInfo = snapshotInfos.get(0);
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));
final List<SnapshotStatus> snapshotStatus = client.admin().cluster().snapshotsStatus(
final List<SnapshotStatus> snapshotStatus = clusterAdmin().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots();
assertThat(snapshotStatus.size(), equalTo(1));
final SnapshotStatus snStatus = snapshotStatus.get(0);
@ -97,8 +95,6 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
}
public void testStatusAPICallInProgressSnapshot() throws Exception {
Client client = client();
createRepository("test-repo", "mock", Settings.builder().put("location", randomRepoPath()).put("block_on_data", true));
createIndex("test-idx-1");
@ -111,21 +107,13 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
refresh();
logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture =
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
ActionFuture<CreateSnapshotResponse> createSnapshotResponseActionFuture = startFullSnapshot("test-repo", "test-snap");
logger.info("--> wait for data nodes to get blocked");
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
assertBusy(() -> {
try {
assertEquals(SnapshotsInProgress.State.STARTED, client.admin().cluster().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[]{"test-snap"})).actionGet().getSnapshots().get(0)
.getState());
} catch (SnapshotMissingException sme) {
throw new AssertionError(sme);
}
}, 1L, TimeUnit.MINUTES);
awaitNumberOfSnapshotsInProgress(1);
assertEquals(SnapshotsInProgress.State.STARTED, client().admin().cluster().prepareSnapshotStatus("test-repo")
.setSnapshots("test-snap").get().getSnapshots().get(0).getState());
logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo");
@ -174,15 +162,12 @@ public class SnapshotStatusApisIT extends AbstractSnapshotIntegTestCase {
.prepareSnapshotStatus("test-repo").setSnapshots("test-snap").execute().actionGet());
}
public void testGetSnapshotsWithoutIndices() {
public void testGetSnapshotsWithoutIndices() throws Exception {
createRepository("test-repo", "fs");
logger.info("--> snapshot");
final SnapshotInfo snapshotInfo =
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices().setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
final SnapshotInfo snapshotInfo = assertSuccessful(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices().setWaitForCompletion(true).execute());
assertThat(snapshotInfo.totalShards(), is(0));
logger.info("--> verify that snapshot without index shows up in non-verbose listing");

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
@ -82,7 +83,6 @@ import java.util.function.Predicate;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ -90,6 +90,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
private static final String OLD_VERSION_SNAPSHOT_PREFIX = "old-version-snapshot-";
// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
protected static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()
.put("thread_pool.snapshot.core", 5).put("thread_pool.snapshot.max", 5).build();
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
@ -123,11 +128,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
@After
public void assertRepoConsistency() {
if (skipRepoConsistencyCheckReason == null) {
client().admin().cluster().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> {
clusterAdmin().prepareGetRepositories().get().repositories().forEach(repositoryMetadata -> {
final String name = repositoryMetadata.name();
if (repositoryMetadata.settings().getAsBoolean("readonly", false) == false) {
client().admin().cluster().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
client().admin().cluster().prepareCleanupRepository(name).get();
clusterAdmin().prepareDeleteSnapshot(name, OLD_VERSION_SNAPSHOT_PREFIX + "*").get();
clusterAdmin().prepareCleanupRepository(name).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
});
@ -203,12 +208,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
while (System.currentTimeMillis() - start < timeout.millis()) {
List<SnapshotInfo> snapshotInfos = client().admin().cluster().prepareGetSnapshots(repository).setSnapshots(snapshotName)
.get().getSnapshots();
assertThat(snapshotInfos.size(), equalTo(1));
if (snapshotInfos.get(0).state().completed()) {
final SnapshotInfo snapshotInfo = getSnapshot(repository, snapshotName);
if (snapshotInfo.state().completed()) {
// Make sure that snapshot clean up operations are finished
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
ClusterStateResponse stateResponse = clusterAdmin().prepareState().get();
boolean found = false;
for (SnapshotsInProgress.Entry entry :
stateResponse.getState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
@ -219,7 +222,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
}
if (found == false) {
return snapshotInfos.get(0);
return snapshotInfo;
}
}
Thread.sleep(100);
@ -308,7 +311,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
protected void createRepository(String repoName, String type, Settings.Builder settings) {
logger.info("--> creating repository [{}] [{}]", repoName, type);
assertAcked(client().admin().cluster().preparePutRepository(repoName)
assertAcked(clusterAdmin().preparePutRepository(repoName)
.setType(type).setSettings(settings));
}
@ -350,7 +353,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
protected String initWithSnapshotVersion(String repoName, Path repoPath, Version version) throws IOException {
assertThat("This hack only works on an empty repository", getRepositoryData(repoName).getSnapshotIds(), empty());
final String oldVersionSnapshot = OLD_VERSION_SNAPSHOT_PREFIX + version.id;
final CreateSnapshotResponse createSnapshotResponse = client().admin().cluster()
final CreateSnapshotResponse createSnapshotResponse = clusterAdmin()
.prepareCreateSnapshot(repoName, oldVersionSnapshot).setIndices("does-not-exist-for-sure-*")
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().totalShards(), is(0));
@ -373,7 +376,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
protected SnapshotInfo createFullSnapshot(String repoName, String snapshotName) {
logger.info("--> creating full snapshot [{}] in [{}]", snapshotName, repoName);
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
.setIncludeGlobalState(true)
.setWaitForCompletion(true)
.get();
@ -417,7 +420,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
* @param metadata snapshot metadata to write (as returned by {@link SnapshotInfo#userMetadata()})
*/
protected void addBwCFailedSnapshot(String repoName, String snapshotName, Map<String, Object> metadata) throws Exception {
final ClusterState state = client().admin().cluster().prepareState().get().getState();
final ClusterState state = clusterAdmin().prepareState().get().getState();
final RepositoriesMetadata repositoriesMetadata = state.metadata().custom(RepositoriesMetadata.TYPE);
assertNotNull(repositoriesMetadata);
final RepositoryMetadata initialRepoMetadata = repositoriesMetadata.repository(repoName);
@ -447,11 +450,6 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}
protected ActionFuture<AcknowledgedResponse> startDeleteSnapshot(String repoName, String snapshotName) {
logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute();
}
protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
@ -478,12 +476,76 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
}
protected ActionFuture<CreateSnapshotResponse> startFullSnapshotBlockedOnDataNode(String snapshotName, String repoName,
String dataNode) throws InterruptedException {
blockDataNode(repoName, dataNode);
final ActionFuture<CreateSnapshotResponse> fut = startFullSnapshot(repoName, snapshotName);
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
return fut;
}
protected ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName, String snapshotName) {
return startFullSnapshot(repoName, snapshotName, false);
}
protected ActionFuture<CreateSnapshotResponse> startFullSnapshot(String repoName, String snapshotName, boolean partial) {
logger.info("--> creating full snapshot [{}] to repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.setPartial(partial).execute();
}
protected void awaitNumberOfSnapshotsInProgress(int count) throws Exception {
logger.info("--> wait for [{}] snapshots to show up in the cluster state", count);
awaitClusterState(state ->
state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() == count);
}
protected static SnapshotInfo assertSuccessful(ActionFuture<CreateSnapshotResponse> future) throws Exception {
final SnapshotInfo snapshotInfo = future.get().getSnapshotInfo();
assertThat(snapshotInfo.state(), is(SnapshotState.SUCCESS));
return snapshotInfo;
}
private static final Settings SINGLE_SHARD_NO_REPLICA = indexSettingsNoReplicas(1).build();
protected void createIndexWithContent(String indexName) {
createIndexWithContent(indexName, SINGLE_SHARD_NO_REPLICA);
}
protected void createIndexWithContent(String indexName, Settings indexSettings) {
logger.info("--> creating index [{}]", indexName);
createIndex(indexName, indexSettings);
ensureGreen(indexName);
index(indexName, "_doc", "some_id", "foo", "bar");
}
protected ActionFuture<AcknowledgedResponse> startDeleteSnapshot(String repoName, String snapshotName) {
logger.info("--> deleting snapshot [{}] from repo [{}]", snapshotName, repoName);
return clusterAdmin().prepareDeleteSnapshot(repoName, snapshotName).execute();
}
protected void updateClusterState(final Function<ClusterState, ClusterState> updater) throws Exception {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
final ClusterService clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
clusterService.submitStateUpdateTask("test", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return updater.apply(currentState);
}
@Override
public void onFailure(String source, Exception e) {
future.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
future.onResponse(null);
}
});
future.get();
}
protected SnapshotInfo getSnapshot(String repository, String snapshot) {
final List<SnapshotInfo> snapshotInfos = clusterAdmin().prepareGetSnapshots(repository).setSnapshots(snapshot)
.get().getSnapshots();

View File

@ -10,7 +10,6 @@ import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
@ -115,10 +114,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get();
java.util.List<SnapshotInfo> snap = snapshot.getSnapshots();
assertEquals(1, snap.size());
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices());
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
assertTrue(
client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "ds" }))
@ -161,10 +157,7 @@ public class DataStreamsSnapshotsIT extends AbstractSnapshotIntegTestCase {
RestStatus status = createSnapshotResponse.getSnapshotInfo().status();
assertEquals(RestStatus.OK, status);
GetSnapshotsResponse snapshot = client.admin().cluster().prepareGetSnapshots(REPO).setSnapshots(SNAPSHOT).get();
java.util.List<SnapshotInfo> snap = snapshot.getSnapshots();
assertEquals(1, snap.size());
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), snap.get(0).indices());
assertEquals(Collections.singletonList(DS_BACKING_INDEX_NAME), getSnapshot(REPO, SNAPSHOT).indices());
assertAcked(client.execute(DeleteDataStreamAction.INSTANCE, new DeleteDataStreamAction.Request(new String[] { "*" })).get());
assertAcked(client.admin().indices().prepareDelete("*").setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN));