Fix InternalEngineTests#assertOpsOnPrimary (#37746)
The assertion `assertOpsOnPrimary` does not store seq_no and primary term of successful deletes to the `lastOpSeqNo` and `lastOpTerm`. This leads to failures of the subsequence CAS deletes or indexes with seq_no and term. Moreover, this assertion trips a translog assertion because it bumps the primary term of some operations but not the primary term of the engine. Relates #36467 Closes #37684
This commit is contained in:
parent
a81931bb2a
commit
a6abb28abf
|
@ -1792,10 +1792,13 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
});
|
||||
refreshThread.start();
|
||||
latch.await();
|
||||
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
|
||||
running.set(false);
|
||||
refreshThread.join();
|
||||
try {
|
||||
latch.await();
|
||||
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
|
||||
} finally {
|
||||
running.set(false);
|
||||
refreshThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
|
||||
|
@ -1805,7 +1808,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
long lastOpVersion = currentOpVersion;
|
||||
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
|
||||
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
|
||||
final AtomicLong currentTerm = new AtomicLong(1);
|
||||
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
|
||||
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
|
||||
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
|
||||
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
|
||||
|
@ -1818,6 +1821,12 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
|
||||
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
|
||||
delete.startTime(), seqNo, term);
|
||||
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
|
||||
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
|
||||
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
|
||||
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
|
||||
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
|
||||
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
|
||||
for (Engine.Operation op : ops) {
|
||||
final boolean versionConflict = rarely();
|
||||
final boolean versionedOp = versionConflict || randomBoolean();
|
||||
|
@ -1829,7 +1838,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
lastOpSeqNo;
|
||||
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
|
||||
if (rarely()) {
|
||||
currentTerm.incrementAndGet();
|
||||
currentTerm.set(currentTerm.get() + 1L);
|
||||
engine.rollTranslogGeneration();
|
||||
}
|
||||
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
|
||||
logger.info("performing [{}]{}{}",
|
||||
|
@ -1860,7 +1870,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
result = engine.index(indexWithVersion.apply(correctVersion, index));
|
||||
}
|
||||
} else {
|
||||
result = engine.index(index);
|
||||
result = engine.index(indexWithCurrentTerm.apply(index));
|
||||
}
|
||||
assertThat(result.isCreated(), equalTo(docDeleted));
|
||||
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
|
||||
|
@ -1894,7 +1904,7 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
} else if (versionedOp) {
|
||||
result = engine.delete(delWithVersion.apply(correctVersion, delete));
|
||||
} else {
|
||||
result = engine.delete(delete);
|
||||
result = engine.delete(deleteWithCurrentTerm.apply(delete));
|
||||
}
|
||||
assertThat(result.isFound(), equalTo(docDeleted == false));
|
||||
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
|
||||
|
@ -1902,8 +1912,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
assertThat(result.getFailure(), nullValue());
|
||||
docDeleted = true;
|
||||
lastOpVersion = result.getVersion();
|
||||
lastOpSeqNo = UNASSIGNED_SEQ_NO;
|
||||
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
|
||||
lastOpSeqNo = result.getSeqNo();
|
||||
lastOpTerm = result.getTerm();
|
||||
opsPerformed++;
|
||||
}
|
||||
}
|
||||
|
@ -1931,6 +1941,8 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
engine.clearDeletedTombstones();
|
||||
if (docDeleted) {
|
||||
lastOpVersion = Versions.NOT_FOUND;
|
||||
lastOpSeqNo = UNASSIGNED_SEQ_NO;
|
||||
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
protected Path primaryTranslogDir;
|
||||
protected Path replicaTranslogDir;
|
||||
// A default primary term is used by engine instances created in this test.
|
||||
protected AtomicLong primaryTerm = new AtomicLong();
|
||||
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
|
||||
|
||||
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
|
||||
assertVisibleCount(engine, numDocs, true);
|
||||
|
@ -682,7 +682,7 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
breakerService,
|
||||
globalCheckpointSupplier,
|
||||
retentionLeasesSupplier,
|
||||
primaryTerm::get,
|
||||
primaryTerm,
|
||||
tombstoneDocSupplier());
|
||||
}
|
||||
|
||||
|
@ -1081,4 +1081,25 @@ public abstract class EngineTestCase extends ESTestCase {
|
|||
InternalEngine internalEngine = (InternalEngine) engine;
|
||||
return internalEngine.getTranslog();
|
||||
}
|
||||
|
||||
public static final class PrimaryTermSupplier implements LongSupplier {
|
||||
private final AtomicLong term;
|
||||
|
||||
PrimaryTermSupplier(long initialTerm) {
|
||||
this.term = new AtomicLong(initialTerm);
|
||||
}
|
||||
|
||||
public long get() {
|
||||
return term.get();
|
||||
}
|
||||
|
||||
public void set(long newTerm) {
|
||||
this.term.set(newTerm);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAsLong() {
|
||||
return get();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue