Translog recovery can repeatedly fail if we run out of disk
If we run out of disk while recoverying the transaction log we repeatedly fail since we expect the latest tranlog to be uncommitted. This change adds 2 safety levels: * uncommitted checkpoints are first written to a temp file and then atomically renamed into a committed (recovered) checkpoint * if the latest uncommitted checkpoints generation is already recovered it has to be identical, if not the recovery fails This allows to fail in between recovering the latest uncommitted checkpoint and moving the checkpoint generation to N+1 which can for instance happen in a situation where we can run out of disk. If we run out of disk while recovering the uncommitted checkpoint either the temp file writing or the atomic rename will fail such that we never have a half written or corrupted recovered checkpoint. Close #14695
This commit is contained in:
parent
3cc1d272ff
commit
1bdf29e263
|
@ -90,4 +90,24 @@ class Checkpoint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o) return true;
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
|
||||||
|
Checkpoint that = (Checkpoint) o;
|
||||||
|
|
||||||
|
if (offset != that.offset) return false;
|
||||||
|
if (numOps != that.numOps) return false;
|
||||||
|
return generation == that.generation;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = Long.hashCode(offset);
|
||||||
|
result = 31 * result + numOps;
|
||||||
|
result = 31 * result + Long.hashCode(generation);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,11 +186,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
|
|
||||||
/** recover all translog files found on disk */
|
/** recover all translog files found on disk */
|
||||||
private ArrayList<ImmutableTranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
|
private final ArrayList<ImmutableTranslogReader> recoverFromFiles(TranslogGeneration translogGeneration, Checkpoint checkpoint) throws IOException {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
ArrayList<ImmutableTranslogReader> foundTranslogs = new ArrayList<>();
|
ArrayList<ImmutableTranslogReader> foundTranslogs = new ArrayList<>();
|
||||||
|
final Path tempFile = Files.createTempFile(location, TRANSLOG_FILE_PREFIX, TRANSLOG_FILE_SUFFIX); // a temp file to copy checkpoint to - note it must be in on the same FS otherwise atomic move won't work
|
||||||
try (ReleasableLock lock = writeLock.acquire()) {
|
try (ReleasableLock lock = writeLock.acquire()) {
|
||||||
|
|
||||||
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
|
logger.debug("open uncommitted translog checkpoint {}", checkpoint);
|
||||||
final String checkpointTranslogFile = getFilename(checkpoint.generation);
|
final String checkpointTranslogFile = getFilename(checkpoint.generation);
|
||||||
for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) {
|
for (long i = translogGeneration.translogFileGeneration; i < checkpoint.generation; i++) {
|
||||||
|
@ -204,14 +204,30 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
}
|
}
|
||||||
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
|
foundTranslogs.add(openReader(location.resolve(checkpointTranslogFile), checkpoint));
|
||||||
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
|
Path commitCheckpoint = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
|
||||||
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), commitCheckpoint);
|
if (Files.exists(commitCheckpoint)) {
|
||||||
IOUtils.fsync(commitCheckpoint, false);
|
Checkpoint checkpointFromDisk = Checkpoint.read(commitCheckpoint);
|
||||||
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
if (checkpoint.equals(checkpointFromDisk) == false) {
|
||||||
|
throw new IllegalStateException("Checkpoint file " + commitCheckpoint.getFileName() + " already exists but has corrupted content expected: " + checkpoint + " but got: " + checkpointFromDisk);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// we first copy this into the temp-file and then fsync it followed by an atomic move into the target file
|
||||||
|
// that way if we hit a disk-full here we are still in an consistent state.
|
||||||
|
Files.copy(location.resolve(CHECKPOINT_FILE_NAME), tempFile, StandardCopyOption.REPLACE_EXISTING);
|
||||||
|
IOUtils.fsync(tempFile, false);
|
||||||
|
Files.move(tempFile, commitCheckpoint, StandardCopyOption.ATOMIC_MOVE);
|
||||||
|
// we only fsync the directory the tempFile was already fsynced
|
||||||
|
IOUtils.fsync(commitCheckpoint.getParent(), true);
|
||||||
|
}
|
||||||
success = true;
|
success = true;
|
||||||
} finally {
|
} finally {
|
||||||
if (success == false) {
|
if (success == false) {
|
||||||
IOUtils.closeWhileHandlingException(foundTranslogs);
|
IOUtils.closeWhileHandlingException(foundTranslogs);
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
|
Files.delete(tempFile);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
logger.warn("failed to delete temp file {}", ex, tempFile);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return foundTranslogs;
|
return foundTranslogs;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1040,7 +1040,109 @@ public class TranslogTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testRecoveryUncommittedFileExists() throws IOException {
|
||||||
|
List<Translog.Location> locations = new ArrayList<>();
|
||||||
|
int translogOperations = randomIntBetween(10, 100);
|
||||||
|
final int prepareOp = randomIntBetween(0, translogOperations-1);
|
||||||
|
Translog.TranslogGeneration translogGeneration = null;
|
||||||
|
final boolean sync = randomBoolean();
|
||||||
|
for (int op = 0; op < translogOperations; op++) {
|
||||||
|
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
||||||
|
if (op == prepareOp) {
|
||||||
|
translogGeneration = translog.getGeneration();
|
||||||
|
translog.prepareCommit();
|
||||||
|
assertEquals("expected this to be the first commit", 1l, translogGeneration.translogFileGeneration);
|
||||||
|
assertNotNull(translogGeneration.translogUUID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (sync) {
|
||||||
|
translog.sync();
|
||||||
|
}
|
||||||
|
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
||||||
|
// translog here as well.
|
||||||
|
TranslogConfig config = translog.getConfig();
|
||||||
|
config.setTranslogGeneration(translogGeneration);
|
||||||
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||||
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
|
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
|
||||||
|
|
||||||
|
try (Translog translog = new Translog(config)) {
|
||||||
|
assertNotNull(translogGeneration);
|
||||||
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
|
assertFalse(translog.syncNeeded());
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||||
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
|
for (int i = 0; i < upTo; i++) {
|
||||||
|
Translog.Operation next = snapshot.next();
|
||||||
|
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
|
||||||
|
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (randomBoolean()) { // recover twice
|
||||||
|
try (Translog translog = new Translog(config)) {
|
||||||
|
assertNotNull(translogGeneration);
|
||||||
|
assertEquals("lastCommitted must be 3 less than current - we never finished the commit and run recovery twice", translogGeneration.translogFileGeneration + 3, translog.currentFileGeneration());
|
||||||
|
assertFalse(translog.syncNeeded());
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||||
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
|
for (int i = 0; i < upTo; i++) {
|
||||||
|
Translog.Operation next = snapshot.next();
|
||||||
|
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
|
||||||
|
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException {
|
||||||
|
List<Translog.Location> locations = new ArrayList<>();
|
||||||
|
int translogOperations = 100;
|
||||||
|
final int prepareOp = 44;
|
||||||
|
Translog.TranslogGeneration translogGeneration = null;
|
||||||
|
final boolean sync = randomBoolean();
|
||||||
|
for (int op = 0; op < translogOperations; op++) {
|
||||||
|
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
|
||||||
|
if (op == prepareOp) {
|
||||||
|
translogGeneration = translog.getGeneration();
|
||||||
|
translog.prepareCommit();
|
||||||
|
assertEquals("expected this to be the first commit", 1l, translogGeneration.translogFileGeneration);
|
||||||
|
assertNotNull(translogGeneration.translogUUID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
translog.sync();
|
||||||
|
// we intentionally don't close the tlog that is in the prepareCommit stage since we try to recovery the uncommitted
|
||||||
|
// translog here as well.
|
||||||
|
TranslogConfig config = translog.getConfig();
|
||||||
|
config.setTranslogGeneration(translogGeneration);
|
||||||
|
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
|
||||||
|
Checkpoint read = Checkpoint.read(ckp);
|
||||||
|
Checkpoint corrupted = new Checkpoint(0,0,0);
|
||||||
|
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
|
||||||
|
try (Translog translog = new Translog(config)) {
|
||||||
|
fail("corrupted");
|
||||||
|
} catch (IllegalStateException ex) {
|
||||||
|
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
|
||||||
|
}
|
||||||
|
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
|
try (Translog translog = new Translog(config)) {
|
||||||
|
assertNotNull(translogGeneration);
|
||||||
|
assertEquals("lastCommitted must be 2 less than current - we never finished the commit", translogGeneration.translogFileGeneration + 2, translog.currentFileGeneration());
|
||||||
|
assertFalse(translog.syncNeeded());
|
||||||
|
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
|
||||||
|
int upTo = sync ? translogOperations : prepareOp;
|
||||||
|
for (int i = 0; i < upTo; i++) {
|
||||||
|
Translog.Operation next = snapshot.next();
|
||||||
|
assertNotNull("operation " + i + " must be non-null synced: " + sync, next);
|
||||||
|
assertEquals("payload missmatch, synced: " + sync, i, Integer.parseInt(next.getSource().source.toUtf8()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotFromStreamInput() throws IOException {
|
public void testSnapshotFromStreamInput() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue