From 7944a0cb2526188295820461f75269aed435041f Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 22 Sep 2018 08:02:57 -0400 Subject: [PATCH] Track max seq_no of updates or deletes on primary (#33842) This PR is the first step to use seq_no to optimize indexing operations. The idea is to track the max seq_no of either update or delete ops on a primary, and transfer this information to replicas, and replicas use it to optimize indexing plan for index operations (with assigned seq_no). The max_seq_no_of_updates on primary is initialized once when a primary finishes its local recovery or peer recovery in relocation or being promoted. After that, the max_seq_no_of_updates is only advanced internally inside an engine when processing update or delete operations. Relates #33656 --- .../elasticsearch/index/engine/Engine.java | 38 ++++++++++++ .../index/engine/InternalEngine.java | 16 +++++ .../index/engine/ReadOnlyEngine.java | 5 ++ .../elasticsearch/index/shard/IndexShard.java | 52 +++++++++++++++- .../index/translog/Translog.java | 14 +++++ .../index/engine/InternalEngineTests.java | 60 +++++++++++++++++-- .../index/engine/ReadOnlyEngineTests.java | 1 + .../index/shard/IndexShardTests.java | 2 + .../index/shard/RefreshListenersTests.java | 1 + .../index/translog/TranslogTests.java | 28 ++++++++- .../elasticsearch/indices/flush/FlushIT.java | 3 +- .../index/engine/EngineTestCase.java | 1 + .../index/shard/IndexShardTestCase.java | 2 + .../index/engine/FollowingEngineTests.java | 1 + 14 files changed, 217 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 56373f25b52..f50f88456a5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -97,6 +97,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -136,6 +137,16 @@ public abstract class Engine implements Closeable { */ protected volatile long lastWriteNanos = System.nanoTime(); + /* + * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. + * An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. + * This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized. + * The value of this marker never goes backwards, and is updated/changed differently on primary and replica: + * 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete. + * 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes). + */ + private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -1781,4 +1792,31 @@ public abstract class Engine implements Closeable { public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; } + + /** + * Returns the maximum sequence number of either update or delete operations have been processed in this engine + * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered + * as an update operation if it overwrites the existing documents in Lucene index with the same document id. + * + * @see #initializeMaxSeqNoOfUpdatesOrDeletes() + * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) + */ + public final long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes.get(); + } + + /** + * A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the + * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. + */ + public abstract void initializeMaxSeqNoOfUpdatesOrDeletes(); + + /** + * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method + * to advance this marker to at least the given sequence number. + */ + public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); + assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo; + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 1043e514fd7..2f38562b7af 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -385,6 +385,7 @@ public class InternalEngine extends Engine { flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized"; if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -918,6 +919,7 @@ public class InternalEngine extends Engine { protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { @@ -952,6 +954,10 @@ public class InternalEngine extends Engine { ); } } + final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; + if (toAppend == false) { + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing); + } return plan; } @@ -1242,6 +1248,7 @@ public class InternalEngine extends Engine { protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); + assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete); assert incrementVersionLookup(); @@ -1263,6 +1270,7 @@ public class InternalEngine extends Engine { currentlyDeleted, generateSeqNoForOperation(delete), delete.versionType().updateVersion(currentVersion, delete.version())); + advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion); } return plan; } @@ -2548,4 +2556,12 @@ public class InternalEngine extends Engine { assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } + + @Override + public void initializeMaxSeqNoOfUpdatesOrDeletes() { + assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO : + "max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]"; + final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); + advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); + } } diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index c36c2fc9c29..7848921b67e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -379,4 +379,9 @@ public final class ReadOnlyEngine extends Engine { public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) { } + + @Override + public void initializeMaxSeqNoOfUpdatesOrDeletes() { + advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index fc5248a5c33..f5f8d70925f 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -511,6 +511,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); + if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + // TODO: Enable this assertion after we replicate max_seq_no_updates during replication + // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : + // indexSettings.getIndexVersionCreated(); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + } replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -1321,7 +1327,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl translogRecoveryStats::incrementRecoveredOperations); }; innerOpenEngineAndTranslog(); - getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); + final Engine engine = getEngine(); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** @@ -1947,6 +1955,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex + // If the old primary was on an old version, this primary (was replica before) + // does not have max_of_updates yet. Thus we need to bootstrap it manually. + if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) { + // TODO: Enable this assertion after we replicate max_seq_no_updates during replication + // assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated(); + getEngine().initializeMaxSeqNoOfUpdatesOrDeletes(); + } } } @@ -2718,6 +2733,41 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog }); + // TODO: do not use init method here but use advance with the max_seq_no received from the primary + newEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); newEngine.recoverFromTranslog(translogRunner, globalCheckpoint); } + + /** + * Returns the maximum sequence number of either update or delete operations have been processed in this shard + * or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered + * as an update operation if it overwrites the existing documents in Lucene index with the same document id. + *

+ * The primary captures this value after executes a replication request, then transfers it to a replica before + * executing that replication request on a replica. + */ + public long getMaxSeqNoOfUpdatesOrDeletes() { + return getEngine().getMaxSeqNoOfUpdatesOrDeletes(); + } + + /** + * A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates + * value (piggybacked in a replication request) that it receives from its primary before executing that replication request. + * The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that + * replication request were processed on it. + *

+ * A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from + * the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least + * as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it. + *

+ * These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value + * which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary. + * + * @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object) + * @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long) + */ + public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { + getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); + assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; + } } diff --git a/server/src/main/java/org/elasticsearch/index/translog/Translog.java b/server/src/main/java/org/elasticsearch/index/translog/Translog.java index f17acac3789..0b91de81932 100644 --- a/server/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/server/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -60,6 +60,7 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -1825,6 +1826,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC return translogUUID; } + /** + * Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current + * existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog. + */ + public long getMaxSeqNo() { + try (ReleasableLock ignored = readLock.acquire()) { + ensureOpen(); + final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current)) + .mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max(); + assert maxSeqNo.isPresent() : "must have at least one translog generation"; + return maxSeqNo.getAsLong(); + } + } TranslogWriter getCurrent() { return current; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b2244163530..e77203b83fe 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -662,6 +662,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(engine.config()); engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -679,6 +680,7 @@ public class InternalEngineTests extends EngineTestCase { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); @@ -708,7 +710,8 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(engine); } trimUnsafeCommits(engine.config()); - try (Engine recoveringEngine = new InternalEngine(engine.config())){ + try (Engine recoveringEngine = new InternalEngine(engine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -745,6 +748,7 @@ public class InternalEngineTests extends EngineTestCase { } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -779,6 +783,7 @@ public class InternalEngineTests extends EngineTestCase { initialEngine.close(); trimUnsafeCommits(initialEngine.config()); recoveringEngine = new InternalEngine(initialEngine.config()); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -812,6 +817,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); @@ -819,6 +825,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(config); try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1203,6 +1210,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(config); engine = new InternalEngine(config); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1222,6 +1230,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); trimUnsafeCommits(config); engine = new InternalEngine(config); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); } @@ -2197,7 +2206,8 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(initialEngine.engineConfig); - try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){ + try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2541,6 +2551,7 @@ public class InternalEngineTests extends EngineTestCase { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2558,6 +2569,7 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2572,6 +2584,7 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2678,6 +2691,7 @@ public class InternalEngineTests extends EngineTestCase { } } }) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); @@ -2689,6 +2703,7 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2756,6 +2771,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier)); engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3462,8 +3478,10 @@ public class InternalEngineTests extends EngineTestCase { engine.index(appendOnlyPrimary(doc, true, timestamp1)); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } - try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) { + try (Store store = createStore(newFSDirectory(storeDir)); + InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), @@ -3749,6 +3767,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(initialEngine.config()); try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -3860,6 +3879,7 @@ public class InternalEngineTests extends EngineTestCase { throw new UnsupportedOperationException(); } }; + noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4087,6 +4107,7 @@ public class InternalEngineTests extends EngineTestCase { } trimUnsafeCommits(engineConfig); try (InternalEngine engine = new InternalEngine(engineConfig)) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4134,6 +4155,7 @@ public class InternalEngineTests extends EngineTestCase { trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4170,6 +4192,7 @@ public class InternalEngineTests extends EngineTestCase { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4352,7 +4375,7 @@ public class InternalEngineTests extends EngineTestCase { final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); - try (Engine engine = new InternalEngine(engineConfig) { + try (InternalEngine engine = new InternalEngine(engineConfig) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { // Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog @@ -4363,6 +4386,7 @@ public class InternalEngineTests extends EngineTestCase { super.commitIndexWriter(writer, translog, syncId); } }) { + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -5032,6 +5056,34 @@ public class InternalEngineTests extends EngineTestCase { expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test")); } + public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception { + engine.close(); + Set liveDocIds = new HashSet<>(); + engine = new InternalEngine(engine.config()); + assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); + int numOps = between(1, 500); + for (int i = 0; i < numOps; i++) { + long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); + ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null); + if (randomBoolean()) { + Engine.IndexResult result = engine.index(indexForDoc(doc)); + if (liveDocIds.add(doc.id()) == false) { + assertThat("update operations on primary must advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); + } else { + assertThat("append operations should not advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates)); + } + } else { + Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get())); + liveDocIds.remove(doc.id()); + assertThat("delete operations on primary must advance max_seq_no_of_updates", + engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo()))); + } + } + } + static void trimUnsafeCommits(EngineConfig config) throws IOException { final Store store = config.getStore(); final TranslogConfig translogConfig = config.getTranslogConfig(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index 4080dd33d53..90469d71944 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -95,6 +95,7 @@ public class ReadOnlyEngineTests extends EngineTestCase { // Close and reopen the main engine InternalEngineTests.trimUnsafeCommits(config); try (InternalEngine recoveringEngine = new InternalEngine(config)) { + recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 96557a00746..e8715f9e8ec 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -941,6 +941,7 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); closeShard(indexShard, false); } @@ -1696,6 +1697,7 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); assertThat(newShard.getReplicationTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId()) .getLocalCheckpoint(), equalTo(totalOps - 1L)); + assertThat(newShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(totalOps - 1L)); assertDocCount(newShard, totalOps); assertThat(newShard.getHistoryUUID(), equalTo(historyUUID)); closeShards(newShard); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 2492ab4cd8a..cbc08b19e8a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -127,6 +127,7 @@ public class RefreshListenersTests extends ESTestCase { new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); + engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index a0e0c481e5f..45bf7a700aa 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -184,7 +184,7 @@ public class TranslogTests extends ESTestCase { markCurrentGenAsCommitted(translog); } - private void commit(Translog translog, long genToRetain, long genToCommit) throws IOException { + private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException { final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy(); deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit); deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain); @@ -192,6 +192,7 @@ public class TranslogTests extends ESTestCase { translog.trimUnreferencedReaders(); assertThat(minGenRequired, equalTo(translog.getMinFileGeneration())); assertFilePresences(translog); + return minGenRequired; } @Override @@ -3054,6 +3055,31 @@ public class TranslogTests extends ESTestCase { misbehavingTranslog.callCloseOnTragicEvent(); } + public void testMaxSeqNo() throws Exception { + Map maxSeqNoPerGeneration = new HashMap<>(); + for (int iterations = between(1, 10), i = 0; i < iterations; i++) { + long startSeqNo = randomLongBetween(0, Integer.MAX_VALUE); + List seqNos = LongStream.range(startSeqNo, startSeqNo + randomInt(100)).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + if (frequently()) { + translog.add(new Translog.Index("test", "id", seqNo, primaryTerm.get(), new byte[]{1})); + maxSeqNoPerGeneration.compute(translog.currentFileGeneration(), + (key, existing) -> existing == null ? seqNo : Math.max(existing, seqNo)); + } + } + translog.rollGeneration(); + } + translog.sync(); + assertThat(translog.getMaxSeqNo(), + equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values()))); + long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration()); + long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream() + .filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue()) + .max().orElse(SequenceNumbers.NO_OPS_PERFORMED); + assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo)); + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null; diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java index 4483ce0d606..ea23ae6308e 100644 --- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java +++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java @@ -239,7 +239,8 @@ public class FlushIT extends ESIntegTestCase { private void indexDoc(Engine engine, String id) throws IOException { final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null); - final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), 1L, doc)); + final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc, + engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false)); assertThat(indexResult.getFailure(), nullValue()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 86f7bd903cc..69a9f51ab69 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -477,6 +477,7 @@ public abstract class EngineTestCase extends ESTestCase { } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); + internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 540b68ee409..370c29740b1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -94,6 +94,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; /** @@ -447,6 +448,7 @@ public abstract class IndexShardTestCase extends ESTestCase { IndexShard shard = shardFunction.apply(primary); if (primary) { recoverShardFromStore(shard); + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(shard.seqNoStats().getMaxSeqNo())); } else { recoveryEmptyReplica(shard, true); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index b3e2d12227b..c9a4de8f03c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -271,6 +271,7 @@ public class FollowingEngineTests extends ESTestCase { store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); + followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; }