Additional engine refactoring

A previous refactoring exposed some protected methods. However, there is
not currently a need to be exposing these methods so publicly so we pull
one of them back to being package-private and we remove the other one.

Relates #27324
This commit is contained in:
Jason Tedor 2017-11-09 15:17:11 -05:00 committed by GitHub
parent 3b6d0ce11f
commit ddd3ed4f22
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 162 additions and 150 deletions

View File

@ -637,7 +637,9 @@ public class InternalEngine extends Engine {
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
} else if (origin == Operation.Origin.PRIMARY) {
assert assertOriginPrimarySequenceNumber(seqNo);
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
@ -645,13 +647,6 @@ public class InternalEngine extends Engine {
return true;
}
protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
return true;
}
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
origin == Operation.Origin.PRIMARY) {
@ -672,7 +667,7 @@ public class InternalEngine extends Engine {
* @param operation the operation
* @return the sequence number
*/
protected long doGenerateSeqNoForOperation(final Operation operation) {
long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoService.generateSeqNo();
}

View File

@ -105,7 +105,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
@ -1228,66 +1227,10 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class));
}
protected List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
boolean partialOldPrimary, long primaryTerm,
int minOpCount, int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid("1");
final int startWithSeqNo;
if (partialOldPrimary) {
startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1);
} else {
startWithSeqNo = 0;
}
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
final long version;
switch (versionType) {
case INTERNAL:
version = forReplica ? i : Versions.MATCH_ANY;
break;
case EXTERNAL:
version = i;
break;
case EXTERNAL_GTE:
version = randomBoolean() ? Math.max(i - 1, 0) : i;
break;
case FORCE:
version = randomNonNegativeLong();
break;
default:
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
);
} else {
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
}
ops.add(op);
}
return ops;
}
public void testOutOfOrderDocsOnReplica() throws IOException {
final List<Engine.Operation> ops = generateSingleDocHistory(true,
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE), false, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
assertOpsOnReplica(ops, replicaEngine, true, logger);
}
public void testOutOfOrderDocsOnReplicaOldPrimary() throws IOException {
@ -1304,72 +1247,7 @@ public class InternalEngineTests extends EngineTestCase {
InternalEngine replicaEngine =
createEngine(oldSettings, oldReplicaStore, createTempDir("translog-old-replica"), newMergePolicy())) {
final List<Engine.Operation> ops = generateSingleDocHistory(true, randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), true, 2, 2, 20);
assertOpsOnReplica(ops, replicaEngine, true);
}
}
private void assertOpsOnReplica(List<Engine.Operation> ops, InternalEngine replicaEngine, boolean shuffleOps) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
lastFieldValue = index.docs().get(0).get("value");
} else {
// delete
lastFieldValue = null;
}
if (shuffleOps) {
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
boolean firstOp = true;
for (Engine.Operation op : ops) {
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
Engine.IndexResult result = replicaEngine.index((Engine.Index) op);
// replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
// as deleted or not. This check is just signal regression so a decision can be made if it's
// intentional
assertThat(result.isCreated(), equalTo(firstOp));
assertThat(result.getVersion(), equalTo(op.version()));
assertThat(result.hasFailure(), equalTo(false));
} else {
Engine.DeleteResult result = replicaEngine.delete((Engine.Delete) op);
// Replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
// his check is just signal regression so a decision can be made if it's
// intentional
assertThat(result.isFound(), equalTo(firstOp == false));
assertThat(result.getVersion(), equalTo(op.version()));
assertThat(result.hasFailure(), equalTo(false));
}
if (randomBoolean()) {
engine.refresh("test");
}
if (randomBoolean()) {
engine.flush();
engine.refresh("test");
}
firstOp = false;
}
assertVisibleCount(replicaEngine, lastFieldValue == null ? 0 : 1);
if (lastFieldValue != null) {
try (Searcher searcher = replicaEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
assertThat(collector.getTotalHits(), equalTo(1));
}
assertOpsOnReplica(ops, replicaEngine, true, logger);
}
}
@ -1626,7 +1504,7 @@ public class InternalEngineTests extends EngineTestCase {
final boolean deletedOnReplica = lastReplicaOp instanceof Engine.Delete;
final long finalReplicaVersion = lastReplicaOp.version();
final long finalReplicaSeqNo = lastReplicaOp.seqNo();
assertOpsOnReplica(replicaOps, replicaEngine, true);
assertOpsOnReplica(replicaOps, replicaEngine, true, logger);
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine,
new Engine.Get(false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
@ -2289,21 +2167,6 @@ public class InternalEngineTests extends EngineTestCase {
assertVisibleCount(engine, numDocs, false);
}
private static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
}
private static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test");
}
try (Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
assertThat(collector.getTotalHits(), equalTo(numDocs));
}
}
public void testTranslogCleanUpPostCommitCrash() throws Exception {
IndexSettings indexSettings = new IndexSettings(defaultSettings.getIndexMetaData(), defaultSettings.getNodeSettings(),
defaultSettings.getScopedSettings());

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
@ -31,8 +32,11 @@ import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
@ -45,6 +49,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
@ -79,6 +84,7 @@ import org.junit.Before;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -86,7 +92,11 @@ import java.util.function.BiFunction;
import java.util.function.ToLongBiFunction;
import static java.util.Collections.emptyList;
import static java.util.Collections.shuffle;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
public abstract class EngineTestCase extends ESTestCase {
@ -415,7 +425,7 @@ public abstract class EngineTestCase extends ESTestCase {
return new BytesArray(string.getBytes(Charset.defaultCharset()));
}
protected Term newUid(String id) {
protected static Term newUid(String id) {
return new Term("_id", Uid.encodeId(id));
}
@ -438,4 +448,148 @@ public abstract class EngineTestCase extends ESTestCase {
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
}
protected static void assertVisibleCount(InternalEngine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
}
protected static void assertVisibleCount(InternalEngine engine, int numDocs, boolean refresh) throws IOException {
if (refresh) {
engine.refresh("test");
}
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new MatchAllDocsQuery(), collector);
assertThat(collector.getTotalHits(), equalTo(numDocs));
}
}
public static List<Engine.Operation> generateSingleDocHistory(
final boolean forReplica,
final VersionType versionType,
final boolean partialOldPrimary,
final long primaryTerm,
final int minOpCount,
final int maxOpCount) {
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
final List<Engine.Operation> ops = new ArrayList<>();
final Term id = newUid("1");
final int startWithSeqNo;
if (partialOldPrimary) {
startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1);
} else {
startWithSeqNo = 0;
}
final String valuePrefix = forReplica ? "r_" : "p_";
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
for (int i = 0; i < numOfOps; i++) {
final Engine.Operation op;
final long version;
switch (versionType) {
case INTERNAL:
version = forReplica ? i : Versions.MATCH_ANY;
break;
case EXTERNAL:
version = i;
break;
case EXTERNAL_GTE:
version = randomBoolean() ? Math.max(i - 1, 0) : i;
break;
case FORCE:
version = randomNonNegativeLong();
break;
default:
throw new UnsupportedOperationException("unknown version type: " + versionType);
}
if (randomBoolean()) {
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis(), -1, false
);
} else {
op = new Engine.Delete("test", "1", id,
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
version,
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
forReplica ? REPLICA : PRIMARY,
System.currentTimeMillis());
}
ops.add(op);
}
return ops;
}
public static void assertOpsOnReplica(
final List<Engine.Operation> ops,
final InternalEngine replicaEngine,
boolean shuffleOps,
final Logger logger) throws IOException {
final Engine.Operation lastOp = ops.get(ops.size() - 1);
final String lastFieldValue;
if (lastOp instanceof Engine.Index) {
Engine.Index index = (Engine.Index) lastOp;
lastFieldValue = index.docs().get(0).get("value");
} else {
// delete
lastFieldValue = null;
}
if (shuffleOps) {
int firstOpWithSeqNo = 0;
while (firstOpWithSeqNo < ops.size() && ops.get(firstOpWithSeqNo).seqNo() < 0) {
firstOpWithSeqNo++;
}
// shuffle ops but make sure legacy ops are first
shuffle(ops.subList(0, firstOpWithSeqNo), random());
shuffle(ops.subList(firstOpWithSeqNo, ops.size()), random());
}
boolean firstOp = true;
for (Engine.Operation op : ops) {
logger.info("performing [{}], v [{}], seq# [{}], term [{}]",
op.operationType().name().charAt(0), op.version(), op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
Engine.IndexResult result = replicaEngine.index((Engine.Index) op);
// replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
// as deleted or not. This check is just signal regression so a decision can be made if it's
// intentional
assertThat(result.isCreated(), equalTo(firstOp));
assertThat(result.getVersion(), equalTo(op.version()));
assertThat(result.hasFailure(), equalTo(false));
} else {
Engine.DeleteResult result = replicaEngine.delete((Engine.Delete) op);
// Replicas don't really care to about found status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return true for the found flag in favor of code simplicity
// his check is just signal regression so a decision can be made if it's
// intentional
assertThat(result.isFound(), equalTo(firstOp == false));
assertThat(result.getVersion(), equalTo(op.version()));
assertThat(result.hasFailure(), equalTo(false));
}
if (randomBoolean()) {
replicaEngine.refresh("test");
}
if (randomBoolean()) {
replicaEngine.flush();
replicaEngine.refresh("test");
}
firstOp = false;
}
assertVisibleCount(replicaEngine, lastFieldValue == null ? 0 : 1);
if (lastFieldValue != null) {
try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.searcher().search(new TermQuery(new Term("value", lastFieldValue)), collector);
assertThat(collector.getTotalHits(), equalTo(1));
}
}
}
}