diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 7883faccb1c..b929a70898d 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -637,7 +637,9 @@ public class InternalEngine extends Engine { assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" + " index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo; } else if (origin == Operation.Origin.PRIMARY) { - assert assertOriginPrimarySequenceNumber(seqNo); + // sequence number should not be set when operation origin is primary + assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO + : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; } else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) { // sequence number should be set when operation origin is not primary assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin; @@ -645,13 +647,6 @@ public class InternalEngine extends Engine { return true; } - protected boolean assertOriginPrimarySequenceNumber(final long seqNo) { - // sequence number should not be set when operation origin is primary - assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO - : "primary operations must never have an assigned sequence number but was [" + seqNo + "]"; - return true; - } - private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) { if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) || origin == Operation.Origin.PRIMARY) { @@ -672,7 +667,7 @@ public class InternalEngine extends Engine { * @param operation the operation * @return the sequence number */ - protected long doGenerateSeqNoForOperation(final Operation operation) { + long doGenerateSeqNoForOperation(final Operation operation) { return seqNoService.generateSeqNo(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e196c6b4d0b..ea07cabd020 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -105,7 +105,6 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; @@ -1228,66 +1227,10 @@ public class InternalEngineTests extends EngineTestCase { assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } - protected List generateSingleDocHistory(boolean forReplica, VersionType versionType, - boolean partialOldPrimary, long primaryTerm, - int minOpCount, int maxOpCount) { - final int numOfOps = randomIntBetween(minOpCount, maxOpCount); - final List ops = new ArrayList<>(); - final Term id = newUid("1"); - final int startWithSeqNo; - if (partialOldPrimary) { - startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); - } else { - startWithSeqNo = 0; - } - final String valuePrefix = forReplica ? "r_" : "p_"; - final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); - for (int i = 0; i < numOfOps; i++) { - final Engine.Operation op; - final long version; - switch (versionType) { - case INTERNAL: - version = forReplica ? i : Versions.MATCH_ANY; - break; - case EXTERNAL: - version = i; - break; - case EXTERNAL_GTE: - version = randomBoolean() ? Math.max(i - 1, 0) : i; - break; - case FORCE: - version = randomNonNegativeLong(); - break; - default: - throw new UnsupportedOperationException("unknown version type: " + versionType); - } - if (randomBoolean()) { - op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, - forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, - forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis(), -1, false - ); - } else { - op = new Engine.Delete("test", "1", id, - forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, - forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, - version, - forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, - forReplica ? REPLICA : PRIMARY, - System.currentTimeMillis()); - } - ops.add(op); - } - return ops; - } - public void testOutOfOrderDocsOnReplica() throws IOException { final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine, true); + assertOpsOnReplica(ops, replicaEngine, true, logger); } public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException { @@ -1304,72 +1247,7 @@ public class InternalEngineTests extends EngineTestCase { InternalEngine replicaEngine = createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) { final List ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20); - assertOpsOnReplica(ops, replicaEngine, true); - } - } - - private void assertOpsOnReplica(List ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException { - final Engine.Operation lastOp = ops.get(ops.size() - 1); - final String lastFieldValue; - if (lastOp instanceof Engine.Index) { - Engine.Index index = (Engine.Index) lastOp; - lastFieldValue = index.docs().get(0).get("value"); - } else { - // delete - lastFieldValue = null; - } - if (shuffleOps) { - int firstOpWithSeqNo = 0; - while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { - firstOpWithSeqNo++; - } - // shuffle ops but make sure legacy ops are first - shuffle(ops.subList(0, firstOpWithSeqNo), random()); - shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); - } - boolean firstOp = true; - for (Engine.Operation op : ops) { - logger.info("performing [{}], v [{}], seq# [{}], term [{}]", - op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); - if (op instanceof Engine.Index) { - Engine.IndexResult result = replicaEngine.index((Engine.Index) op); - // replicas don't really care to about creation status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return false for the created flag in favor of code simplicity - // as deleted or not. This check is just signal regression so a decision can be made if it's - // intentional - assertThat(result.isCreated(), equalTo(firstOp)); - assertThat(result.getVersion(), equalTo(op.version())); - assertThat(result.hasFailure(), equalTo(false)); - - } else { - Engine.DeleteResult result = replicaEngine.delete((Engine.Delete) op); - // Replicas don't really care to about found status of documents - // this allows to ignore the case where a document was found in the live version maps in - // a delete state and return true for the found flag in favor of code simplicity - // his check is just signal regression so a decision can be made if it's - // intentional - assertThat(result.isFound(), equalTo(firstOp == false)); - assertThat(result.getVersion(), equalTo(op.version())); - assertThat(result.hasFailure(), equalTo(false)); - } - if (randomBoolean()) { - engine.refresh("test"); - } - if (randomBoolean()) { - engine.flush(); - engine.refresh("test"); - } - firstOp = false; - } - - assertVisibleCount(replicaEngine, lastFieldValue == null ? 0 : 1); - if (lastFieldValue != null) { - try (Searcher searcher = replicaEngine.acquireSearcher("test")) { - final TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); - assertThat(collector.getTotalHits(), equalTo(1)); - } + assertOpsOnReplica(ops, replicaEngine, true, logger); } } @@ -1626,7 +1504,7 @@ public class InternalEngineTests extends EngineTestCase { final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete; final long finalReplicaVersion = lastReplicaOp.version(); final long finalReplicaSeqNo = lastReplicaOp.seqNo(); - assertOpsOnReplica(replicaOps, replicaEngine, true); + assertOpsOnReplica(replicaOps, replicaEngine, true, logger); final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine); final long currentSeqNo = getSequenceID(replicaEngine, new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1(); @@ -2289,21 +2167,6 @@ public class InternalEngineTests extends EngineTestCase { assertVisibleCount(engine, numDocs, false); } - private static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { - assertVisibleCount(engine, numDocs, true); - } - - private static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException { - if (refresh) { - engine.refresh("test"); - } - try (Searcher searcher = engine.acquireSearcher("test")) { - final TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.searcher().search(new MatchAllDocsQuery(), collector); - assertThat(collector.getTotalHits(), equalTo(numDocs)); - } - } - public void testTranslogCleanUpPostCommitCrash() throws Exception { IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(), defaultSettings.getScopedSettings()); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 5c2ef977b16..8438d90a133 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.engine; +import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.document.Field; @@ -31,8 +32,11 @@ import org.apache.lucene.index.LiveIndexWriterConfig; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; @@ -45,6 +49,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; @@ -79,6 +84,7 @@ import org.junit.Before; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -86,7 +92,11 @@ import java.util.function.BiFunction; import java.util.function.ToLongBiFunction; import static java.util.Collections.emptyList; +import static java.util.Collections.shuffle; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; +import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; +import static org.hamcrest.Matchers.equalTo; public abstract class EngineTestCase extends ESTestCase { @@ -415,7 +425,7 @@ public abstract class EngineTestCase extends ESTestCase { return new BytesArray(string.getBytes(Charset.defaultCharset())); } - protected Term newUid(String id) { + protected static Term newUid(String id) { return new Term("_id", Uid.encodeId(id)); } @@ -438,4 +448,148 @@ public abstract class EngineTestCase extends ESTestCase { IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry); } + protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException { + assertVisibleCount(engine, numDocs, true); + } + + protected static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException { + if (refresh) { + engine.refresh("test"); + } + try (Engine.Searcher searcher = engine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new MatchAllDocsQuery(), collector); + assertThat(collector.getTotalHits(), equalTo(numDocs)); + } + } + + public static List generateSingleDocHistory( + final boolean forReplica, + final VersionType versionType, + final boolean partialOldPrimary, + final long primaryTerm, + final int minOpCount, + final int maxOpCount) { + final int numOfOps = randomIntBetween(minOpCount, maxOpCount); + final List ops = new ArrayList<>(); + final Term id = newUid("1"); + final int startWithSeqNo; + if (partialOldPrimary) { + startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1); + } else { + startWithSeqNo = 0; + } + final String valuePrefix = forReplica ? "r_" : "p_"; + final boolean incrementTermWhenIntroducingSeqNo = randomBoolean(); + for (int i = 0; i < numOfOps; i++) { + final Engine.Operation op; + final long version; + switch (versionType) { + case INTERNAL: + version = forReplica ? i : Versions.MATCH_ANY; + break; + case EXTERNAL: + version = i; + break; + case EXTERNAL_GTE: + version = randomBoolean() ? Math.max(i - 1, 0) : i; + break; + case FORCE: + version = randomNonNegativeLong(); + break; + default: + throw new UnsupportedOperationException("unknown version type: " + versionType); + } + if (randomBoolean()) { + op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null), + forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, + version, + forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? REPLICA : PRIMARY, + System.currentTimeMillis(), -1, false + ); + } else { + op = new Engine.Delete("test", "1", id, + forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO, + forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm, + version, + forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType, + forReplica ? REPLICA : PRIMARY, + System.currentTimeMillis()); + } + ops.add(op); + } + return ops; + } + + public static void assertOpsOnReplica( + final List ops, + final InternalEngine replicaEngine, + boolean shuffleOps, + final Logger logger) throws IOException { + final Engine.Operation lastOp = ops.get(ops.size() - 1); + final String lastFieldValue; + if (lastOp instanceof Engine.Index) { + Engine.Index index = (Engine.Index) lastOp; + lastFieldValue = index.docs().get(0).get("value"); + } else { + // delete + lastFieldValue = null; + } + if (shuffleOps) { + int firstOpWithSeqNo = 0; + while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) { + firstOpWithSeqNo++; + } + // shuffle ops but make sure legacy ops are first + shuffle(ops.subList(0, firstOpWithSeqNo), random()); + shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random()); + } + boolean firstOp = true; + for (Engine.Operation op : ops) { + logger.info("performing [{}], v [{}], seq# [{}], term [{}]", + op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm()); + if (op instanceof Engine.Index) { + Engine.IndexResult result = replicaEngine.index((Engine.Index) op); + // replicas don't really care to about creation status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return false for the created flag in favor of code simplicity + // as deleted or not. This check is just signal regression so a decision can be made if it's + // intentional + assertThat(result.isCreated(), equalTo(firstOp)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); + + } else { + Engine.DeleteResult result = replicaEngine.delete((Engine.Delete) op); + // Replicas don't really care to about found status of documents + // this allows to ignore the case where a document was found in the live version maps in + // a delete state and return true for the found flag in favor of code simplicity + // his check is just signal regression so a decision can be made if it's + // intentional + assertThat(result.isFound(), equalTo(firstOp == false)); + assertThat(result.getVersion(), equalTo(op.version())); + assertThat(result.hasFailure(), equalTo(false)); + } + if (randomBoolean()) { + replicaEngine.refresh("test"); + } + if (randomBoolean()) { + replicaEngine.flush(); + replicaEngine.refresh("test"); + } + firstOp = false; + } + + assertVisibleCount(replicaEngine, lastFieldValue == null ? 0 : 1); + if (lastFieldValue != null) { + try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) { + final TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector); + assertThat(collector.getTotalHits(), equalTo(1)); + } + } + } + }