Copy checkpoint atomically when rolling generation (#35407)
Today when rolling a transog generation we copy the checkpoint from `translog.ckp` to `translog-nnnn.ckp` using a simple `Files.copy()` followed by appropriate `fsync()` calls. The copy operation is not atomic, so if we crash at the wrong moment we can leave an incomplete checkpoint file on disk. In practice the checkpoint is so small that it's either empty or fully written. However, we do not correctly handle the case where it's empty when the node restarts. In contrast, in `recoverFromFiles()` we _do_ copy the checkpoint atomically. This commit extracts the atomic copy operation from `recoverFromFiles()` and re-uses it in `rollGeneration()`.
This commit is contained in:
parent
be69a774df
commit
d01436de3c
|
@ -213,9 +213,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
private ArrayList<TranslogReader> recoverFromFiles(Checkpoint checkpoint) throws IOException {
|
||||
boolean success = false;
|
||||
ArrayList<TranslogReader> foundTranslogs = new ArrayList<>();
|
||||
// a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
|
||||
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX);
|
||||
boolean tempFileRenamed = false;
|
||||
try (ReleasableLock lock = writeLock.acquire()) {
|
||||
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
|
||||
|
||||
|
@ -263,20 +260,32 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
" already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
|
||||
}
|
||||
} else {
|
||||
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
|
||||
// that way if we hit a disk-full here we are still in an consistent state.
|
||||
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
IOUtils.fsync(tempFile, false);
|
||||
Files.move(tempFile, commitCheckpoint, StandardCopyOption.ATOMIC_MOVE);
|
||||
tempFileRenamed = true;
|
||||
// we only fsync the directory the tempFile was already fsynced
|
||||
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
||||
copyCheckpointTo(commitCheckpoint);
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
IOUtils.closeWhileHandlingException(foundTranslogs);
|
||||
}
|
||||
}
|
||||
return foundTranslogs;
|
||||
}
|
||||
|
||||
private void copyCheckpointTo(Path targetPath) throws IOException {
|
||||
// a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
|
||||
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, CHECKPOINT_SUFFIX);
|
||||
boolean tempFileRenamed = false;
|
||||
|
||||
try {
|
||||
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
|
||||
// that way if we hit a disk-full here we are still in an consistent state.
|
||||
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
IOUtils.fsync(tempFile, false);
|
||||
Files.move(tempFile, targetPath, StandardCopyOption.ATOMIC_MOVE);
|
||||
tempFileRenamed = true;
|
||||
// we only fsync the directory the tempFile was already fsynced
|
||||
IOUtils.fsync(targetPath.getParent(), true);
|
||||
} finally {
|
||||
if (tempFileRenamed == false) {
|
||||
try {
|
||||
Files.delete(tempFile);
|
||||
|
@ -285,7 +294,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
}
|
||||
}
|
||||
}
|
||||
return foundTranslogs;
|
||||
}
|
||||
|
||||
TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
|
||||
|
@ -1643,13 +1651,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
|||
try {
|
||||
final TranslogReader reader = current.closeIntoReader();
|
||||
readers.add(reader);
|
||||
final Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME);
|
||||
assert Checkpoint.read(checkpoint).generation == current.getGeneration();
|
||||
final Path generationCheckpoint =
|
||||
location.resolve(getCommitCheckpointFileName(current.getGeneration()));
|
||||
Files.copy(checkpoint, generationCheckpoint);
|
||||
IOUtils.fsync(generationCheckpoint, false);
|
||||
IOUtils.fsync(generationCheckpoint.getParent(), true);
|
||||
assert Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)).generation == current.getGeneration();
|
||||
copyCheckpointTo(location.resolve(getCommitCheckpointFileName(current.getGeneration())));
|
||||
// create a new translog file; this will sync it and update the checkpoint data;
|
||||
current = createWriter(current.getGeneration() + 1);
|
||||
logger.trace("current translog set to [{}]", current.getGeneration());
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.lucene.document.NumericDocValuesField;
|
|||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.mockfile.FilterFileChannel;
|
||||
import org.apache.lucene.mockfile.FilterFileSystemProvider;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.ByteArrayDataOutput;
|
||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
|
@ -81,6 +82,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.CopyOption;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
|
@ -3197,4 +3199,36 @@ public class TranslogTests extends ESTestCase {
|
|||
snapshot.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void testCrashDuringCheckpointCopy() throws IOException {
|
||||
final Path path = createTempDir();
|
||||
final AtomicBoolean failOnCopy = new AtomicBoolean();
|
||||
final String expectedExceptionMessage = "simulated failure during copy";
|
||||
final FilterFileSystemProvider filterFileSystemProvider
|
||||
= new FilterFileSystemProvider(path.getFileSystem().provider().getScheme(), path.getFileSystem()) {
|
||||
|
||||
@Override
|
||||
public void copy(Path source, Path target, CopyOption... options) throws IOException {
|
||||
if (failOnCopy.get() && source.toString().endsWith(Translog.CHECKPOINT_SUFFIX)) {
|
||||
deleteIfExists(target);
|
||||
Files.createFile(target);
|
||||
throw new IOException(expectedExceptionMessage);
|
||||
} else {
|
||||
super.copy(source, target, options);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
try (Translog brokenTranslog = create(filterFileSystemProvider.getPath(path.toUri()))) {
|
||||
failOnCopy.set(true);
|
||||
assertThat(expectThrows(IOException.class, brokenTranslog::rollGeneration).getMessage(), equalTo(expectedExceptionMessage));
|
||||
assertFalse(brokenTranslog.isOpen());
|
||||
|
||||
try (Translog recoveredTranslog = new Translog(getTranslogConfig(path), brokenTranslog.getTranslogUUID(),
|
||||
brokenTranslog.getDeletionPolicy(), () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get)) {
|
||||
recoveredTranslog.rollGeneration();
|
||||
assertFilePresences(recoveredTranslog);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue