Clarify global checkpoint recovery
Today when starting a new engine, we read the global checkpoint from the translog only if we are opening an existing translog. This commit clarifies this situation by distinguishing the three cases of engine creation in the constructor leading to clearer code. Relates #21934
This commit is contained in:
parent
0afef53a17
commit
b0e8696143
|
@ -154,23 +154,29 @@ public class InternalEngine extends Engine {
|
||||||
throttle = new IndexThrottle();
|
throttle = new IndexThrottle();
|
||||||
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
|
this.searcherFactory = new SearchFactory(logger, isClosed, engineConfig);
|
||||||
try {
|
try {
|
||||||
writer = createWriter(openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG);
|
final SeqNoStats seqNoStats;
|
||||||
final SeqNoStats seqNoStats = loadSeqNoStats(engineConfig, writer);
|
switch (openMode) {
|
||||||
if (logger.isTraceEnabled()) {
|
case OPEN_INDEX_AND_TRANSLOG:
|
||||||
logger.trace(
|
writer = createWriter(false);
|
||||||
"recovering max sequence number: [{}], local checkpoint: [{}], global checkpoint: [{}]",
|
seqNoStats = loadSeqNoStatsFromLuceneAndTranslog(engineConfig.getTranslogConfig(), writer);
|
||||||
seqNoStats.getMaxSeqNo(),
|
break;
|
||||||
seqNoStats.getLocalCheckpoint(),
|
case OPEN_INDEX_CREATE_TRANSLOG:
|
||||||
seqNoStats.getGlobalCheckpoint());
|
writer = createWriter(false);
|
||||||
|
seqNoStats = loadSeqNoStatsLucene(SequenceNumbersService.UNASSIGNED_SEQ_NO, writer);
|
||||||
|
break;
|
||||||
|
case CREATE_INDEX_AND_TRANSLOG:
|
||||||
|
writer = createWriter(true);
|
||||||
|
seqNoStats = new SeqNoStats(
|
||||||
|
SequenceNumbersService.NO_OPS_PERFORMED,
|
||||||
|
SequenceNumbersService.NO_OPS_PERFORMED,
|
||||||
|
SequenceNumbersService.UNASSIGNED_SEQ_NO);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException(openMode.toString());
|
||||||
}
|
}
|
||||||
seqNoService =
|
logger.trace("recovered [{}]", seqNoStats);
|
||||||
new SequenceNumbersService(
|
|
||||||
shardId,
|
|
||||||
engineConfig.getIndexSettings(),
|
|
||||||
seqNoStats.getMaxSeqNo(),
|
|
||||||
seqNoStats.getLocalCheckpoint(),
|
|
||||||
seqNoStats.getGlobalCheckpoint());
|
|
||||||
indexWriter = writer;
|
indexWriter = writer;
|
||||||
|
seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats);
|
||||||
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
|
translog = openTranslog(engineConfig, writer, seqNoService::getGlobalCheckpoint);
|
||||||
assert translog.getGeneration() != null;
|
assert translog.getGeneration() != null;
|
||||||
} catch (IOException | TranslogCorruptedException e) {
|
} catch (IOException | TranslogCorruptedException e) {
|
||||||
|
@ -209,6 +215,18 @@ public class InternalEngine extends Engine {
|
||||||
logger.trace("created new InternalEngine");
|
logger.trace("created new InternalEngine");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static SequenceNumbersService sequenceNumberService(
|
||||||
|
final ShardId shardId,
|
||||||
|
final IndexSettings indexSettings,
|
||||||
|
final SeqNoStats seqNoStats) {
|
||||||
|
return new SequenceNumbersService(
|
||||||
|
shardId,
|
||||||
|
indexSettings,
|
||||||
|
seqNoStats.getMaxSeqNo(),
|
||||||
|
seqNoStats.getLocalCheckpoint(),
|
||||||
|
seqNoStats.getGlobalCheckpoint());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public InternalEngine recoverFromTranslog() throws IOException {
|
public InternalEngine recoverFromTranslog() throws IOException {
|
||||||
flushLock.lock();
|
flushLock.lock();
|
||||||
|
@ -326,18 +344,33 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the Translog
|
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and the translog
|
||||||
* checkpoint (global checkpoint).
|
* checkpoint (global checkpoint).
|
||||||
*
|
*
|
||||||
* @param engineConfig the engine configuration (for the open mode and the translog path)
|
* @param translogConfig the translog config (for the global checkpoint)
|
||||||
* @param writer the index writer (for the Lucene commit point)
|
* @param indexWriter the index writer (for the Lucene commit point)
|
||||||
* @return the sequence number stats
|
* @return the sequence number stats
|
||||||
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
|
* @throws IOException if an I/O exception occurred reading the Lucene commit point or the translog checkpoint
|
||||||
*/
|
*/
|
||||||
private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final IndexWriter writer) throws IOException {
|
private static SeqNoStats loadSeqNoStatsFromLuceneAndTranslog(
|
||||||
|
final TranslogConfig translogConfig,
|
||||||
|
final IndexWriter indexWriter) throws IOException {
|
||||||
|
long globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath());
|
||||||
|
return loadSeqNoStatsLucene(globalCheckpoint, indexWriter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the sequence number stats from the Lucene commit point (maximum sequence number and local checkpoint) and uses the
|
||||||
|
* specified global checkpoint.
|
||||||
|
*
|
||||||
|
* @param globalCheckpoint the global checkpoint to use
|
||||||
|
* @param indexWriter the index writer (for the Lucene commit point)
|
||||||
|
* @return the sequence number stats
|
||||||
|
*/
|
||||||
|
private static SeqNoStats loadSeqNoStatsLucene(final long globalCheckpoint, final IndexWriter indexWriter) {
|
||||||
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
|
long maxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED;
|
||||||
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
|
long localCheckpoint = SequenceNumbersService.NO_OPS_PERFORMED;
|
||||||
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
|
for (Map.Entry<String, String> entry : indexWriter.getLiveCommitData()) {
|
||||||
final String key = entry.getKey();
|
final String key = entry.getKey();
|
||||||
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
|
if (key.equals(LOCAL_CHECKPOINT_KEY)) {
|
||||||
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
|
assert localCheckpoint == SequenceNumbersService.NO_OPS_PERFORMED;
|
||||||
|
@ -348,13 +381,6 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final long globalCheckpoint;
|
|
||||||
if (engineConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
|
||||||
globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath());
|
|
||||||
} else {
|
|
||||||
globalCheckpoint = SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
|
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue