Fix Memory Leak From Master Failover During Snapshot (#58511) (#58560)

If we failed over while the data nodes were doing their work
we would never resolve the listener and leak it.
This change fails all listeners if master fails over.
This commit is contained in:
Armin Braun 2020-06-25 20:43:08 +02:00 committed by GitHub
parent 38be2812b1
commit 468e559ff7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 1 deletions

View File

@ -54,6 +54,7 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFutureThrows;
@ -65,7 +66,7 @@ import static org.hamcrest.Matchers.is;
* Tests snapshot operations during disruptions. * Tests snapshot operations during disruptions.
*/ */
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0)
public class SnapshotDisruptionIT extends ESIntegTestCase { public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
@ -306,6 +307,48 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).get(); client(masterNode).admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2").setWaitForCompletion(true).get();
final SnapshotInfo successfulSnapshotInfo = successfulSnapshot.getSnapshotInfo(); final SnapshotInfo successfulSnapshotInfo = successfulSnapshot.getSnapshotInfo();
assertThat(successfulSnapshotInfo.state(), is(SnapshotState.SUCCESS)); assertThat(successfulSnapshotInfo.state(), is(SnapshotState.SUCCESS));
logger.info("--> making sure snapshot delete works out cleanly");
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "snapshot-2").get());
}
public void testMasterFailOverDuringShardSnapshots() throws Exception {
internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
final String repoName = "test-repo";
createRepository(repoName, "mock", randomRepoPath());
final String indexName = "index-one";
createIndex(indexName);
client().prepareIndex(indexName, "_doc").setSource("foo", "bar").get();
blockDataNode(repoName, dataNode);
logger.info("--> create snapshot via master node client");
final ActionFuture<CreateSnapshotResponse> snapshotResponse = internalCluster().masterClient().admin().cluster()
.prepareCreateSnapshot(repoName, "test-snap").setWaitForCompletion(true).execute();
waitForBlock(dataNode, repoName, TimeValue.timeValueSeconds(30L));
final String masterNode = internalCluster().getMasterName();
final NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode),
Arrays.stream(internalCluster().getNodeNames()).filter(name -> masterNode.equals(name) == false)
.collect(Collectors.toSet())),
new NetworkDisruption.NetworkDisconnect());
internalCluster().setDisruptionScheme(networkDisruption);
networkDisruption.startDisrupting();
ensureStableCluster(3, dataNode);
unblockNode(repoName, dataNode);
networkDisruption.stopDisrupting();
assertAllSnapshotsCompleted();
logger.info("--> make sure isolated master responds to snapshot request");
final SnapshotException sne =
expectThrows(SnapshotException.class, () -> snapshotResponse.actionGet(TimeValue.timeValueSeconds(30L)));
assertThat(sne.getMessage(), endsWith("no longer master"));
} }
private void assertAllSnapshotsCompleted() throws Exception { private void assertAllSnapshotsCompleted() throws Exception {

View File

@ -618,6 +618,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (newMaster) { if (newMaster) {
finalizeSnapshotDeletionFromPreviousMaster(event.state()); finalizeSnapshotDeletionFromPreviousMaster(event.state());
} }
} else if (snapshotCompletionListeners.isEmpty() == false) {
// We have snapshot listeners but are not the master any more. Fail all waiting listeners except for those that already
// have their snapshots finalizing (those that are already finalizing will fail on their own from to update the cluster
// state).
for (Snapshot snapshot : new HashSet<>(snapshotCompletionListeners.keySet())) {
if (endingSnapshots.add(snapshot)) {
failSnapshotCompletionListeners(snapshot, new SnapshotException(snapshot, "no longer master"));
}
}
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("Failed to update snapshot state ", e); logger.warn("Failed to update snapshot state ", e);
@ -1562,4 +1571,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
protected void doClose() { protected void doClose() {
clusterService.removeApplier(this); clusterService.removeApplier(this);
} }
/**
* Assert that no in-memory state for any running snapshot operation exists in this instance.
*/
public boolean assertAllListenersResolved() {
synchronized (endingSnapshots) {
final DiscoveryNode localNode = clusterService.localNode();
assert endingSnapshots.isEmpty() : "Found leaked ending snapshots " + endingSnapshots
+ " on [" + localNode + "]";
assert snapshotCompletionListeners.isEmpty() : "Found leaked snapshot completion listeners " + snapshotCompletionListeners
+ " on [" + localNode + "]";
}
return true;
}
} }

View File

@ -87,6 +87,15 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
} }
@After
public void verifyNoLeakedListeners() throws Exception {
assertBusy(() -> {
for (SnapshotsService snapshotsService : internalCluster().getInstances(SnapshotsService.class)) {
assertTrue(snapshotsService.assertAllListenersResolved());
}
}, 30L, TimeUnit.SECONDS);
}
private String skipRepoConsistencyCheckReason; private String skipRepoConsistencyCheckReason;
@After @After
@ -226,6 +235,11 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
return null; return null;
} }
public static void blockDataNode(String repository, String nodeName) {
((MockRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository)).blockOnDataFiles(true);
}
public static void blockAllDataNodes(String repository) { public static void blockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true); ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);