Fix Snapshot Completion Listener Lost on Master Failover (#54286) (#54330)

* Fix Snapshot Completion Listener Lost on Master Failover

If master fails over before (or we run into any other exception) when removing
the snapshot from the CS we must still resolve all the completion listeners for
the snapshot.
This commit is contained in:
Armin Braun 2020-03-27 14:11:13 +01:00 committed by GitHub
parent 8126ad0ab1
commit 14b5daad7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 12 deletions

View File

@ -1104,7 +1104,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// will try ending this snapshot again
logger.debug(() -> new ParameterizedMessage(
"[{}] failed to update cluster state during snapshot finalization", snapshot), e);
endingSnapshots.remove(snapshot);
failSnapshotCompletionListeners(snapshot,
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
} else {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
@ -1158,7 +1159,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to remove snapshot metadata", snapshot), e);
endingSnapshots.remove(snapshot);
failSnapshotCompletionListeners(
snapshot, new SnapshotException(snapshot, "Failed to remove snapshot from cluster state", e));
if (listener != null) {
listener.onFailure(e);
}
@ -1166,7 +1168,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void onNoLongerMaster(String source) {
endingSnapshots.remove(snapshot);
failSnapshotCompletionListeners(
snapshot, ExceptionsHelper.useOrSuppress(failure, new SnapshotException(snapshot, "no longer master")));
if (listener != null) {
listener.onNoLongerMaster();
}
@ -1174,19 +1177,19 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
if (snapshotInfo == null) {
ActionListener.onFailure(completionListeners, failure);
} else {
if (snapshotInfo == null) {
failSnapshotCompletionListeners(snapshot, failure);
} else {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, snapshotInfo);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
endingSnapshots.remove(snapshot);
}
endingSnapshots.remove(snapshot);
if (listener != null) {
listener.onResponse(snapshotInfo);
}
@ -1194,6 +1197,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
});
}
private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onFailure(completionListeners, e);
} catch (Exception ex) {
logger.warn("Failed to notify listeners", ex);
}
}
endingSnapshots.remove(snapshot);
}
/**
* Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting.
* If the snapshot is still running cancels the snapshot first and then deletes it from the repository.

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.discovery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@ -27,11 +28,14 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
import org.elasticsearch.snapshots.SnapshotState;
@ -49,6 +53,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.is;
/**
* Tests snapshot operations during disruptions.
@ -156,6 +163,95 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
assertAllSnapshotsCompleted();
}
public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
final List<String> allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3);
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
createRandomIndex(idxName);
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
final String masterNode1 = internalCluster().getMasterName();
Set<String> otherNodes = new HashSet<>(allMasterEligibleNodes);
otherNodes.remove(masterNode1);
otherNodes.add(dataNode);
NetworkDisruption networkDisruption =
new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes),
new NetworkDisruption.NetworkUnresponsive());
internalCluster().setDisruptionScheme(networkDisruption);
ClusterService clusterService = internalCluster().clusterService(masterNode1);
CountDownLatch disruptionStarted = new CountDownLatch(1);
clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshots != null && snapshots.entries().size() > 0) {
final SnapshotsInProgress.Entry snapshotEntry = snapshots.entries().get(0);
if (snapshotEntry.state() == SnapshotsInProgress.State.SUCCESS) {
final RepositoriesMetaData repoMeta =
event.state().metaData().custom(RepositoriesMetaData.TYPE);
final RepositoryMetaData metaData = repoMeta.repository("test-repo");
if (metaData.generation() == metaData.pendingGeneration()
&& metaData.generation() > snapshotEntry.repositoryStateId()) {
logger.info("--> starting disruption");
networkDisruption.startDisrupting();
clusterService.removeListener(this);
disruptionStarted.countDown();
}
}
}
}
});
final String snapshot = "test-snap";
logger.info("--> starting snapshot");
ActionFuture<CreateSnapshotResponse> future = client(masterNode1).admin().cluster()
.prepareCreateSnapshot("test-repo", snapshot).setWaitForCompletion(true)
.setIndices(idxName).execute();
logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
assertAllSnapshotsCompleted();
logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
try {
assertSnapshotExists("test-repo", snapshot);
} catch (SnapshotMissingException exception) {
logger.info("--> done verifying, snapshot doesn't exist");
}
}, 1, TimeUnit.MINUTES);
logger.info("--> stopping disrupting");
networkDisruption.stopDisrupting();
ensureStableCluster(4, masterNode1);
logger.info("--> done");
try {
future.get();
fail("Should have failed because the node disconnected from the cluster during snapshot finalization");
} catch (Exception ex) {
final SnapshotException sne = (SnapshotException) ExceptionsHelper.unwrap(ex, SnapshotException.class);
assertNotNull(sne);
assertThat(
sne.getMessage(), either(endsWith(" Failed to remove snapshot from cluster state")).or(endsWith(" no longer master")));
assertThat(sne.getSnapshotName(), is(snapshot));
}
assertAllSnapshotsCompleted();
}
private void assertAllSnapshotsCompleted() throws Exception {
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {