Translog recovery can repeatedly fail if we run out of disk

If we run out of disk while recoverying the transaction log
we repeatedly fail since we expect the latest tranlog to be uncommitted.
This change adds 2 safety levels:

 * uncommitted checkpoints are first written to a temp file and then atomically
   renamed into a committed (recovered) checkpoint
 * if the latest uncommitted checkpoints generation is already recovered it has to be
   identical, if not the recovery fails

This allows to fail in between recovering the latest uncommitted checkpoint and moving
the checkpoint generation to N+1 which can for instance happen in a situation where
we can run out of disk. If we run out of disk while recovering the uncommitted checkpoint
either the temp file writing or the atomic rename will fail such that we never have a
half written or corrupted recovered checkpoint.

Close #14695
This commit is contained in:
Simon Willnauer 2015-11-11 15:05:13 +01:00 committed by Adrien Grand
parent 3cc1d272ff
commit 1bdf29e263
3 changed files with 143 additions and 5 deletions

View File

@ -90,4 +90,24 @@ class Checkpoint {
} }
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Checkpoint that = (Checkpoint) o;
if (offset != that.offset) return false;
if (numOps != that.numOps) return false;
return generation == that.generation;
}
@Override
public int hashCode() {
int result = Long.hashCode(offset);
result = 31 * result + numOps;
result = 31 * result + Long.hashCode(generation);
return result;
}
} }

View File

@ -186,11 +186,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
/** recover all translog files found on disk */ /** recover all translog files found on disk */
private ArrayList<ImmutableTranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException { private final ArrayList<ImmutableTranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
boolean success = false; boolean success = false;
ArrayList<ImmutableTranslogReader> foundTranslogs = new ArrayList<>(); ArrayList<ImmutableTranslogReader> foundTranslogs = new ArrayList<>();
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
try (ReleasableLock lock = writeLock.acquire()) { try (ReleasableLock lock = writeLock.acquire()) {
logger.debug("open uncommitted translog checkpoint {}", checkpoint); logger.debug("open uncommitted translog checkpoint {}", checkpoint);
final String checkpointTranslogFile = getFilename(checkpoint.generation); final String checkpointTranslogFile = getFilename(checkpoint.generation);
for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) { for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) {
@ -204,14 +204,30 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
} }
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint)); foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), commitCheckpoint); if (Files.exists(commitCheckpoint)) {
IOUtils.fsync(commitCheckpoint, false); Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
if (checkpoint.equals(checkpointFromDisk) == false) {
throw new IllegalStateException("Checkpoint file " + commitCheckpoint.getFileName() + " already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
}
} else {
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
// that way if we hit a disk-full here we are still in an consistent state.
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
IOUtils.fsync(tempFile, false);
Files.move(tempFile, commitCheckpoint, StandardCopyOption.ATOMIC_MOVE);
// we only fsync the directory the tempFile was already fsynced
IOUtils.fsync(commitCheckpoint.getParent(), true); IOUtils.fsync(commitCheckpoint.getParent(), true);
}
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
IOUtils.closeWhileHandlingException(foundTranslogs); IOUtils.closeWhileHandlingException(foundTranslogs);
} }
try {
Files.delete(tempFile);
} catch (IOException ex) {
logger.warn("failed to delete temp file {}", ex, tempFile);
}
} }
return foundTranslogs; return foundTranslogs;
} }

View File

@ -1040,7 +1040,109 @@ public class TranslogTests extends ESTestCase {
} }
} }
} }
}
public void testRecoveryUncommittedFileExists() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
final int prepareOp = randomIntBetween(0, translogOperations-1);
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
assertEquals("expected this to be the first commit", 1l, translogGeneration.translogFileGeneration);
assertNotNull(translogGeneration.translogUUID);
}
}
if (sync) {
translog.sync();
}
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well.
TranslogConfig config = translog.getConfig();
config.setTranslogGeneration(translogGeneration);
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
try (Translog translog = new Translog(config)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
if (randomBoolean()) { // recover twice
try (Translog translog = new Translog(config)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
}
}
public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = 100;
final int prepareOp = 44;
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
if (op == prepareOp) {
translogGeneration = translog.getGeneration();
translog.prepareCommit();
assertEquals("expected this to be the first commit", 1l, translogGeneration.translogFileGeneration);
assertNotNull(translogGeneration.translogUUID);
}
}
translog.sync();
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
// translog here as well.
TranslogConfig config = translog.getConfig();
config.setTranslogGeneration(translogGeneration);
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0,0,0);
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog translog = new Translog(config)) {
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
}
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
try (Translog translog = new Translog(config)) {
assertNotNull(translogGeneration);
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
assertFalse(translog.syncNeeded());
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
int upTo = sync ? translogOperations : prepareOp;
for (int i = 0; i < upTo; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
} }
public void testSnapshotFromStreamInput() throws IOException { public void testSnapshotFromStreamInput() throws IOException {