* Fix Infinite Retry Loop in loading RepositoryData We were running into an infinite loop when trying to load corrupted (or otherwise un-loadable) repository data for a repo that uses best effort consistency (e.g. that was just freshly mounted as done in the test) because we kepy resetting to `-1` on `IOException`, listing and finding the broken generation `N` and then interpreted the subsequent reset to `-1` as a concurrent change to the repository.
This commit is contained in:
parent
f6c89b4599
commit
e51b209dd3
|
@ -1065,6 +1065,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
|
||||||
|
// Keep track of the most recent generation we failed to load so we can break out of the loop if we fail to load the same
|
||||||
|
// generation repeatedly.
|
||||||
|
long lastFailedGeneration = RepositoryData.UNKNOWN_REPO_GEN;
|
||||||
while (true) {
|
while (true) {
|
||||||
final long genToLoad;
|
final long genToLoad;
|
||||||
if (bestEffortConsistency) {
|
if (bestEffortConsistency) {
|
||||||
|
@ -1074,7 +1077,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
try {
|
try {
|
||||||
generation = latestIndexBlobId();
|
generation = latestIndexBlobId();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
|
listener.onFailure(
|
||||||
|
new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
|
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
|
||||||
if (genToLoad > generation) {
|
if (genToLoad > generation) {
|
||||||
|
@ -1089,7 +1094,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
listener.onResponse(getRepositoryData(genToLoad));
|
listener.onResponse(getRepositoryData(genToLoad));
|
||||||
return;
|
return;
|
||||||
} catch (RepositoryException e) {
|
} catch (RepositoryException e) {
|
||||||
if (genToLoad != latestKnownRepoGen.get()) {
|
// If the generation to load changed concurrently and we didn't just try loading the same generation before we retry
|
||||||
|
if (genToLoad != latestKnownRepoGen.get() && genToLoad != lastFailedGeneration) {
|
||||||
|
lastFailedGeneration = genToLoad;
|
||||||
logger.warn("Failed to load repository data generation [" + genToLoad +
|
logger.warn("Failed to load repository data generation [" + genToLoad +
|
||||||
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
|
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
|
||||||
continue;
|
continue;
|
||||||
|
@ -1099,10 +1106,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
// of N so we mark this repository as corrupted.
|
// of N so we mark this repository as corrupted.
|
||||||
markRepoCorrupted(genToLoad, e,
|
markRepoCorrupted(genToLoad, e,
|
||||||
ActionListener.wrap(v -> listener.onFailure(corruptedStateException(e)), listener::onFailure));
|
ActionListener.wrap(v -> listener.onFailure(corruptedStateException(e)), listener::onFailure));
|
||||||
return;
|
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
listener.onFailure(e);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -303,6 +303,46 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testMountCorruptedRepositoryData() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test intentionally corrupts the repository contents");
|
||||||
|
Client client = client();
|
||||||
|
|
||||||
|
Path repo = randomRepoPath();
|
||||||
|
final String repoName = "test-repo";
|
||||||
|
logger.info("--> creating repository at {}", repo.toAbsolutePath());
|
||||||
|
assertAcked(client.admin().cluster().preparePutRepository(repoName)
|
||||||
|
.setType("fs").setSettings(Settings.builder()
|
||||||
|
.put("location", repo)
|
||||||
|
.put("compress", false)));
|
||||||
|
|
||||||
|
final String snapshot = "test-snap";
|
||||||
|
|
||||||
|
logger.info("--> creating snapshot");
|
||||||
|
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
|
||||||
|
.setWaitForCompletion(true).setIndices("test-idx-*").get();
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||||
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
|
|
||||||
|
logger.info("--> corrupt index-N blob");
|
||||||
|
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||||
|
final RepositoryData repositoryData = getRepositoryData(repository);
|
||||||
|
Files.write(repo.resolve("index-" + repositoryData.getGenId()), randomByteArrayOfLength(randomIntBetween(1, 100)));
|
||||||
|
|
||||||
|
logger.info("--> verify loading repository data throws RepositoryException");
|
||||||
|
expectThrows(RepositoryException.class, () -> getRepositoryData(repository));
|
||||||
|
|
||||||
|
logger.info("--> mount repository path in a new repository");
|
||||||
|
final String otherRepoName = "other-repo";
|
||||||
|
assertAcked(client.admin().cluster().preparePutRepository(otherRepoName)
|
||||||
|
.setType("fs").setSettings(Settings.builder()
|
||||||
|
.put("location", repo)
|
||||||
|
.put("compress", false)));
|
||||||
|
final Repository otherRepo = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(otherRepoName);
|
||||||
|
|
||||||
|
logger.info("--> verify loading repository data from newly mounted repository throws RepositoryException");
|
||||||
|
expectThrows(RepositoryException.class, () -> getRepositoryData(otherRepo));
|
||||||
|
}
|
||||||
|
|
||||||
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
|
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
|
||||||
logger.info("--> try to delete snapshot");
|
logger.info("--> try to delete snapshot");
|
||||||
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
|
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
|
||||||
|
|
Loading…
Reference in New Issue