Fix for hanging aborted snapshot during node shutdown

If a node is shutdown while a snapshot that runs on this node is aborted, it might cause the snapshot process to hang.

Closes #5958
This commit is contained in:
Igor Motov 2014-04-28 21:19:28 -04:00
parent fdbefa0cd1
commit dfdc183ba6
2 changed files with 58 additions and 4 deletions

View File

@ -537,7 +537,7 @@ public class SnapshotsService extends AbstractComponent implements ClusterStateL
for (final SnapshotMetaData.Entry snapshot : snapshots.entries()) {
SnapshotMetaData.Entry updatedSnapshot = snapshot;
boolean snapshotChanged = false;
if (snapshot.state() == State.STARTED) {
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
ImmutableMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableMap.builder();
for (ImmutableMap.Entry<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards().entrySet()) {
ShardSnapshotStatus shardStatus = shardEntry.getValue();

View File

@ -20,8 +20,10 @@
package org.elasticsearch.snapshots;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
@ -86,9 +88,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
@Test
public void snapshotDuringNodeShutdownTest() throws Exception {
logger.info("--> start 2 nodes");
ArrayList<String> nodes = newArrayList();
nodes.add(cluster().startNode());
nodes.add(cluster().startNode());
Client client = client();
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", 2).put("number_of_replicas", 0).put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
@ -132,6 +131,61 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
logger.info("--> done");
}
@Test
public void snapshotWithStuckNodeTest() throws Exception {
logger.info("--> start 2 nodes");
ArrayList<String> nodes = newArrayList();
nodes.add(cluster().startNode());
nodes.add(cluster().startNode());
Client client = client();
assertAcked(prepareCreate("test-idx", 2, settingsBuilder().put("number_of_shards", 2).put("number_of_replicas", 0).put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)));
ensureGreen();
logger.info("--> indexing some data");
for (int i = 0; i < 100; i++) {
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
}
refresh();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
logger.info("--> creating repository");
PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType(MockRepositoryModule.class.getCanonicalName()).setSettings(
ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.TEST))
.put("random", randomAsciiOfLength(10))
.put("wait_after_unblock", 200)
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));
// Pick one node and block it
String blockedNode = blockNodeWithIndex("test-idx");
// Remove it from the list of available nodes
nodes.remove(blockedNode);
logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
logger.info("--> waiting for block to kick in");
waitForBlock(blockedNode, "test-repo", TimeValue.timeValueSeconds(60));
logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode);
ListenableActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = cluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute();
// Make sure that abort makes some progress
Thread.sleep(100);
unblockNode(blockedNode);
logger.info("--> stopping node", blockedNode);
stopNode(blockedNode);
DeleteSnapshotResponse deleteSnapshotResponse = deleteSnapshotResponseFuture.get();
assertThat(deleteSnapshotResponse.isAcknowledged(), equalTo(true));
logger.info("--> making sure that snapshot no longer exists");
assertThrows(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").execute(), SnapshotMissingException.class);
logger.info("--> done");
}
@Test
@TestLogging("snapshots:TRACE")
public void restoreIndexWithMissingShards() throws Exception {