Track max seq_no of updates or deletes on primary (#33842)

This PR is the first step to use seq_no to optimize indexing operations.
The idea is to track the max seq_no of either update or delete ops on a
primary, and transfer this information to replicas, and replicas use it
to optimize indexing plan for index operations (with assigned seq_no).

The max_seq_no_of_updates on primary is initialized once when a primary
finishes its local recovery or peer recovery in relocation or being
promoted. After that, the max_seq_no_of_updates is only advanced internally
inside an engine when processing update or delete operations.

Relates #33656
This commit is contained in:
Nhat Nguyen 2018-09-22 08:02:57 -04:00 committed by GitHub
parent 477391d751
commit 7944a0cb25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 217 additions and 7 deletions

View File

@ -97,6 +97,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -136,6 +137,16 @@ public abstract class Engine implements Closeable {
*/
protected volatile long lastWriteNanos = System.nanoTime();
/*
* This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine.
* An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index.
* This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized.
* The value of this marker never goes backwards, and is updated/changed differently on primary and replica:
* 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete.
* 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes).
*/
private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
protected Engine(EngineConfig engineConfig) {
Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine");
@ -1781,4 +1792,31 @@ public abstract class Engine implements Closeable {
public interface TranslogRecoveryRunner {
int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
}
/**
* Returns the maximum sequence number of either update or delete operations have been processed in this engine
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
*
* @see #initializeMaxSeqNoOfUpdatesOrDeletes()
* @see #advanceMaxSeqNoOfUpdatesOrDeletes(long)
*/
public final long getMaxSeqNoOfUpdatesOrDeletes() {
return maxSeqNoOfUpdatesOrDeletes.get();
}
/**
* A primary shard calls this method once to initialize the max_seq_no_of_updates marker using the
* max_seq_no from Lucene index and translog before replaying the local translog in its local recovery.
*/
public abstract void initializeMaxSeqNoOfUpdatesOrDeletes();
/**
* A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method
* to advance this marker to at least the given sequence number.
*/
public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo));
assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo;
}
}

View File

@ -385,6 +385,7 @@ public class InternalEngine extends Engine {
flushLock.lock();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized";
if (pendingTranslogRecovery.get() == false) {
throw new IllegalStateException("Engine has already been recovered");
}
@ -918,6 +919,7 @@ public class InternalEngine extends Engine {
protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
@ -952,6 +954,10 @@ public class InternalEngine extends Engine {
);
}
}
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoForIndexing);
}
return plan;
}
@ -1242,6 +1248,7 @@ public class InternalEngine extends Engine {
protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete);
assert incrementVersionLookup();
@ -1263,6 +1270,7 @@ public class InternalEngine extends Engine {
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
advanceMaxSeqNoOfUpdatesOrDeletes(plan.seqNoOfDeletion);
}
return plan;
}
@ -2548,4 +2556,12 @@ public class InternalEngine extends Engine {
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
}
@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
assert getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO :
"max_seq_no_of_updates is already initialized [" + getMaxSeqNoOfUpdatesOrDeletes() + "]";
final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo());
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo);
}
}

View File

@ -379,4 +379,9 @@ public final class ReadOnlyEngine extends Engine {
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {
}
@Override
public void initializeMaxSeqNoOfUpdatesOrDeletes() {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo());
}
}

View File

@ -511,6 +511,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) :
// indexSettings.getIndexVersionCreated();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
}
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
@ -1321,7 +1327,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogRecoveryStats::incrementRecoveredOperations);
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
final Engine engine = getEngine();
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE);
}
/**
@ -1947,6 +1955,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
// If the old primary was on an old version, this primary (was replica before)
// does not have max_of_updates yet. Thus we need to bootstrap it manually.
if (getMaxSeqNoOfUpdatesOrDeletes() == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// TODO: Enable this assertion after we replicate max_seq_no_updates during replication
// assert indexSettings.getIndexVersionCreated().before(Version.V_7_0_0_alpha1) : indexSettings.getIndexVersionCreated();
getEngine().initializeMaxSeqNoOfUpdatesOrDeletes();
}
}
}
@ -2718,6 +2733,41 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
// TODO: add a dedicate recovery stats for the reset translog
});
// TODO: do not use init method here but use advance with the max_seq_no received from the primary
newEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
newEngine.recoverFromTranslog(translogRunner, globalCheckpoint);
}
/**
* Returns the maximum sequence number of either update or delete operations have been processed in this shard
* or the sequence number from {@link #advanceMaxSeqNoOfUpdatesOrDeletes(long)}. An index request is considered
* as an update operation if it overwrites the existing documents in Lucene index with the same document id.
* <p>
* The primary captures this value after executes a replication request, then transfers it to a replica before
* executing that replication request on a replica.
*/
public long getMaxSeqNoOfUpdatesOrDeletes() {
return getEngine().getMaxSeqNoOfUpdatesOrDeletes();
}
/**
* A replica calls this method to advance the max_seq_no_of_updates marker of its engine to at least the max_seq_no_of_updates
* value (piggybacked in a replication request) that it receives from its primary before executing that replication request.
* The receiving value is at least as high as the max_seq_no_of_updates on the primary was when any of the operations of that
* replication request were processed on it.
* <p>
* A replica shard also calls this method to bootstrap the max_seq_no_of_updates marker with the value that it received from
* the primary in peer-recovery, before it replays remote translog operations from the primary. The receiving value is at least
* as high as the max_seq_no_of_updates on the primary was when any of these operations were processed on it.
* <p>
* These transfers guarantee that every index/delete operation when executing on a replica engine will observe this marker a value
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, ActionListener, String, Object)
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo;
}
}

View File

@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -1825,6 +1826,19 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
return translogUUID;
}
/**
* Returns the max seq_no of translog operations found in this translog. Since this value is calculated based on the current
* existing readers, this value is not necessary to be the max seq_no of all operations have been stored in this translog.
*/
public long getMaxSeqNo() {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final OptionalLong maxSeqNo = Stream.concat(readers.stream(), Stream.of(current))
.mapToLong(reader -> reader.getCheckpoint().maxSeqNo).max();
assert maxSeqNo.isPresent() : "must have at least one translog generation";
return maxSeqNo.getAsLong();
}
}
TranslogWriter getCurrent() {
return current;

View File

@ -662,6 +662,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
assertThat(counter.get(), equalTo(2));
@ -679,6 +680,7 @@ public class InternalEngineTests extends EngineTestCase {
engine = new InternalEngine(engine.config());
expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
assertTrue(engine.isRecovering());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertFalse(engine.isRecovering());
doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
@ -708,7 +710,8 @@ public class InternalEngineTests extends EngineTestCase {
IOUtils.close(engine);
}
trimUnsafeCommits(engine.config());
try (Engine recoveringEngine = new InternalEngine(engine.config())){
try (Engine recoveringEngine = new InternalEngine(engine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
@ -745,6 +748,7 @@ public class InternalEngineTests extends EngineTestCase {
}
};
assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertTrue(committed.get());
} finally {
@ -779,6 +783,7 @@ public class InternalEngineTests extends EngineTestCase {
initialEngine.close();
trimUnsafeCommits(initialEngine.config());
recoveringEngine = new InternalEngine(initialEngine.config());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
@ -812,6 +817,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
@ -819,6 +825,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, upToSeqNo);
assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
@ -1203,6 +1210,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
}
@ -1222,6 +1230,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
trimUnsafeCommits(config);
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
}
@ -2197,7 +2206,8 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(initialEngine.engineConfig);
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
@ -2541,6 +2551,7 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2558,6 +2569,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
@ -2572,6 +2584,7 @@ public class InternalEngineTests extends EngineTestCase {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2678,6 +2691,7 @@ public class InternalEngineTests extends EngineTestCase {
}
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
@ -2689,6 +2703,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier))) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, 1);
final long committedGen = Long.valueOf(
@ -2756,6 +2771,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.close();
trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertVisibleCount(engine, numDocs, false);
@ -3462,8 +3478,10 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(appendOnlyPrimary(doc, true, timestamp1));
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) {
try (Store store = createStore(newFSDirectory(storeDir));
InternalEngine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
@ -3749,6 +3767,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(initialEngine.config());
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
@ -3860,6 +3879,7 @@ public class InternalEngineTests extends EngineTestCase {
throw new UnsupportedOperationException();
}
};
noOpEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = "filling gaps";
@ -4087,6 +4107,7 @@ public class InternalEngineTests extends EngineTestCase {
}
trimUnsafeCommits(engineConfig);
try (InternalEngine engine = new InternalEngine(engineConfig)) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, globalCheckpoint.get());
engine.restoreLocalHistoryFromTranslog(translogHandler);
assertThat(getDocIds(engine, true), equalTo(prevDocs));
@ -4134,6 +4155,7 @@ public class InternalEngineTests extends EngineTestCase {
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4170,6 +4192,7 @@ public class InternalEngineTests extends EngineTestCase {
if (flushed) {
assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
@ -4352,7 +4375,7 @@ public class InternalEngineTests extends EngineTestCase {
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null,
() -> globalCheckpoint.get());
try (Engine engine = new InternalEngine(engineConfig) {
try (InternalEngine engine = new InternalEngine(engineConfig) {
@Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
@ -4363,6 +4386,7 @@ public class InternalEngineTests extends EngineTestCase {
super.commitIndexWriter(writer, translog, syncId);
}
}) {
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
int numDocs = scaledRandomIntBetween(10, 100);
for (int docId = 0; docId < numDocs; docId++) {
@ -5032,6 +5056,34 @@ public class InternalEngineTests extends EngineTestCase {
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
}
public void testTrackMaxSeqNoOfUpdatesOrDeletesOnPrimary() throws Exception {
engine.close();
Set<String> liveDocIds = new HashSet<>();
engine = new InternalEngine(engine.config());
assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L));
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
int numOps = between(1, 500);
for (int i = 0; i < numOps; i++) {
long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes();
ParsedDocument doc = createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
Engine.IndexResult result = engine.index(indexForDoc(doc));
if (liveDocIds.add(doc.id()) == false) {
assertThat("update operations on primary must advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo())));
} else {
assertThat("append operations should not advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(currentMaxSeqNoOfUpdates));
}
} else {
Engine.DeleteResult result = engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocIds.remove(doc.id());
assertThat("delete operations on primary must advance max_seq_no_of_updates",
engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, result.getSeqNo())));
}
}
}
static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();

View File

@ -95,6 +95,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
// Close and reopen the main engine
InternalEngineTests.trimUnsafeCommits(config);
try (InternalEngine recoveringEngine = new InternalEngine(config)) {
recoveringEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
// the locked down engine should still point to the previous commit
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));

View File

@ -941,6 +941,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback));
assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo));
closeShard(indexShard, false);
}
@ -1696,6 +1697,7 @@ public class IndexShardTests extends IndexShardTestCase {
assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L));
assertThat(newShard.getReplicationTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId())
.getLocalCheckpoint(), equalTo(totalOps - 1L));
assertThat(newShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(totalOps - 1L));
assertDocCount(newShard, totalOps);
assertThat(newShard.getHistoryUUID(), equalTo(historyUUID));
closeShards(newShard);

View File

@ -127,6 +127,7 @@ public class RefreshListenersTests extends ESTestCase {
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
}

View File

@ -184,7 +184,7 @@ public class TranslogTests extends ESTestCase {
markCurrentGenAsCommitted(translog);
}
private void commit(Translog translog, long genToRetain, long genToCommit) throws IOException {
private long commit(Translog translog, long genToRetain, long genToCommit) throws IOException {
final TranslogDeletionPolicy deletionPolicy = translog.getDeletionPolicy();
deletionPolicy.setTranslogGenerationOfLastCommit(genToCommit);
deletionPolicy.setMinTranslogGenerationForRecovery(genToRetain);
@ -192,6 +192,7 @@ public class TranslogTests extends ESTestCase {
translog.trimUnreferencedReaders();
assertThat(minGenRequired, equalTo(translog.getMinFileGeneration()));
assertFilePresences(translog);
return minGenRequired;
}
@Override
@ -3054,6 +3055,31 @@ public class TranslogTests extends ESTestCase {
misbehavingTranslog.callCloseOnTragicEvent();
}
public void testMaxSeqNo() throws Exception {
Map<Long, Long> maxSeqNoPerGeneration = new HashMap<>();
for (int iterations = between(1, 10), i = 0; i < iterations; i++) {
long startSeqNo = randomLongBetween(0, Integer.MAX_VALUE);
List<Long> seqNos = LongStream.range(startSeqNo, startSeqNo + randomInt(100)).boxed().collect(Collectors.toList());
Randomness.shuffle(seqNos);
for (long seqNo : seqNos) {
if (frequently()) {
translog.add(new Translog.Index("test", "id", seqNo, primaryTerm.get(), new byte[]{1}));
maxSeqNoPerGeneration.compute(translog.currentFileGeneration(),
(key, existing) -> existing == null ? seqNo : Math.max(existing, seqNo));
}
}
translog.rollGeneration();
}
translog.sync();
assertThat(translog.getMaxSeqNo(),
equalTo(maxSeqNoPerGeneration.isEmpty() ? SequenceNumbers.NO_OPS_PERFORMED : Collections.max(maxSeqNoPerGeneration.values())));
long minRetainedGen = commit(translog, randomLongBetween(1, translog.currentFileGeneration()), translog.currentFileGeneration());
long expectedMaxSeqNo = maxSeqNoPerGeneration.entrySet().stream()
.filter(e -> e.getKey() >= minRetainedGen).mapToLong(e -> e.getValue())
.max().orElse(SequenceNumbers.NO_OPS_PERFORMED);
assertThat(translog.getMaxSeqNo(), equalTo(expectedMaxSeqNo));
}
static class SortedSnapshot implements Translog.Snapshot {
private final Translog.Snapshot snapshot;
private List<Translog.Operation> operations = null;

View File

@ -239,7 +239,8 @@ public class FlushIT extends ESIntegTestCase {
private void indexDoc(Engine engine, String id) throws IOException {
final ParsedDocument doc = InternalEngineTests.createParsedDoc(id, null);
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), 1L, doc));
final Engine.IndexResult indexResult = engine.index(new Engine.Index(new Term("_id", Uid.encodeId(doc.id())), doc,
engine.getLocalCheckpoint() + 1, 1L, 1L, null, Engine.Operation.Origin.REPLICA, randomLong(), -1L, false));
assertThat(indexResult.getFailure(), nullValue());
}

View File

@ -477,6 +477,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return internalEngine;
}

View File

@ -94,6 +94,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
/**
@ -447,6 +448,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
IndexShard shard = shardFunction.apply(primary);
if (primary) {
recoverShardFromStore(shard);
assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(shard.seqNoStats().getMaxSeqNo()));
} else {
recoveryEmptyReplica(shard, true);
}

View File

@ -271,6 +271,7 @@ public class FollowingEngineTests extends ESTestCase {
store.associateIndexWithNewTranslog(translogUuid);
FollowingEngine followingEngine = new FollowingEngine(config);
TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
followingEngine.initializeMaxSeqNoOfUpdatesOrDeletes();
followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
return followingEngine;
}