add tests to ensure translog ID is baked into commit on all open modes

This commit is contained in:
Simon Willnauer 2016-03-31 15:09:11 +02:00
parent 0a277227c3
commit f7cbc384f7
4 changed files with 76 additions and 11 deletions

View File

@ -227,10 +227,10 @@ public class InternalEngine extends Engine {
} }
private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException { private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer) throws IOException {
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
translogConfig.setTranslogGeneration(null);
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer);
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! // We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
if (generation == null) { if (generation == null) {
throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist"); throw new IllegalStateException("no translog generation present in commit data but translog is expected to exist");
@ -241,7 +241,10 @@ public class InternalEngine extends Engine {
} }
} }
final Translog translog = new Translog(translogConfig); final Translog translog = new Translog(translogConfig);
final Translog.TranslogGeneration generation = translogConfig.getTranslogGeneration();
if (generation == null || generation.translogUUID == null) { if (generation == null || generation.translogUUID == null) {
assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be "
+ EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
if (generation == null) { if (generation == null) {
logger.debug("no translog ID present in the current generation - creating one"); logger.debug("no translog ID present in the current generation - creating one");
} else if (generation.translogUUID == null) { } else if (generation.translogUUID == null) {
@ -249,7 +252,8 @@ public class InternalEngine extends Engine {
} }
boolean success = false; boolean success = false;
try { try {
commitIndexWriter(writer, translog); commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG
? writer.getCommitData().get(SYNC_COMMIT_ID) : null);
success = true; success = true;
} finally { } finally {
if (success == false) { if (success == false) {
@ -661,7 +665,7 @@ public class InternalEngine extends Engine {
try { try {
translog.prepareCommit(); translog.prepareCommit();
logger.trace("starting commit for flush; commitTranslog=true"); logger.trace("starting commit for flush; commitTranslog=true");
commitIndexWriter(indexWriter, translog); commitIndexWriter(indexWriter, translog, null);
logger.trace("finished commit for flush"); logger.trace("finished commit for flush");
// we need to refresh in order to clear older version values // we need to refresh in order to clear older version values
refresh("version_table_flush"); refresh("version_table_flush");
@ -1129,10 +1133,6 @@ public class InternalEngine extends Engine {
} }
} }
private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException {
commitIndexWriter(writer, translog, null);
}
public void onSettingsChanged() { public void onSettingsChanged() {
mergeScheduler.refreshConfig(); mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:

View File

@ -1343,4 +1343,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME)); return Checkpoint.read(location.resolve(CHECKPOINT_FILE_NAME));
} }
/**
* Returns the translog uuid used to associate a lucene index with a translog.
*/
public String getTranslogUUID() {
return translogUUID;
}
} }

View File

@ -100,7 +100,7 @@ public final class TranslogConfig {
* file referenced by this generation. The translog creation will fail if this generation can't be opened. * file referenced by this generation. The translog creation will fail if this generation can't be opened.
*/ */
public TranslogGeneration getTranslogGeneration() { public TranslogGeneration getTranslogGeneration() {
return translogGeneration; return translogGeneration; // TODO make this a ctor argument on the Translog - this mutable state is aweful
} }
/** /**

View File

@ -923,7 +923,7 @@ public class InternalEngineTests extends ESTestCase {
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) { if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG && randomBoolean()) {
engine.recoverFromTranslog(); engine.recoverFromTranslog();
} }
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); assertEquals(engine.config().getOpenMode().toString(), engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
} }
public void testSycnedFlushVanishesOnReplay() throws IOException { public void testSycnedFlushVanishesOnReplay() throws IOException {
@ -1676,7 +1676,6 @@ public class InternalEngineTests extends ESTestCase {
assertThat(topDocs.totalHits, equalTo(numDocs)); assertThat(topDocs.totalHits, equalTo(numDocs));
} }
engine.close(); engine.close();
boolean recoveredButFailed = false;
final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class); final MockDirectoryWrapper directory = DirectoryUtils.getLeaf(store.directory(), MockDirectoryWrapper.class);
if (directory != null) { if (directory != null) {
// since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents // since we rollback the IW we are writing the same segment files again after starting IW but MDW prevents
@ -2047,4 +2046,63 @@ public class InternalEngineTests extends ESTestCase {
logger.info("exception caught: ", throwable.get()); logger.info("exception caught: ", throwable.get());
assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get())); assertTrue("expected an Exception that signals shard is not available", TransportActions.isShardNotAvailableException(throwable.get()));
} }
public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy());
// create
{
ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
engine.index(firstIndexRequest);
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG))) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
} }