diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 87895ca7fe4..8249e2600ad 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -103,19 +103,6 @@ public class LocalCheckpointTracker { } } - /** - * Resets the checkpoint to the specified value. - * - * @param checkpoint the local checkpoint to reset this tracker to - */ - public synchronized void resetCheckpoint(final long checkpoint) { - // TODO: remove this method as after we restore the local history on promotion. - assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO; - assert checkpoint <= this.checkpoint; - processedSeqNo.clear(); - this.checkpoint = checkpoint; - } - /** * The current checkpoint which can be advanced by {@link #markSeqNoAsCompleted(long)}. * diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 15b10218d0f..0bc05046418 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5463,7 +5463,7 @@ public class InternalEngineTests extends EngineTestCase { final List docs; try (InternalEngine engine = createEngine( config(softDeletesEnabled, store, translogPath, newMergePolicy(), null, null, globalCheckpoint::get))) { - List ops = generateReplicaHistory(between(1, 100), randomBoolean()); + List ops = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); applyOperations(engine, ops); globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint())); engine.syncTranslog(); diff --git a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java index 80f3285dfb6..f179cd840c6 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/LuceneChangesSnapshotTests.java @@ -22,7 +22,6 @@ package org.elasticsearch.index.engine; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.translog.SnapshotMatchers; @@ -32,7 +31,6 @@ import org.junit.Before; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -150,35 +148,35 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34667") - public void testDedupByPrimaryTerm() throws Exception { - Map latestOperations = new HashMap<>(); - List terms = Arrays.asList(between(1, 1000), between(1000, 2000)); + /** + * If an operation above the local checkpoint is delivered multiple times, an engine will add multiple copies of that operation + * into Lucene (only the first copy is non-stale; others are stale and soft-deleted). Moreover, a nested document is indexed into + * Lucene as multiple documents (only the root document has both seq_no and term, non-root docs only have seq_no). This test verifies + * that {@link LuceneChangesSnapshot} returns exactly one operation per seq_no, and skip non-root nested documents or stale copies. + */ + public void testSkipStaleOrNonRootOfNestedDocuments() throws Exception { + Map seqNoToTerm = new HashMap<>(); + List operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean()); int totalOps = 0; - for (long term : terms) { - final List ops = generateSingleDocHistory(true, - randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE), term, 2, 20, "1"); - primaryTerm.set(Math.max(primaryTerm.get(), term)); - engine.rollTranslogGeneration(); - for (Engine.Operation op : ops) { - // We need to simulate a rollback here as only ops after local checkpoint get into the engine - if (op.seqNo() <= engine.getLocalCheckpointTracker().getCheckpoint()) { - engine.getLocalCheckpointTracker().resetCheckpoint(randomLongBetween(-1, op.seqNo() - 1)); - engine.rollTranslogGeneration(); - } + for (Engine.Operation op : operations) { + // Engine skips deletes or indexes below the local checkpoint + if (engine.getLocalCheckpoint() < op.seqNo() || op instanceof Engine.NoOp) { + seqNoToTerm.put(op.seqNo(), op.primaryTerm()); if (op instanceof Engine.Index) { - engine.index((Engine.Index) op); - } else if (op instanceof Engine.Delete) { - engine.delete((Engine.Delete) op); + totalOps += ((Engine.Index) op).docs().size(); + } else { + totalOps++; } - latestOperations.put(op.seqNo(), op.primaryTerm()); - if (rarely()) { - engine.refresh("test"); - } - if (rarely()) { - engine.flush(); - } - totalOps++; + } + applyOperation(engine, op); + if (rarely()) { + engine.refresh("test"); + } + if (rarely()) { + engine.rollTranslogGeneration(); + } + if (rarely()) { + engine.flush(); } } long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); @@ -188,9 +186,9 @@ public class LuceneChangesSnapshotTests extends EngineTestCase { searcher = null; Translog.Operation op; while ((op = snapshot.next()) != null) { - assertThat(op.toString(), op.primaryTerm(), equalTo(latestOperations.get(op.seqNo()))); + assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); } - assertThat(snapshot.skippedOperations(), equalTo(totalOps - latestOperations.size())); + assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size())); } finally { IOUtils.close(searcher); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java index 789a60ec55d..44b3794ea6d 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java @@ -19,13 +19,10 @@ package org.elasticsearch.index.seqno; -import com.carrotsearch.hppc.LongObjectHashMap; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; -import org.hamcrest.BaseMatcher; -import org.hamcrest.Description; import org.junit.Before; import java.util.ArrayList; @@ -266,35 +263,6 @@ public class LocalCheckpointTrackerTests extends ESTestCase { thread.join(); } - public void testResetCheckpoint() { - final int operations = 1024 - scaledRandomIntBetween(0, 1024); - int maxSeqNo = Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED); - for (int i = 0; i < operations; i++) { - if (!rarely()) { - tracker.markSeqNoAsCompleted(i); - maxSeqNo = i; - } - } - - final int localCheckpoint = - randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(tracker.getCheckpoint())); - tracker.resetCheckpoint(localCheckpoint); - assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint)); - assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo)); - assertThat(tracker.processedSeqNo, new BaseMatcher>() { - @Override - public boolean matches(Object item) { - return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty()); - } - - @Override - public void describeTo(Description description) { - description.appendText("empty"); - } - }); - assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1))); - } - public void testContains() { final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100); final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 513d76e2a31..2753df6fdca 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,7 +49,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; -import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Strings; @@ -316,18 +316,17 @@ public abstract class EngineTestCase extends ESTestCase { mappingUpdate); } - public static CheckedFunction nestedParsedDocFactory() throws Exception { + public static CheckedBiFunction nestedParsedDocFactory() throws Exception { final MapperService mapperService = createMapperService("type"); final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") .startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject() .endObject().endObject()); final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping)); - return docId -> { + return (docId, nestedFieldValues) -> { final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); - final int nestedValues = between(0, 3); - if (nestedValues > 0) { + if (nestedFieldValues > 0) { XContentBuilder nestedField = source.startObject("nested_field"); - for (int i = 0; i < nestedValues; i++) { + for (int i = 0; i < nestedFieldValues; i++) { nestedField.field("field-" + i, "value-" + i); } source.endObject(); @@ -705,22 +704,36 @@ public abstract class EngineTestCase extends ESTestCase { return ops; } - public List generateReplicaHistory(int numOps, boolean allowGapInSeqNo) { + public List generateHistoryOnReplica(int numOps, boolean allowGapInSeqNo, boolean allowDuplicate, + boolean includeNestedDocs) throws Exception { long seqNo = 0; - List operations = new ArrayList<>(numOps); + final int maxIdValue = randomInt(numOps * 2); + final List operations = new ArrayList<>(numOps); + CheckedBiFunction nestedParsedDocFactory = nestedParsedDocFactory(); for (int i = 0; i < numOps; i++) { - String id = Integer.toString(between(1, 100)); - final ParsedDocument doc = EngineTestCase.createParsedDoc(id, null); - if (randomBoolean()) { - operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), - -1, true)); - } else if (randomBoolean()) { - operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), - i, null, Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis())); - } else { - operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, - threadPool.relativeTimeInMillis(), "test-" + i)); + final String id = Integer.toString(randomInt(maxIdValue)); + final Engine.Operation.TYPE opType = randomFrom(Engine.Operation.TYPE.values()); + final boolean isNestedDoc = includeNestedDocs && opType == Engine.Operation.TYPE.INDEX && randomBoolean(); + final int nestedValues = between(0, 3); + final long startTime = threadPool.relativeTimeInMillis(); + final int copies = allowDuplicate && rarely() ? between(2, 4) : 1; + for (int copy = 0; copy < copies; copy++) { + final ParsedDocument doc = isNestedDoc ? nestedParsedDocFactory.apply(id, nestedValues) : createParsedDoc(id, null); + switch (opType) { + case INDEX: + operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, startTime, -1, true)); + break; + case DELETE: + operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), seqNo, primaryTerm.get(), + i, null, Engine.Operation.Origin.REPLICA, startTime)); + break; + case NO_OP: + operations.add(new Engine.NoOp(seqNo, primaryTerm.get(), Engine.Operation.Origin.REPLICA, startTime, "test-" + i)); + break; + default: + throw new IllegalStateException("Unknown operation type [" + opType + "]"); + } } seqNo++; if (allowGapInSeqNo && rarely()) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 0719329ece4..b23f2be0ab0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -14,7 +14,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; -import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -567,12 +567,12 @@ public class FollowingEngineTests extends ESTestCase { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); - final CheckedFunction nestedDocFactory = EngineTestCase.nestedParsedDocFactory(); + final CheckedBiFunction nestedDocFunc = EngineTestCase.nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { String docId = Integer.toString(between(1, 100)); - ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId); + ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFunc.apply(docId, randomInt(3)); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));