Get max_seq_no after snapshot translog and Lucene (#45461)

We should capture max_seq_no after snapshotting translog and Lucene;
otherwise, that max_seq_no can be smaller some operation in translog or
Lucene. With this change, we also hold the Engine#writeLock during this
check so that no indexing can happen.

Closes #45454
This commit is contained in:
Nhat Nguyen 2019-08-13 12:44:36 -04:00
parent adf8e20021
commit 24514275c7

View File

@ -68,6 +68,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
@ -1075,17 +1076,21 @@ public abstract class EngineTestCase extends ESTestCase {
|| (engine instanceof InternalEngine) == false) { || (engine instanceof InternalEngine) == false) {
return; return;
} }
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo(); // hold writeLock during this check so indexing can't happen
try (ReleasableLock ignored = engine.writeLock.acquire()) {
final List<Translog.Operation> translogOps = new ArrayList<>(); final List<Translog.Operation> translogOps = new ArrayList<>();
try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) { try (Translog.Snapshot snapshot = EngineTestCase.getTranslog(engine).newSnapshot()) {
Translog.Operation op; Translog.Operation op;
while ((op = snapshot.next()) != null) { while ((op = snapshot.next()) != null) {
assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
translogOps.add(op); translogOps.add(op);
} }
} }
final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream() final Map<Long, Translog.Operation> luceneOps = readAllOperationsInLucene(engine, mapper).stream()
.collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity())); .collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
final long maxSeqNo = ((InternalEngine) engine).getLocalCheckpointTracker().getMaxSeqNo();
for (Translog.Operation op : translogOps) {
assertThat("translog operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
}
for (Translog.Operation op : luceneOps.values()) { for (Translog.Operation op : luceneOps.values()) {
assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo)); assertThat("lucene operation [" + op + "] > max_seq_no[" + maxSeqNo + "]", op.seqNo(), lessThanOrEqualTo(maxSeqNo));
} }
@ -1114,6 +1119,7 @@ public abstract class EngineTestCase extends ESTestCase {
} }
} }
} }
}
/** /**
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit. * Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.