diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 512153b909a..45f7801831e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -756,9 +756,10 @@ public class InternalEngine extends Engine { logger.trace("starting commit for flush; commitTranslog=true"); commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); - translog.commit(); // we need to refresh in order to clear older version values refresh("version_table_flush"); + // after refresh documents can be retrieved from the index so we can now commit the translog + translog.commit(); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0c154daf022..9f012efd47e 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -94,6 +94,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -521,6 +522,36 @@ public class InternalEngineTests extends ESTestCase { IOUtils.close(store, engine); } + @Test + /* */ + public void testConcurrentGetAndFlush() throws Exception { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + engine.create(new Engine.Create(newUid("1"), doc)); + final AtomicReference latestGetResult = new AtomicReference<>(); + final AtomicBoolean flushFinished = new AtomicBoolean(false); + Thread getThread = new Thread() { + @Override + public void run() { + while (flushFinished.get() == false) { + Engine.GetResult previousGetResult = latestGetResult.get(); + if (previousGetResult != null) { + previousGetResult.release(); + } + latestGetResult.set(engine.get(new Engine.Get(true, newUid("1")))); + if (latestGetResult.get().exists() == false) { + break; + } + } + } + }; + getThread.start(); + engine.flush(); + flushFinished.set(true); + getThread.join(); + assertTrue(latestGetResult.get().exists()); + latestGetResult.get().release(); + } + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.acquireSearcher("test");