Do not use soft-deletes to resolve indexing strategy (#43336)
This PR reverts #35230. Previously, we reply on soft-deletes to fill the mismatch between the version map and the Lucene index. This is no longer needed after #43202 where we rebuild the version map when opening an engine. Moreover, PrunePostingsMergePolicy can prune _id of soft-deleted documents out of order; thus the lookup result including soft-deletes sometimes does not return the latest version (although it's okay as we only use a valid result in an engine). With this change, we use only live documents in Lucene to resolve the indexing strategy. This is perfectly safe since we keep all deleted documents after the local checkpoint in the version map. Closes #42979
This commit is contained in:
parent
a4c45b5d70
commit
b5c8b32cab
|
@ -102,38 +102,20 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
throws IOException {
|
||||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
|
||||
"context's reader is not the same as the reader class was initialized on.";
|
||||
int docID = getDocID(id, context.reader().getLiveDocs());
|
||||
int docID = getDocID(id, context);
|
||||
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
|
||||
if (versions == null) {
|
||||
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
|
||||
}
|
||||
if (versions.advanceExact(docID) == false) {
|
||||
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
|
||||
}
|
||||
final long seqNo;
|
||||
final long term;
|
||||
if (loadSeqNo) {
|
||||
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
// remove the null check in 7.0 once we can't read indices with no seq#
|
||||
if (seqNos != null && seqNos.advanceExact(docID)) {
|
||||
seqNo = seqNos.longValue();
|
||||
} else {
|
||||
seqNo = UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
|
||||
if (terms != null && terms.advanceExact(docID)) {
|
||||
term = terms.longValue();
|
||||
} else {
|
||||
term = UNASSIGNED_PRIMARY_TERM;
|
||||
}
|
||||
|
||||
seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
|
||||
term = readNumericDocValues(context.reader(), SeqNoFieldMapper.PRIMARY_TERM_NAME, docID);
|
||||
} else {
|
||||
seqNo = UNASSIGNED_SEQ_NO;
|
||||
term = UNASSIGNED_PRIMARY_TERM;
|
||||
}
|
||||
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
|
||||
final long version = readNumericDocValues(context.reader(), VersionFieldMapper.NAME, docID);
|
||||
return new DocIdAndVersion(docID, version, seqNo, term, context.reader(), context.docBase);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
@ -143,9 +125,10 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
* returns the internal lucene doc id for the given id bytes.
|
||||
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
|
||||
* */
|
||||
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
|
||||
private int getDocID(BytesRef id, LeafReaderContext context) throws IOException {
|
||||
// termsEnum can possibly be null here if this leaf contains only no-ops.
|
||||
if (termsEnum != null && termsEnum.seekExact(id)) {
|
||||
final Bits liveDocs = context.reader().getLiveDocs();
|
||||
int docID = DocIdSetIterator.NO_MORE_DOCS;
|
||||
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
|
@ -161,41 +144,23 @@ final class PerThreadIDVersionAndSeqNoLookup {
|
|||
}
|
||||
}
|
||||
|
||||
private static long readNumericDocValues(LeafReader reader, String field, int docId) throws IOException {
|
||||
final NumericDocValues dv = reader.getNumericDocValues(field);
|
||||
if (dv == null || dv.advanceExact(docId) == false) {
|
||||
assert false : "document [" + docId + "] does not have docValues for [" + field + "]";
|
||||
throw new IllegalStateException("document [" + docId + "] does not have docValues for [" + field + "]");
|
||||
}
|
||||
return dv.longValue();
|
||||
}
|
||||
|
||||
/** Return null if id is not found. */
|
||||
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
|
||||
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
|
||||
"context's reader is not the same as the reader class was initialized on.";
|
||||
// termsEnum can possibly be null here if this leaf contains only no-ops.
|
||||
if (termsEnum != null && termsEnum.seekExact(id)) {
|
||||
docsEnum = termsEnum.postings(docsEnum, 0);
|
||||
final Bits liveDocs = context.reader().getLiveDocs();
|
||||
DocIdAndSeqNo result = null;
|
||||
int docID = docsEnum.nextDoc();
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
|
||||
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
|
||||
final long seqNo;
|
||||
// remove the null check in 7.0 once we can't read indices with no seq#
|
||||
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
|
||||
seqNo = seqNoDV.longValue();
|
||||
} else {
|
||||
seqNo = UNASSIGNED_SEQ_NO;
|
||||
}
|
||||
final boolean isLive = (liveDocs == null || liveDocs.get(docID));
|
||||
if (isLive) {
|
||||
// The live document must always be the latest copy, thus we can early terminate here.
|
||||
// If a nested docs is live, we return the first doc which doesn't have term (only the last doc has term).
|
||||
// This should not be an issue since we no longer use primary term as tier breaker when comparing operations.
|
||||
assert result == null || result.seqNo <= seqNo :
|
||||
"the live doc does not have the highest seq_no; live_seq_no=" + seqNo + " < deleted_seq_no=" + result.seqNo;
|
||||
return new DocIdAndSeqNo(docID, seqNo, context, isLive);
|
||||
}
|
||||
if (result == null || result.seqNo < seqNo) {
|
||||
result = new DocIdAndSeqNo(docID, seqNo, context, isLive);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
final int docID = getDocID(id, context);
|
||||
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
final long seqNo = readNumericDocValues(context.reader(), SeqNoFieldMapper.NAME, docID);
|
||||
return new DocIdAndSeqNo(docID, seqNo, context);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -114,13 +114,11 @@ public final class VersionsAndSeqNoResolver {
|
|||
public final int docId;
|
||||
public final long seqNo;
|
||||
public final LeafReaderContext context;
|
||||
public final boolean isLive;
|
||||
|
||||
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context, boolean isLive) {
|
||||
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
|
||||
this.docId = docId;
|
||||
this.seqNo = seqNo;
|
||||
this.context = context;
|
||||
this.isLive = isLive;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,32 +147,21 @@ public final class VersionsAndSeqNoResolver {
|
|||
|
||||
/**
|
||||
* Loads the internal docId and sequence number of the latest copy for a given uid from the provided reader.
|
||||
* The flag {@link DocIdAndSeqNo#isLive} indicates whether the returned document is live or (soft)deleted.
|
||||
* This returns {@code null} if no such document matching the given term uid.
|
||||
* The result is either null or the live and latest version of the given uid.
|
||||
*/
|
||||
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
|
||||
final PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
|
||||
final List<LeafReaderContext> leaves = reader.leaves();
|
||||
DocIdAndSeqNo latest = null;
|
||||
// iterate backwards to optimize for the frequently updated documents
|
||||
// which are likely to be in the last segments
|
||||
for (int i = leaves.size() - 1; i >= 0; i--) {
|
||||
final LeafReaderContext leaf = leaves.get(i);
|
||||
final PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
|
||||
final DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
|
||||
if (result == null) {
|
||||
continue;
|
||||
}
|
||||
if (result.isLive) {
|
||||
// The live document must always be the latest copy, thus we can early terminate here.
|
||||
assert latest == null || latest.seqNo <= result.seqNo :
|
||||
"the live doc does not have the highest seq_no; live_seq_no=" + result.seqNo + " < deleted_seq_no=" + latest.seqNo;
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
if (latest == null || latest.seqNo < result.seqNo) {
|
||||
latest = result;
|
||||
}
|
||||
}
|
||||
return latest;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -709,11 +709,7 @@ public class InternalEngine extends Engine {
|
|||
if (docAndSeqNo == null) {
|
||||
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
|
||||
} else if (op.seqNo() > docAndSeqNo.seqNo) {
|
||||
if (docAndSeqNo.isLive) {
|
||||
status = OpVsLuceneDocStatus.OP_NEWER;
|
||||
} else {
|
||||
status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;
|
||||
}
|
||||
status = OpVsLuceneDocStatus.OP_NEWER;
|
||||
} else if (op.seqNo() == docAndSeqNo.seqNo) {
|
||||
assert localCheckpointTracker.contains(op.seqNo()) || softDeleteEnabled == false :
|
||||
"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.lucene.util.BytesRef;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
|
@ -52,6 +53,8 @@ public class VersionLookupTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.addDocument(doc);
|
||||
writer.addDocument(new Document());
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
|
@ -86,6 +89,8 @@ public class VersionLookupTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.addDocument(doc);
|
||||
writer.addDocument(doc);
|
||||
writer.addDocument(new Document());
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.VersionFieldMapper;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
@ -69,6 +70,8 @@ public class VersionsTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 1));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(1L));
|
||||
|
@ -78,6 +81,8 @@ public class VersionsTests extends ESTestCase {
|
|||
Field version = new NumericDocValuesField(VersionFieldMapper.NAME, 2);
|
||||
doc.add(uid);
|
||||
doc.add(version);
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
directoryReader = reopen(directoryReader);
|
||||
assertThat(loadDocIdAndVersion(directoryReader, new Term(IdFieldMapper.NAME, "1"), randomBoolean()).version, equalTo(2L));
|
||||
|
@ -87,6 +92,8 @@ public class VersionsTests extends ESTestCase {
|
|||
version.setLongValue(3);
|
||||
doc.add(uid);
|
||||
doc.add(version);
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.updateDocument(new Term(IdFieldMapper.NAME, "1"), doc);
|
||||
|
||||
directoryReader = reopen(directoryReader);
|
||||
|
@ -116,6 +123,8 @@ public class VersionsTests extends ESTestCase {
|
|||
doc.add(new Field(IdFieldMapper.NAME, "1", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
NumericDocValuesField version = new NumericDocValuesField(VersionFieldMapper.NAME, 5L);
|
||||
doc.add(version);
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
docs.add(doc);
|
||||
|
||||
writer.updateDocuments(new Term(IdFieldMapper.NAME, "1"), docs);
|
||||
|
@ -146,6 +155,8 @@ public class VersionsTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
// should increase cache size by 1
|
||||
|
@ -171,6 +182,8 @@ public class VersionsTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new Field(IdFieldMapper.NAME, "6", IdFieldMapper.Defaults.FIELD_TYPE));
|
||||
doc.add(new NumericDocValuesField(VersionFieldMapper.NAME, 87));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.NAME, randomNonNegativeLong()));
|
||||
doc.add(new NumericDocValuesField(SeqNoFieldMapper.PRIMARY_TERM_NAME, randomLongBetween(1, Long.MAX_VALUE)));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader reader = DirectoryReader.open(writer);
|
||||
assertEquals(87, loadDocIdAndVersion(reader, new Term(IdFieldMapper.NAME, "6"), randomBoolean()).version);
|
||||
|
|
|
@ -4026,7 +4026,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
searchResult.close();
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42979")
|
||||
public void testLookupSeqNoByIdInLucene() throws Exception {
|
||||
int numOps = between(10, 100);
|
||||
long seqNo = 0;
|
||||
|
@ -4061,20 +4060,23 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null))) {
|
||||
CheckedRunnable<IOException> lookupAndCheck = () -> {
|
||||
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
|
||||
for (String id : latestOps.keySet()) {
|
||||
String msg = "latestOps=" + latestOps + " op=" + id;
|
||||
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
|
||||
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
|
||||
assertThat(msg, docIdAndSeqNo.isLive,
|
||||
equalTo(latestOps.get(id).operationType() == Engine.Operation.TYPE.INDEX));
|
||||
}
|
||||
assertThat(VersionsAndSeqNoResolver.loadDocIdAndVersion(
|
||||
searcher.reader(), newUid("any-" + between(1, 10)), randomBoolean()), nullValue());
|
||||
Map<String, Long> liveOps = latestOps.entrySet().stream()
|
||||
.filter(e -> e.getValue().operationType() == Engine.Operation.TYPE.INDEX)
|
||||
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().seqNo()));
|
||||
assertThat(getDocIds(engine, true).stream().collect(Collectors.toMap(e -> e.getId(), e -> e.getSeqNo())),
|
||||
equalTo(liveOps));
|
||||
for (String id : latestOps.keySet()) {
|
||||
String msg = "latestOps=" + latestOps + " op=" + id;
|
||||
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(id));
|
||||
if (liveOps.containsKey(id) == false) {
|
||||
assertNull(msg, docIdAndSeqNo);
|
||||
} else {
|
||||
assertNotNull(msg, docIdAndSeqNo);
|
||||
assertThat(msg, docIdAndSeqNo.seqNo, equalTo(latestOps.get(id).seqNo()));
|
||||
}
|
||||
}
|
||||
String notFoundId = randomValueOtherThanMany(liveOps::containsKey, () -> Long.toString(randomNonNegativeLong()));
|
||||
assertNull(VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.reader(), newUid(notFoundId)));
|
||||
}
|
||||
};
|
||||
for (Engine.Operation op : operations) {
|
||||
|
|
Loading…
Reference in New Issue