Snapshot/Restore: restore with wait_for_completion=true should wait for succesfully restored shards to get started

This commit ensures that restore operation with wait_for_completion=true doesn't return until all successfully restored shards are started. Before it was returning as soon as restore operation was over, which cause some shards to be unavailable immediately after restore completion.

Fixes #8340
This commit is contained in:
Igor Motov 2014-11-18 21:07:32 -05:00
parent e0d6e0a6d3
commit 1aff8631ed
4 changed files with 159 additions and 87 deletions

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreSnapshotListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -76,25 +75,26 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeOperation
request.indices(), request.indicesOptions(), request.renamePattern(), request.renameReplacement(),
request.settings(), request.masterNodeTimeout(), request.includeGlobalState(), request.partial(), request.includeAliases());
restoreService.restoreSnapshot(restoreRequest, new RestoreSnapshotListener() {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreInfo>() {
@Override
public void onResponse(RestoreInfo restoreInfo) {
if (restoreInfo == null) {
if (request.waitForCompletion()) {
restoreService.addListener(new RestoreService.RestoreCompletionListener() {
SnapshotId snapshotId = new SnapshotId(request.repository(), request.snapshot());
if (restoreInfo == null && request.waitForCompletion()) {
restoreService.addListener(new ActionListener<RestoreService.RestoreCompletionResponse>() {
SnapshotId snapshotId = new SnapshotId(request.repository(), request.snapshot());
@Override
public void onRestoreCompletion(SnapshotId snapshotId, RestoreInfo snapshot) {
if (this.snapshotId.equals(snapshotId)) {
listener.onResponse(new RestoreSnapshotResponse(snapshot));
restoreService.removeListener(this);
}
@Override
public void onResponse(RestoreService.RestoreCompletionResponse restoreCompletionResponse) {
if (this.snapshotId.equals(restoreCompletionResponse.getSnapshotId())) {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
restoreService.removeListener(this);
}
});
} else {
listener.onResponse(new RestoreSnapshotResponse(null));
}
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreInfo));
}

View File

@ -25,6 +25,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -33,6 +34,7 @@ import org.elasticsearch.cluster.metadata.RestoreMetaData.ShardRestoreStatus;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -59,7 +61,7 @@ import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX
* <p/>
* Restore operation is performed in several stages.
* <p/>
* First {@link #restoreSnapshot(RestoreRequest, RestoreSnapshotListener)}
* First {@link #restoreSnapshot(RestoreRequest, org.elasticsearch.action.ActionListener))}
* method reads information about snapshot and metadata from repository. In update cluster state task it checks restore
* preconditions, restores global state if needed, creates {@link RestoreMetaData} record with list of shards that needs
* to be restored and adds this shard to the routing table using {@link RoutingTable.Builder#addAsRestore(IndexMetaData, RestoreSource)}
@ -90,7 +92,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
private final MetaDataCreateIndexService createIndexService;
private final CopyOnWriteArrayList<RestoreCompletionListener> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<ActionListener<RestoreCompletionResponse>> listeners = new CopyOnWriteArrayList<>();
@Inject
public RestoreService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, TransportService transportService, AllocationService allocationService, MetaDataCreateIndexService createIndexService) {
@ -110,7 +112,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
* @param request restore request
* @param listener restore listener
*/
public void restoreSnapshot(final RestoreRequest request, final RestoreSnapshotListener listener) {
public void restoreSnapshot(final RestoreRequest request, final ActionListener<RestoreInfo> listener) {
try {
// Read snapshot info and metadata from the repository
Repository repository = repositoriesService.repository(request.repository());
@ -233,7 +235,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
private void checkAliasNameConflicts(Map<String, String> renamedIndices, Set<String> aliases) {
for(Map.Entry<String, String> renamedIndex: renamedIndices.entrySet()) {
for (Map.Entry<String, String> renamedIndex : renamedIndices.entrySet()) {
if (aliases.contains(renamedIndex.getKey())) {
throw new SnapshotRestoreException(snapshotId, "cannot rename index [" + renamedIndex.getValue() + "] into [" + renamedIndex.getKey() + "] because of conflict with an alias with the same name");
}
@ -345,6 +347,24 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
}
public final static class RestoreCompletionResponse {
private final SnapshotId snapshotId;
private final RestoreInfo restoreInfo;
private RestoreCompletionResponse(SnapshotId snapshotId, RestoreInfo restoreInfo) {
this.snapshotId = snapshotId;
this.restoreInfo = restoreInfo;
}
public SnapshotId getSnapshotId() {
return snapshotId;
}
public RestoreInfo getRestoreInfo() {
return restoreInfo;
}
}
/**
* Updates shard restore record in the cluster state.
*
@ -354,6 +374,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
clusterService.submitStateUpdateTask("update snapshot state", new ProcessedClusterStateUpdateTask() {
private RestoreInfo restoreInfo = null;
private HashMap<ShardId, ShardRestoreStatus> shards = null;
@Override
public ClusterState execute(ClusterState currentState) {
@ -362,9 +383,12 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
RestoreMetaData restore = metaData.custom(RestoreMetaData.TYPE);
if (restore != null) {
boolean changed = false;
boolean found = false;
ArrayList<RestoreMetaData.Entry> entries = newArrayList();
for (RestoreMetaData.Entry entry : restore.entries()) {
if (entry.snapshotId().equals(request.snapshotId())) {
assert !found;
found = true;
HashMap<ShardId, ShardRestoreStatus> shards = newHashMap(entry.shards());
logger.trace("[{}] Updating shard [{}] with status [{}]", request.snapshotId(), request.shardId(), request.status().state());
shards.put(request.shardId(), request.status());
@ -373,6 +397,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
} else {
logger.info("restore [{}] is done", request.snapshotId());
restoreInfo = new RestoreInfo(entry.snapshotId().getSnapshot(), entry.indices(), shards.size(), shards.size() - failedShards(shards));
this.shards = shards;
}
changed = true;
} else {
@ -389,20 +414,71 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
@Override
public void onFailure(String source, Throwable t) {
public void onFailure(String source, @Nullable Throwable t) {
logger.warn("[{}][{}] failed to update snapshot status to [{}]", t, request.snapshotId(), request.shardId(), request.status());
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (restoreInfo != null) {
for (RestoreCompletionListener listener : listeners) {
try {
listener.onRestoreCompletion(request.snapshotId, restoreInfo);
} catch (Throwable e) {
logger.warn("failed to update snapshot status for [{}]", e, listener);
RoutingTable routingTable = newState.getRoutingTable();
final List<ShardId> waitForStarted = newArrayList();
for (HashMap.Entry<ShardId, ShardRestoreStatus> shard : shards.entrySet()) {
if (shard.getValue().state() == RestoreMetaData.State.SUCCESS ) {
ShardId shardId = shard.getKey();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
if (shardRouting != null && !shardRouting.active()) {
logger.trace("[{}][{}] waiting for the shard to start", request.snapshotId(), shardId);
waitForStarted.add(shardId);
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
} else {
clusterService.addLast(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.routingTableChanged()) {
RoutingTable routingTable = event.state().getRoutingTable();
for (Iterator<ShardId> iterator = waitForStarted.iterator(); iterator.hasNext();) {
ShardId shardId = iterator.next();
ShardRouting shardRouting = findPrimaryShard(routingTable, shardId);
// Shard disappeared (index deleted) or became active
if (shardRouting == null || shardRouting.active()) {
iterator.remove();
logger.trace("[{}][{}] shard disappeared or started - removing", request.snapshotId(), shardId);
}
}
}
if (waitForStarted.isEmpty()) {
notifyListeners();
clusterService.remove(this);
}
}
});
}
}
}
private ShardRouting findPrimaryShard(RoutingTable routingTable, ShardId shardId) {
IndexRoutingTable indexRoutingTable = routingTable.index(shardId.getIndex());
if (indexRoutingTable != null) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId.id());
if (indexShardRoutingTable != null) {
return indexShardRoutingTable.primaryShard();
}
}
return null;
}
private void notifyListeners() {
for (ActionListener<RestoreCompletionResponse> listener : listeners) {
try {
listener.onResponse(new RestoreCompletionResponse(request.snapshotId, restoreInfo));
} catch (Throwable e) {
logger.warn("failed to update snapshot status for [{}]", e, listener);
}
}
}
});
@ -528,7 +604,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
*
* @param listener restore completion listener
*/
public void addListener(RestoreCompletionListener listener) {
public void addListener(ActionListener<RestoreCompletionResponse> listener) {
this.listeners.add(listener);
}
@ -539,7 +615,7 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
*
* @param listener restore completion listener
*/
public void removeListener(RestoreCompletionListener listener) {
public void removeListener(ActionListener<RestoreCompletionResponse> listener) {
this.listeners.remove(listener);
}
@ -745,42 +821,6 @@ public class RestoreService extends AbstractComponent implements ClusterStateLis
}
/**
* This listener is called as soon as restore operation starts in the cluster.
* <p/>
* To receive notifications about when operation ends in the cluster use {@link RestoreCompletionListener}
*/
public static interface RestoreSnapshotListener {
/**
* Called when restore operations successfully starts in the cluster. Not null value of {@code snapshot} parameter
* means that restore operation didn't involve any shards and therefore has already completed.
*
* @param restoreInfo if restore operation finished, contains information about restore operation, null otherwise
*/
void onResponse(RestoreInfo restoreInfo);
/**
* Called when restore operation failed to start
*
* @param t exception that prevented the restore operation to start
*/
void onFailure(Throwable t);
}
/**
* This listener is called every time a snapshot is restored in the cluster
*/
public static interface RestoreCompletionListener {
/**
* Called for every snapshot that is completed in the cluster
*
* @param snapshotId snapshot id
* @param restoreInfo restore completion information
*/
void onRestoreCompletion(SnapshotId snapshotId, RestoreInfo restoreInfo);
}
/**
* Internal class that is used to send notifications about finished shard restore operations to master node
*/

View File

@ -465,8 +465,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(6));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen("test-idx-all");
assertThat(client().prepareCount("test-idx-all").get().getCount(), equalTo(100L));
logger.info("--> restore snapshot for the partial index");
@ -478,7 +476,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), allOf(greaterThan(0), lessThan(6)));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), greaterThan(0));
ensureGreen("test-idx-some");
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
logger.info("--> restore snapshot for the index that didn't have any shards snapshotted successfully");
@ -490,7 +487,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(6));
ensureGreen("test-idx-some");
assertThat(client().prepareCount("test-idx-some").get().getCount(), allOf(greaterThan(0L), lessThan(100L)));
}

View File

@ -139,10 +139,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
for (int i=0; i<5; i++) {
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-3").get().getCount(), equalTo(50L));
}
// Test restore after index deletion
logger.info("--> delete indices");
@ -150,8 +151,10 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> restore one index after deletion");
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-2").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
for (int i=0; i<5; i++) {
assertThat(client.prepareCount("test-idx-1").get().getCount(), equalTo(100L));
}
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
@ -162,6 +165,51 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
}
}
@Test
public void singleGetAfterRestoreTest() throws Exception {
String indexName = "testindex";
String repoName = "test-restore-snapshot-repo";
String snapshotName = "test-restore-snapshot";
String absolutePath = newTempDir().getAbsolutePath();
logger.info("Path [{}]", absolutePath);
String restoredIndexName = indexName + "-restored";
String typeName = "actions";
String expectedValue = "expected";
Client client = client();
// Write a document
String docId = Integer.toString(randomInt());
index(indexName, typeName, docId, "value", expectedValue);
// TODO: Remove after dynamic mapping flushing is implemented
waitForConcreteMappingsOnAll(indexName, typeName, "value");
logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", absolutePath)
));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setIndices(indexName)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.SUCCESS));
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
.setWaitForCompletion(true)
.setRenamePattern(indexName)
.setRenameReplacement(restoredIndexName)
.get();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
}
@Test
public void restoreWithDifferentMappingsAndSettingsTest() throws Exception {
Client client = client();
@ -200,7 +248,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> restore all indices from the snapshot");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
logger.info("--> assert that old mapping is restored");
ImmutableOpenMap<String, MappingMetaData> mappings = client().admin().cluster().prepareState().get().getState().getMetaData().getIndices().get("test-idx").getMappings();
@ -265,7 +312,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> check that aliases are restored");
assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get());
logger.info("--> update aliases");
assertAcked(client.admin().indices().prepareAliases().removeAlias("test-idx-3", "alias-123"));
assertAcked(client.admin().indices().prepareAliases().addAlias("test-idx-3", "alias-3"));
@ -391,7 +437,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen();
logger.info("--> check that template wasn't restored but index was");
getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get();
assertIndexTemplateMissing(getIndexTemplatesResponse, "test-template");
@ -554,7 +599,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
CountResponse countResponse = client.prepareCount("test-idx").get();
assertThat(countResponse.getCount(), equalTo(100L));
}
@ -620,7 +664,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
ensureGreen();
CountResponse countResponse = client.prepareCount("test-idx").get();
assertThat(countResponse.getCount(), equalTo(100L));
@ -696,7 +739,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", lastSnapshot).setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(10L * numberOfSnapshots));
logger.info("--> delete the last snapshot");
@ -864,7 +906,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
.setRenamePattern("(.+)").setRenameReplacement("$1-copy").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1-copy").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2-copy").get().getCount(), equalTo(100L));
@ -876,7 +917,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
.setRenamePattern("(.+)").setRenameReplacement("$1-copy").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx-1-copy").get().getCount(), equalTo(100L));
assertThat(client.prepareCount("test-idx-2-copy").get().getCount(), equalTo(100L));
@ -1009,7 +1049,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
}
@ -1094,7 +1133,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
}
@ -1138,7 +1176,7 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> restore index after deletion");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("url-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx").execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> list available shapshots");
@ -1193,8 +1231,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> restore index");
RestoreSnapshotResponse restoreSnapshotResponse = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
ensureGreen();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
long snapshotPause = 0L;