diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 796d7eb0c60..edf1925fdd7 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -1792,10 +1792,13 @@ public class InternalEngineTests extends EngineTestCase { } }); refreshThread.start(); - latch.await(); - assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); - running.set(false); - refreshThread.join(); + try { + latch.await(); + assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine); + } finally { + running.set(false); + refreshThread.join(); + } } private int assertOpsOnPrimary(List ops, long currentOpVersion, boolean docDeleted, InternalEngine engine) @@ -1805,7 +1808,7 @@ public class InternalEngineTests extends EngineTestCase { long lastOpVersion = currentOpVersion; long lastOpSeqNo = UNASSIGNED_SEQ_NO; long lastOpTerm = UNASSIGNED_PRIMARY_TERM; - final AtomicLong currentTerm = new AtomicLong(1); + PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier(); BiFunction indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0); @@ -1818,6 +1821,12 @@ public class InternalEngineTests extends EngineTestCase { TriFunction delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(), delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(), seqNo, term); + Function indexWithCurrentTerm = index -> new Engine.Index(index.uid(), + index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(), + index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm()); + Function deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(), + delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(), + delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm()); for (Engine.Operation op : ops) { final boolean versionConflict = rarely(); final boolean versionedOp = versionConflict || randomBoolean(); @@ -1829,7 +1838,8 @@ public class InternalEngineTests extends EngineTestCase { lastOpSeqNo; final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm; if (rarely()) { - currentTerm.incrementAndGet(); + currentTerm.set(currentTerm.get() + 1L); + engine.rollTranslogGeneration(); } final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion; logger.info("performing [{}]{}{}", @@ -1860,7 +1870,7 @@ public class InternalEngineTests extends EngineTestCase { result = engine.index(indexWithVersion.apply(correctVersion, index)); } } else { - result = engine.index(index); + result = engine.index(indexWithCurrentTerm.apply(index)); } assertThat(result.isCreated(), equalTo(docDeleted)); assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); @@ -1894,7 +1904,7 @@ public class InternalEngineTests extends EngineTestCase { } else if (versionedOp) { result = engine.delete(delWithVersion.apply(correctVersion, delete)); } else { - result = engine.delete(delete); + result = engine.delete(deleteWithCurrentTerm.apply(delete)); } assertThat(result.isFound(), equalTo(docDeleted == false)); assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1))); @@ -1902,8 +1912,8 @@ public class InternalEngineTests extends EngineTestCase { assertThat(result.getFailure(), nullValue()); docDeleted = true; lastOpVersion = result.getVersion(); - lastOpSeqNo = UNASSIGNED_SEQ_NO; - lastOpTerm = UNASSIGNED_PRIMARY_TERM; + lastOpSeqNo = result.getSeqNo(); + lastOpTerm = result.getTerm(); opsPerformed++; } } @@ -1931,6 +1941,8 @@ public class InternalEngineTests extends EngineTestCase { engine.clearDeletedTombstones(); if (docDeleted) { lastOpVersion = Versions.NOT_FOUND; + lastOpSeqNo = UNASSIGNED_SEQ_NO; + lastOpTerm = UNASSIGNED_PRIMARY_TERM; } } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 8b463f33b90..1e3dbef92c3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase { protected Path primaryTranslogDir; protected Path replicaTranslogDir; // A default primary term is used by engine instances created in this test. - protected AtomicLong primaryTerm = new AtomicLong(); + protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L); protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException { assertVisibleCount(engine, numDocs, true); @@ -682,7 +682,7 @@ public abstract class EngineTestCase extends ESTestCase { breakerService, globalCheckpointSupplier, retentionLeasesSupplier, - primaryTerm::get, + primaryTerm, tombstoneDocSupplier()); } @@ -1081,4 +1081,25 @@ public abstract class EngineTestCase extends ESTestCase { InternalEngine internalEngine = (InternalEngine) engine; return internalEngine.getTranslog(); } + + public static final class PrimaryTermSupplier implements LongSupplier { + private final AtomicLong term; + + PrimaryTermSupplier(long initialTerm) { + this.term = new AtomicLong(initialTerm); + } + + public long get() { + return term.get(); + } + + public void set(long newTerm) { + this.term.set(newTerm); + } + + @Override + public long getAsLong() { + return get(); + } + } }