Fully encapsulate LocalCheckpointTracker inside of the engine (#31213)

* Fully encapsulate LocalCheckpointTracker inside of the engine

This makes the Engine interface not expose the `LocalCheckpointTracker`, instead
exposing the pieces needed (like retrieving the local checkpoint) as individual
methods.
This commit is contained in:
Lee Hinman 2018-06-08 17:19:41 -06:00 committed by GitHub
parent cdb486ae70
commit bdb0fb2555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 130 additions and 82 deletions

View File

@ -62,7 +62,7 @@ import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
@ -635,11 +635,28 @@ public abstract class Engine implements Closeable {
}
/**
* The sequence number service for this engine.
*
* @return the sequence number service
* @return the local checkpoint for this Engine
*/
public abstract LocalCheckpointTracker getLocalCheckpointTracker();
public abstract long getLocalCheckpoint();
/**
* Waits for all operations up to the provided sequence number to complete.
*
* @param seqNo the sequence number that the checkpoint must advance to before this method returns
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException;
/**
* Reset the local checkpoint in the tracker to the given local checkpoint
* @param localCheckpoint the new checkpoint to be set
*/
public abstract void resetLocalCheckpoint(long localCheckpoint);
/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
*/
public abstract SeqNoStats getSeqNoStats(long globalCheckpoint);
/**
* Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint)

View File

@ -65,6 +65,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
@ -2185,10 +2186,31 @@ public class InternalEngine extends Engine {
return mergeScheduler.stats();
}
public final LocalCheckpointTracker getLocalCheckpointTracker() {
// Used only for testing! Package private to prevent anyone else from using it
LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
@Override
public long getLocalCheckpoint() {
return localCheckpointTracker.getCheckpoint();
}
@Override
public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}
@Override
public void resetLocalCheckpoint(long localCheckpoint) {
localCheckpointTracker.resetCheckpoint(localCheckpoint);
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
}
/**
* Returns the number of times a version was looked up either from the index.
* Note this is only available if assertions are enabled

View File

@ -405,7 +405,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting;
if (newRouting.primary() && currentRouting.isRelocationTarget() == false) {
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
}
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
@ -479,8 +479,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
engine.rollTranslogGeneration();
engine.fillSeqNoGaps(newPrimaryTerm);
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(),
getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint());
primaryReplicaSyncer.accept(this, new ActionListener<ResyncTask>() {
@Override
public void onResponse(ResyncTask resyncTask) {
@ -506,7 +505,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
},
e -> failShard("exception during primary term transition", e));
replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint());
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
primaryTerm = newPrimaryTerm;
}
}
@ -873,7 +872,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
@Nullable
public SeqNoStats seqNoStats() {
Engine engine = getEngineOrNull();
return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint());
}
public IndexingStats indexingStats(String... types) {
@ -1707,7 +1706,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @throws InterruptedException if the thread was interrupted while blocking on the condition
*/
public void waitForOpsToComplete(final long seqNo) throws InterruptedException {
getEngine().getLocalCheckpointTracker().waitForOpsToComplete(seqNo);
getEngine().waitForOpsToComplete(seqNo);
}
/**
@ -1740,7 +1739,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @return the local checkpoint
*/
public long getLocalCheckpoint() {
return getEngine().getLocalCheckpointTracker().getCheckpoint();
return getEngine().getLocalCheckpoint();
}
/**
@ -1781,7 +1780,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return;
}
// only sync if there are not operations in flight
final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint());
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
final ObjectLongMap<String> globalCheckpoints = getInSyncGlobalCheckpoints();
final String allocationId = routingEntry().allocationId().getId();
@ -1818,7 +1817,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
verifyReplicationTarget();
final long localCheckpoint = getEngine().getLocalCheckpointTracker().getCheckpoint();
final long localCheckpoint = getLocalCheckpoint();
if (globalCheckpoint > localCheckpoint) {
/*
* This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global
@ -1847,8 +1846,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getEngine().getLocalCheckpointTracker().getCheckpoint() ==
primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex
}
@ -2234,7 +2232,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
operationPrimaryTerm,
getLocalCheckpoint(),
localCheckpoint);
getEngine().getLocalCheckpointTracker().resetCheckpoint(localCheckpoint);
getEngine().resetLocalCheckpoint(localCheckpoint);
getEngine().rollTranslogGeneration();
});
globalCheckpointUpdated = true;

View File

@ -62,7 +62,7 @@ final class LocalShardSnapshot implements Closeable {
}
long maxSeqNo() {
return shard.getEngine().getLocalCheckpointTracker().getMaxSeqNo();
return shard.getEngine().getSeqNoStats(-1).getMaxSeqNo();
}
long maxUnsafeAutoIdTimestamp() {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
@ -350,7 +351,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1));
}
final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId);
IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno.
EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(oldPrimaryShard)); // Make gap in seqno.
long moreDocs = scaledRandomIntBetween(1, 10);
for (int i = 0; i < moreDocs; i++) {
IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i));

View File

@ -225,7 +225,7 @@ public class InternalEngineTests extends EngineTestCase {
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
operation = randomBoolean() ?
appendOnlyPrimary(doc, false, 1)
: appendOnlyReplica(doc, false, 1, engine.getLocalCheckpointTracker().generateSeqNo());
: appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine));
engine.index(operation);
assertTrue("safe access should be required", engine.isSafeAccessRequired());
assertEquals(1, engine.getVersionMapSize()); // now we add this to the map
@ -1018,7 +1018,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.index(indexForDoc(doc));
boolean inSync = randomBoolean();
if (inSync) {
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
}
engine.flush();
@ -1036,7 +1036,7 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L));
assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.flush(true, true);
assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L));
assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L));
@ -2058,12 +2058,12 @@ public class InternalEngineTests extends EngineTestCase {
final Engine.DeleteResult result = initialEngine.delete(delete);
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo + 1));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo + 1));
indexedIds.remove(id);
primarySeqNo++;
} else {
assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo));
}
} else {
// index a document
@ -2076,12 +2076,12 @@ public class InternalEngineTests extends EngineTestCase {
final Engine.IndexResult result = initialEngine.index(index);
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo + 1));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo + 1));
indexedIds.add(id);
primarySeqNo++;
} else {
assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo));
}
}
@ -2090,7 +2090,7 @@ public class InternalEngineTests extends EngineTestCase {
replicaLocalCheckpoint = randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo));
}
gcpTracker.updateLocalCheckpoint(primary.allocationId().getId(),
initialEngine.getLocalCheckpointTracker().getCheckpoint());
initialEngine.getLocalCheckpoint());
gcpTracker.updateLocalCheckpoint(replica.allocationId().getId(), replicaLocalCheckpoint);
if (rarely()) {
@ -2103,8 +2103,8 @@ public class InternalEngineTests extends EngineTestCase {
logger.info("localcheckpoint {}, global {}", replicaLocalCheckpoint, primarySeqNo);
globalCheckpoint = gcpTracker.getGlobalCheckpoint();
assertEquals(primarySeqNo, initialEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(primarySeqNo, initialEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(primarySeqNo, initialEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(primarySeqNo, initialEngine.getLocalCheckpoint());
assertThat(globalCheckpoint, equalTo(replicaLocalCheckpoint));
assertThat(
@ -2126,7 +2126,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){
recoveringEngine.recoverFromTranslog();
assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertThat(
Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)),
equalTo(primarySeqNo));
@ -2139,9 +2139,9 @@ public class InternalEngineTests extends EngineTestCase {
// that the committed max seq no is equivalent to what the current primary seq no is, as all data
// we have assigned sequence numbers to should be in the commit
equalTo(primarySeqNo));
assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(primarySeqNo));
assertThat(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(recoveringEngine.getLocalCheckpointTracker().generateSeqNo(), equalTo(primarySeqNo + 1));
assertThat(recoveringEngine.getLocalCheckpoint(), equalTo(primarySeqNo));
assertThat(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo));
assertThat(generateNewSeqNo(recoveringEngine), equalTo(primarySeqNo + 1));
}
}
@ -2444,7 +2444,7 @@ public class InternalEngineTests extends EngineTestCase {
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
@ -2607,7 +2607,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.recoverFromTranslog();
final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
engine.index(indexForDoc(doc1));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
throwErrorOnCommit.set(true);
FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush);
assertThat(e.getCause().getMessage(), equalTo("power's out"));
@ -2665,7 +2665,7 @@ public class InternalEngineTests extends EngineTestCase {
}
public void testTranslogReplay() throws IOException {
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint();
final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpoint();
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
@ -3600,7 +3600,7 @@ public class InternalEngineTests extends EngineTestCase {
final AtomicBoolean stall,
final AtomicLong expectedLocalCheckpoint) {
return (engine, operation) -> {
final long seqNo = engine.getLocalCheckpointTracker().generateSeqNo();
final long seqNo = generateNewSeqNo(engine);
final CountDownLatch latch = latchReference.get();
if (stall.get()) {
try {
@ -3652,8 +3652,8 @@ public class InternalEngineTests extends EngineTestCase {
}
}
assertThat(initialEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint.get()));
assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo((long) (docs - 1)));
assertThat(initialEngine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint.get()));
assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1)));
initialEngine.flush(true, true);
latchReference.get().countDown();
@ -3667,7 +3667,7 @@ public class InternalEngineTests extends EngineTestCase {
try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
recoveringEngine.recoverFromTranslog();
recoveringEngine.fillSeqNoGaps(2);
assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
}
}
@ -3746,7 +3746,7 @@ public class InternalEngineTests extends EngineTestCase {
expectedLocalCheckpoint = numberOfOperations - 1;
}
assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint));
assertThat(engine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint));
try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "type", "2", uid), searcherFactory)) {
assertThat(result.exists(), equalTo(exists));
}
@ -3776,11 +3776,11 @@ public class InternalEngineTests extends EngineTestCase {
final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
final String reason = randomAlphaOfLength(16);
noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason));
assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOpEngine.getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1)));
assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled));
noOpEngine.noOp(
new Engine.NoOp(maxSeqNo + 2, primaryTerm.get(), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY), System.nanoTime(), reason));
assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 2)));
assertThat(noOpEngine.getLocalCheckpoint(), equalTo((long) (maxSeqNo + 2)));
assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled + 1));
// skip to the op that we added to the translog
Translog.Operation op;
@ -3933,17 +3933,17 @@ public class InternalEngineTests extends EngineTestCase {
actualEngine.rollTranslogGeneration();
}
}
final long currentLocalCheckpoint = actualEngine.getLocalCheckpointTracker().getCheckpoint();
final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint();
final long resetLocalCheckpoint =
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint));
actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint);
actualEngine.resetLocalCheckpoint(resetLocalCheckpoint);
completedSeqNos.clear();
actualEngine.restoreLocalCheckpointFromTranslog();
final Set<Long> intersection = new HashSet<>(expectedCompletedSeqNos);
intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet()));
assertThat(completedSeqNos, equalTo(intersection));
assertThat(actualEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(currentLocalCheckpoint));
assertThat(actualEngine.getLocalCheckpointTracker().generateSeqNo(), equalTo((long) operations));
assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint));
assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations));
} finally {
IOUtils.close(actualEngine);
}
@ -3967,7 +3967,7 @@ public class InternalEngineTests extends EngineTestCase {
replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false));
}
}
checkpointOnReplica = replicaEngine.getLocalCheckpointTracker().getCheckpoint();
checkpointOnReplica = replicaEngine.getLocalCheckpoint();
} finally {
IOUtils.close(replicaEngine);
}
@ -3977,16 +3977,16 @@ public class InternalEngineTests extends EngineTestCase {
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Engine recoveringEngine = null;
try {
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(docs - 1, engine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(docs - 1, engine.getLocalCheckpoint());
assertEquals(maxSeqIDOnReplica, replicaEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint());
trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations());
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
// now snapshot the tlog and ensure the primary term is updated
@ -4001,10 +4001,10 @@ public class InternalEngineTests extends EngineTestCase {
}
}
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
if ((flushed = randomBoolean())) {
globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
recoveringEngine.getTranslog().sync();
recoveringEngine.flush(true, true);
}
@ -4021,11 +4021,11 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
}
recoveringEngine.recoverFromTranslog();
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
} finally {
IOUtils.close(recoveringEngine);
}
@ -4208,7 +4208,7 @@ public class InternalEngineTests extends EngineTestCase {
// Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog
// (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService.
if (rarely()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), getLocalCheckpointTracker().getCheckpoint()));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), getLocalCheckpoint()));
}
super.commitIndexWriter(writer, translog, syncId);
}
@ -4220,7 +4220,7 @@ public class InternalEngineTests extends EngineTestCase {
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null)));
if (frequently()) {
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint()));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.getTranslog().sync();
}
if (frequently()) {
@ -4354,11 +4354,11 @@ public class InternalEngineTests extends EngineTestCase {
engine.flush(false, randomBoolean());
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
// Global checkpoint advanced but not enough - all commits are kept.
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1));
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint() - 1));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits));
// Global checkpoint advanced enough - only the last commit is kept.
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE));
globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE));
engine.syncTranslog();
assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1)));
}
@ -4382,7 +4382,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < numSnapshots; i++) {
snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit.
}
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.syncTranslog();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
for (int i = 0; i < numSnapshots - 1; i++) {
@ -4432,13 +4432,13 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0));
// If the new index commit still points to the same translog generation as the current index commit,
// we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes.
engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here
generateNewSeqNo(engine); // create a gap here
for (int id = 0; id < numDocs; id++) {
if (randomBoolean()) {
translog.rollGeneration();
}
final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false));
engine.index(replicaIndexForDoc(doc, 2L, generateNewSeqNo(engine), false));
if (engine.shouldPeriodicallyFlush()) {
engine.flush();
assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo)));
@ -4459,7 +4459,7 @@ public class InternalEngineTests extends EngineTestCase {
engine.onSettingsChanged();
final int numOps = scaledRandomIntBetween(100, 10_000);
for (int i = 0; i < numOps; i++) {
final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint();
final long localCheckPoint = engine.getLocalCheckpoint();
final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5);
final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqno, false));
@ -4546,9 +4546,9 @@ public class InternalEngineTests extends EngineTestCase {
}
final long deleteBatch = between(10, 20);
final long gapSeqNo = randomLongBetween(
engine.getLocalCheckpointTracker().getMaxSeqNo() + 1, engine.getLocalCheckpointTracker().getMaxSeqNo() + deleteBatch);
engine.getSeqNoStats(-1).getMaxSeqNo() + 1, engine.getSeqNoStats(-1).getMaxSeqNo() + deleteBatch);
for (int i = 0; i < deleteBatch; i++) {
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final long seqno = generateNewSeqNo(engine);
if (seqno != gapSeqNo) {
if (randomBoolean()) {
clock.incrementAndGet();
@ -4595,7 +4595,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) {
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, generateNewSeqNo(engine)));
} else {
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
}
@ -4612,7 +4612,7 @@ public class InternalEngineTests extends EngineTestCase {
for (int i = 0; i < numOps; i++) {
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
final long seqno = generateNewSeqNo(engine);
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
if (randomBoolean()) {
engine.index(doc);
@ -4631,7 +4631,7 @@ public class InternalEngineTests extends EngineTestCase {
}
appendOnlyIndexer.join(120_000);
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
globalCheckpoint.set(engine.getLocalCheckpoint());
engine.syncTranslog();
engine.flush();
}
@ -4643,15 +4643,14 @@ public class InternalEngineTests extends EngineTestCase {
public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
long lookupTimes = 0L;
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
final int initDocs = between(0, 10);
for (int i = 0; i < initDocs; i++) {
index(engine, i);
lookupTimes++;
}
// doc1 is delayed and arrived after a non-append-only op.
final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo();
final long seqnoNormalOp = localCheckpointTracker.generateSeqNo();
final long seqNoAppendOnly1 = generateNewSeqNo(engine);
final long seqnoNormalOp = generateNewSeqNo(engine);
if (randomBoolean()) {
engine.index(replicaIndexForDoc(
testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false));
@ -4670,7 +4669,7 @@ public class InternalEngineTests extends EngineTestCase {
// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
false, randomNonNegativeLong(), generateNewSeqNo(engine)));
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
}

View File

@ -75,6 +75,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.Segment;
@ -847,7 +848,7 @@ public class IndexShardTests extends IndexShardTestCase {
recoverReplica(replicaShard, primaryShard);
final int maxSeqNo = randomIntBetween(0, 128);
for (int i = 0; i <= maxSeqNo; i++) {
primaryShard.getEngine().getLocalCheckpointTracker().generateSeqNo();
EngineTestCase.generateNewSeqNo(primaryShard.getEngine());
}
final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo;

View File

@ -77,6 +77,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
@ -3334,7 +3335,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
final Index index = resolveIndex(indexName);
final IndexShard primary = internalCluster().getInstance(IndicesService.class, dataNode).getShardOrNull(new ShardId(index, 0));
// create a gap in the sequence numbers
getEngineFromShard(primary).getLocalCheckpointTracker().generateSeqNo();
EngineTestCase.generateNewSeqNo(getEngineFromShard(primary));
for (int i = 5; i < 10; i++) {
index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i);

View File

@ -386,6 +386,15 @@ public abstract class EngineTestCase extends ESTestCase {
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException;
}
/**
* Generate a new sequence number and return it. Only works on InternalEngines
*/
public static long generateNewSeqNo(final Engine engine) {
assert engine instanceof InternalEngine : "expected InternalEngine, got: " + engine.getClass();
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getLocalCheckpointTracker().generateSeqNo();
}
public static InternalEngine createInternalEngine(
@Nullable final IndexWriterFactory indexWriterFactory,
@Nullable final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,

View File

@ -570,7 +570,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false);
}
shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(),
shard.getEngine().getLocalCheckpointTracker().getCheckpoint());
shard.getLocalCheckpoint());
} else {
result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0,
VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);