We shouldn't be throwing `RepositoryException` when the repository wasn't concurrently modified in an unexpected fashion (i.e. on the blob/file level). When we know that the known repo gen moved higher in terms of the generation tracked in master memory we should throw the concurrent snapshot exception. This change makes concurrent snapshot create and delete always throw the same exception, prevents unnecessary listings when the generation is known to be off and prevents future test failures in SLM tests that assume the concurrent snapshot exception is always thrown here. Without this change, the newly added test randomly fails the `instanceOf` assertion by running into a `RepositoryException`.
This commit is contained in:
parent
abd4a70b10
commit
9c00648314
|
@ -91,6 +91,8 @@ import org.elasticsearch.repositories.RepositoryException;
|
|||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.snapshots.SnapshotCreationException;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
|
@ -371,6 +373,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
} else {
|
||||
final long latestKnownGen = latestKnownRepoGen.get();
|
||||
if (latestKnownGen > repositoryStateId) {
|
||||
listener.onFailure(new ConcurrentSnapshotExecutionException(
|
||||
new Snapshot(metadata.name(), snapshotId), "Another concurrent operation moved repo generation to [ " + latestKnownGen
|
||||
+ "] but this delete assumed generation [" + repositoryStateId + "]"));
|
||||
return;
|
||||
}
|
||||
try {
|
||||
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryStateId, rootBlobs);
|
||||
|
|
|
@ -203,6 +203,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
|
|||
import static org.hamcrest.Matchers.either;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
|
@ -417,6 +418,74 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
|
||||
public void testConcurrentSnapshotCreateAndDeleteOther() {
|
||||
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
||||
|
||||
String repoName = "repo";
|
||||
String snapshotName = "snapshot";
|
||||
final String index = "test";
|
||||
final int shards = randomIntBetween(1, 10);
|
||||
|
||||
TestClusterNodes.TestClusterNode masterNode =
|
||||
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
|
||||
createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
||||
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createSnapshotResponseStepListener,
|
||||
createSnapshotResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
|
||||
.execute(createOtherSnapshotResponseStepListener));
|
||||
|
||||
final StepListener<Boolean> deleteSnapshotStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createOtherSnapshotResponseStepListener,
|
||||
createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
|
||||
new DeleteSnapshotRequest(repoName, snapshotName), ActionListener.wrap(
|
||||
resp -> deleteSnapshotStepListener.onResponse(true),
|
||||
e -> {
|
||||
assertThat(e, instanceOf(ConcurrentSnapshotExecutionException.class));
|
||||
deleteSnapshotStepListener.onResponse(false);
|
||||
})));
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(deleteSnapshotStepListener, deleted -> {
|
||||
if (deleted) {
|
||||
// The delete worked out, creating a third snapshot
|
||||
masterNode.client.admin().cluster()
|
||||
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
|
||||
.execute(createAnotherSnapshotResponseStepListener);
|
||||
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
|
||||
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
|
||||
} else {
|
||||
createAnotherSnapshotResponseStepListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
|
||||
final CreateSnapshotResponse thirdSnapshotResponse = createAnotherSnapshotResponseStepListener.result();
|
||||
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
|
||||
assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3));
|
||||
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
||||
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
|
||||
assertEquals(shards, snapshotInfo.successfulShards());
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simulates concurrent restarts of data and master nodes as well as relocating a primary shard, while starting and subsequently
|
||||
* deleting a snapshot.
|
||||
|
|
Loading…
Reference in New Issue