LUCENE-9935: Enable bulk merge for stored fields with index sort (#134)

This commit enables bulk-merges (i.e., raw chunk copying) for stored 
fields when index sort is enabled
This commit is contained in:
Nhat Nguyen 2021-05-12 21:00:18 -04:00 committed by GitHub
parent ad43841daf
commit 9a17d67658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 336 additions and 178 deletions

View File

@ -169,6 +169,12 @@ Example:
gradlew -p lucene/core coverage
open lucene/core/build/reports/jacoco/test/html/index.html
If you want to use test filtering to just check a particular test, specify
the "test" task explicitly before "coverage":
gradlew -p lucene/core test --tests TestDemo coverage
open lucene/core/build/reports/jacoco/test/html/index.html
External data sets
------------------

View File

@ -228,6 +228,8 @@ Improvements
* LUCENE-9929: Add NorwegianNormalizationFilter, which does the same as ScandinavianNormalizationFilter except
it does not fold oo->ø and ao->å. (janhoy, Robert Muir, Adrien Grand)
* LUCENE-9935: Enable bulk merge for stored fields with index sort. (Robert Muir, Adrien Grand, Nhat Nguyen)
Bug fixes

View File

@ -438,7 +438,7 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
private void doReset(int docID) throws IOException {
docBase = fieldsStream.readVInt();
final int token = fieldsStream.readVInt();
chunkDocs = token >>> 1;
chunkDocs = token >>> 2;
if (contains(docID) == false || docBase + chunkDocs > numDocs) {
throw new CorruptIndexException(
"Corrupted: docID="
@ -610,6 +610,18 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
return state.document(docID);
}
/** Checks if a given docID was loaded in the current block state. */
boolean isLoaded(int docID) {
if (merging == false) {
throw new IllegalStateException("isLoaded should only ever get called on a merge instance");
}
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"isLoaded should only ever get called when the reader is on the current version");
}
return state.contains(docID);
}
@Override
public void visitDocument(int docID, StoredFieldVisitor visitor) throws IOException {

View File

@ -44,7 +44,6 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.packed.PackedInts;
@ -190,7 +189,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
endOffsets[numBufferedDocs] = Math.toIntExact(bufferedDocs.size());
++numBufferedDocs;
if (triggerFlush()) {
flush();
flush(false);
}
}
@ -203,13 +202,18 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
}
private void writeHeader(
int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths, boolean sliced)
int docBase,
int numBufferedDocs,
int[] numStoredFields,
int[] lengths,
boolean sliced,
boolean dirtyChunk)
throws IOException {
final int slicedBit = sliced ? 1 : 0;
final int dirtyBit = dirtyChunk ? 2 : 0;
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
fieldsStream.writeVInt((numBufferedDocs) << 1 | slicedBit);
fieldsStream.writeVInt((numBufferedDocs << 2) | dirtyBit | slicedBit);
// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
@ -224,8 +228,13 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
numBufferedDocs >= maxDocsPerChunk;
}
private void flush() throws IOException {
private void flush(boolean force) throws IOException {
assert triggerFlush() != force;
numChunks++;
if (force) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += numBufferedDocs;
}
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());
// transform end offsets into lengths
@ -235,7 +244,8 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
assert lengths[i] >= 0;
}
final boolean sliced = bufferedDocs.size() >= 2 * chunkSize;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);
final boolean dirtyChunk = force;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced, dirtyChunk);
// compress stored fields to fieldsStream.
//
@ -470,9 +480,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numBufferedDocs > 0) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += numBufferedDocs;
flush();
flush(true);
} else {
assert bufferedDocs.size() == 0;
}
@ -507,176 +515,143 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
BULK_MERGE_ENABLED = v;
}
@Override
public int merge(MergeState mergeState) throws IOException {
int docCount = 0;
int numReaders = mergeState.maxDocs.length;
MatchingReaders matching = new MatchingReaders(mergeState);
if (mergeState.needsIndexSort) {
/**
* If all readers are compressed and they have the same fieldinfos then we can merge the
* serialized document directly.
*/
List<CompressingStoredFieldsMergeSub> subs = new ArrayList<>();
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
if (matching.matchingReaders[i]
&& mergeState.storedFieldsReaders[i] instanceof Lucene90CompressingStoredFieldsReader) {
Lucene90CompressingStoredFieldsReader storedFieldsReader =
(Lucene90CompressingStoredFieldsReader) mergeState.storedFieldsReaders[i];
storedFieldsReader.checkIntegrity();
subs.add(
new CompressingStoredFieldsMergeSub(
storedFieldsReader, mergeState.docMaps[i], mergeState.maxDocs[i]));
} else {
return super.merge(mergeState);
}
}
final DocIDMerger<CompressingStoredFieldsMergeSub> docIDMerger = DocIDMerger.of(subs, true);
while (true) {
CompressingStoredFieldsMergeSub sub = docIDMerger.next();
if (sub == null) {
break;
}
assert sub.mappedDocID == docCount;
SerializedDocument doc = sub.reader.document(sub.docID);
private void copyOneDoc(Lucene90CompressingStoredFieldsReader reader, int docID)
throws IOException {
assert reader.getVersion() == VERSION_CURRENT;
SerializedDocument doc = reader.document(docID);
startDocument();
bufferedDocs.copyBytes(doc.in, doc.length);
numStoredFieldsInDoc = doc.numStoredFields;
finishDocument();
++docCount;
}
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) {
MergeVisitor visitor = new MergeVisitor(mergeState, readerIndex);
Lucene90CompressingStoredFieldsReader matchingFieldsReader = null;
if (matching.matchingReaders[readerIndex]) {
final StoredFieldsReader fieldsReader = mergeState.storedFieldsReaders[readerIndex];
// we can only bulk-copy if the matching reader is also a CompressingStoredFieldsReader
if (fieldsReader != null && fieldsReader instanceof Lucene90CompressingStoredFieldsReader) {
matchingFieldsReader = (Lucene90CompressingStoredFieldsReader) fieldsReader;
}
}
private void copyChunks(
final MergeState mergeState,
final CompressingStoredFieldsMergeSub sub,
final int fromDocID,
final int toDocID)
throws IOException {
final Lucene90CompressingStoredFieldsReader reader =
(Lucene90CompressingStoredFieldsReader) mergeState.storedFieldsReaders[sub.readerIndex];
assert reader.getVersion() == VERSION_CURRENT;
assert reader.getChunkSize() == chunkSize;
assert reader.getCompressionMode() == compressionMode;
assert !tooDirty(reader);
assert mergeState.liveDocs[sub.readerIndex] == null;
final int maxDoc = mergeState.maxDocs[readerIndex];
final Bits liveDocs = mergeState.liveDocs[readerIndex];
int docID = fromDocID;
final FieldsIndex index = reader.getIndexReader();
// if its some other format, or an older version of this format, or safety switch:
if (matchingFieldsReader == null
|| matchingFieldsReader.getVersion() != VERSION_CURRENT
|| BULK_MERGE_ENABLED == false) {
// naive merge...
StoredFieldsReader storedFieldsReader = mergeState.storedFieldsReaders[readerIndex];
if (storedFieldsReader != null) {
storedFieldsReader.checkIntegrity();
// copy docs that belong to the previous chunk
while (docID < toDocID && reader.isLoaded(docID)) {
copyOneDoc(reader, docID++);
}
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && liveDocs.get(docID) == false) {
continue;
if (docID >= toDocID) {
return;
}
startDocument();
storedFieldsReader.visitDocument(docID, visitor);
finishDocument();
++docCount;
}
} else if (matchingFieldsReader.getCompressionMode() == compressionMode
&& matchingFieldsReader.getChunkSize() == chunkSize
&& liveDocs == null
&& !tooDirty(matchingFieldsReader)) {
// optimized merge, raw byte copy
// its not worth fine-graining this if there are deletions.
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
// flush any pending chunks
// copy chunks
long fromPointer = index.getStartPointer(docID);
final long toPointer =
toDocID == sub.maxDoc ? reader.getMaxPointer() : index.getStartPointer(toDocID);
if (fromPointer < toPointer) {
if (numBufferedDocs > 0) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += numBufferedDocs;
flush();
flush(true);
}
// iterate over each chunk. we use the stored fields index to find chunk boundaries,
// read the docstart + doccount from the chunk header (we write a new header, since doc
// numbers will change),
// and just copy the bytes directly.
IndexInput rawDocs = matchingFieldsReader.getFieldsStream();
FieldsIndex index = matchingFieldsReader.getIndexReader();
rawDocs.seek(index.getStartPointer(0));
int docID = 0;
while (docID < maxDoc) {
// read header
int base = rawDocs.readVInt();
final IndexInput rawDocs = reader.getFieldsStream();
rawDocs.seek(fromPointer);
do {
final int base = rawDocs.readVInt();
final int code = rawDocs.readVInt();
final int bufferedDocs = code >>> 2;
if (base != docID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
int code = rawDocs.readVInt();
// write a new index entry and new header for this chunk.
int bufferedDocs = code >>> 1;
indexWriter.writeIndex(bufferedDocs, fieldsStream.getFilePointer());
fieldsStream.writeVInt(docBase); // rebase
fieldsStream.writeVInt(code);
docID += bufferedDocs;
docBase += bufferedDocs;
docCount += bufferedDocs;
if (docID > maxDoc) {
if (docID > toDocID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc,
"invalid state: base=" + base + ", count=" + bufferedDocs + ", toDocID=" + toDocID,
rawDocs);
}
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == maxDoc) {
end = matchingFieldsReader.getMaxPointer();
final long endChunkPointer;
if (docID == sub.maxDoc) {
endChunkPointer = reader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
endChunkPointer = index.getStartPointer(docID);
}
fieldsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
fieldsStream.copyBytes(rawDocs, endChunkPointer - rawDocs.getFilePointer());
++numChunks;
final boolean dirtyChunk = (code & 2) != 0;
if (dirtyChunk) {
assert bufferedDocs < maxDocsPerChunk;
++numDirtyChunks;
numDirtyDocs += bufferedDocs;
}
fromPointer = endChunkPointer;
} while (fromPointer < toPointer);
}
if (rawDocs.getFilePointer() != matchingFieldsReader.getMaxPointer()) {
throw new CorruptIndexException(
"invalid state: pos="
+ rawDocs.getFilePointer()
+ ", max="
+ matchingFieldsReader.getMaxPointer(),
rawDocs);
// copy leftover docs that don't form a complete chunk
assert reader.isLoaded(docID) == false;
while (docID < toDocID) {
copyOneDoc(reader, docID++);
}
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numChunks += matchingFieldsReader.getNumChunks();
numDirtyChunks += matchingFieldsReader.getNumDirtyChunks();
numDirtyDocs += matchingFieldsReader.getNumDirtyDocs();
} else {
// optimized merge, we copy serialized (but decompressed) bytes directly
// even on simple docs (1 stored field), it seems to help by about 20%
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && liveDocs.get(docID) == false) {
continue;
@Override
public int merge(MergeState mergeState) throws IOException {
final MatchingReaders matchingReaders = new MatchingReaders(mergeState);
final MergeVisitor[] visitors = new MergeVisitor[mergeState.storedFieldsReaders.length];
final List<CompressingStoredFieldsMergeSub> subs =
new ArrayList<>(mergeState.storedFieldsReaders.length);
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
final StoredFieldsReader reader = mergeState.storedFieldsReaders[i];
reader.checkIntegrity();
MergeStrategy mergeStrategy = getMergeStrategy(mergeState, matchingReaders, i);
if (mergeStrategy == MergeStrategy.VISITOR) {
visitors[i] = new MergeVisitor(mergeState, i);
}
SerializedDocument doc = matchingFieldsReader.document(docID);
subs.add(new CompressingStoredFieldsMergeSub(mergeState, mergeStrategy, i));
}
int docCount = 0;
final DocIDMerger<CompressingStoredFieldsMergeSub> docIDMerger =
DocIDMerger.of(subs, mergeState.needsIndexSort);
CompressingStoredFieldsMergeSub sub = docIDMerger.next();
while (sub != null) {
assert sub.mappedDocID == docCount : sub.mappedDocID + " != " + docCount;
final StoredFieldsReader reader = mergeState.storedFieldsReaders[sub.readerIndex];
if (sub.mergeStrategy == MergeStrategy.BULK) {
final int fromDocID = sub.docID;
int toDocID = fromDocID;
final CompressingStoredFieldsMergeSub current = sub;
while ((sub = docIDMerger.next()) == current) {
++toDocID;
assert sub.docID == toDocID;
}
++toDocID; // exclusive bound
copyChunks(mergeState, current, fromDocID, toDocID);
docCount += (toDocID - fromDocID);
} else if (sub.mergeStrategy == MergeStrategy.DOC) {
copyOneDoc((Lucene90CompressingStoredFieldsReader) reader, sub.docID);
++docCount;
sub = docIDMerger.next();
} else if (sub.mergeStrategy == MergeStrategy.VISITOR) {
assert visitors[sub.readerIndex] != null;
startDocument();
bufferedDocs.copyBytes(doc.in, doc.length);
numStoredFieldsInDoc = doc.numStoredFields;
reader.visitDocument(sub.docID, visitors[sub.readerIndex]);
finishDocument();
++docCount;
}
sub = docIDMerger.next();
} else {
throw new AssertionError("Unknown merge strategy [" + sub.mergeStrategy + "]");
}
}
finish(mergeState.mergeFieldInfos, docCount);
@ -698,16 +673,51 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
}
private enum MergeStrategy {
/** Copy chunk by chunk in a compressed format */
BULK,
/** Copy document by document in a decompressed format */
DOC,
/** Copy field by field of decompressed documents */
VISITOR
}
private MergeStrategy getMergeStrategy(
MergeState mergeState, MatchingReaders matchingReaders, int readerIndex) {
final StoredFieldsReader candidate = mergeState.storedFieldsReaders[readerIndex];
if (matchingReaders.matchingReaders[readerIndex] == false
|| candidate instanceof Lucene90CompressingStoredFieldsReader == false
|| ((Lucene90CompressingStoredFieldsReader) candidate).getVersion() != VERSION_CURRENT) {
return MergeStrategy.VISITOR;
}
Lucene90CompressingStoredFieldsReader reader =
(Lucene90CompressingStoredFieldsReader) candidate;
if (BULK_MERGE_ENABLED
&& reader.getCompressionMode() == compressionMode
&& reader.getChunkSize() == chunkSize
// its not worth fine-graining this if there are deletions.
&& mergeState.liveDocs[readerIndex] == null
&& !tooDirty(reader)) {
return MergeStrategy.BULK;
} else {
return MergeStrategy.DOC;
}
}
private static class CompressingStoredFieldsMergeSub extends DocIDMerger.Sub {
private final Lucene90CompressingStoredFieldsReader reader;
private final int readerIndex;
private final int maxDoc;
private final MergeStrategy mergeStrategy;
int docID = -1;
CompressingStoredFieldsMergeSub(
Lucene90CompressingStoredFieldsReader reader, MergeState.DocMap docMap, int maxDoc) {
super(docMap);
this.maxDoc = maxDoc;
this.reader = reader;
MergeState mergeState, MergeStrategy mergeStrategy, int readerIndex) {
super(mergeState.docMaps[readerIndex]);
this.readerIndex = readerIndex;
this.mergeStrategy = mergeStrategy;
this.maxDoc = mergeState.maxDocs[readerIndex];
}
@Override

View File

@ -20,6 +20,7 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
@ -30,6 +31,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.StoredFieldsFormat;
@ -45,6 +48,8 @@ import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
@ -839,4 +844,127 @@ public abstract class BaseStoredFieldsFormatTestCase extends BaseIndexFileFormat
IOUtils.close(iw, ir, everything);
IOUtils.close(dirs);
}
public void testRandomStoredFieldsWithIndexSort() throws Exception {
final SortField[] sortFields;
if (random().nextBoolean()) {
sortFields =
new SortField[] {
new SortField("sort-1", SortField.Type.LONG),
new SortField("sort-2", SortField.Type.INT)
};
} else {
sortFields = new SortField[] {new SortField("sort-1", SortField.Type.LONG)};
}
List<String> storedFields = new ArrayList<>();
int numFields = TestUtil.nextInt(random(), 1, 10);
for (int i = 0; i < numFields; i++) {
storedFields.add("f-" + i);
}
FieldType storeType = new FieldType(TextField.TYPE_STORED);
storeType.setStored(true);
Function<String, Document> documentFactory =
id -> {
Document doc = new Document();
doc.add(new StringField("id", id, random().nextBoolean() ? Store.YES : Store.NO));
if (random().nextInt(100) <= 5) {
Collections.shuffle(storedFields, random());
}
for (String fieldName : storedFields) {
if (random().nextBoolean()) {
String s = TestUtil.randomUnicodeString(random(), 100);
doc.add(newField(fieldName, s, storeType));
}
}
for (SortField sortField : sortFields) {
doc.add(
new NumericDocValuesField(
sortField.getField(), TestUtil.nextInt(random(), 0, 10000)));
}
return doc;
};
Map<String, Document> docs = new HashMap<>();
int numDocs = atLeast(100);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
docs.put(id, documentFactory.apply(id));
}
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setMaxBufferedDocs(TestUtil.nextInt(random(), 5, 20));
iwc.setIndexSort(new Sort(sortFields));
RandomIndexWriter iw = new RandomIndexWriter(random(), dir, iwc);
List<String> addedIds = new ArrayList<>();
Runnable verifyStoreFields =
() -> {
try (DirectoryReader reader = maybeWrapWithMergingReader(iw.getReader())) {
IndexSearcher searcher = new IndexSearcher(reader);
int iters = TestUtil.nextInt(random(), 1, 10);
for (int i = 0; i < iters; i++) {
String testID = addedIds.get(random().nextInt(addedIds.size()));
if (VERBOSE) {
System.out.println("TEST: test id=" + testID);
}
TopDocs hits = searcher.search(new TermQuery(new Term("id", testID)), 1);
assertEquals(1, hits.totalHits.value);
List<IndexableField> expectedFields =
docs.get(testID).getFields().stream()
.filter(f -> f.fieldType().stored())
.collect(Collectors.toList());
Document actualDoc = reader.document(hits.scoreDocs[0].doc);
assertEquals(expectedFields.size(), actualDoc.getFields().size());
for (IndexableField expectedField : expectedFields) {
IndexableField[] actualFields = actualDoc.getFields(expectedField.name());
assertEquals(1, actualFields.length);
assertEquals(expectedField.stringValue(), actualFields[0].stringValue());
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
final List<String> ids = new ArrayList<>(docs.keySet());
Collections.shuffle(ids, random());
for (String id : ids) {
if (random().nextInt(100) < 5) {
// add via foreign reader
IndexWriterConfig otherIwc = newIndexWriterConfig();
otherIwc.setIndexSort(new Sort(sortFields));
try (Directory otherDir = newDirectory();
RandomIndexWriter otherIw = new RandomIndexWriter(random(), otherDir, otherIwc)) {
otherIw.addDocument(docs.get(id));
try (DirectoryReader otherReader = otherIw.getReader()) {
TestUtil.addIndexesSlowly(iw.w, otherReader);
}
}
} else {
// add normally
iw.addDocument(docs.get(id));
}
addedIds.add(id);
if (random().nextInt(100) < 5) {
String deletingId = addedIds.remove(random().nextInt(addedIds.size()));
if (random().nextBoolean()) {
iw.deleteDocuments(new TermQuery(new Term("id", deletingId)));
addedIds.remove(deletingId);
} else {
final Document newDoc = documentFactory.apply(deletingId);
docs.put(deletingId, newDoc);
iw.updateDocument(new Term("id", deletingId), newDoc);
}
}
if (random().nextInt(100) < 5) {
verifyStoreFields.run();
}
if (random().nextInt(100) < 2) {
iw.forceMerge(TestUtil.nextInt(random(), 1, 3));
}
}
verifyStoreFields.run();
iw.forceMerge(TestUtil.nextInt(random(), 1, 3));
verifyStoreFields.run();
IOUtils.close(iw, dir);
}
}