diff --git a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 19d5a145c10..5c3fe0b616a 100644 --- a/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -200,7 +200,7 @@ public final class EngineConfig { } /** if true the engine will start even if the translog id in the commit point can not be found */ - public boolean forceNewTranlog() { + public boolean forceNewTranslog() { return forceNewTranslog; } diff --git a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 499a9495333..c68c3a114d1 100644 --- a/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -35,7 +35,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.InfoStream; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.DjbHashFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; @@ -134,7 +133,7 @@ public class InternalEngine extends Engine { try { writer = createWriter(); indexWriter = writer; - translog = openTranslog(engineConfig, writer, skipInitialTranslogRecovery || engineConfig.forceNewTranlog()); + translog = openTranslog(engineConfig, writer, skipInitialTranslogRecovery || engineConfig.forceNewTranslog()); translogGeneration = translog.getGeneration(); assert translogGeneration != null; } catch (IOException | TranslogCorruptedException e) { @@ -151,7 +150,7 @@ public class InternalEngine extends Engine { try { if (skipInitialTranslogRecovery) { // make sure we point at the latest translog from now on.. - commitIndexWriter(writer, translog.getGeneration()); + commitIndexWriter(writer, translog); } else { recoverFromTranslog(engineConfig, translogGeneration); } @@ -174,22 +173,21 @@ public class InternalEngine extends Engine { private Translog openTranslog(EngineConfig engineConfig, IndexWriter writer, boolean createNew) throws IOException { final Translog.TranslogGeneration generation = loadTranslogIdFromCommit(writer); - Translog translog; - TranslogConfig translogConfig = engineConfig.getTranslogConfig(); - if (createNew) { - translog = new Translog(translogConfig); - } else { + final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); + if (createNew == false) { translogConfig.setTranslogGeneration(generation); if (generation != null && generation.translogUUID == null) { // only upgrade on pre-2.0 indices... Translog.upgradeLegacyTranslog(logger, translogConfig); } - translog = new Translog(translogConfig); } - + final Translog translog = new Translog(translogConfig); if (generation == null) { + if (createNew) { + throw new IllegalStateException("no tranlog generation present in commit data but tranlog is expected to exists"); + } logger.debug("no translog ID present in the current generation - creating one"); - commitIndexWriter(writer, translog.getGeneration()); + commitIndexWriter(writer, translog); } return translog; } @@ -244,7 +242,9 @@ public class InternalEngine extends Engine { assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false : "legacy commit contains translog UUID"; return new Translog.TranslogGeneration(null, Long.parseLong(commitUserData.get("translog_id"))); } else if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY)) { - assert commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) : "commit doesn't contain translog UUID"; + if (commitUserData.containsKey(Translog.TRANSLOG_UUID_KEY) == false) { + throw new IllegalStateException("commit doesn't contain translog UUID"); + } final String translogUUID = commitUserData.get(Translog.TRANSLOG_UUID_KEY); final long translogGen = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY)); return new Translog.TranslogGeneration(translogUUID, translogGen); @@ -702,12 +702,10 @@ public class InternalEngine extends Engine { if (commitTranslog) { if (flushNeeded || force) { flushNeeded = false; - final Translog.TranslogGeneration translogGeneration; try { translog.prepareCommit(); - translogGeneration = translog.getGeneration(); logger.trace("starting commit for flush; commitTranslog=true"); - commitIndexWriter(indexWriter, translogGeneration); + commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); translog.commit(); // we need to refresh in order to clear older version values @@ -726,7 +724,7 @@ public class InternalEngine extends Engine { // other flushes use flushLock try { logger.trace("starting commit for flush; commitTranslog=false"); - commitIndexWriter(indexWriter, translog.getGeneration()); + commitIndexWriter(indexWriter, translog); logger.trace("finished commit for flush"); } catch (Throwable e) { throw new FlushFailedEngineException(shardId, e); @@ -1176,12 +1174,13 @@ public class InternalEngine extends Engine { } - private void commitIndexWriter(IndexWriter writer, Translog.TranslogGeneration commit) throws IOException { + private void commitIndexWriter(IndexWriter writer, Translog translog) throws IOException { try { - logger.trace("committing writer with translog id [{}] ", commit.translogFileGeneration); + Translog.TranslogGeneration translogGeneration = translog.getGeneration(); + logger.trace("committing writer with translog id [{}] ", translogGeneration.translogFileGeneration); Map commitData = new HashMap<>(2); - commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(commit.translogFileGeneration)); - commitData.put(Translog.TRANSLOG_UUID_KEY, commit.translogUUID); + commitData.put(Translog.TRANSLOG_GENERATION_KEY, Long.toString(translogGeneration.translogFileGeneration)); + commitData.put(Translog.TRANSLOG_UUID_KEY, translogGeneration.translogUUID); indexWriter.setCommitData(commitData); writer.commit(); } catch (Throwable ex) { diff --git a/src/main/java/org/elasticsearch/index/translog/Translog.java b/src/main/java/org/elasticsearch/index/translog/Translog.java index 9d2d81f97ec..b247189fb5f 100644 --- a/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -380,6 +380,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (currentCommittingTranslog != null) { int tops = currentCommittingTranslog.totalOperations(); assert tops != TranslogReader.UNKNOWN_OP_COUNT; + assert tops >= 0; ops += tops; } } @@ -499,7 +500,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC * while receiving future ones as well */ public Translog.View newView() { - // we need to acquire the read lock to make sure new translog is created + // we need to acquire the read lock to make sure no new translog is created // and will be missed by the view we're making try (ReleasableLock lock = readLock.acquire()) { ArrayList translogs = new ArrayList<>(); @@ -571,7 +572,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC } } - private boolean isReferencedGeneration(long generation) { // pkg private for testing + private boolean isReferencedGeneration(long generation) { // used to make decisions if a file can be deleted return generation >= lastCommittedTranslogFileGeneration; } @@ -662,6 +663,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (tops == TranslogReader.UNKNOWN_OP_COUNT) { return -1; } + assert tops >= 0; ops += tops; } return ops; @@ -812,7 +814,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC /** * Returns the next operation in the snapshot or null if we reached the end. */ - public Translog.Operation next() throws IOException; + Translog.Operation next() throws IOException; } @@ -1653,8 +1655,8 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (currentCommittingTranslog != null) { throw new IllegalStateException("already committing a translog with generation: " + currentCommittingTranslog.getGeneration()); } - final TranslogWriter writer = current; - writer.sync(); + final TranslogWriter oldCurrent = current; + oldCurrent.sync(); currentCommittingTranslog = current.immutableReader(); Path checkpoint = location.resolve(CHECKPOINT_FILE_NAME); assert Checkpoint.read(checkpoint).generation == currentCommittingTranslog.getGeneration(); @@ -1663,16 +1665,15 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC IOUtils.fsync(commitCheckpoint, false); IOUtils.fsync(commitCheckpoint.getParent(), true); // create a new translog file - this will sync it and update the checkpoint data; - final TranslogWriter newFile = createWriter(current.getGeneration() + 1); - current = newFile; + current = createWriter(current.getGeneration() + 1); // notify all outstanding views of the new translog (no views are created now as // we hold a write lock). for (FsView view : outstandingViews) { - view.onNewTranslog(currentCommittingTranslog.clone(), newFile.newReaderFromWriter()); + view.onNewTranslog(currentCommittingTranslog.clone(), current.newReaderFromWriter()); } - IOUtils.close(writer); + IOUtils.close(oldCurrent); logger.trace("current translog set to [{}]", current.getGeneration()); - assert writer.syncNeeded() == false : "old translog writer must not need a sync"; + assert oldCurrent.syncNeeded() == false : "old translog oldCurrent must not need a sync"; } catch (Throwable t) { IOUtils.closeWhileHandlingException(this); // tragic event @@ -1688,7 +1689,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC if (currentCommittingTranslog == null) { prepareCommit(); } - current.sync(); lastCommittedTranslogFileGeneration = current.getGeneration(); // this is important - otherwise old files will not be cleaned up if (recoveredTranslogs.isEmpty() == false) { IOUtils.close(recoveredTranslogs);