Add reopen method in PerThreadPKLookup (#13596)

Co-authored-by: Adrien Grand <jpountz@gmail.com>
This commit is contained in:
zhouhui 2024-10-10 16:30:24 +08:00 committed by Adrien Grand
parent 250bb03aac
commit 9a3467d352
3 changed files with 152 additions and 16 deletions

View File

@ -343,7 +343,7 @@ public class TestIDVersionPostingsFormat extends LuceneTestCase {
/** Returns docID if found, else -1. */ /** Returns docID if found, else -1. */
public int lookup(BytesRef id, long version) throws IOException { public int lookup(BytesRef id, long version) throws IOException {
for (int seg = 0; seg < numSegs; seg++) { for (int seg = 0; seg < numEnums; seg++) {
if (((IDVersionSegmentTermsEnum) termsEnums[seg]).seekExact(id, version)) { if (((IDVersionSegmentTermsEnum) termsEnums[seg]).seekExact(id, version)) {
if (VERBOSE) { if (VERBOSE) {
System.out.println(" found in seg=" + termsEnums[seg]); System.out.println(" found in seg=" + termsEnums[seg]);

View File

@ -18,8 +18,13 @@ package org.apache.lucene.tests.index;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexReader.CacheHelper;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum; import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Terms; import org.apache.lucene.index.Terms;
@ -35,17 +40,29 @@ import org.apache.lucene.util.BytesRef;
*/ */
public class PerThreadPKLookup { public class PerThreadPKLookup {
private final String idFieldName;
protected final TermsEnum[] termsEnums; protected final TermsEnum[] termsEnums;
protected final PostingsEnum[] postingsEnums; protected final PostingsEnum[] postingsEnums;
protected final Bits[] liveDocs; protected final Bits[] liveDocs;
protected final int[] docBases; protected final int[] docBases;
protected final int numSegs; protected final int numEnums;
protected final boolean hasDeletions; protected final boolean hasDeletions;
private final Map<IndexReader.CacheKey, Integer> enumIndexes;
public PerThreadPKLookup(IndexReader r, String idFieldName) throws IOException { public PerThreadPKLookup(IndexReader reader, String idFieldName) throws IOException {
this(reader, idFieldName, Collections.emptyMap(), null, null);
}
List<LeafReaderContext> leaves = new ArrayList<>(r.leaves()); private PerThreadPKLookup(
IndexReader reader,
String idFieldName,
Map<IndexReader.CacheKey, Integer> prevEnumIndexes,
TermsEnum[] reusableTermsEnums,
PostingsEnum[] reusablePostingsEnums)
throws IOException {
this.idFieldName = idFieldName;
List<LeafReaderContext> leaves = new ArrayList<>(reader.leaves());
// Larger segments are more likely to have the id, so we sort largest to smallest by numDocs: // Larger segments are more likely to have the id, so we sort largest to smallest by numDocs:
leaves.sort((c1, c2) -> c2.reader().numDocs() - c1.reader().numDocs()); leaves.sort((c1, c2) -> c2.reader().numDocs() - c1.reader().numDocs());
@ -53,26 +70,50 @@ public class PerThreadPKLookup {
postingsEnums = new PostingsEnum[leaves.size()]; postingsEnums = new PostingsEnum[leaves.size()];
liveDocs = new Bits[leaves.size()]; liveDocs = new Bits[leaves.size()];
docBases = new int[leaves.size()]; docBases = new int[leaves.size()];
int numSegs = 0; enumIndexes = new HashMap<>();
int numEnums = 0;
boolean hasDeletions = false; boolean hasDeletions = false;
for (int i = 0; i < leaves.size(); i++) { for (int i = 0; i < leaves.size(); i++) {
Terms terms = leaves.get(i).reader().terms(idFieldName); LeafReaderContext context = leaves.get(i);
LeafReader leafReader = context.reader();
CacheHelper cacheHelper = leafReader.getCoreCacheHelper();
IndexReader.CacheKey cacheKey = cacheHelper == null ? null : cacheHelper.getKey();
if (cacheKey != null && prevEnumIndexes.containsKey(cacheKey)) {
// Reuse termsEnum, postingsEnum.
int seg = prevEnumIndexes.get(cacheKey);
termsEnums[numEnums] = reusableTermsEnums[seg];
postingsEnums[numEnums] = reusablePostingsEnums[seg];
} else {
// New or empty segment.
Terms terms = leafReader.terms(idFieldName);
if (terms != null) { if (terms != null) {
termsEnums[numSegs] = terms.iterator(); termsEnums[numEnums] = terms.iterator();
assert termsEnums[numSegs] != null; assert termsEnums[numEnums] != null;
docBases[numSegs] = leaves.get(i).docBase;
liveDocs[numSegs] = leaves.get(i).reader().getLiveDocs();
hasDeletions |= leaves.get(i).reader().hasDeletions();
numSegs++;
} }
} }
this.numSegs = numSegs;
if (termsEnums[numEnums] != null) {
if (cacheKey != null) {
enumIndexes.put(cacheKey, numEnums);
}
docBases[numEnums] = context.docBase;
liveDocs[numEnums] = leafReader.getLiveDocs();
hasDeletions |= leafReader.hasDeletions();
numEnums++;
}
}
this.numEnums = numEnums;
this.hasDeletions = hasDeletions; this.hasDeletions = hasDeletions;
} }
/** Returns docID if found, else -1. */ /** Returns docID if found, else -1. */
public int lookup(BytesRef id) throws IOException { public int lookup(BytesRef id) throws IOException {
for (int seg = 0; seg < numSegs; seg++) { for (int seg = 0; seg < numEnums; seg++) {
if (termsEnums[seg].seekExact(id)) { if (termsEnums[seg].seekExact(id)) {
postingsEnums[seg] = termsEnums[seg].postings(postingsEnums[seg], 0); postingsEnums[seg] = termsEnums[seg].postings(postingsEnums[seg], 0);
int docID = -1; int docID = -1;
@ -88,5 +129,12 @@ public class PerThreadPKLookup {
return -1; return -1;
} }
// TODO: add reopen method to carry over re-used enums...? /** Reuse previous PerThreadPKLookup's termsEnum and postingsEnum. */
public PerThreadPKLookup reopen(IndexReader reader) throws IOException {
if (reader == null) {
return null;
}
return new PerThreadPKLookup(
reader, this.idFieldName, this.enumIndexes, this.termsEnums, this.postingsEnums);
}
} }

View File

@ -31,6 +31,94 @@ import org.apache.lucene.tests.util.LuceneTestCase;
public class TestPerThreadPKLookup extends LuceneTestCase { public class TestPerThreadPKLookup extends LuceneTestCase {
public void testReopen() throws Exception {
Directory dir = newDirectory();
IndexWriter writer =
new IndexWriter(
dir,
new IndexWriterConfig(new MockAnalyzer(random()))
.setMergePolicy(NoMergePolicy.INSTANCE));
Document doc;
doc = new Document();
doc.add(new KeywordField("PK", "1", Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new KeywordField("PK", "2", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
// Terms in PK is null.
doc = new Document();
doc.add(new KeywordField("PK2", "3", Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new KeywordField("PK2", "4", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
DirectoryReader reader1 = DirectoryReader.open(writer);
PerThreadPKLookup pkLookup1 = new PerThreadPKLookup(reader1, "PK");
doc = new Document();
doc.add(new KeywordField("PK", "5", Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new KeywordField("PK", "6", Field.Store.NO));
writer.addDocument(doc);
// Update liveDocs.
writer.deleteDocuments(new Term("PK", "1"));
writer.flush();
// Terms in PK is null.
doc = new Document();
doc.add(new KeywordField("PK2", "7", Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new KeywordField("PK2", "8", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
assertEquals(0, pkLookup1.lookup(newBytesRef("1")));
assertEquals(1, pkLookup1.lookup(newBytesRef("2")));
assertEquals(-1, pkLookup1.lookup(newBytesRef("5")));
assertEquals(-1, pkLookup1.lookup(newBytesRef("8")));
DirectoryReader reader2 = DirectoryReader.openIfChanged(reader1);
PerThreadPKLookup pkLookup2 = pkLookup1.reopen(reader2);
assertEquals(-1, pkLookup2.lookup(newBytesRef("1")));
assertEquals(1, pkLookup2.lookup(newBytesRef("2")));
assertEquals(4, pkLookup2.lookup(newBytesRef("5")));
assertEquals(-1, pkLookup2.lookup(newBytesRef("8")));
doc = new Document();
doc.add(new KeywordField("PK", "9", Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new KeywordField("PK", "10", Field.Store.NO));
writer.addDocument(doc);
writer.flush();
assertEquals(-1, pkLookup2.lookup(newBytesRef("9")));
DirectoryReader reader3 = DirectoryReader.openIfChanged(reader2);
PerThreadPKLookup pkLookup3 = pkLookup2.reopen(reader3);
assertEquals(8, pkLookup3.lookup(newBytesRef("9")));
DirectoryReader reader4 = DirectoryReader.openIfChanged(reader3);
assertNull(pkLookup3.reopen(reader4));
writer.close();
reader1.close();
reader2.close();
reader3.close();
dir.close();
}
public void testPKLookupWithUpdate() throws Exception { public void testPKLookupWithUpdate() throws Exception {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriter writer = IndexWriter writer =