From 12b93e72f0b25f0b96e4276541d0827f03d3a883 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 6 Jan 2016 13:10:21 +0100 Subject: [PATCH 1/2] Never delete translog-N.tlog file when creation fails We today delete the translog-N.tlog file if any subsequent operation fails but we might actually be in a good state if for instance the creation of the writer failes after we sucessfully baked the new translog generation into the checkpoint. In this situation we used to delete the translog-N.tlog file and failed on the next recovery of the translog with a NoSuchFileException | FileNotFoundException just like in https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 This commit changes the behavior and cleans up that limbo state on recovery if we already have a generation+1 file written but not baked into the checkpoint we remove that file but only if the previous ckp file has already been renamed otherwise we know we can't be in this state. --- .../index/translog/Translog.java | 15 ++ .../index/translog/TranslogWriter.java | 19 ++- .../index/translog/TranslogTests.java | 152 ++++++++++++++---- 3 files changed, 152 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 3f8f0ab54df..00fc4556d05 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -163,6 +163,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC try { if (translogGeneration != null) { final Checkpoint checkpoint = readCheckpoint(); + final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); + final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); + // this is special handling for error condition when we create a new writer but we fail to bake + // the newly written file (generation+1) into the checkpoint. This is still a valid state + // we just need to cleanup before we continue + // we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this: + // https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example + // + // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists + // if not we don't even try to clean it up and wait until we fail creating it + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID); + if (Files.exists(currentCheckpointFile) // current checkpoint is already copied + && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning + logger.warn("Deleted invalid next generation before opening writer {} this is like caused by some previously tragic exception ", nextTranslogFile.getFileName()); + } this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint); if (recoveredTranslogs.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 026aac4515e..867b94a8505 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -69,9 +69,17 @@ public class TranslogWriter extends TranslogReader { totalOffset = lastSyncedOffset; } + static int getHeaderLength(String translogUUID) { + return getHeaderLength(new BytesRef(translogUUID).length); + } + + private static int getHeaderLength(int uuidLength) { + return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT; + } + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT; + final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because @@ -80,17 +88,14 @@ public class TranslogWriter extends TranslogReader { CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); out.writeInt(ref.length); out.writeBytes(ref.bytes, ref.offset, ref.length); - channel.force(false); + channel.force(true); writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE); final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); return writer; } catch (Throwable throwable){ + // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that + // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition IOUtils.closeWhileHandlingException(channel); - try { - Files.delete(file); // remove the file as well - } catch (IOException ex) { - throwable.addSuppressed(ex); - } throw throwable; } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1da2b7bf3c8..b31aafa432d 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; @@ -136,8 +137,8 @@ public class TranslogTests extends ESTestCase { private TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); } @@ -335,9 +336,9 @@ public class TranslogTests extends ESTestCase { assertEquals(6, copy.estimatedNumberOfOperations()); assertEquals(431, copy.getTranslogSizeInBytes()); assertEquals("\"translog\"{\n" + - " \"operations\" : 6,\n" + - " \"size_in_bytes\" : 431\n" + - "}", copy.toString().trim()); + " \"operations\" : 6,\n" + + " \"size_in_bytes\" : 431\n" + + "}", copy.toString().trim()); try { new TranslogStats(1, -1); @@ -634,7 +635,9 @@ public class TranslogTests extends ESTestCase { assertFileIsPresent(translog, 1); } - /** Tests that concurrent readers and writes maintain view and snapshot semantics */ + /** + * Tests that concurrent readers and writes maintain view and snapshot semantics + */ public void testConcurrentWriteViewsAndSnapshot() throws Throwable { final Thread[] writers = new Thread[randomIntBetween(1, 10)]; final Thread[] readers = new Thread[randomIntBetween(1, 10)]; @@ -833,7 +836,7 @@ public class TranslogTests extends ESTestCase { int count = 0; for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); - if (rarely() && translogOperations > op+1) { + if (rarely() && translogOperations > op + 1) { translog.commit(); } } @@ -912,7 +915,7 @@ public class TranslogTests extends ESTestCase { final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); for (int i = 0; i < numOps; i++) { ByteBuffer buffer = ByteBuffer.allocate(4); - reader.readBytes(buffer, reader.getFirstOperationOffset() + 4*i); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); buffer.flip(); final int value = buffer.getInt(); assertEquals(i, value); @@ -951,9 +954,9 @@ public class TranslogTests extends ESTestCase { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); - if (commit && op < translogOperations-1) { + if (commit && op < translogOperations - 1) { translog.commit(); - minUncommittedOp = op+1; + minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } } @@ -987,7 +990,7 @@ public class TranslogTests extends ESTestCase { public void testRecoveryUncommitted() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - final int prepareOp = randomIntBetween(0, translogOperations-1); + final int prepareOp = randomIntBetween(0, translogOperations - 1); Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { @@ -1040,7 +1043,7 @@ public class TranslogTests extends ESTestCase { public void testRecoveryUncommittedFileExists() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - final int prepareOp = randomIntBetween(0, translogOperations-1); + final int prepareOp = randomIntBetween(0, translogOperations - 1); Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { @@ -1094,7 +1097,7 @@ public class TranslogTests extends ESTestCase { } } - public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException { + public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { List locations = new ArrayList<>(); int translogOperations = 100; final int prepareOp = 44; @@ -1116,10 +1119,10 @@ public class TranslogTests extends ESTestCase { config.setTranslogGeneration(translogGeneration); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = new Checkpoint(0,0,0); + Checkpoint corrupted = new Checkpoint(0, 0, 0); Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); try (Translog translog = new Translog(config)) { - fail("corrupted"); + fail("corrupted"); } catch (IllegalStateException ex) { assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); } @@ -1157,7 +1160,7 @@ public class TranslogTests extends ESTestCase { List locations = new ArrayList<>(); List locations2 = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - try(Translog translog2 = create(createTempDir())) { + try (Translog translog2 = create(createTempDir())) { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); @@ -1196,7 +1199,7 @@ public class TranslogTests extends ESTestCase { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); - config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()),translogGeneration.translogFileGeneration)); + config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration)); try { new Translog(config); fail("translog doesn't belong to this UUID"); @@ -1283,12 +1286,12 @@ public class TranslogTests extends ESTestCase { case CREATE: case INDEX: op = new Translog.Index("test", threadId + "_" + opCount, - randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); break; case DELETE: op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount), - 1 + randomInt(100000), - randomFrom(VersionType.values())); + 1 + randomInt(100000), + randomFrom(VersionType.values())); break; default: throw new ElasticsearchException("not supported op type"); @@ -1307,7 +1310,8 @@ public class TranslogTests extends ESTestCase { return translog.add(op); } - protected void afterAdd() throws IOException {} + protected void afterAdd() throws IOException { + } } public void testFailFlush() throws IOException { @@ -1319,7 +1323,7 @@ public class TranslogTests extends ESTestCase { List locations = new ArrayList<>(); int opsSynced = 0; boolean failed = false; - while(failed == false) { + while (failed == false) { try { locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); translog.sync(); @@ -1331,7 +1335,7 @@ public class TranslogTests extends ESTestCase { failed = true; assertFalse(translog.isOpen()); assertEquals("__FAKE__ no space left on device", ex.getMessage()); - } + } fail.set(randomBoolean()); } fail.set(false); @@ -1370,7 +1374,7 @@ public class TranslogTests extends ESTestCase { assertFalse(translog.isOpen()); translog.close(); // we are closed config.setTranslogGeneration(translogGeneration); - try (Translog tlog = new Translog(config)){ + try (Translog tlog = new Translog(config)) { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); @@ -1393,7 +1397,7 @@ public class TranslogTests extends ESTestCase { for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); try (Translog.Snapshot snapshot = translog.newSnapshot()) { - assertEquals(opsAdded+1, snapshot.estimatedTotalOperations()); + assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations()); for (int i = 0; i < opsAdded; i++) { assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); Translog.Operation next = snapshot.next(); @@ -1407,7 +1411,7 @@ public class TranslogTests extends ESTestCase { Path tempDir = createTempDir(); final AtomicBoolean fail = new AtomicBoolean(); TranslogConfig config = getTranslogConfig(tempDir); - assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation()); + assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation()); Translog translog = getFailableTranslog(fail, config, false, true); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); @@ -1427,7 +1431,7 @@ public class TranslogTests extends ESTestCase { assertTrue(ex.getCause() instanceof UnknownException); } assertFalse(translog.isOpen()); - assertTrue(translog.getTragicException() instanceof UnknownException); + assertTrue(translog.getTragicException() instanceof UnknownException); } public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException { @@ -1520,6 +1524,7 @@ public class TranslogTests extends ESTestCase { } } } + private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException { return getFailableTranslog(fail, config, randomBoolean(), false); } @@ -1613,4 +1618,97 @@ public class TranslogTests extends ESTestCase { // all is well } } + + public void testRecoverWithUnbackedNextGen() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + } + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 2; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + } + } + + public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + // don't copy the new file + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + + try { + Translog tlog = new Translog(config); + fail("file already exists?"); + } catch (TranslogException ex) { + // all is well + assertEquals(ex.getMessage(), "failed to create new translog file"); + assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); + } + } + public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + } + + try { + Translog tlog = new Translog(config); + fail("file already exists?"); + } catch (TranslogException ex) { + // all is well + assertEquals(ex.getMessage(), "failed to create new translog file"); + assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); + } + } + } From 5c833750d73ee7bed6c44496479ff2094d2058e9 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 6 Jan 2016 14:19:58 +0100 Subject: [PATCH 2/2] apply feedback from @bleskes --- .../main/java/org/elasticsearch/index/translog/Translog.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 00fc4556d05..0d2fba6ea57 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -173,10 +173,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC // // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists // if not we don't even try to clean it up and wait until we fail creating it - assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID); + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID) : "unexpected translog file: [" + nextTranslogFile + "]"; if (Files.exists(currentCheckpointFile) // current checkpoint is already copied && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning - logger.warn("Deleted invalid next generation before opening writer {} this is like caused by some previously tragic exception ", nextTranslogFile.getFileName()); + logger.warn("deleted previously created, but not yet committed, next generation [{}]. This can happen due to a tragic exception when creating a new generation", nextTranslogFile.getFileName()); } this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint); if (recoveredTranslogs.isEmpty()) {