Do not hold writeLock while verifying Lucene/translog
We should not hold Engine#writeLock while executing assertConsistentHistoryBetweenTranslogAndLuceneIndex for this check might acquire Engine#readLock. Relates #45461
This commit is contained in:
parent
24514275c7
commit
4fcf7bbd07
|
@ -68,7 +68,6 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.common.util.concurrent.ReleasableLock;
|
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
@ -1076,47 +1075,44 @@ public abstract class EngineTestCase extends ESTestCase {
|
||||||
|| (engine instanceof InternalEngine) == false) {
|
|| (engine instanceof InternalEngine) == false) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// hold writeLock during this check so indexing can't happen
|
final List<Translog.Operation> translogOps = new ArrayList<>();
|
||||||
try (ReleasableLock ignored = engine.writeLock.acquire()) {
|
try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) {
|
||||||
final List<Translog.Operation> translogOps = new ArrayList<>();
|
Translog.Operation op;
|
||||||
try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) {
|
while ((op = snapshot.next()) != null) {
|
||||||
Translog.Operation op;
|
translogOps.add(op);
|
||||||
while ((op = snapshot.next()) != null) {
|
}
|
||||||
translogOps.add(op);
|
}
|
||||||
|
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
|
||||||
|
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
|
||||||
|
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
|
||||||
|
for (Translog.Operation op : translogOps) {
|
||||||
|
assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
|
||||||
|
}
|
||||||
|
for (Translog.Operation op : luceneOps.values()) {
|
||||||
|
assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
|
||||||
|
}
|
||||||
|
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
|
||||||
|
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
|
||||||
|
final long seqNoForRecovery;
|
||||||
|
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
|
||||||
|
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
||||||
|
}
|
||||||
|
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
|
||||||
|
for (Translog.Operation translogOp : translogOps) {
|
||||||
|
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
|
||||||
|
if (luceneOp == null) {
|
||||||
|
if (minSeqNoToRetain <= translogOp.seqNo()) {
|
||||||
|
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
|
||||||
|
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
|
assertThat(luceneOp, notNullValue());
|
||||||
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
|
assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
|
||||||
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
|
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
|
||||||
for (Translog.Operation op : translogOps) {
|
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
|
||||||
assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
|
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
|
||||||
}
|
|
||||||
for (Translog.Operation op : luceneOps.values()) {
|
|
||||||
assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
|
|
||||||
}
|
|
||||||
final long globalCheckpoint = EngineTestCase.getTranslog(engine).getLastSyncedGlobalCheckpoint();
|
|
||||||
final long retainedOps = engine.config().getIndexSettings().getSoftDeleteRetentionOperations();
|
|
||||||
final long seqNoForRecovery;
|
|
||||||
try (Engine.IndexCommitRef safeCommit = engine.acquireSafeIndexCommit()) {
|
|
||||||
seqNoForRecovery = Long.parseLong(safeCommit.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1;
|
|
||||||
}
|
|
||||||
final long minSeqNoToRetain = Math.min(seqNoForRecovery, globalCheckpoint + 1 - retainedOps);
|
|
||||||
for (Translog.Operation translogOp : translogOps) {
|
|
||||||
final Translog.Operation luceneOp = luceneOps.get(translogOp.seqNo());
|
|
||||||
if (luceneOp == null) {
|
|
||||||
if (minSeqNoToRetain <= translogOp.seqNo()) {
|
|
||||||
fail("Operation not found seq# [" + translogOp.seqNo() + "], global checkpoint [" + globalCheckpoint + "], " +
|
|
||||||
"retention policy [" + retainedOps + "], maxSeqNo [" + maxSeqNo + "], translog op [" + translogOp + "]");
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertThat(luceneOp, notNullValue());
|
|
||||||
assertThat(luceneOp.toString(), luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm()));
|
|
||||||
assertThat(luceneOp.opType(), equalTo(translogOp.opType()));
|
|
||||||
if (luceneOp.opType() == Translog.Operation.Type.INDEX) {
|
|
||||||
assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue