From 117f0f3a445ea8f43cb479c42759314e9a9dbd2f Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Fri, 3 Nov 2017 19:36:08 -0400 Subject: [PATCH] Fix snapshot getting stuck in INIT state (#27214) If the master disconnects from the cluster after initiating snapshot, but just before the snapshot switches from INIT to STARTED state, the snapshot can get indefinitely stuck in the INIT state. This error is specific to v5.x+ and was triggered by keeping the master node that stepped down in the node list, the cleanup logic in snapshot/restore assumed that if master steps down it is always removed from the the node list. This commit changes the logic to trigger cleanup even if no nodes left the cluster. Closes #27180 --- .../common/blobstore/fs/FsBlobContainer.java | 4 +- .../snapshots/SnapshotsService.java | 38 +++- .../discovery/SnapshotDisruptionIT.java | 173 ++++++++++++++++++ 3 files changed, 209 insertions(+), 6 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java diff --git a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 757cce7d837..1e384109aeb 100644 --- a/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/core/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -140,7 +140,9 @@ public class FsBlobContainer extends AbstractBlobContainer { Path targetPath = path.resolve(target); // If the target file exists then Files.move() behaviour is implementation specific // the existing file might be replaced or this method fails by throwing an IOException. - assert !Files.exists(targetPath); + if (Files.exists(targetPath)) { + throw new FileAlreadyExistsException("blob [" + targetPath + "] already exists, cannot overwrite"); + } Files.move(sourcePath, targetPath, StandardCopyOption.ATOMIC_MOVE); IOUtils.fsync(path, true); } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 037db4d5caf..0804e69e46e 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -425,6 +425,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e)); } + @Override + public void onNoLongerMaster(String source) { + // We are not longer a master - we shouldn't try to do any cleanup + // The new master will take care of it + logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId()); + userCreateSnapshotListener.onFailure( + new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization")); + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { // The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted @@ -473,6 +482,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus cleanupAfterError(e); } + public void onNoLongerMaster(String source) { + userCreateSnapshotListener.onFailure(e); + } + private void cleanupAfterError(Exception exception) { if(snapshotCreated) { try { @@ -628,7 +641,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void applyClusterState(ClusterChangedEvent event) { try { if (event.localNodeMaster()) { - if (event.nodesRemoved()) { + // We don't remove old master when master flips anymore. So, we need to check for change in master + if (event.nodesRemoved() || event.previousState().nodes().isLocalNodeElectedMaster() == false) { processSnapshotsOnRemovedNodes(event); } if (event.routingTableChanged()) { @@ -981,7 +995,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param listener listener to notify when snapshot information is removed from the cluster state */ private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, final Exception failure, - @Nullable ActionListener listener) { + @Nullable CleanupAfterErrorListener listener) { clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @Override @@ -1013,6 +1027,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } + @Override + public void onNoLongerMaster(String source) { + if (listener != null) { + listener.onNoLongerMaster(source); + } + } + @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { for (SnapshotCompletionListener listener : snapshotCompletionListeners) { @@ -1183,9 +1204,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (completedSnapshot.equals(snapshot)) { logger.debug("deleted snapshot completed - deleting files"); removeListener(this); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> - deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), - listener, true) + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { + try { + deleteSnapshot(completedSnapshot.getRepository(), completedSnapshot.getSnapshotId().getName(), + listener, true); + + } catch (Exception ex) { + logger.warn((Supplier) () -> + new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); + } + } ); } } diff --git a/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java new file mode 100644 index 00000000000..3458cca0cf7 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -0,0 +1,173 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.discovery; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.snapshots.SnapshotInfo; +import org.elasticsearch.snapshots.SnapshotMissingException; +import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.disruption.NetworkDisruption; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.instanceOf; + +/** + * Tests snapshot operations during disruptions. + */ +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, transportClientRatio = 0, autoMinMasterNodes = false) +@TestLogging("org.elasticsearch.snapshot:TRACE") +public class SnapshotDisruptionIT extends AbstractDisruptionTestCase { + + public void testDisruptionOnSnapshotInitialization() throws Exception { + final Settings settings = Settings.builder() + .put(DEFAULT_SETTINGS) + .put(DiscoverySettings.COMMIT_TIMEOUT_SETTING.getKey(), "30s") // wait till cluster state is committed + .build(); + final String idxName = "test"; + configureCluster(settings, 4, null, 2); + final List allMasterEligibleNodes = internalCluster().startMasterOnlyNodes(3); + final String dataNode = internalCluster().startDataOnlyNode(); + ensureStableCluster(4); + + createRandomIndex(idxName); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + // Writing incompatible snapshot can cause this test to fail due to a race condition in repo initialization + // by the current master and the former master. It is not causing any issues in real life scenario, but + // might make this test to fail. We are going to complete initialization of the snapshot to prevent this failures. + logger.info("--> initializing the repository"); + assertEquals(SnapshotState.SUCCESS, client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") + .setWaitForCompletion(true).setIncludeGlobalState(true).setIndices().get().getSnapshotInfo().state()); + + final String masterNode1 = internalCluster().getMasterName(); + Set otherNodes = new HashSet<>(); + otherNodes.addAll(allMasterEligibleNodes); + otherNodes.remove(masterNode1); + otherNodes.add(dataNode); + + NetworkDisruption networkDisruption = + new NetworkDisruption(new NetworkDisruption.TwoPartitions(Collections.singleton(masterNode1), otherNodes), + new NetworkDisruption.NetworkUnresponsive()); + internalCluster().setDisruptionScheme(networkDisruption); + + ClusterService clusterService = internalCluster().clusterService(masterNode1); + CountDownLatch disruptionStarted = new CountDownLatch(1); + clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + SnapshotsInProgress snapshots = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().size() > 0) { + if (snapshots.entries().get(0).state() == SnapshotsInProgress.State.INIT) { + // The snapshot started, we can start disruption so the INIT state will arrive to another master node + logger.info("--> starting disruption"); + networkDisruption.startDisrupting(); + clusterService.removeListener(this); + disruptionStarted.countDown(); + } + } + } + }); + + logger.info("--> starting snapshot"); + ActionFuture future = client(masterNode1).admin().cluster() + .prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(false).setIndices(idxName).execute(); + + logger.info("--> waiting for disruption to start"); + assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); + + logger.info("--> wait until the snapshot is done"); + assertBusy(() -> { + SnapshotsInProgress snapshots = dataNodeClient().admin().cluster().prepareState().setLocal(true).get().getState() + .custom(SnapshotsInProgress.TYPE); + if (snapshots != null && snapshots.entries().size() > 0) { + logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state()); + fail("Snapshot is still running"); + } else { + logger.info("Snapshot is no longer in the cluster state"); + } + }, 1, TimeUnit.MINUTES); + + logger.info("--> verify that snapshot was successful or no longer exist"); + assertBusy(() -> { + try { + GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots("test-repo") + .setSnapshots("test-snap-2").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + logger.info("--> done verifying"); + } catch (SnapshotMissingException exception) { + logger.info("--> snapshot doesn't exist"); + } + }, 1, TimeUnit.MINUTES); + + logger.info("--> stopping disrupting"); + networkDisruption.stopDisrupting(); + ensureStableCluster(4, masterNode1); + logger.info("--> done"); + + try { + future.get(); + } catch (Exception ex) { + logger.info("--> got exception from hanged master", ex); + Throwable cause = ex.getCause(); + assertThat(cause, instanceOf(MasterNotDiscoveredException.class)); + cause = cause.getCause(); + assertThat(cause, instanceOf(Discovery.FailedToCommitClusterStateException.class)); + } + } + + private void createRandomIndex(String idxName) throws ExecutionException, InterruptedException { + assertAcked(prepareCreate(idxName, 0, Settings.builder().put("number_of_shards", between(1, 20)) + .put("number_of_replicas", 0))); + logger.info("--> indexing some data"); + final int numdocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex(idxName, "type1", Integer.toString(i)).setSource("field1", "bar " + i); + } + indexRandom(true, builders); + } +}