diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index 355873db329..ea5b563821e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; @@ -65,7 +66,7 @@ import static org.hamcrest.Matchers.is; * Tests snapshot operations during disruptions. */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) -public class SnapshotDisruptionIT extends ESIntegTestCase { +public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { @Override protected Collection> nodePlugins() { @@ -306,6 +307,48 @@ public class SnapshotDisruptionIT extends ESIntegTestCase { client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).get(); final SnapshotInfo successfulSnapshotInfo = successfulSnapshot.getSnapshotInfo(); assertThat(successfulSnapshotInfo.state(), is(SnapshotState.SUCCESS)); + + logger.info("--> making sure snapshot delete works out cleanly"); + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "snapshot-2").get()); + } + + public void testMasterFailOverDuringShardSnapshots() throws Exception { + internalCluster().startMasterOnlyNodes(3); + final String dataNode = internalCluster().startDataOnlyNode(); + ensureStableCluster(4); + final String repoName = "test-repo"; + createRepository(repoName, "mock", randomRepoPath()); + + final String indexName = "index-one"; + createIndex(indexName); + client().prepareIndex(indexName, "_doc").setSource("foo", "bar").get(); + + blockDataNode(repoName, dataNode); + + logger.info("--> create snapshot via master node client"); + final ActionFuture snapshotResponse = internalCluster().masterClient().admin().cluster() + .prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute(); + + waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L)); + + final String masterNode = internalCluster().getMasterName(); + final NetworkDisruption networkDisruption = new NetworkDisruption( + new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode), + Arrays.stream(internalCluster().getNodeNames()).filter(name -> masterNode.equals(name) == false) + .collect(Collectors.toSet())), + new NetworkDisruption.NetworkDisconnect()); + internalCluster().setDisruptionScheme(networkDisruption); + networkDisruption.startDisrupting(); + ensureStableCluster(3, dataNode); + unblockNode(repoName, dataNode); + + networkDisruption.stopDisrupting(); + assertAllSnapshotsCompleted(); + + logger.info("--> make sure isolated master responds to snapshot request"); + final SnapshotException sne = + expectThrows(SnapshotException.class, () -> snapshotResponse.actionGet(TimeValue.timeValueSeconds(30L))); + assertThat(sne.getMessage(), endsWith("no longer master")); } private void assertAllSnapshotsCompleted() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f4ffae3e00e..bfae3986b7b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -618,6 +618,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event.state()); } + } else if (snapshotCompletionListeners.isEmpty() == false) { + // We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already + // have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster + // state). + for (Snapshot snapshot : new HashSet<>(snapshotCompletionListeners.keySet())) { + if (endingSnapshots.add(snapshot)) { + failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, "no longer master")); + } + } } } catch (Exception e) { logger.warn("Failed to update snapshot state ", e); @@ -1562,4 +1571,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus protected void doClose() { clusterService.removeApplier(this); } + + /** + * Assert that no in-memory state for any running snapshot operation exists in this instance. + */ + public boolean assertAllListenersResolved() { + synchronized (endingSnapshots) { + final DiscoveryNode localNode = clusterService.localNode(); + assert endingSnapshots.isEmpty() : "Found leaked ending snapshots " + endingSnapshots + + " on [" + localNode + "]"; + assert snapshotCompletionListeners.isEmpty() : "Found leaked snapshot completion listeners " + snapshotCompletionListeners + + " on [" + localNode + "]"; + } + return true; + } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 1df493468bb..9f6a4448853 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -87,6 +87,15 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); } + @After + public void verifyNoLeakedListeners() throws Exception { + assertBusy(() -> { + for (SnapshotsService snapshotsService : internalCluster().getInstances(SnapshotsService.class)) { + assertTrue(snapshotsService.assertAllListenersResolved()); + } + }, 30L, TimeUnit.SECONDS); + } + private String skipRepoConsistencyCheckReason; @After @@ -226,6 +235,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { return null; } + public static void blockDataNode(String repository, String nodeName) { + ((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName) + .repository(repository)).blockOnDataFiles(true); + } + public static void blockAllDataNodes(String repository) { for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);