diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 3f8f0ab54df..00fc4556d05 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -163,6 +163,21 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC try { if (translogGeneration != null) { final Checkpoint checkpoint = readCheckpoint(); + final Path nextTranslogFile = location.resolve(getFilename(checkpoint.generation + 1)); + final Path currentCheckpointFile = location.resolve(getCommitCheckpointFileName(checkpoint.generation)); + // this is special handling for error condition when we create a new writer but we fail to bake + // the newly written file (generation+1) into the checkpoint. This is still a valid state + // we just need to cleanup before we continue + // we hit this before and then blindly deleted the new generation even though we managed to bake it in and then hit this: + // https://discuss.elastic.co/t/cannot-recover-index-because-of-missing-tanslog-files/38336 as an example + // + // For this to happen we must have already copied the translog.ckp file into translog-gen.ckp so we first check if that file exists + // if not we don't even try to clean it up and wait until we fail creating it + assert Files.exists(nextTranslogFile) == false || Files.size(nextTranslogFile) <= TranslogWriter.getHeaderLength(translogUUID); + if (Files.exists(currentCheckpointFile) // current checkpoint is already copied + && Files.deleteIfExists(nextTranslogFile)) { // delete it and log a warning + logger.warn("Deleted invalid next generation before opening writer {} this is like caused by some previously tragic exception ", nextTranslogFile.getFileName()); + } this.recoveredTranslogs = recoverFromFiles(translogGeneration, checkpoint); if (recoveredTranslogs.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 026aac4515e..867b94a8505 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -69,9 +69,17 @@ public class TranslogWriter extends TranslogReader { totalOffset = lastSyncedOffset; } + static int getHeaderLength(String translogUUID) { + return getHeaderLength(new BytesRef(translogUUID).length); + } + + private static int getHeaderLength(int uuidLength) { + return CodecUtil.headerLength(TRANSLOG_CODEC) + uuidLength + RamUsageEstimator.NUM_BYTES_INT; + } + public static TranslogWriter create(ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback onClose, ChannelFactory channelFactory, ByteSizeValue bufferSize) throws IOException { final BytesRef ref = new BytesRef(translogUUID); - final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT; + final int headerLength = getHeaderLength(ref.length); final FileChannel channel = channelFactory.open(file); try { // This OutputStreamDataOutput is intentionally not closed because @@ -80,17 +88,14 @@ public class TranslogWriter extends TranslogReader { CodecUtil.writeHeader(out, TRANSLOG_CODEC, VERSION); out.writeInt(ref.length); out.writeBytes(ref.bytes, ref.offset, ref.length); - channel.force(false); + channel.force(true); writeCheckpoint(headerLength, 0, file.getParent(), fileGeneration, StandardOpenOption.WRITE); final TranslogWriter writer = new TranslogWriter(shardId, fileGeneration, new ChannelReference(file, fileGeneration, channel, onClose), bufferSize); return writer; } catch (Throwable throwable){ + // if we fail to bake the file-generation into the checkpoint we stick with the file and once we recover and that + // file exists we remove it. We only apply this logic to the checkpoint.generation+1 any other file with a higher generation is an error condition IOUtils.closeWhileHandlingException(channel); - try { - Files.delete(file); // remove the file as well - } catch (IOException ex) { - throwable.addSuppressed(ex); - } throw throwable; } } diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index 1da2b7bf3c8..b31aafa432d 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -55,6 +55,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; +import java.nio.file.FileAlreadyExistsException; import java.nio.file.Files; import java.nio.file.InvalidPathException; import java.nio.file.Path; @@ -136,8 +137,8 @@ public class TranslogTests extends ESTestCase { private TranslogConfig getTranslogConfig(Path path) { Settings build = Settings.settingsBuilder() - .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .build(); + .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) + .build(); ByteSizeValue bufferSize = randomBoolean() ? TranslogConfig.DEFAULT_BUFFER_SIZE : new ByteSizeValue(10 + randomInt(128 * 1024), ByteSizeUnit.BYTES); return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), BigArrays.NON_RECYCLING_INSTANCE, bufferSize); } @@ -335,9 +336,9 @@ public class TranslogTests extends ESTestCase { assertEquals(6, copy.estimatedNumberOfOperations()); assertEquals(431, copy.getTranslogSizeInBytes()); assertEquals("\"translog\"{\n" + - " \"operations\" : 6,\n" + - " \"size_in_bytes\" : 431\n" + - "}", copy.toString().trim()); + " \"operations\" : 6,\n" + + " \"size_in_bytes\" : 431\n" + + "}", copy.toString().trim()); try { new TranslogStats(1, -1); @@ -634,7 +635,9 @@ public class TranslogTests extends ESTestCase { assertFileIsPresent(translog, 1); } - /** Tests that concurrent readers and writes maintain view and snapshot semantics */ + /** + * Tests that concurrent readers and writes maintain view and snapshot semantics + */ public void testConcurrentWriteViewsAndSnapshot() throws Throwable { final Thread[] writers = new Thread[randomIntBetween(1, 10)]; final Thread[] readers = new Thread[randomIntBetween(1, 10)]; @@ -833,7 +836,7 @@ public class TranslogTests extends ESTestCase { int count = 0; for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(++count).getBytes(Charset.forName("UTF-8"))))); - if (rarely() && translogOperations > op+1) { + if (rarely() && translogOperations > op + 1) { translog.commit(); } } @@ -912,7 +915,7 @@ public class TranslogTests extends ESTestCase { final TranslogReader reader = randomBoolean() ? writer : translog.openReader(writer.path(), Checkpoint.read(translog.location().resolve(Translog.CHECKPOINT_FILE_NAME))); for (int i = 0; i < numOps; i++) { ByteBuffer buffer = ByteBuffer.allocate(4); - reader.readBytes(buffer, reader.getFirstOperationOffset() + 4*i); + reader.readBytes(buffer, reader.getFirstOperationOffset() + 4 * i); buffer.flip(); final int value = buffer.getInt(); assertEquals(i, value); @@ -951,9 +954,9 @@ public class TranslogTests extends ESTestCase { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); final boolean commit = commitOften ? frequently() : rarely(); - if (commit && op < translogOperations-1) { + if (commit && op < translogOperations - 1) { translog.commit(); - minUncommittedOp = op+1; + minUncommittedOp = op + 1; translogGeneration = translog.getGeneration(); } } @@ -987,7 +990,7 @@ public class TranslogTests extends ESTestCase { public void testRecoveryUncommitted() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - final int prepareOp = randomIntBetween(0, translogOperations-1); + final int prepareOp = randomIntBetween(0, translogOperations - 1); Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { @@ -1040,7 +1043,7 @@ public class TranslogTests extends ESTestCase { public void testRecoveryUncommittedFileExists() throws IOException { List locations = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - final int prepareOp = randomIntBetween(0, translogOperations-1); + final int prepareOp = randomIntBetween(0, translogOperations - 1); Translog.TranslogGeneration translogGeneration = null; final boolean sync = randomBoolean(); for (int op = 0; op < translogOperations; op++) { @@ -1094,7 +1097,7 @@ public class TranslogTests extends ESTestCase { } } - public void testRecoveryUncommittedCorryptedCheckpoint() throws IOException { + public void testRecoveryUncommittedCorruptedCheckpoint() throws IOException { List locations = new ArrayList<>(); int translogOperations = 100; final int prepareOp = 44; @@ -1116,10 +1119,10 @@ public class TranslogTests extends ESTestCase { config.setTranslogGeneration(translogGeneration); Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); Checkpoint read = Checkpoint.read(ckp); - Checkpoint corrupted = new Checkpoint(0,0,0); + Checkpoint corrupted = new Checkpoint(0, 0, 0); Checkpoint.write(config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), corrupted, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); try (Translog translog = new Translog(config)) { - fail("corrupted"); + fail("corrupted"); } catch (IllegalStateException ex) { assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}"); } @@ -1157,7 +1160,7 @@ public class TranslogTests extends ESTestCase { List locations = new ArrayList<>(); List locations2 = new ArrayList<>(); int translogOperations = randomIntBetween(10, 100); - try(Translog translog2 = create(createTempDir())) { + try (Translog translog2 = create(createTempDir())) { for (int op = 0; op < translogOperations; op++) { locations.add(translog.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); locations2.add(translog2.add(new Translog.Index("test", "" + op, Integer.toString(op).getBytes(Charset.forName("UTF-8"))))); @@ -1196,7 +1199,7 @@ public class TranslogTests extends ESTestCase { Translog.TranslogGeneration translogGeneration = translog.getGeneration(); translog.close(); - config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()),translogGeneration.translogFileGeneration)); + config.setTranslogGeneration(new Translog.TranslogGeneration(randomRealisticUnicodeOfCodepointLengthBetween(1, translogGeneration.translogUUID.length()), translogGeneration.translogFileGeneration)); try { new Translog(config); fail("translog doesn't belong to this UUID"); @@ -1283,12 +1286,12 @@ public class TranslogTests extends ESTestCase { case CREATE: case INDEX: op = new Translog.Index("test", threadId + "_" + opCount, - randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); + randomUnicodeOfLengthBetween(1, 20 * 1024).getBytes("UTF-8")); break; case DELETE: op = new Translog.Delete(new Term("_uid", threadId + "_" + opCount), - 1 + randomInt(100000), - randomFrom(VersionType.values())); + 1 + randomInt(100000), + randomFrom(VersionType.values())); break; default: throw new ElasticsearchException("not supported op type"); @@ -1307,7 +1310,8 @@ public class TranslogTests extends ESTestCase { return translog.add(op); } - protected void afterAdd() throws IOException {} + protected void afterAdd() throws IOException { + } } public void testFailFlush() throws IOException { @@ -1319,7 +1323,7 @@ public class TranslogTests extends ESTestCase { List locations = new ArrayList<>(); int opsSynced = 0; boolean failed = false; - while(failed == false) { + while (failed == false) { try { locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); translog.sync(); @@ -1331,7 +1335,7 @@ public class TranslogTests extends ESTestCase { failed = true; assertFalse(translog.isOpen()); assertEquals("__FAKE__ no space left on device", ex.getMessage()); - } + } fail.set(randomBoolean()); } fail.set(false); @@ -1370,7 +1374,7 @@ public class TranslogTests extends ESTestCase { assertFalse(translog.isOpen()); translog.close(); // we are closed config.setTranslogGeneration(translogGeneration); - try (Translog tlog = new Translog(config)){ + try (Translog tlog = new Translog(config)) { assertEquals("lastCommitted must be 1 less than current", translogGeneration.translogFileGeneration + 1, tlog.currentFileGeneration()); assertFalse(tlog.syncNeeded()); @@ -1393,7 +1397,7 @@ public class TranslogTests extends ESTestCase { for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { locations.add(translog.add(new Translog.Index("test", "" + opsAdded, lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))))); try (Translog.Snapshot snapshot = translog.newSnapshot()) { - assertEquals(opsAdded+1, snapshot.estimatedTotalOperations()); + assertEquals(opsAdded + 1, snapshot.estimatedTotalOperations()); for (int i = 0; i < opsAdded; i++) { assertEquals("expected operation" + i + " to be in the current translog but wasn't", translog.currentFileGeneration(), locations.get(i).generation); Translog.Operation next = snapshot.next(); @@ -1407,7 +1411,7 @@ public class TranslogTests extends ESTestCase { Path tempDir = createTempDir(); final AtomicBoolean fail = new AtomicBoolean(); TranslogConfig config = getTranslogConfig(tempDir); - assumeFalse("this won't work if we sync on any op",config.isSyncOnEachOperation()); + assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation()); Translog translog = getFailableTranslog(fail, config, false, true); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); @@ -1427,7 +1431,7 @@ public class TranslogTests extends ESTestCase { assertTrue(ex.getCause() instanceof UnknownException); } assertFalse(translog.isOpen()); - assertTrue(translog.getTragicException() instanceof UnknownException); + assertTrue(translog.getTragicException() instanceof UnknownException); } public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException { @@ -1520,6 +1524,7 @@ public class TranslogTests extends ESTestCase { } } } + private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException { return getFailableTranslog(fail, config, randomBoolean(), false); } @@ -1613,4 +1618,97 @@ public class TranslogTests extends ESTestCase { // all is well } } + + public void testRecoverWithUnbackedNextGen() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + } + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 2; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + } + } + + public void testRecoverWithUnbackedNextGenInIllegalState() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + // don't copy the new file + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + + try { + Translog tlog = new Translog(config); + fail("file already exists?"); + } catch (TranslogException ex) { + // all is well + assertEquals(ex.getMessage(), "failed to create new translog file"); + assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); + } + } + public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { + translog.add(new Translog.Index("test", "" + 0, Integer.toString(0).getBytes(Charset.forName("UTF-8")))); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + translog.close(); + TranslogConfig config = translog.getConfig(); + + Path ckp = config.getTranslogPath().resolve(Translog.CHECKPOINT_FILE_NAME); + Checkpoint read = Checkpoint.read(ckp); + Files.copy(ckp, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation))); + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 1) + ".tlog")); + // we add N+1 and N+2 to ensure we only delete the N+1 file and never jump ahead and wipe without the right condition + Files.createFile(config.getTranslogPath().resolve("translog-" + (read.generation + 2) + ".tlog")); + config.setTranslogGeneration(translogGeneration); + try (Translog tlog = new Translog(config)) { + assertNotNull(translogGeneration); + assertFalse(tlog.syncNeeded()); + try (Translog.Snapshot snapshot = tlog.newSnapshot()) { + for (int i = 0; i < 1; i++) { + Translog.Operation next = snapshot.next(); + assertNotNull("operation " + i + " must be non-null", next); + assertEquals("payload missmatch", i, Integer.parseInt(next.getSource().source.toUtf8())); + } + } + tlog.add(new Translog.Index("test", "" + 1, Integer.toString(1).getBytes(Charset.forName("UTF-8")))); + } + + try { + Translog tlog = new Translog(config); + fail("file already exists?"); + } catch (TranslogException ex) { + // all is well + assertEquals(ex.getMessage(), "failed to create new translog file"); + assertEquals(ex.getCause().getClass(), FileAlreadyExistsException.class); + } + } + }