diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 63f15d93466..b1f923222e9 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -227,10 +227,10 @@ public class InternalEngine extends Engine { } private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { - final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - + translogConfig.setTranslogGeneration(null); if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! if (generation == null) { throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); @@ -241,7 +241,10 @@ public class InternalEngine extends Engine { } } final Translog translog = new Translog(translogConfig); + final Translog.TranslogGeneration generation = translogConfig.getTranslogGeneration(); if (generation == null || generation.translogUUID == null) { + assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " + + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; if (generation == null) { logger.debug("no translog ID present in the current generation - creating one"); } else if (generation.translogUUID == null) { @@ -249,7 +252,8 @@ public class InternalEngine extends Engine { } boolean success = false; try { - commitIndexWriter(writer, translog); + commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + ? writer.getCommitData().get(SYNC_COMMIT_ID) : null); success = true; } finally { if (success == false) { @@ -661,7 +665,7 @@ public class InternalEngine extends Engine { try { translog.prepareCommit(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translog); + commitIndexWriter(indexWriter, translog, null); logger.trace("finished commit for flush"); // we need to refresh in order to clear older version values refresh("version_table_flush"); @@ -1129,10 +1133,6 @@ public class InternalEngine extends Engine { } } - private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { - commitIndexWriter(writer, translog, null); - } - public void onSettingsChanged() { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 31b8db03141..a0fda01d286 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -1343,4 +1343,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); } + /** + * Returns the translog uuid used to associate a lucene index with a translog. + */ + public String getTranslogUUID() { + return translogUUID; + } + } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java index a4ee7961fce..5f6e1912661 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogConfig.java @@ -100,7 +100,7 @@ public final class TranslogConfig { * file referenced by this generation. The translog creation will fail if this generation can't be opened. */ public TranslogGeneration getTranslogGeneration() { - return translogGeneration; + return translogGeneration; // TODO make this a ctor argument on the Translog - this mutable state is aweful } /** diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 21cef11f416..59c64111961 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -923,7 +923,7 @@ public class InternalEngineTests extends ESTestCase { if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) { engine.recoverFromTranslog(); } - assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); + assertEquals(engine.config().getOpenMode().toString(), engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } public void testSycnedFlushVanishesOnReplay() throws IOException { @@ -1676,7 +1676,6 @@ public class InternalEngineTests extends ESTestCase { assertThat(topDocs.totalHits, equalTo(numDocs)); } engine.close(); - boolean recoveredButFailed = false; final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class); if (directory != null) { // since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents @@ -2047,4 +2046,63 @@ public class InternalEngineTests extends ESTestCase { logger.info("exception caught: ", throwable.get()); assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get())); } + + public void testCurrentTranslogIDisCommitted() throws IOException { + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy()); + + // create + { + ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime()); + + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){ + engine.index(firstIndexRequest); + + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + // open and recover tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + // open index with new tlog + { + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + } + } + + // open and recover tlog with empty tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + } + } }