Never delete translog-N.tlog file when creation fails

We today delete the translog-N.tlog file if any subsequent operation fails
but we might actually be in a good state if for instance the creation of the writer
failes after we sucessfully baked the new translog generation into the checkpoint. In this situation
we used to delete the translog-N.tlog file and failed on the next recovery of the translog with a
NoSuchFileException | FileNotFoundException just like in https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336

This commit changes the behavior and cleans up that limbo state on recovery if we already have a generation+1 file written but not baked into
the checkpoint we remove that file but only if the previous ckp file has already been renamed otherwise we know we can't be in this state.
This commit is contained in:
Simon Willnauer 2016-01-06 13:10:21 +01:00
parent 8081c782ef
commit 12b93e72f0
3 changed files with 152 additions and 34 deletions

View File

@ -163,6 +163,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
try {
if (translogGeneration != null) {
final Checkpoint checkpoint = readCheckpoint();
final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1));
final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation));
// this is special handling for error condition when we create a new writer but we fail to bake
// the newly written file (generation+1) into the checkpoint. This is still a valid state
// we just need to cleanup before we continue
// we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this:
// https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example
//
// For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists
// if not we don't even try to clean it up and wait until we fail creating it
assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID);
if (Files.exists(currentCheckpointFile) // current checkpoint is already copied
&& Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning
logger.warn("Deleted invalid next generation before opening writer {} this is like caused by some previously tragic exception ", nextTranslogFile.getFileName());
}
this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint);
if (recoveredTranslogs.isEmpty()) {
throw new IllegalStateException("at least one reader must be recovered");

View File

@ -69,9 +69,17 @@ public class TranslogWriter extends TranslogReader {
totalOffset = lastSyncedOffset;
}
static int getHeaderLength(String translogUUID) {
return getHeaderLength(new BytesRef(translogUUID).length);
}
private static int getHeaderLength(int uuidLength) {
return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT;
}
public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
final int headerLength = getHeaderLength(ref.length);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
@ -80,17 +88,14 @@ public class TranslogWriter extends TranslogReader {
CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION);
out.writeInt(ref.length);
out.writeBytes(ref.bytes, ref.offset, ref.length);
channel.force(false);
channel.force(true);
writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE);
final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize);
return writer;
} catch (Throwable throwable){
// if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that
// file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition
IOUtils.closeWhileHandlingException(channel);
try {
Files.delete(file); // remove the file as well
} catch (IOException ex) {
throwable.addSuppressed(ex);
}
throw throwable;
}
}

View File

@ -55,6 +55,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.Charset;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
@ -136,8 +137,8 @@ public class TranslogTests extends ESTestCase {
private TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize);
}
@ -335,9 +336,9 @@ public class TranslogTests extends ESTestCase {
assertEquals(6, copy.estimatedNumberOfOperations());
assertEquals(431, copy.getTranslogSizeInBytes());
assertEquals("\"translog\"{\n" +
" \"operations\" : 6,\n" +
" \"size_in_bytes\" : 431\n" +
"}", copy.toString().trim());
" \"operations\" : 6,\n" +
" \"size_in_bytes\" : 431\n" +
"}", copy.toString().trim());
try {
new TranslogStats(1, -1);
@ -634,7 +635,9 @@ public class TranslogTests extends ESTestCase {
assertFileIsPresent(translog, 1);
}
/** Tests that concurrent readers and writes maintain view and snapshot semantics */
/**
* Tests that concurrent readers and writes maintain view and snapshot semantics
*/
public void testConcurrentWriteViewsAndSnapshot() throws Throwable {
final Thread[] writers = new Thread[randomIntBetween(1, 10)];
final Thread[] readers = new Thread[randomIntBetween(1, 10)];
@ -833,7 +836,7 @@ public class TranslogTests extends ESTestCase {
int count = 0;
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8")))));
if (rarely() && translogOperations > op+1) {
if (rarely() && translogOperations > op + 1) {
translog.commit();
}
}
@ -912,7 +915,7 @@ public class TranslogTests extends ESTestCase {
final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME)));
for (int i = 0; i < numOps; i++) {
ByteBuffer buffer = ByteBuffer.allocate(4);
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4*i);
reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i);
buffer.flip();
final int value = buffer.getInt();
assertEquals(i, value);
@ -951,9 +954,9 @@ public class TranslogTests extends ESTestCase {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
final boolean commit = commitOften ? frequently() : rarely();
if (commit && op < translogOperations-1) {
if (commit && op < translogOperations - 1) {
translog.commit();
minUncommittedOp = op+1;
minUncommittedOp = op + 1;
translogGeneration = translog.getGeneration();
}
}
@ -987,7 +990,7 @@ public class TranslogTests extends ESTestCase {
public void testRecoveryUncommitted() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
final int prepareOp = randomIntBetween(0, translogOperations-1);
final int prepareOp = randomIntBetween(0, translogOperations - 1);
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
@ -1040,7 +1043,7 @@ public class TranslogTests extends ESTestCase {
public void testRecoveryUncommittedFileExists() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
final int prepareOp = randomIntBetween(0, translogOperations-1);
final int prepareOp = randomIntBetween(0, translogOperations - 1);
Translog.TranslogGeneration translogGeneration = null;
final boolean sync = randomBoolean();
for (int op = 0; op < translogOperations; op++) {
@ -1094,7 +1097,7 @@ public class TranslogTests extends ESTestCase {
}
}
public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException {
public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException {
List<Translog.Location> locations = new ArrayList<>();
int translogOperations = 100;
final int prepareOp = 44;
@ -1116,10 +1119,10 @@ public class TranslogTests extends ESTestCase {
config.setTranslogGeneration(translogGeneration);
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Checkpoint corrupted = new Checkpoint(0,0,0);
Checkpoint corrupted = new Checkpoint(0, 0, 0);
Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
try (Translog translog = new Translog(config)) {
fail("corrupted");
fail("corrupted");
} catch (IllegalStateException ex) {
assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
}
@ -1157,7 +1160,7 @@ public class TranslogTests extends ESTestCase {
List<Translog.Location> locations = new ArrayList<>();
List<Translog.Location> locations2 = new ArrayList<>();
int translogOperations = randomIntBetween(10, 100);
try(Translog translog2 = create(createTempDir())) {
try (Translog translog2 = create(createTempDir())) {
for (int op = 0; op < translogOperations; op++) {
locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8")))));
@ -1196,7 +1199,7 @@ public class TranslogTests extends ESTestCase {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()),translogGeneration.translogFileGeneration));
config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration));
try {
new Translog(config);
fail("translog doesn't belong to this UUID");
@ -1283,12 +1286,12 @@ public class TranslogTests extends ESTestCase {
case CREATE:
case INDEX:
op = new Translog.Index("test", threadId + "_" + opCount,
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8"));
break;
case DELETE:
op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount),
1 + randomInt(100000),
randomFrom(VersionType.values()));
1 + randomInt(100000),
randomFrom(VersionType.values()));
break;
default:
throw new ElasticsearchException("not supported op type");
@ -1307,7 +1310,8 @@ public class TranslogTests extends ESTestCase {
return translog.add(op);
}
protected void afterAdd() throws IOException {}
protected void afterAdd() throws IOException {
}
}
public void testFailFlush() throws IOException {
@ -1319,7 +1323,7 @@ public class TranslogTests extends ESTestCase {
List<Translog.Location> locations = new ArrayList<>();
int opsSynced = 0;
boolean failed = false;
while(failed == false) {
while (failed == false) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
translog.sync();
@ -1331,7 +1335,7 @@ public class TranslogTests extends ESTestCase {
failed = true;
assertFalse(translog.isOpen());
assertEquals("__FAKE__ no space left on device", ex.getMessage());
}
}
fail.set(randomBoolean());
}
fail.set(false);
@ -1370,7 +1374,7 @@ public class TranslogTests extends ESTestCase {
assertFalse(translog.isOpen());
translog.close(); // we are closed
config.setTranslogGeneration(translogGeneration);
try (Translog tlog = new Translog(config)){
try (Translog tlog = new Translog(config)) {
assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration());
assertFalse(tlog.syncNeeded());
@ -1393,7 +1397,7 @@ public class TranslogTests extends ESTestCase {
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))));
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertEquals(opsAdded+1, snapshot.estimatedTotalOperations());
assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations());
for (int i = 0; i < opsAdded; i++) {
assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation);
Translog.Operation next = snapshot.next();
@ -1407,7 +1411,7 @@ public class TranslogTests extends ESTestCase {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean();
TranslogConfig config = getTranslogConfig(tempDir);
assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation());
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
Translog translog = getFailableTranslog(fail, config, false, true);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
@ -1427,7 +1431,7 @@ public class TranslogTests extends ESTestCase {
assertTrue(ex.getCause() instanceof UnknownException);
}
assertFalse(translog.isOpen());
assertTrue(translog.getTragicException() instanceof UnknownException);
assertTrue(translog.getTragicException() instanceof UnknownException);
}
public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
@ -1520,6 +1524,7 @@ public class TranslogTests extends ESTestCase {
}
}
}
private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
return getFailableTranslog(fail, config, randomBoolean(), false);
}
@ -1613,4 +1618,97 @@ public class TranslogTests extends ESTestCase {
// all is well
}
}
public void testRecoverWithUnbackedNextGen() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
config.setTranslogGeneration(translogGeneration);
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 2; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
}
}
public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
// don't copy the new file
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
config.setTranslogGeneration(translogGeneration);
try {
Translog tlog = new Translog(config);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
assertEquals(ex.getMessage(), "failed to create new translog file");
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
}
}
public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8"))));
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
translog.close();
TranslogConfig config = translog.getConfig();
Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME);
Checkpoint read = Checkpoint.read(ckp);
Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)));
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog"));
// we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition
Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog"));
config.setTranslogGeneration(translogGeneration);
try (Translog tlog = new Translog(config)) {
assertNotNull(translogGeneration);
assertFalse(tlog.syncNeeded());
try (Translog.Snapshot snapshot = tlog.newSnapshot()) {
for (int i = 0; i < 1; i++) {
Translog.Operation next = snapshot.next();
assertNotNull("operation " + i + " must be non-null", next);
assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8()));
}
}
tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8"))));
}
try {
Translog tlog = new Translog(config);
fail("file already exists?");
} catch (TranslogException ex) {
// all is well
assertEquals(ex.getMessage(), "failed to create new translog file");
assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class);
}
}
}