diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a361a53450c..90e9616ef26 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -154,23 +154,29 @@ public class InternalEngine extends Engine { throttle = new IndexThrottle(); this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig); try { - writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG); - final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer); - if (logger.isTraceEnabled()) { - logger.trace( - "recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]", - seqNoStats.getMaxSeqNo(), - seqNoStats.getLocalCheckpoint(), - seqNoStats.getGlobalCheckpoint()); + final SeqNoStats seqNoStats; + switch (openMode) { + case OPEN_INDEX_AND_TRANSLOG: + writer = createWriter(false); + seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer); + break; + case OPEN_INDEX_CREATE_TRANSLOG: + writer = createWriter(false); + seqNoStats = loadSeqNoStatsLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer); + break; + case CREATE_INDEX_AND_TRANSLOG: + writer = createWriter(true); + seqNoStats = new SeqNoStats( + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.NO_OPS_PERFORMED, + SequenceNumbersService.UNASSIGNED_SEQ_NO); + break; + default: + throw new IllegalArgumentException(openMode.toString()); } - seqNoService = - new SequenceNumbersService( - shardId, - engineConfig.getIndexSettings(), - seqNoStats.getMaxSeqNo(), - seqNoStats.getLocalCheckpoint(), - seqNoStats.getGlobalCheckpoint()); + logger.trace("recovered [{}]", seqNoStats); indexWriter = writer; + seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint); assert translog.getGeneration() != null; } catch (IOException | TranslogCorruptedException e) { @@ -209,6 +215,18 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + private static SequenceNumbersService sequenceNumberService( + final ShardId shardId, + final IndexSettings indexSettings, + final SeqNoStats seqNoStats) { + return new SequenceNumbersService( + shardId, + indexSettings, + seqNoStats.getMaxSeqNo(), + seqNoStats.getLocalCheckpoint(), + seqNoStats.getGlobalCheckpoint()); + } + @Override public InternalEngine recoverFromTranslog() throws IOException { flushLock.lock(); @@ -326,18 +344,33 @@ public class InternalEngine extends Engine { } /** - * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog + * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog * checkpoint (global checkpoint). * - * @param engineConfig the engine configuration (for the open mode and the translog path) - * @param writer the index writer (for the Lucene commit point) + * @param translogConfig the translog config (for the global checkpoint) + * @param indexWriter the index writer (for the Lucene commit point) * @return the sequence number stats * @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint */ - private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException { + private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog( + final TranslogConfig translogConfig, + final IndexWriter indexWriter) throws IOException { + long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()); + return loadSeqNoStatsLucene(globalCheckpoint, indexWriter); + } + + /** + * Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the + * specified global checkpoint. + * + * @param globalCheckpoint the global checkpoint to use + * @param indexWriter the index writer (for the Lucene commit point) + * @return the sequence number stats + */ + private static SeqNoStats loadSeqNoStatsLucene(final long globalCheckpoint, final IndexWriter indexWriter) { long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED; - for (Map.Entry entry : writer.getLiveCommitData()) { + for (Map.Entry entry : indexWriter.getLiveCommitData()) { final String key = entry.getKey(); if (key.equals(LOCAL_CHECKPOINT_KEY)) { assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED; @@ -348,13 +381,6 @@ public class InternalEngine extends Engine { } } - final long globalCheckpoint; - if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); - } else { - globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO; - } - return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); }