From 0ce66b4d70dce79b83c36f7048d93fc4c3ea61f9 Mon Sep 17 00:00:00 2001 From: Britta Weber Date: Wed, 9 Sep 2015 10:07:10 +0200 Subject: [PATCH] Engine: refresh before translog commit When we commit the translog, documents that were in it before cannot be retrieved from it anymore via get and have to be retrieved from the index instead. But they will only be visible if between index and get a refresh is called. Therfore we have to call first refresh and then translog.commit() because otherwise there is a small gap in which we cannot read from the translog anymore but also not from the index. closes #13379 --- .../index/engine/InternalEngine.java | 3 +- .../index/engine/InternalEngineTests.java | 31 +++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 512153b909a..45f7801831e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -756,9 +756,10 @@ public class InternalEngine extends Engine { logger.trace("starting commit for flush; commitTranslog=true"); commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); - translog.commit(); // we need to refresh in order to clear older version values refresh("version_table_flush"); + // after refresh documents can be retrieved from the index so we can now commit the translog + translog.commit(); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 0c154daf022..9f012efd47e 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -94,6 +94,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; @@ -521,6 +522,36 @@ public class InternalEngineTests extends ESTestCase { IOUtils.close(store, engine); } + @Test + /* */ + public void testConcurrentGetAndFlush() throws Exception { + ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); + engine.create(new Engine.Create(newUid("1"), doc)); + final AtomicReference latestGetResult = new AtomicReference<>(); + final AtomicBoolean flushFinished = new AtomicBoolean(false); + Thread getThread = new Thread() { + @Override + public void run() { + while (flushFinished.get() == false) { + Engine.GetResult previousGetResult = latestGetResult.get(); + if (previousGetResult != null) { + previousGetResult.release(); + } + latestGetResult.set(engine.get(new Engine.Get(true, newUid("1")))); + if (latestGetResult.get().exists() == false) { + break; + } + } + } + }; + getThread.start(); + engine.flush(); + flushFinished.set(true); + getThread.join(); + assertTrue(latestGetResult.get().exists()); + latestGetResult.get().release(); + } + @Test public void testSimpleOperations() throws Exception { Engine.Searcher searchResult = engine.acquireSearcher("test");