Ensures cleanup of temporary index-* generational blobs during snapshotting (#21469)
Ensures pending index-* blobs are deleted when snapshotting. The index-* blobs are generational files that maintain the snapshots in the repository. To write these atomically, we first write a `pending-index-*` blob, then move it to `index-*`, which also deletes `pending-index-*` in case its not a file-system level move (e.g. S3 repositories) . For example, to write the 5th generation of the index blob for the repository, we would first write the bytes to `pending-index-5` and then move `pending-index-5` to `index-5`. It is possible that we fail after writing `pending-index-5`, but before moving it to `index-5` or deleting `pending-index-5`. In this case, we will have a dangling `pending-index-5` blob laying around. Since snapshot #5 would have failed, the next snapshot assumes a generation number of 5, so it tries to write to `index-5`, which first tries to write to `pending-index-5` before moving the blob to `index-5`. Since `pending-index-5` is leftover from the previous failure, the snapshot fails as it cannot overwrite this blob. This commit solves the problem by first, adding a UUID to the `pending-index-*` blobs, and secondly, strengthen the logic around failure to write the `index-*` generational blob to ensure pending files are deleted on cleanup. Closes #21462
This commit is contained in:
parent
48bfb142b9
commit
3001b636db
|
@ -105,8 +105,11 @@ public interface BlobContainer {
|
|||
Map<String, BlobMetaData> listBlobsByPrefix(String blobNamePrefix) throws IOException;
|
||||
|
||||
/**
|
||||
* Atomically renames the source blob into the target blob. If the source blob does not exist or the
|
||||
* target blob already exists, an exception is thrown.
|
||||
* Renames the source blob into the target blob. If the source blob does not exist or the
|
||||
* target blob already exists, an exception is thrown. Atomicity of the move operation
|
||||
* can only be guaranteed on an implementation-by-implementation basis. The only current
|
||||
* implementation of {@link BlobContainer} for which atomicity can be guaranteed is the
|
||||
* {@link org.elasticsearch.common.blobstore.fs.FsBlobContainer}.
|
||||
*
|
||||
* @param sourceBlobName
|
||||
* The blob to rename.
|
||||
|
|
|
@ -867,15 +867,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
|
||||
final String tempBlobName = "pending-" + blobName;
|
||||
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
|
||||
try (InputStream stream = bytesRef.streamInput()) {
|
||||
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
|
||||
}
|
||||
try {
|
||||
snapshotsBlobContainer.move(tempBlobName, blobName);
|
||||
} catch (IOException ex) {
|
||||
// Move failed - try cleaning up
|
||||
snapshotsBlobContainer.deleteBlob(tempBlobName);
|
||||
// temporary blob creation or move failed - try cleaning up
|
||||
try {
|
||||
snapshotsBlobContainer.deleteBlob(tempBlobName);
|
||||
} catch (IOException e) {
|
||||
ex.addSuppressed(e);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2672,4 +2672,53 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason());
|
||||
}
|
||||
|
||||
public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
|
||||
logger.info("--> creating repository");
|
||||
final Path repoPath = randomRepoPath();
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings(
|
||||
Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)));
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
createIndex("test-idx");
|
||||
ensureGreen();
|
||||
final int numDocs = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));
|
||||
|
||||
logger.info("--> snapshot with potential I/O failures");
|
||||
try {
|
||||
CreateSnapshotResponse createSnapshotResponse =
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx")
|
||||
.get();
|
||||
if (createSnapshotResponse.getSnapshotInfo().totalShards() != createSnapshotResponse.getSnapshotInfo().successfulShards()) {
|
||||
assertThat(getFailureCount("test-repo"), greaterThan(0L));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().shardFailures().size(), greaterThan(0));
|
||||
for (SnapshotShardFailure shardFailure : createSnapshotResponse.getSnapshotInfo().shardFailures()) {
|
||||
assertThat(shardFailure.reason(), containsString("Random IOException"));
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
// sometimes, the snapshot will fail with a top level I/O exception
|
||||
assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
|
||||
}
|
||||
|
||||
logger.info("--> snapshot with no I/O failures");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings(
|
||||
Settings.builder().put("location", repoPath)));
|
||||
CreateSnapshotResponse createSnapshotResponse =
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx")
|
||||
.get();
|
||||
assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards());
|
||||
GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2")
|
||||
.addSnapshots("test-snap-2").get();
|
||||
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -321,14 +321,20 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
@Override
|
||||
public void move(String sourceBlob, String targetBlob) throws IOException {
|
||||
// simulate a non-atomic move, since many blob container implementations
|
||||
// will not have an atomic move, and we should be able to handle that
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.move(sourceBlob, targetBlob);
|
||||
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
|
||||
super.deleteBlob(sourceBlob);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.writeBlob(blobName, inputStream, blobSize);
|
||||
// for network based repositories, the blob may have been written but we may still
|
||||
// get an error with the client connection, so an IOException here simulates this
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue