From 0290547ad7455d9d2330665eb7520b9908e0860b Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 22 Jan 2019 09:22:33 +0100 Subject: [PATCH] Ensure that max seq # is equal to the global checkpoint when creating ReadOnlyEngines (#37426) Since version 6.7.0 the Close Index API guarantees that all translog operations have been correctly flushed before the index is closed. If the index is reopened as a Frozen index (which uses a ReadOnlyEngine) we can verify that the maximum sequence number from the last Lucene commit is indeed equal to the last known global checkpoint and refuses to open the read only engine if it's not the case. In this PR the check is only done for indices created on or after 6.7.0 as they are guaranteed to be closed using the new Close Index API. Related #33888 --- .../index/engine/ReadOnlyEngine.java | 28 +++++++++++++- .../index/engine/ReadOnlyEngineTests.java | 38 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c926a5a4719..5c09708b62c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -30,6 +30,8 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.SearcherManager; import org.apache.lucene.store.Directory; import org.apache.lucene.store.Lock; +import org.elasticsearch.Assertions; +import org.elasticsearch.Version; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.core.internal.io.IOUtils; @@ -98,7 +100,25 @@ public class ReadOnlyEngine extends Engine { indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null; this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory); this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats; - this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats; + if (seqNoStats == null) { + seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos); + // During a peer-recovery the global checkpoint is not known and up to date when the engine + // is created, so we only check the max seq no / global checkpoint coherency when the global + // checkpoint is different from the unassigned sequence number value. + // In addition to that we only execute the check if the index the engine belongs to has been + // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction + // that guarantee that all operations have been flushed to Lucene. + final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong(); + if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO + && engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) { + if (seqNoStats.getMaxSeqNo() != globalCheckpoint) { + assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint); + throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo() + + "] from last commit does not match global checkpoint [" + globalCheckpoint + "]"); + } + } + } + this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); reader = open(indexCommit); reader = wrapReader(reader, readerWrapperFunction); @@ -116,6 +136,12 @@ public class ReadOnlyEngine extends Engine { } } + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + if (Assertions.ENABLED) { + assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]"; + } + } + protected final DirectoryReader wrapReader(DirectoryReader reader, Function readerWrapperFunction) throws IOException { reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index e5cdcc1ac73..b345afe9b8f 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -70,7 +70,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { lastDocIds = getDocIds(engine, true); assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo())); - assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); + assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds)); for (int i = 0; i < numDocs; i++) { if (randomBoolean()) { String delId = Integer.toString(i); @@ -126,7 +126,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { if (rarely()) { engine.flush(); } - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); + globalCheckpoint.set(i); } engine.syncTranslog(); engine.flushAndClose(); @@ -139,6 +139,40 @@ public class ReadOnlyEngineTests extends EngineTestCase { } } + public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { + IOUtils.close(engine, store); + Engine readOnlyEngine = null; + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + final int numDocs = scaledRandomIntBetween(10, 100); + try (InternalEngine engine = createEngine(config)) { + long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED; + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA, + System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0)); + maxSeqNo = engine.getLocalCheckpoint(); + } + globalCheckpoint.set(engine.getLocalCheckpoint() - 1); + engine.syncTranslog(); + engine.flushAndClose(); + + IllegalStateException exception = expectThrows(IllegalStateException.class, + () -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) { + @Override + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + // we don't want the assertion to trip in this test + } + }); + assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo + + "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]")); + } finally { + IOUtils.close(readOnlyEngine); + } + } + } + public void testReadOnly() throws IOException { IOUtils.close(engine, store); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);