Merge pull request #13414 from brwe/commit-refresh-order

Engine: refresh before translog commit
This commit is contained in:
Britta Weber 2015-09-09 12:06:26 +02:00
commit fa9696fb8c
2 changed files with 33 additions and 1 deletions

View File

@ -756,9 +756,10 @@ public class InternalEngine extends Engine {
logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog);
logger.trace("finished commit for flush");
translog.commit();
// we need to refresh in order to clear older version values
refresh("version_table_flush");
// after refresh documents can be retrieved from the index so we can now commit the translog
translog.commit();
} catch (Throwable e) {
throw new FlushFailedEngineException(shardId, e);
}

View File

@ -94,6 +94,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
@ -521,6 +522,36 @@ public class InternalEngineTests extends ESTestCase {
IOUtils.close(store, engine);
}
@Test
/* */
public void testConcurrentGetAndFlush() throws Exception {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.create(new Engine.Create(newUid("1"), doc));
final AtomicReference<Engine.GetResult> latestGetResult = new AtomicReference<>();
final AtomicBoolean flushFinished = new AtomicBoolean(false);
Thread getThread = new Thread() {
@Override
public void run() {
while (flushFinished.get() == false) {
Engine.GetResult previousGetResult = latestGetResult.get();
if (previousGetResult != null) {
previousGetResult.release();
}
latestGetResult.set(engine.get(new Engine.Get(true, newUid("1"))));
if (latestGetResult.get().exists() == false) {
break;
}
}
}
};
getThread.start();
engine.flush();
flushFinished.set(true);
getThread.join();
assertTrue(latestGetResult.get().exists());
latestGetResult.get().release();
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = engine.acquireSearcher("test");