Consolidates the logic for cleaning up snapshots on master election (#24894)
In #24605, logic was implemented to ensure that completed snapshots were properly removed from the cluster state upon a change in master nodes. This commit removes redundant logic that also attempted to clean up completed snapshots from the cluster state on master election, but only covered a limited case that was remedied in #24605. This commit also adds a test to ensure cleaning up of completed snapshots at the right moment in time when a master election happens before finalizing a snapshot, as well as adds a check to handle the case where the old master and new master could attempt to finalize the snapshot and write the same blob to the repository simultaneously.
This commit is contained in:
parent
aa5b11687d
commit
3cb307462d
|
@ -106,6 +106,7 @@ import java.io.FilterInputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.DirectoryNotEmptyException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -463,21 +464,24 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final int totalShards,
|
||||
final List<SnapshotShardFailure> shardFailures,
|
||||
final long repositoryStateId) {
|
||||
|
||||
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
|
||||
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures);
|
||||
try {
|
||||
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
|
||||
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
startTime,
|
||||
failure,
|
||||
System.currentTimeMillis(),
|
||||
totalShards,
|
||||
shardFailures);
|
||||
snapshotFormat.write(blobStoreSnapshot, snapshotsBlobContainer, snapshotId.getUUID());
|
||||
final RepositoryData repositoryData = getRepositoryData();
|
||||
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
|
||||
return blobStoreSnapshot;
|
||||
} catch (FileAlreadyExistsException ex) {
|
||||
// if another master was elected and took over finalizing the snapshot, it is possible
|
||||
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
|
||||
// log a warning here and carry on
|
||||
throw new RepositoryException(metadata.name(), "Blob already exists while " +
|
||||
"finalizing snapshot, assume the snapshot has already been saved", ex);
|
||||
} catch (IOException ex) {
|
||||
throw new RepositoryException(metadata.name(), "failed to update snapshot in repository", ex);
|
||||
}
|
||||
return blobStoreSnapshot;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -745,9 +745,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot());
|
||||
}
|
||||
}, updatedSnapshot.getRepositoryStateId(), false);
|
||||
} else if (snapshot.state() == State.SUCCESS && newMaster) {
|
||||
// Finalize the snapshot
|
||||
endSnapshot(snapshot);
|
||||
}
|
||||
}
|
||||
if (changed) {
|
||||
|
|
|
@ -128,6 +128,13 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
|||
return null;
|
||||
}
|
||||
|
||||
public static String blockMasterFromFinalizingSnapshot(final String repositoryName) {
|
||||
final String masterName = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
|
||||
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
|
||||
return masterName;
|
||||
}
|
||||
|
||||
public static String blockNodeWithIndex(final String repositoryName, final String indexName) {
|
||||
for(String node : internalCluster().nodesInclude(indexName)) {
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.elasticsearch.client.node.NodeClient;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
|
@ -777,6 +779,69 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
public void testMasterShutdownDuringFailedSnapshot() throws Exception {
|
||||
logger.info("--> starting two master nodes and two data nodes");
|
||||
internalCluster().startMasterOnlyNodes(2);
|
||||
internalCluster().startDataOnlyNodes(2);
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("mock").setSettings(Settings.builder()
|
||||
.put("location", randomRepoPath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
|
||||
|
||||
assertAcked(prepareCreate("test-idx", 0, Settings.builder()
|
||||
.put("number_of_shards", 6).put("number_of_replicas", 0)));
|
||||
ensureGreen();
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
final int numdocs = randomIntBetween(50, 100);
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex("test-idx", "type1",
|
||||
Integer.toString(i)).setSource("field1", "bar " + i);
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
flushAndRefresh();
|
||||
|
||||
logger.info("--> stopping random data node, which should cause shards to go missing");
|
||||
internalCluster().stopRandomDataNode();
|
||||
assertBusy(() ->
|
||||
assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()),
|
||||
30, TimeUnit.SECONDS);
|
||||
|
||||
final String masterNode = blockMasterFromFinalizingSnapshot("test-repo");
|
||||
|
||||
logger.info("--> snapshot");
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
logger.info("--> waiting for block to kick in on " + masterNode);
|
||||
waitForBlock(masterNode, "test-repo", TimeValue.timeValueSeconds(60));
|
||||
|
||||
logger.info("--> stopping master node");
|
||||
internalCluster().stopCurrentMasterNode();
|
||||
|
||||
logger.info("--> wait until the snapshot is done");
|
||||
assertBusy(() -> {
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
||||
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").setIgnoreUnavailable(true).get();
|
||||
assertEquals(1, snapshotsStatusResponse.getSnapshots().size());
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertTrue(snapshotInfo.state().completed());
|
||||
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
SnapshotsInProgress snapshotsInProgress = clusterState.custom(SnapshotsInProgress.TYPE);
|
||||
assertEquals(0, snapshotsInProgress.entries().size());
|
||||
}, 30, TimeUnit.SECONDS);
|
||||
|
||||
logger.info("--> verify that snapshot failed");
|
||||
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster()
|
||||
.prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
|
||||
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
|
||||
assertEquals(SnapshotState.FAILED, snapshotInfo.state());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a shrunken index (created via the shrink APIs) and subsequently snapshotted
|
||||
* can be restored when the node the shrunken index was created on is no longer part of
|
||||
|
@ -844,7 +909,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
restoreResponse.getRestoreInfo().successfulShards());
|
||||
ensureYellow();
|
||||
}
|
||||
|
||||
|
||||
public static class SnapshottableMetadata extends TestCustomMetaData {
|
||||
public static final String TYPE = "test_snapshottable";
|
||||
|
||||
|
|
|
@ -100,6 +100,10 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private volatile boolean blockOnDataFiles;
|
||||
|
||||
/** Allows blocking on writing the index-N blob; this is a way to enforce blocking the
|
||||
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
|
||||
private volatile boolean blockOnWriteIndexFile;
|
||||
|
||||
private volatile boolean atomicMove;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
|
@ -163,6 +167,7 @@ public class MockRepository extends FsRepository {
|
|||
blockOnDataFiles = false;
|
||||
blockOnControlFiles = false;
|
||||
blockOnInitialization = false;
|
||||
blockOnWriteIndexFile = false;
|
||||
this.notifyAll();
|
||||
}
|
||||
|
||||
|
@ -170,6 +175,10 @@ public class MockRepository extends FsRepository {
|
|||
blockOnDataFiles = blocked;
|
||||
}
|
||||
|
||||
public void setBlockOnWriteIndexFile(boolean blocked) {
|
||||
blockOnWriteIndexFile = blocked;
|
||||
}
|
||||
|
||||
public boolean blocked() {
|
||||
return blocked;
|
||||
}
|
||||
|
@ -178,7 +187,7 @@ public class MockRepository extends FsRepository {
|
|||
logger.debug("Blocking execution");
|
||||
boolean wasBlocked = false;
|
||||
try {
|
||||
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization) {
|
||||
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile) {
|
||||
blocked = true;
|
||||
this.wait();
|
||||
wasBlocked = true;
|
||||
|
@ -249,36 +258,30 @@ public class MockRepository extends FsRepository {
|
|||
throw new IOException("Random IOException");
|
||||
}
|
||||
} else if (blockOnDataFiles) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
try {
|
||||
// Delay operation after unblocking
|
||||
// So, we can start node shutdown while this operation is still running.
|
||||
Thread.sleep(waitAfterUnblock);
|
||||
} catch (InterruptedException ex) {
|
||||
//
|
||||
}
|
||||
}
|
||||
blockExecutionAndMaybeWait(blobName);
|
||||
}
|
||||
} else {
|
||||
if (shouldFail(blobName, randomControlIOExceptionRate) && (incrementAndGetFailureCount() < maximumNumberOfFailures)) {
|
||||
logger.info("throwing random IOException for file [{}] at path [{}]", blobName, path());
|
||||
throw new IOException("Random IOException");
|
||||
} else if (blockOnControlFiles) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
try {
|
||||
// Delay operation after unblocking
|
||||
// So, we can start node shutdown while this operation is still running.
|
||||
Thread.sleep(waitAfterUnblock);
|
||||
} catch (InterruptedException ex) {
|
||||
//
|
||||
}
|
||||
}
|
||||
blockExecutionAndMaybeWait(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void blockExecutionAndMaybeWait(final String blobName) {
|
||||
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
|
||||
if (blockExecution() && waitAfterUnblock > 0) {
|
||||
try {
|
||||
// Delay operation after unblocking
|
||||
// So, we can start node shutdown while this operation is still running.
|
||||
Thread.sleep(waitAfterUnblock);
|
||||
} catch (InterruptedException ex) {
|
||||
//
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
MockBlobContainer(BlobContainer delegate) {
|
||||
super(delegate);
|
||||
|
@ -315,6 +318,9 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
@Override
|
||||
public void move(String sourceBlob, String targetBlob) throws IOException {
|
||||
if (blockOnWriteIndexFile && targetBlob.startsWith("index-")) {
|
||||
blockExecutionAndMaybeWait(targetBlob);
|
||||
}
|
||||
if (atomicMove) {
|
||||
// atomic move since this inherits from FsBlobContainer which provides atomic moves
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
|
|
Loading…
Reference in New Issue