diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 5283975be7b..0b6eea26e03 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -62,7 +62,7 @@ import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.merge.MergeStats; -import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -635,11 +635,28 @@ public abstract class Engine implements Closeable { } /** - * The sequence number service for this engine. - * - * @return the sequence number service + * @return the local checkpoint for this Engine */ - public abstract LocalCheckpointTracker getLocalCheckpointTracker(); + public abstract long getLocalCheckpoint(); + + /** + * Waits for all operations up to the provided sequence number to complete. + * + * @param seqNo the sequence number that the checkpoint must advance to before this method returns + * @throws InterruptedException if the thread was interrupted while blocking on the condition + */ + public abstract void waitForOpsToComplete(long seqNo) throws InterruptedException; + + /** + * Reset the local checkpoint in the tracker to the given local checkpoint + * @param localCheckpoint the new checkpoint to be set + */ + public abstract void resetLocalCheckpoint(long localCheckpoint); + + /** + * @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint + */ + public abstract SeqNoStats getSeqNoStats(long globalCheckpoint); /** * Returns the latest global checkpoint value that has been persisted in the underlying storage (i.e. translog's checkpoint) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 88e71608452..53f209ccf63 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -65,6 +65,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; import org.elasticsearch.index.seqno.LocalCheckpointTracker; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; @@ -2185,10 +2186,31 @@ public class InternalEngine extends Engine { return mergeScheduler.stats(); } - public final LocalCheckpointTracker getLocalCheckpointTracker() { + // Used only for testing! Package private to prevent anyone else from using it + LocalCheckpointTracker getLocalCheckpointTracker() { return localCheckpointTracker; } + @Override + public long getLocalCheckpoint() { + return localCheckpointTracker.getCheckpoint(); + } + + @Override + public void waitForOpsToComplete(long seqNo) throws InterruptedException { + localCheckpointTracker.waitForOpsToComplete(seqNo); + } + + @Override + public void resetLocalCheckpoint(long localCheckpoint) { + localCheckpointTracker.resetCheckpoint(localCheckpoint); + } + + @Override + public SeqNoStats getSeqNoStats(long globalCheckpoint) { + return localCheckpointTracker.getStats(globalCheckpoint); + } + /** * Returns the number of times a version was looked up either from the index. * Note this is only available if assertions are enabled diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8583b6b4c9b..34230be14cb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -405,7 +405,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert currentRouting.active() == false : "we are in POST_RECOVERY, but our shard routing is active " + currentRouting; if (newRouting.primary() && currentRouting.isRelocationTarget() == false) { - replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); } changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); @@ -479,8 +479,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); - replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), - getEngine().getLocalCheckpointTracker().getCheckpoint()); + replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getLocalCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override public void onResponse(ResyncTask resyncTask) { @@ -506,7 +505,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } }, e -> failShard("exception during primary term transition", e)); - replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); + replicationTracker.activatePrimaryMode(getLocalCheckpoint()); primaryTerm = newPrimaryTerm; } } @@ -873,7 +872,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Nullable public SeqNoStats seqNoStats() { Engine engine = getEngineOrNull(); - return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint()); + return engine == null ? null : engine.getSeqNoStats(replicationTracker.getGlobalCheckpoint()); } public IndexingStats indexingStats(String... types) { @@ -1707,7 +1706,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @throws InterruptedException if the thread was interrupted while blocking on the condition */ public void waitForOpsToComplete(final long seqNo) throws InterruptedException { - getEngine().getLocalCheckpointTracker().waitForOpsToComplete(seqNo); + getEngine().waitForOpsToComplete(seqNo); } /** @@ -1740,7 +1739,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @return the local checkpoint */ public long getLocalCheckpoint() { - return getEngine().getLocalCheckpointTracker().getCheckpoint(); + return getEngine().getLocalCheckpoint(); } /** @@ -1781,7 +1780,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return; } // only sync if there are not operations in flight - final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint()); + final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) { final ObjectLongMap globalCheckpoints = getInSyncGlobalCheckpoints(); final String allocationId = routingEntry().allocationId().getId(); @@ -1818,7 +1817,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) { verifyReplicationTarget(); - final long localCheckpoint = getEngine().getLocalCheckpointTracker().getCheckpoint(); + final long localCheckpoint = getLocalCheckpoint(); if (globalCheckpoint > localCheckpoint) { /* * This can happen during recovery when the shard has started its engine but recovery is not finalized and is receiving global @@ -1847,8 +1846,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl verifyPrimary(); assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) && - getEngine().getLocalCheckpointTracker().getCheckpoint() == - primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); + getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex } @@ -2234,7 +2232,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl operationPrimaryTerm, getLocalCheckpoint(), localCheckpoint); - getEngine().getLocalCheckpointTracker().resetCheckpoint(localCheckpoint); + getEngine().resetLocalCheckpoint(localCheckpoint); getEngine().rollTranslogGeneration(); }); globalCheckpointUpdated = true; diff --git a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index d7105c0c14d..09391c9bc96 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/server/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -62,7 +62,7 @@ final class LocalShardSnapshot implements Closeable { } long maxSeqNo() { - return shard.getEngine().getLocalCheckpointTracker().getMaxSeqNo(); + return shard.getEngine().getSeqNoStats(-1).getMaxSeqNo(); } long maxUnsafeAutoIdTimestamp() { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 407212936d1..90173455c3b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -350,7 +351,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { assertThat(indexResult.getShardInfo().getSuccessful(), equalTo(numberOfReplicas + 1)); } final IndexShard oldPrimaryShard = internalCluster().getInstance(IndicesService.class, oldPrimary).getShardOrNull(shardId); - IndexShardTestCase.getEngine(oldPrimaryShard).getLocalCheckpointTracker().generateSeqNo(); // Make gap in seqno. + EngineTestCase.generateNewSeqNo(IndexShardTestCase.getEngine(oldPrimaryShard)); // Make gap in seqno. long moreDocs = scaledRandomIntBetween(1, 10); for (int i = 0; i < moreDocs; i++) { IndexResponse indexResult = index("test", "doc", Long.toString(numDocs + i)); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 26da424460e..d67148dbff2 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -225,7 +225,7 @@ public class InternalEngineTests extends EngineTestCase { new BytesArray("{}".getBytes(Charset.defaultCharset())), null); operation = randomBoolean() ? appendOnlyPrimary(doc, false, 1) - : appendOnlyReplica(doc, false, 1, engine.getLocalCheckpointTracker().generateSeqNo()); + : appendOnlyReplica(doc, false, 1, generateNewSeqNo(engine)); engine.index(operation); assertTrue("safe access should be required", engine.isSafeAccessRequired()); assertEquals(1, engine.getVersionMapSize()); // now we add this to the map @@ -1018,7 +1018,7 @@ public class InternalEngineTests extends EngineTestCase { engine.index(indexForDoc(doc)); boolean inSync = randomBoolean(); if (inSync) { - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); } engine.flush(); @@ -1036,7 +1036,7 @@ public class InternalEngineTests extends EngineTestCase { assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(inSync ? 4L : 1L)); assertThat(engine.getTranslog().getDeletionPolicy().getTranslogGenerationOfLastCommit(), equalTo(4L)); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); engine.flush(true, true); assertThat(engine.getTranslog().currentFileGeneration(), equalTo(5L)); assertThat(engine.getTranslog().getDeletionPolicy().getMinTranslogGenerationForRecovery(), equalTo(5L)); @@ -2058,12 +2058,12 @@ public class InternalEngineTests extends EngineTestCase { final Engine.DeleteResult result = initialEngine.delete(delete); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1)); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo + 1)); + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo + 1)); indexedIds.remove(id); primarySeqNo++; } else { assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo)); } } else { // index a document @@ -2076,12 +2076,12 @@ public class InternalEngineTests extends EngineTestCase { final Engine.IndexResult result = initialEngine.index(index); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assertThat(result.getSeqNo(), equalTo(primarySeqNo + 1)); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo + 1)); + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo + 1)); indexedIds.add(id); primarySeqNo++; } else { assertThat(result.getSeqNo(), equalTo(SequenceNumbers.UNASSIGNED_SEQ_NO)); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo)); } } @@ -2090,7 +2090,7 @@ public class InternalEngineTests extends EngineTestCase { replicaLocalCheckpoint = randomIntBetween(Math.toIntExact(replicaLocalCheckpoint), Math.toIntExact(primarySeqNo)); } gcpTracker.updateLocalCheckpoint(primary.allocationId().getId(), - initialEngine.getLocalCheckpointTracker().getCheckpoint()); + initialEngine.getLocalCheckpoint()); gcpTracker.updateLocalCheckpoint(replica.allocationId().getId(), replicaLocalCheckpoint); if (rarely()) { @@ -2103,8 +2103,8 @@ public class InternalEngineTests extends EngineTestCase { logger.info("localcheckpoint {}, global {}", replicaLocalCheckpoint, primarySeqNo); globalCheckpoint = gcpTracker.getGlobalCheckpoint(); - assertEquals(primarySeqNo, initialEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(primarySeqNo, initialEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(primarySeqNo, initialEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(primarySeqNo, initialEngine.getLocalCheckpoint()); assertThat(globalCheckpoint, equalTo(replicaLocalCheckpoint)); assertThat( @@ -2126,7 +2126,7 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){ recoveringEngine.recoverFromTranslog(); - assertEquals(primarySeqNo, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); + assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertThat( Long.parseLong(recoveringEngine.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)), equalTo(primarySeqNo)); @@ -2139,9 +2139,9 @@ public class InternalEngineTests extends EngineTestCase { // that the committed max seq no is equivalent to what the current primary seq no is, as all data // we have assigned sequence numbers to should be in the commit equalTo(primarySeqNo)); - assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(primarySeqNo)); - assertThat(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(primarySeqNo)); - assertThat(recoveringEngine.getLocalCheckpointTracker().generateSeqNo(), equalTo(primarySeqNo + 1)); + assertThat(recoveringEngine.getLocalCheckpoint(), equalTo(primarySeqNo)); + assertThat(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo(primarySeqNo)); + assertThat(generateNewSeqNo(recoveringEngine), equalTo(primarySeqNo + 1)); } } @@ -2444,7 +2444,7 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine engine = createEngine(config)) { engine.index(firstIndexRequest); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2607,7 +2607,7 @@ public class InternalEngineTests extends EngineTestCase { engine.recoverFromTranslog(); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); engine.index(indexForDoc(doc1)); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); throwErrorOnCommit.set(true); FlushFailedEngineException e = expectThrows(FlushFailedEngineException.class, engine::flush); assertThat(e.getCause().getMessage(), equalTo("power's out")); @@ -2665,7 +2665,7 @@ public class InternalEngineTests extends EngineTestCase { } public void testTranslogReplay() throws IOException { - final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpointTracker().getCheckpoint(); + final LongSupplier inSyncGlobalCheckpointSupplier = () -> this.engine.getLocalCheckpoint(); final int numDocs = randomIntBetween(1, 10); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); @@ -3600,7 +3600,7 @@ public class InternalEngineTests extends EngineTestCase { final AtomicBoolean stall, final AtomicLong expectedLocalCheckpoint) { return (engine, operation) -> { - final long seqNo = engine.getLocalCheckpointTracker().generateSeqNo(); + final long seqNo = generateNewSeqNo(engine); final CountDownLatch latch = latchReference.get(); if (stall.get()) { try { @@ -3652,8 +3652,8 @@ public class InternalEngineTests extends EngineTestCase { } } - assertThat(initialEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint.get())); - assertThat(initialEngine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo((long) (docs - 1))); + assertThat(initialEngine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint.get())); + assertThat(initialEngine.getSeqNoStats(-1).getMaxSeqNo(), equalTo((long) (docs - 1))); initialEngine.flush(true, true); latchReference.get().countDown(); @@ -3667,7 +3667,7 @@ public class InternalEngineTests extends EngineTestCase { try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { recoveringEngine.recoverFromTranslog(); recoveringEngine.fillSeqNoGaps(2); - assertThat(recoveringEngine.getLocalCheckpointTracker().getCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); + assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); } } @@ -3746,7 +3746,7 @@ public class InternalEngineTests extends EngineTestCase { expectedLocalCheckpoint = numberOfOperations - 1; } - assertThat(engine.getLocalCheckpointTracker().getCheckpoint(), equalTo(expectedLocalCheckpoint)); + assertThat(engine.getLocalCheckpoint(), equalTo(expectedLocalCheckpoint)); try (Engine.GetResult result = engine.get(new Engine.Get(true, false, "type", "2", uid), searcherFactory)) { assertThat(result.exists(), equalTo(exists)); } @@ -3776,11 +3776,11 @@ public class InternalEngineTests extends EngineTestCase { final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = randomAlphaOfLength(16); noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason)); - assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 1))); + assertThat(noOpEngine.getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled)); noOpEngine.noOp( new Engine.NoOp(maxSeqNo + 2, primaryTerm.get(), randomFrom(PRIMARY, REPLICA, PEER_RECOVERY), System.nanoTime(), reason)); - assertThat(noOpEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo((long) (maxSeqNo + 2))); + assertThat(noOpEngine.getLocalCheckpoint(), equalTo((long) (maxSeqNo + 2))); assertThat(noOpEngine.getTranslog().stats().getUncommittedOperations(), equalTo(gapsFilled + 1)); // skip to the op that we added to the translog Translog.Operation op; @@ -3933,17 +3933,17 @@ public class InternalEngineTests extends EngineTestCase { actualEngine.rollTranslogGeneration(); } } - final long currentLocalCheckpoint = actualEngine.getLocalCheckpointTracker().getCheckpoint(); + final long currentLocalCheckpoint = actualEngine.getLocalCheckpoint(); final long resetLocalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(currentLocalCheckpoint)); - actualEngine.getLocalCheckpointTracker().resetCheckpoint(resetLocalCheckpoint); + actualEngine.resetLocalCheckpoint(resetLocalCheckpoint); completedSeqNos.clear(); actualEngine.restoreLocalCheckpointFromTranslog(); final Set intersection = new HashSet<>(expectedCompletedSeqNos); intersection.retainAll(LongStream.range(resetLocalCheckpoint + 1, operations).boxed().collect(Collectors.toSet())); assertThat(completedSeqNos, equalTo(intersection)); - assertThat(actualEngine.getLocalCheckpointTracker().getCheckpoint(), equalTo(currentLocalCheckpoint)); - assertThat(actualEngine.getLocalCheckpointTracker().generateSeqNo(), equalTo((long) operations)); + assertThat(actualEngine.getLocalCheckpoint(), equalTo(currentLocalCheckpoint)); + assertThat(generateNewSeqNo(actualEngine), equalTo((long) operations)); } finally { IOUtils.close(actualEngine); } @@ -3967,7 +3967,7 @@ public class InternalEngineTests extends EngineTestCase { replicaEngine.index(replicaIndexForDoc(doc, 1, indexResult.getSeqNo(), false)); } } - checkpointOnReplica = replicaEngine.getLocalCheckpointTracker().getCheckpoint(); + checkpointOnReplica = replicaEngine.getLocalCheckpoint(); } finally { IOUtils.close(replicaEngine); } @@ -3977,16 +3977,16 @@ public class InternalEngineTests extends EngineTestCase { AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); Engine recoveringEngine = null; try { - assertEquals(docs - 1, engine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(docs - 1, engine.getLocalCheckpointTracker().getCheckpoint()); - assertEquals(maxSeqIDOnReplica, replicaEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(docs - 1, engine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(docs - 1, engine.getLocalCheckpoint()); + assertEquals(maxSeqIDOnReplica, replicaEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint()); trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get)); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, recoveringEngine.getTranslog().stats().getUncommittedOperations()); recoveringEngine.recoverFromTranslog(); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2)); // now snapshot the tlog and ensure the primary term is updated @@ -4001,10 +4001,10 @@ public class InternalEngineTests extends EngineTestCase { } } - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); if ((flushed = randomBoolean())) { - globalCheckpoint.set(recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); + globalCheckpoint.set(recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); recoveringEngine.getTranslog().sync(); recoveringEngine.flush(true, true); } @@ -4021,11 +4021,11 @@ public class InternalEngineTests extends EngineTestCase { assertThat(recoveringEngine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); } recoveringEngine.recoverFromTranslog(); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); assertEquals(0, recoveringEngine.fillSeqNoGaps(3)); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpointTracker().getCheckpoint()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); + assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); } finally { IOUtils.close(recoveringEngine); } @@ -4208,7 +4208,7 @@ public class InternalEngineTests extends EngineTestCase { // Advance the global checkpoint during the flush to create a lag between a persisted global checkpoint in the translog // (this value is visible to the deletion policy) and an in memory global checkpoint in the SequenceNumbersService. if (rarely()) { - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), getLocalCheckpointTracker().getCheckpoint())); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), getLocalCheckpoint())); } super.commitIndexWriter(writer, translog, syncId); } @@ -4220,7 +4220,7 @@ public class InternalEngineTests extends EngineTestCase { document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); engine.index(indexForDoc(testParsedDocument(Integer.toString(docId), null, document, B_1, null))); if (frequently()) { - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint())); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); engine.getTranslog().sync(); } if (frequently()) { @@ -4354,11 +4354,11 @@ public class InternalEngineTests extends EngineTestCase { engine.flush(false, randomBoolean()); List commits = DirectoryReader.listCommits(store.directory()); // Global checkpoint advanced but not enough - all commits are kept. - globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpointTracker().getCheckpoint() - 1)); + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint() - 1)); engine.syncTranslog(); assertThat(DirectoryReader.listCommits(store.directory()), equalTo(commits)); // Global checkpoint advanced enough - only the last commit is kept. - globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpointTracker().getCheckpoint(), Long.MAX_VALUE)); + globalCheckpoint.set(randomLongBetween(engine.getLocalCheckpoint(), Long.MAX_VALUE)); engine.syncTranslog(); assertThat(DirectoryReader.listCommits(store.directory()), contains(commits.get(commits.size() - 1))); } @@ -4382,7 +4382,7 @@ public class InternalEngineTests extends EngineTestCase { for (int i = 0; i < numSnapshots; i++) { snapshots.add(engine.acquireSafeIndexCommit()); // taking snapshots from the safe commit. } - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); engine.syncTranslog(); final List commits = DirectoryReader.listCommits(store.directory()); for (int i = 0; i < numSnapshots - 1; i++) { @@ -4432,13 +4432,13 @@ public class InternalEngineTests extends EngineTestCase { assertThat(engine.getTranslog().stats().getUncommittedOperations(), equalTo(0)); // If the new index commit still points to the same translog generation as the current index commit, // we should not enable the periodically flush condition; otherwise we can get into an infinite loop of flushes. - engine.getLocalCheckpointTracker().generateSeqNo(); // create a gap here + generateNewSeqNo(engine); // create a gap here for (int id = 0; id < numDocs; id++) { if (randomBoolean()) { translog.rollGeneration(); } final ParsedDocument doc = testParsedDocument("new" + id, null, testDocumentWithTextField(), SOURCE, null); - engine.index(replicaIndexForDoc(doc, 2L, engine.getLocalCheckpointTracker().generateSeqNo(), false)); + engine.index(replicaIndexForDoc(doc, 2L, generateNewSeqNo(engine), false)); if (engine.shouldPeriodicallyFlush()) { engine.flush(); assertThat(engine.getLastCommittedSegmentInfos(), not(sameInstance(lastCommitInfo))); @@ -4459,7 +4459,7 @@ public class InternalEngineTests extends EngineTestCase { engine.onSettingsChanged(); final int numOps = scaledRandomIntBetween(100, 10_000); for (int i = 0; i < numOps; i++) { - final long localCheckPoint = engine.getLocalCheckpointTracker().getCheckpoint(); + final long localCheckPoint = engine.getLocalCheckpoint(); final long seqno = randomLongBetween(Math.max(0, localCheckPoint), localCheckPoint + 5); final ParsedDocument doc = testParsedDocument(Long.toString(seqno), null, testDocumentWithTextField(), SOURCE, null); engine.index(replicaIndexForDoc(doc, 1L, seqno, false)); @@ -4546,9 +4546,9 @@ public class InternalEngineTests extends EngineTestCase { } final long deleteBatch = between(10, 20); final long gapSeqNo = randomLongBetween( - engine.getLocalCheckpointTracker().getMaxSeqNo() + 1, engine.getLocalCheckpointTracker().getMaxSeqNo() + deleteBatch); + engine.getSeqNoStats(-1).getMaxSeqNo() + 1, engine.getSeqNoStats(-1).getMaxSeqNo() + deleteBatch); for (int i = 0; i < deleteBatch; i++) { - final long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + final long seqno = generateNewSeqNo(engine); if (seqno != gapSeqNo) { if (randomBoolean()) { clock.incrementAndGet(); @@ -4595,7 +4595,7 @@ public class InternalEngineTests extends EngineTestCase { for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null); if (randomBoolean()) { - engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo())); + engine.index(appendOnlyReplica(doc, randomBoolean(), 1, generateNewSeqNo(engine))); } else { engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong())); } @@ -4612,7 +4612,7 @@ public class InternalEngineTests extends EngineTestCase { for (int i = 0; i < numOps; i++) { ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null); if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations - final long seqno = engine.getLocalCheckpointTracker().generateSeqNo(); + final long seqno = generateNewSeqNo(engine); final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean()); if (randomBoolean()) { engine.index(doc); @@ -4631,7 +4631,7 @@ public class InternalEngineTests extends EngineTestCase { } appendOnlyIndexer.join(120_000); assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly)); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + globalCheckpoint.set(engine.getLocalCheckpoint()); engine.syncTranslog(); engine.flush(); } @@ -4643,15 +4643,14 @@ public class InternalEngineTests extends EngineTestCase { public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception { long lookupTimes = 0L; - final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker(); final int initDocs = between(0, 10); for (int i = 0; i < initDocs; i++) { index(engine, i); lookupTimes++; } // doc1 is delayed and arrived after a non-append-only op. - final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo(); - final long seqnoNormalOp = localCheckpointTracker.generateSeqNo(); + final long seqNoAppendOnly1 = generateNewSeqNo(engine); + final long seqnoNormalOp = generateNewSeqNo(engine); if (randomBoolean()) { engine.index(replicaIndexForDoc( testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false)); @@ -4670,7 +4669,7 @@ public class InternalEngineTests extends EngineTestCase { // optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map. engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null), - false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo())); + false, randomNonNegativeLong(), generateNewSeqNo(engine))); assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes)); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2f420dcbc4d..31afb5ed42f 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.Segment; @@ -847,7 +848,7 @@ public class IndexShardTests extends IndexShardTestCase { recoverReplica(replicaShard, primaryShard); final int maxSeqNo = randomIntBetween(0, 128); for (int i = 0; i <= maxSeqNo; i++) { - primaryShard.getEngine().getLocalCheckpointTracker().generateSeqNo(); + EngineTestCase.generateNewSeqNo(primaryShard.getEngine()); } final long checkpoint = rarely() ? maxSeqNo - scaledRandomIntBetween(0, maxSeqNo) : maxSeqNo; diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 7e5f9dbb820..06499aa544e 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -77,6 +77,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -3334,7 +3335,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas final Index index = resolveIndex(indexName); final IndexShard primary = internalCluster().getInstance(IndicesService.class, dataNode).getShardOrNull(new ShardId(index, 0)); // create a gap in the sequence numbers - getEngineFromShard(primary).getLocalCheckpointTracker().generateSeqNo(); + EngineTestCase.generateNewSeqNo(getEngineFromShard(primary)); for (int i = 5; i < 10; i++) { index(indexName, "_doc", Integer.toString(i), "foo", "bar" + i); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8fff17900b0..0d5e693d62d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -386,6 +386,15 @@ public abstract class EngineTestCase extends ESTestCase { IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException; } + /** + * Generate a new sequence number and return it. Only works on InternalEngines + */ + public static long generateNewSeqNo(final Engine engine) { + assert engine instanceof InternalEngine : "expected InternalEngine, got: " + engine.getClass(); + InternalEngine internalEngine = (InternalEngine) engine; + return internalEngine.getLocalCheckpointTracker().generateSeqNo(); + } + public static InternalEngine createInternalEngine( @Nullable final IndexWriterFactory indexWriterFactory, @Nullable final BiFunction localCheckpointTrackerSupplier, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 655112a0646..9b21af71370 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -570,7 +570,7 @@ public abstract class IndexShardTestCase extends ESTestCase { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } shard.updateLocalCheckpointForShard(shard.routingEntry().allocationId().getId(), - shard.getEngine().getLocalCheckpointTracker().getCheckpoint()); + shard.getLocalCheckpoint()); } else { result = shard.applyIndexOperationOnReplica(shard.seqNoStats().getMaxSeqNo() + 1, 0, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, sourceToParse);