parent
db14717098
commit
8fefa8a661
|
@ -138,7 +138,6 @@ import java.util.Collections;
|
|||
import java.util.Comparator;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -1422,11 +1421,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
// randomly interleave
|
||||
final AtomicLong seqNoGenerator = new AtomicLong();
|
||||
Function<Engine.Operation, Engine.Operation> seqNoUpdater = operation -> {
|
||||
final long newSeqNo = seqNoGenerator.getAndIncrement();
|
||||
BiFunction<Engine.Operation, Long, Engine.Operation> seqNoUpdater = (operation, newSeqNo) -> {
|
||||
if (operation instanceof Engine.Index) {
|
||||
Engine.Index index = (Engine.Index) operation;
|
||||
return new Engine.Index(index.uid(), index.parsedDoc(), newSeqNo, index.primaryTerm(), index.version(),
|
||||
Document doc = testDocumentWithTextField(index.docs().get(0).get("value"));
|
||||
ParsedDocument parsedDocument = testParsedDocument(index.id(), index.routing(), doc, index.source(), null);
|
||||
return new Engine.Index(index.uid(), parsedDocument, newSeqNo, index.primaryTerm(), index.version(),
|
||||
index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry());
|
||||
} else {
|
||||
Engine.Delete delete = (Engine.Delete) operation;
|
||||
|
@ -1439,12 +1439,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
Iterator<Engine.Operation> iter2 = opsDoc2.iterator();
|
||||
while (iter1.hasNext() && iter2.hasNext()) {
|
||||
final Engine.Operation next = randomBoolean() ? iter1.next() : iter2.next();
|
||||
allOps.add(seqNoUpdater.apply(next));
|
||||
allOps.add(seqNoUpdater.apply(next, seqNoGenerator.getAndIncrement()));
|
||||
}
|
||||
iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o)));
|
||||
iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o)));
|
||||
iter1.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o, seqNoGenerator.getAndIncrement())));
|
||||
iter2.forEachRemaining(o -> allOps.add(seqNoUpdater.apply(o, seqNoGenerator.getAndIncrement())));
|
||||
// insert some duplicates
|
||||
allOps.addAll(randomSubsetOf(allOps));
|
||||
randomSubsetOf(allOps).forEach(op -> allOps.add(seqNoUpdater.apply(op, op.seqNo())));
|
||||
|
||||
shuffle(allOps, random());
|
||||
concurrentlyApplyOps(allOps, engine);
|
||||
|
@ -4635,13 +4635,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
|
||||
public void testLuceneHistoryOnPrimary() throws Exception {
|
||||
final List<Engine.Operation> operations = generateSingleDocHistory(false,
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300);
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "1");
|
||||
assertOperationHistoryInLucene(operations);
|
||||
}
|
||||
|
||||
public void testLuceneHistoryOnReplica() throws Exception {
|
||||
final List<Engine.Operation> operations = generateSingleDocHistory(true,
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), false, 2, 10, 300);
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL), 2, 10, 300, "2");
|
||||
Randomness.shuffle(operations);
|
||||
assertOperationHistoryInLucene(operations);
|
||||
}
|
||||
|
|
|
@ -579,24 +579,13 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static List<Engine.Operation> generateSingleDocHistory(
|
||||
final boolean forReplica,
|
||||
final VersionType versionType,
|
||||
final boolean partialOldPrimary,
|
||||
final long primaryTerm,
|
||||
final int minOpCount,
|
||||
final int maxOpCount) {
|
||||
public static List<Engine.Operation> generateSingleDocHistory(boolean forReplica, VersionType versionType,
|
||||
long primaryTerm, int minOpCount, int maxOpCount, String docId) {
|
||||
final int numOfOps = randomIntBetween(minOpCount, maxOpCount);
|
||||
final List<Engine.Operation> ops = new ArrayList<>();
|
||||
final Term id = newUid("1");
|
||||
final int startWithSeqNo;
|
||||
if (partialOldPrimary) {
|
||||
startWithSeqNo = randomBoolean() ? numOfOps - 1 : randomIntBetween(0, numOfOps - 1);
|
||||
} else {
|
||||
startWithSeqNo = 0;
|
||||
}
|
||||
final int seqNoGap = randomBoolean() ? 1 : 2;
|
||||
final String valuePrefix = forReplica ? "r_" : "p_";
|
||||
final Term id = newUid(docId);
|
||||
final int startWithSeqNo = 0;
|
||||
final String valuePrefix = (forReplica ? "r_" : "p_" ) + docId + "_";
|
||||
final boolean incrementTermWhenIntroducingSeqNo = randomBoolean();
|
||||
for (int i = 0; i < numOfOps; i++) {
|
||||
final Engine.Operation op;
|
||||
|
@ -618,22 +607,22 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
throw new UnsupportedOperationException("unknown version type: " + versionType);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
op = new Engine.Index(id, testParsedDocument("1", null, testDocumentWithTextField(valuePrefix + i), B_1, null),
|
||||
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis(), -1, false
|
||||
op = new Engine.Index(id, testParsedDocument(docId, null, testDocumentWithTextField(valuePrefix + i), B_1, null),
|
||||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis(), -1, false
|
||||
);
|
||||
} else {
|
||||
op = new Engine.Delete("test", "1", id,
|
||||
forReplica && i >= startWithSeqNo ? i * seqNoGap : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
op = new Engine.Delete("test", docId, id,
|
||||
forReplica && i >= startWithSeqNo ? i * 2 : SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
forReplica && i >= startWithSeqNo && incrementTermWhenIntroducingSeqNo ? primaryTerm + 1 : primaryTerm,
|
||||
version,
|
||||
forReplica ? versionType.versionTypeForReplicationAndRecovery() : versionType,
|
||||
forReplica ? REPLICA : PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
ops.add(op);
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ public class FollowingEngineTests extends ESTestCase {
|
|||
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
|
||||
final VersionType versionType =
|
||||
randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE, VersionType.FORCE);
|
||||
final List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, versionType, false, 2, 2, 20);
|
||||
final List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, versionType, 2, 2, 20, "id");
|
||||
EngineTestCase.assertOpsOnReplica(ops, followingEngine, true, logger);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue