Engine: refresh before translog commit
When we commit the translog, documents that were in it before cannot be retrieved from it anymore via get and have to be retrieved from the index instead. But they will only be visible if between index and get a refresh is called. Therfore we have to call first refresh and then translog.commit() because otherwise there is a small gap in which we cannot read from the translog anymore but also not from the index. closes #13379
This commit is contained in:
parent
e40409dd7f
commit
0ce66b4d70
|
@ -756,9 +756,10 @@ public class InternalEngine extends Engine {
|
|||
logger.trace("starting commit for flush; commitTranslog=true");
|
||||
commitIndexWriter(indexWriter, translog);
|
||||
logger.trace("finished commit for flush");
|
||||
translog.commit();
|
||||
// we need to refresh in order to clear older version values
|
||||
refresh("version_table_flush");
|
||||
// after refresh documents can be retrieved from the index so we can now commit the translog
|
||||
translog.commit();
|
||||
} catch (Throwable e) {
|
||||
throw new FlushFailedEngineException(shardId, e);
|
||||
}
|
||||
|
|
|
@ -94,6 +94,7 @@ import java.nio.file.Files;
|
|||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -521,6 +522,36 @@ public class InternalEngineTests extends ESTestCase {
|
|||
IOUtils.close(store, engine);
|
||||
}
|
||||
|
||||
@Test
|
||||
/* */
|
||||
public void testConcurrentGetAndFlush() throws Exception {
|
||||
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
|
||||
engine.create(new Engine.Create(newUid("1"), doc));
|
||||
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
|
||||
final AtomicBoolean flushFinished = new AtomicBoolean(false);
|
||||
Thread getThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (flushFinished.get() == false) {
|
||||
Engine.GetResult previousGetResult = latestGetResult.get();
|
||||
if (previousGetResult != null) {
|
||||
previousGetResult.release();
|
||||
}
|
||||
latestGetResult.set(engine.get(new Engine.Get(true, newUid("1"))));
|
||||
if (latestGetResult.get().exists() == false) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
getThread.start();
|
||||
engine.flush();
|
||||
flushFinished.set(true);
|
||||
getThread.join();
|
||||
assertTrue(latestGetResult.get().exists());
|
||||
latestGetResult.get().release();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleOperations() throws Exception {
|
||||
Engine.Searcher searchResult = engine.acquireSearcher("test");
|
||||
|
|
Loading…
Reference in New Issue