diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2b5f29d192a..d1e3e99f6e1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -68,6 +68,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -1075,42 +1076,47 @@ public abstract class EngineTestCase extends ESTestCase { || (engine instanceof InternalEngine) == false) { return; } - final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo(); - final List translogOps = new ArrayList<>(); - try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) { - Translog.Operation op; - while ((op = snapshot.next()) != null) { - assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); - translogOps.add(op); - } - } - final Map luceneOps = readAllOperationsInLucene(engine, mapper).stream() - .collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); - for (Translog.Operation op : luceneOps.values()) { - assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); - } - final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint(); - final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); - final long seqNoForRecovery; - try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { - seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; - } - final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps); - for (Translog.Operation translogOp : translogOps) { - final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo()); - if (luceneOp == null) { - if (minSeqNoToRetain <= translogOp.seqNo()) { - fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " + - "retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]"); - } else { - continue; + // hold writeLock during this check so indexing can't happen + try (ReleasableLock ignored = engine.writeLock.acquire()) { + final List translogOps = new ArrayList<>(); + try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + translogOps.add(op); } } - assertThat(luceneOp, notNullValue()); - assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); - assertThat(luceneOp.opType(), equalTo(translogOp.opType())); - if (luceneOp.opType() == Translog.Operation.Type.INDEX) { - assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); + final Map luceneOps = readAllOperationsInLucene(engine, mapper).stream() + .collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); + final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo(); + for (Translog.Operation op : translogOps) { + assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); + } + for (Translog.Operation op : luceneOps.values()) { + assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); + } + final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint(); + final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations(); + final long seqNoForRecovery; + try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) { + seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1; + } + final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps); + for (Translog.Operation translogOp : translogOps) { + final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo()); + if (luceneOp == null) { + if (minSeqNoToRetain <= translogOp.seqNo()) { + fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " + + "retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]"); + } else { + continue; + } + } + assertThat(luceneOp, notNullValue()); + assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); + assertThat(luceneOp.opType(), equalTo(translogOp.opType())); + if (luceneOp.opType() == Translog.Operation.Type.INDEX) { + assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); + } } } }