Ensure that max seq # is equal to the global checkpoint when creating ReadOnlyEngines (#37426)
Since version 6.7.0 the Close Index API guarantees that all translog operations have been correctly flushed before the index is closed. If the index is reopened as a Frozen index (which uses a ReadOnlyEngine) we can verify that the maximum sequence number from the last Lucene commit is indeed equal to the last known global checkpoint and refuses to open the read only engine if it's not the case. In this PR the check is only done for indices created on or after 6.7.0 as they are guaranteed to be closed using the new Close Index API. Related #33888
This commit is contained in:
parent
a713183cab
commit
0290547ad7
|
@ -30,6 +30,8 @@ import org.apache.lucene.search.ReferenceManager;
|
|||
import org.apache.lucene.search.SearcherManager;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.Lock;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
@ -98,7 +100,25 @@ public class ReadOnlyEngine extends Engine {
|
|||
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
|
||||
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
|
||||
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
|
||||
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
|
||||
if (seqNoStats == null) {
|
||||
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
|
||||
// During a peer-recovery the global checkpoint is not known and up to date when the engine
|
||||
// is created, so we only check the max seq no / global checkpoint coherency when the global
|
||||
// checkpoint is different from the unassigned sequence number value.
|
||||
// In addition to that we only execute the check if the index the engine belongs to has been
|
||||
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
|
||||
// that guarantee that all operations have been flushed to Lucene.
|
||||
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
|
||||
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
|
||||
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
|
||||
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
|
||||
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
|
||||
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
|
||||
+ "] from last commit does not match global checkpoint [" + globalCheckpoint + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
this.seqNoStats = seqNoStats;
|
||||
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
|
||||
reader = open(indexCommit);
|
||||
reader = wrapReader(reader, readerWrapperFunction);
|
||||
|
@ -116,6 +136,12 @@ public class ReadOnlyEngine extends Engine {
|
|||
}
|
||||
}
|
||||
|
||||
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
|
||||
if (Assertions.ENABLED) {
|
||||
assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
|
||||
}
|
||||
}
|
||||
|
||||
protected final DirectoryReader wrapReader(DirectoryReader reader,
|
||||
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
|
||||
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
|
||||
|
|
|
@ -126,7 +126,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
|||
if (rarely()) {
|
||||
engine.flush();
|
||||
}
|
||||
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
|
||||
globalCheckpoint.set(i);
|
||||
}
|
||||
engine.syncTranslog();
|
||||
engine.flushAndClose();
|
||||
|
@ -139,6 +139,40 @@ public class ReadOnlyEngineTests extends EngineTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
|
||||
IOUtils.close(engine, store);
|
||||
Engine readOnlyEngine = null;
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
try (Store store = createStore()) {
|
||||
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
|
||||
final int numDocs = scaledRandomIntBetween(10, 100);
|
||||
try (InternalEngine engine = createEngine(config)) {
|
||||
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
|
||||
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
|
||||
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
|
||||
maxSeqNo = engine.getLocalCheckpoint();
|
||||
}
|
||||
globalCheckpoint.set(engine.getLocalCheckpoint() - 1);
|
||||
engine.syncTranslog();
|
||||
engine.flushAndClose();
|
||||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class,
|
||||
() -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) {
|
||||
@Override
|
||||
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
|
||||
// we don't want the assertion to trip in this test
|
||||
}
|
||||
});
|
||||
assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo
|
||||
+ "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]"));
|
||||
} finally {
|
||||
IOUtils.close(readOnlyEngine);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadOnly() throws IOException {
|
||||
IOUtils.close(engine, store);
|
||||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
|
|
Loading…
Reference in New Issue