Abort snapshots on a node that leaves the cluster (#21084)

Previously, if a node left the cluster (for example, due to a long GC),
during a snapshot, the master node would mark the snapshot as failed, but
the node itself could continue snapshotting the data on its shards to the
repository. If the node rejoins the cluster, the master may assign it to
hold the replica shard (where it held the primary before getting kicked off
the cluster). The initialization of the replica shard would repeatedly fail
with a ShardLockObtainFailedException until the snapshot thread finally
finishes and relinquishes the lock on the Store.

This commit resolves the situation by ensuring that when a shard is removed
from a node (such as when a node rejoins the cluster and realizes it no longer
holds the active shard copy), any snapshotting of the removed shards is aborted.
In the scenario above, when the node rejoins the cluster, it will see in the cluster 
state that the node no longer holds the primary shard, so IndicesClusterStateService
will remove the shard, thereby causing any snapshots of that shard to be aborted.

Closes #20876
This commit is contained in:
Ali Beyad 2016-10-26 10:04:50 -04:00 committed by GitHub
parent aaf3477327
commit c88452dc80
9 changed files with 146 additions and 57 deletions

View File

@ -210,12 +210,9 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
public static class ShardSnapshotStatus {
private State state;
private String nodeId;
private String reason;
private ShardSnapshotStatus() {
}
private final State state;
private final String nodeId;
private final String reason;
public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
@ -231,6 +228,12 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
this.reason = reason;
}
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}
public State state() {
return state;
}
@ -243,18 +246,6 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
return reason;
}
public static ShardSnapshotStatus readShardSnapshotStatus(StreamInput in) throws IOException {
ShardSnapshotStatus shardSnapshotStatus = new ShardSnapshotStatus();
shardSnapshotStatus.readFrom(in);
return shardSnapshotStatus;
}
public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
@ -282,6 +273,11 @@ public class SnapshotsInProgress extends AbstractDiffable<Custom> implements Cus
result = 31 * result + (reason != null ? reason.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
}
}
public enum State {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -399,9 +400,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (DiscoveryNode node : this) {
sb.append(node).append(',');
}
sb.append(Strings.collectionToDelimitedString(this, ","));
sb.append("}");
return sb.toString();
}

View File

@ -411,7 +411,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
} finally {
try {
if (store != null) {
store.close();
} else {
logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId);
}
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(

View File

@ -27,7 +27,7 @@ public class IndexShardSnapshotStatus {
/**
* Snapshot stage
*/
public static enum Stage {
public enum Stage {
/**
* Snapshot hasn't started yet
*/
@ -66,7 +66,7 @@ public class IndexShardSnapshotStatus {
private long indexVersion;
private boolean aborted;
private volatile boolean aborted;
private String failure;

View File

@ -69,6 +69,7 @@ import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -113,10 +114,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService);
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService);
}
// for tests
@ -128,9 +130,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
super(settings);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService,
snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;

View File

@ -29,8 +29,10 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -42,11 +44,13 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.SnapshotFailedEngineException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus.Stage;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
@ -80,7 +84,7 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
* starting and stopping shard level snapshots
*/
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener {
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot";
@ -156,11 +160,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);
if (prev == null) {
if (curr != null) {
processIndexShardSnapshots(event);
}
} else if (prev.equals(curr) == false) {
if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) {
processIndexShardSnapshots(event);
}
String masterNodeId = event.state().nodes().getMasterNodeId();
@ -173,6 +173,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
}
}
@Override
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
// abort any snapshots occurring on the soon-to-be closed shard
Map<Snapshot, SnapshotShards> snapshotShardsMap = shardSnapshots;
for (Map.Entry<Snapshot, SnapshotShards> snapshotShards : snapshotShardsMap.entrySet()) {
Map<ShardId, IndexShardSnapshotStatus> shards = snapshotShards.getValue().shards;
if (shards.containsKey(shardId)) {
logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId());
shards.get(shardId).abort();
}
}
}
/**
* Returns status of shards that are snapshotted on the node and belong to the given snapshot
@ -205,6 +217,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
final Snapshot snapshot = entry.getKey();
if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) {
survivors.put(entry.getKey(), entry.getValue());
} else {
// abort any running snapshots of shards for the removed entry;
// this could happen if for some reason the cluster state update for aborting
// running shards is missed, then the snapshot is removed is a subsequent cluster
// state update, which is being processed here
for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) {
if (snapshotStatus.stage() == Stage.INIT || snapshotStatus.stage() == Stage.STARTED) {
snapshotStatus.abort();
}
}
}
}
@ -221,7 +243,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
if (entry.state() == SnapshotsInProgress.State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.value.nodeId())) {
if (shard.value.state() == SnapshotsInProgress.State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) {
@ -249,7 +271,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
if (snapshotShards != null) {
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key);
if (snapshotStatus != null) {
switch (snapshotStatus.stage()) {
@ -263,12 +285,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
break;
default:
throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage());
@ -309,18 +331,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
@Override
public void doRun() {
snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
}
@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e);
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
}
});
} catch (Exception e) {
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
}
}
}
@ -383,23 +405,23 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
if (snapshot.state() == SnapshotsInProgress.State.STARTED || snapshot.state() == SnapshotsInProgress.State.ABORTED) {
Map<ShardId, IndexShardSnapshotStatus> localShards = currentSnapshotShards(snapshot.snapshot());
if (localShards != null) {
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> masterShards = snapshot.shards();
ImmutableOpenMap<ShardId, ShardSnapshotStatus> masterShards = snapshot.shards();
for(Map.Entry<ShardId, IndexShardSnapshotStatus> localShard : localShards.entrySet()) {
ShardId shardId = localShard.getKey();
IndexShardSnapshotStatus localShardStatus = localShard.getValue();
SnapshotsInProgress.ShardSnapshotStatus masterShard = masterShards.get(shardId);
ShardSnapshotStatus masterShard = masterShards.get(shardId);
if (masterShard != null && masterShard.state().completed() == false) {
// Master knows about the shard and thinks it has not completed
if (localShardStatus.stage() == IndexShardSnapshotStatus.Stage.DONE) {
if (localShardStatus.stage() == Stage.DONE) {
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
} else if (localShard.getValue().stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
} else if (localShard.getValue().stage() == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));
}
}
@ -427,7 +449,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
private Snapshot snapshot;
private ShardId shardId;
private SnapshotsInProgress.ShardSnapshotStatus status;
private ShardSnapshotStatus status;
private volatile boolean processed; // state field, no need to serialize
@ -435,7 +457,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
}
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
@ -446,7 +468,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
super.readFrom(in);
snapshot = new Snapshot(in);
shardId = ShardId.readShardId(in);
status = SnapshotsInProgress.ShardSnapshotStatus.readShardSnapshotStatus(in);
status = new ShardSnapshotStatus(in);
}
@Override
@ -465,7 +487,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
return shardId;
}
public SnapshotsInProgress.ShardSnapshotStatus status() {
public ShardSnapshotStatus status() {
return status;
}
@ -486,7 +508,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
/**
* Updates the shard status
*/
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
try {
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
@ -533,7 +555,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (int i = 0; i < batchSize; i++) {

View File

@ -793,12 +793,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
// Check if we just became the master
boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
return false;
}
// Check if we just became the master
boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) {
// We just replaced old master and snapshots in intermediate states needs to be cleaned

View File

@ -376,7 +376,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice
transportService, null, clusterService);
final ShardStateAction shardStateAction = mock(ShardStateAction.class);
return new IndicesClusterStateService(settings, indicesService, clusterService,
threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null);
threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null);
}
private class RecordingIndicesService extends MockIndicesService {

View File

@ -51,6 +51,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -58,8 +59,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
@ -2490,4 +2493,66 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60));
}
/**
* This test ensures that when a shard is removed from a node (perhaps due to the node
* leaving the cluster, then returning), all snapshotting of that shard is aborted, so
* all Store references held onto by the snapshot are released.
*
* See https://github.com/elastic/elasticsearch/issues/20876
*/
public void testSnapshotCanceledOnRemovedShard() throws Exception {
final int numPrimaries = 1;
final int numReplicas = 1;
final int numDocs = 100;
final String repo = "test-repo";
final String index = "test-idx";
final String snapshot = "test-snap";
assertAcked(prepareCreate(index, 1,
Settings.builder().put("number_of_shards", numPrimaries).put("number_of_replicas", numReplicas)));
logger.info("--> indexing some data");
for (int i = 0; i < numDocs; i++) {
index(index, "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse =
client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertTrue(putRepositoryResponse.isAcknowledged());
String blockedNode = blockNodeWithIndex(repo, index);
logger.info("--> snapshot");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot)
.setWaitForCompletion(false)
.execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
logger.info("--> removing primary shard that is being snapshotted");
ClusterState clusterState = internalCluster().clusterService(internalCluster().getMasterName()).state();
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index);
String nodeWithPrimary = clusterState.nodes().get(indexRoutingTable.shard(0).primaryShard().currentNodeId()).getName();
assertNotNull("should be at least one node with a primary shard", nodeWithPrimary);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeWithPrimary);
IndexService indexService = indicesService.indexService(resolveIndex(index));
indexService.removeShard(0, "simulate node removal");
logger.info("--> unblocking blocked node [{}]", blockedNode);
unblockNode(repo, blockedNode);
logger.info("--> ensuring snapshot is aborted and the aborted shard was marked as failed");
SnapshotInfo snapshotInfo = waitForCompletion(repo, snapshot, TimeValue.timeValueSeconds(10));
assertEquals(1, snapshotInfo.shardFailures().size());
assertEquals(0, snapshotInfo.shardFailures().get(0).shardId());
assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason());
}
}