LUCENE-9935: Enable bulk-merge for term vectors with index sort (#140)

This change enables bulk-merge for term vectors with index sort. The
algorithm used here is similar to the one that is used to merge stored
fields.

Relates #134
This commit is contained in:
Nhat Nguyen 2021-06-10 11:03:17 -04:00
parent 3bedc0871e
commit 50607e0fb9
3 changed files with 276 additions and 145 deletions

View File

@ -91,6 +91,7 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
private final long maxPointer; // end of the data section
private BlockState blockState = new BlockState(-1, -1, 0);
// used by clone
private Lucene90CompressingTermVectorsReader(Lucene90CompressingTermVectorsReader reader) {
@ -310,25 +311,46 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
return new ByteBuffersDataInput(Collections.singletonList(ByteBuffer.wrap(bytes)));
}
/** Checks if a given docID was loaded in the current block state. */
boolean isLoaded(int docID) {
return blockState.docBase <= docID && docID < blockState.docBase + blockState.chunkDocs;
}
private static class BlockState {
final long startPointer;
final int docBase;
final int chunkDocs;
BlockState(long startPointer, int docBase, int chunkDocs) {
this.startPointer = startPointer;
this.docBase = docBase;
this.chunkDocs = chunkDocs;
}
}
@Override
public Fields get(int doc) throws IOException {
ensureOpen();
// seek to the right place
{
final long startPointer = indexReader.getStartPointer(doc);
vectorsStream.seek(startPointer);
final long startPointer;
if (isLoaded(doc)) {
startPointer = blockState.startPointer; // avoid searching the start pointer
} else {
startPointer = indexReader.getStartPointer(doc);
}
vectorsStream.seek(startPointer);
// decode
// - docBase: first doc ID of the chunk
// - chunkDocs: number of docs of the chunk
final int docBase = vectorsStream.readVInt();
final int chunkDocs = vectorsStream.readVInt();
final int chunkDocs = vectorsStream.readVInt() >>> 1;
if (doc < docBase || doc >= docBase + chunkDocs || docBase + chunkDocs > numDocs) {
throw new CorruptIndexException(
"docBase=" + docBase + ",chunkDocs=" + chunkDocs + ",doc=" + doc, vectorsStream);
}
this.blockState = new BlockState(startPointer, docBase, chunkDocs);
final int skip; // number of fields to skip
final int numFields; // number of fields of the document we're looking for

View File

@ -16,8 +16,11 @@
*/
package org.apache.lucene.codecs.lucene90.compressing;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
@ -32,6 +35,7 @@ import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.MatchingReaders;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DocIDMerger;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
@ -46,7 +50,6 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
@ -325,7 +328,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
payloadBytes.reset();
++numDocs;
if (triggerFlush()) {
flush();
flush(false);
}
curDoc = null;
}
@ -379,17 +382,22 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
}
private void flush() throws IOException {
numChunks++;
private void flush(boolean force) throws IOException {
assert force != triggerFlush();
final int chunkDocs = pendingDocs.size();
assert chunkDocs > 0 : chunkDocs;
numChunks++;
if (force) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += pendingDocs.size();
}
// write the index file
indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer());
final int docBase = numDocs - chunkDocs;
vectorsStream.writeVInt(docBase);
vectorsStream.writeVInt(chunkDocs);
final int dirtyBit = force ? 1 : 0;
vectorsStream.writeVInt((chunkDocs << 1) | dirtyBit);
// total number of fields of the chunk
final int totalFields = flushNumFields(chunkDocs);
@ -715,9 +723,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (!pendingDocs.isEmpty()) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += pendingDocs.size();
flush();
flush(true);
}
if (numDocs != this.numDocs) {
throw new RuntimeException(
@ -806,127 +812,131 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
BULK_MERGE_ENABLED = v;
}
@Override
public int merge(MergeState mergeState) throws IOException {
if (mergeState.needsIndexSort) {
// TODO: can we gain back some optos even if index is sorted? E.g. if sort results in large
// chunks of contiguous docs from one sub
// being copied over...?
return super.merge(mergeState);
private void copyChunks(
final MergeState mergeState,
final CompressingTermVectorsSub sub,
final int fromDocID,
final int toDocID)
throws IOException {
final Lucene90CompressingTermVectorsReader reader =
(Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[sub.readerIndex];
assert reader.getVersion() == VERSION_CURRENT;
assert reader.getChunkSize() == chunkSize;
assert reader.getCompressionMode() == compressionMode;
assert !tooDirty(reader);
assert mergeState.liveDocs[sub.readerIndex] == null;
int docID = fromDocID;
final FieldsIndex index = reader.getIndexReader();
// copy docs that belong to the previous chunk
while (docID < toDocID && reader.isLoaded(docID)) {
addAllDocVectors(reader.get(docID++), mergeState);
}
int docCount = 0;
int numReaders = mergeState.maxDocs.length;
MatchingReaders matching = new MatchingReaders(mergeState);
for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) {
Lucene90CompressingTermVectorsReader matchingVectorsReader = null;
final TermVectorsReader vectorsReader = mergeState.termVectorsReaders[readerIndex];
if (matching.matchingReaders[readerIndex]) {
// we can only bulk-copy if the matching reader is also a CompressingTermVectorsReader
if (vectorsReader != null
&& vectorsReader instanceof Lucene90CompressingTermVectorsReader) {
matchingVectorsReader = (Lucene90CompressingTermVectorsReader) vectorsReader;
}
if (docID >= toDocID) {
return;
}
// copy chunks
long fromPointer = index.getStartPointer(docID);
final long toPointer =
toDocID == sub.maxDoc ? reader.getMaxPointer() : index.getStartPointer(toDocID);
if (fromPointer < toPointer) {
// flush any pending chunks
if (!pendingDocs.isEmpty()) {
flush(true);
}
final int maxDoc = mergeState.maxDocs[readerIndex];
final Bits liveDocs = mergeState.liveDocs[readerIndex];
if (matchingVectorsReader != null
&& matchingVectorsReader.getCompressionMode() == compressionMode
&& matchingVectorsReader.getChunkSize() == chunkSize
&& matchingVectorsReader.getVersion() == VERSION_CURRENT
&& matchingVectorsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT
&& BULK_MERGE_ENABLED
&& liveDocs == null
&& !tooDirty(matchingVectorsReader)) {
// optimized merge, raw byte copy
// its not worth fine-graining this if there are deletions.
matchingVectorsReader.checkIntegrity();
// flush any pending chunks
if (!pendingDocs.isEmpty()) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += pendingDocs.size();
flush();
}
final IndexInput rawDocs = reader.getVectorsStream();
rawDocs.seek(fromPointer);
do {
// iterate over each chunk. we use the vectors index to find chunk boundaries,
// read the docstart + doccount from the chunk header (we write a new header, since doc
// numbers will change),
// and just copy the bytes directly.
IndexInput rawDocs = matchingVectorsReader.getVectorsStream();
FieldsIndex index = matchingVectorsReader.getIndexReader();
rawDocs.seek(index.getStartPointer(0));
int docID = 0;
while (docID < maxDoc) {
// read header
int base = rawDocs.readVInt();
if (base != docID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
int bufferedDocs = rawDocs.readVInt();
// write a new index entry and new header for this chunk.
indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer());
vectorsStream.writeVInt(docCount); // rebase
vectorsStream.writeVInt(bufferedDocs);
docID += bufferedDocs;
docCount += bufferedDocs;
numDocs += bufferedDocs;
if (docID > maxDoc) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc,
rawDocs);
}
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == maxDoc) {
end = matchingVectorsReader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
}
vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
// read header
final int base = rawDocs.readVInt();
if (base != docID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
if (rawDocs.getFilePointer() != matchingVectorsReader.getMaxPointer()) {
final int code = rawDocs.readVInt();
final int bufferedDocs = code >>> 1;
// write a new index entry and new header for this chunk.
indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer());
vectorsStream.writeVInt(numDocs); // rebase
vectorsStream.writeVInt(code);
docID += bufferedDocs;
numDocs += bufferedDocs;
if (docID > toDocID) {
throw new CorruptIndexException(
"invalid state: pos="
+ rawDocs.getFilePointer()
+ ", max="
+ matchingVectorsReader.getMaxPointer(),
"invalid state: base=" + base + ", count=" + bufferedDocs + ", toDocID=" + toDocID,
rawDocs);
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numChunks += matchingVectorsReader.getNumChunks();
numDirtyChunks += matchingVectorsReader.getNumDirtyChunks();
numDirtyDocs += matchingVectorsReader.getNumDirtyDocs();
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == sub.maxDoc) {
end = reader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
}
vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
++numChunks;
boolean dirtyChunk = (code & 1) != 0;
if (dirtyChunk) {
numDirtyChunks++;
numDirtyDocs += bufferedDocs;
}
fromPointer = end;
} while (fromPointer < toPointer);
}
// copy leftover docs that don't form a complete chunk
assert reader.isLoaded(docID) == false;
while (docID < toDocID) {
addAllDocVectors(reader.get(docID++), mergeState);
}
}
@Override
public int merge(MergeState mergeState) throws IOException {
final int numReaders = mergeState.termVectorsReaders.length;
final MatchingReaders matchingReaders = new MatchingReaders(mergeState);
final List<CompressingTermVectorsSub> subs = new ArrayList<>(numReaders);
for (int i = 0; i < numReaders; i++) {
final TermVectorsReader reader = mergeState.termVectorsReaders[i];
if (reader != null) {
reader.checkIntegrity();
}
final boolean bulkMerge = canPerformBulkMerge(mergeState, matchingReaders, i);
subs.add(new CompressingTermVectorsSub(mergeState, bulkMerge, i));
}
int docCount = 0;
final DocIDMerger<CompressingTermVectorsSub> docIDMerger =
DocIDMerger.of(subs, mergeState.needsIndexSort);
CompressingTermVectorsSub sub = docIDMerger.next();
while (sub != null) {
assert sub.mappedDocID == docCount : sub.mappedDocID + " != " + docCount;
if (sub.canPerformBulkMerge) {
final int fromDocID = sub.docID;
int toDocID = fromDocID;
final CompressingTermVectorsSub current = sub;
while ((sub = docIDMerger.next()) == current) {
++toDocID;
assert sub.docID == toDocID;
}
++toDocID; // exclusive bound
copyChunks(mergeState, current, fromDocID, toDocID);
docCount += toDocID - fromDocID;
} else {
// naive merge...
if (vectorsReader != null) {
vectorsReader.checkIntegrity();
}
for (int i = 0; i < maxDoc; i++) {
if (liveDocs != null && liveDocs.get(i) == false) {
continue;
}
Fields vectors;
if (vectorsReader == null) {
vectors = null;
} else {
vectors = vectorsReader.get(i);
}
addAllDocVectors(vectors, mergeState);
++docCount;
}
final TermVectorsReader reader = mergeState.termVectorsReaders[sub.readerIndex];
final Fields vectors = reader != null ? reader.get(sub.docID) : null;
addAllDocVectors(vectors, mergeState);
++docCount;
sub = docIDMerger.next();
}
}
finish(mergeState.mergeFieldInfos, docCount);
@ -948,6 +958,48 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
}
private boolean canPerformBulkMerge(
MergeState mergeState, MatchingReaders matchingReaders, int readerIndex) {
if (mergeState.termVectorsReaders[readerIndex]
instanceof Lucene90CompressingTermVectorsReader) {
final Lucene90CompressingTermVectorsReader reader =
(Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[readerIndex];
return BULK_MERGE_ENABLED
&& matchingReaders.matchingReaders[readerIndex]
&& reader.getCompressionMode() == compressionMode
&& reader.getChunkSize() == chunkSize
&& reader.getVersion() == VERSION_CURRENT
&& reader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT
&& mergeState.liveDocs[readerIndex] == null
&& !tooDirty(reader);
}
return false;
}
private static class CompressingTermVectorsSub extends DocIDMerger.Sub {
final int maxDoc;
final int readerIndex;
final boolean canPerformBulkMerge;
int docID = -1;
CompressingTermVectorsSub(MergeState mergeState, boolean canPerformBulkMerge, int readerIndex) {
super(mergeState.docMaps[readerIndex]);
this.maxDoc = mergeState.maxDocs[readerIndex];
this.readerIndex = readerIndex;
this.canPerformBulkMerge = canPerformBulkMerge;
}
@Override
public int nextDoc() {
docID++;
if (docID == maxDoc) {
return NO_MORE_DOCS;
} else {
return docID;
}
}
}
@Override
public long ramBytesUsed() {
return positionsBuf.length

View File

@ -25,12 +25,14 @@ import static org.apache.lucene.index.PostingsEnum.POSITIONS;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
@ -49,16 +51,20 @@ import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.TermsEnum.SeekStatus;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.AttributeImpl;
import org.apache.lucene.util.AttributeReflector;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
/**
@ -667,45 +673,96 @@ public abstract class BaseTermVectorsFormatTestCase extends BaseIndexFileFormatT
dir.close();
}
public void testMerge() throws IOException {
private void doTestMerge(Sort indexSort, boolean allowDeletes) throws IOException {
final RandomDocumentFactory docFactory = new RandomDocumentFactory(5, 20);
final int numDocs = atLeast(100);
final int numDeletes = random().nextInt(numDocs);
final Set<Integer> deletes = new HashSet<>();
while (deletes.size() < numDeletes) {
deletes.add(random().nextInt(numDocs));
}
for (Options options : validOptions()) {
final RandomDocument[] docs = new RandomDocument[numDocs];
Map<String, RandomDocument> docs = new HashMap<>();
for (int i = 0; i < numDocs; ++i) {
docs[i] = docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options);
docs.put(
Integer.toString(i),
docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options));
}
final Directory dir = newDirectory();
final RandomIndexWriter writer = new RandomIndexWriter(random(), dir);
for (int i = 0; i < numDocs; ++i) {
writer.addDocument(addId(docs[i].toDocument(), "" + i));
final IndexWriterConfig iwc = newIndexWriterConfig();
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
final RandomIndexWriter writer = new RandomIndexWriter(random(), dir, iwc);
List<String> liveDocIDs = new ArrayList<>();
List<String> ids = new ArrayList<>(docs.keySet());
Collections.shuffle(ids, random());
Runnable verifyTermVectors =
() -> {
try (DirectoryReader reader = maybeWrapWithMergingReader(writer.getReader())) {
for (String id : liveDocIDs) {
final int docID = docID(reader, id);
assertEquals(docs.get(id), reader.getTermVectors(docID));
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
};
for (String id : ids) {
final Document doc = addId(docs.get(id).toDocument(), id);
if (indexSort != null) {
for (SortField sortField : indexSort.getSort()) {
doc.add(
new NumericDocValuesField(
sortField.getField(), TestUtil.nextInt(random(), 0, 1024)));
}
}
if (random().nextInt(100) < 5) {
// add via foreign writer
IndexWriterConfig otherIwc = newIndexWriterConfig();
if (indexSort != null) {
otherIwc.setIndexSort(indexSort);
}
try (Directory otherDir = newDirectory();
RandomIndexWriter otherIw = new RandomIndexWriter(random(), otherDir, otherIwc)) {
otherIw.addDocument(doc);
try (DirectoryReader otherReader = otherIw.getReader()) {
TestUtil.addIndexesSlowly(writer.w, otherReader);
}
}
} else {
writer.addDocument(doc);
}
liveDocIDs.add(id);
if (allowDeletes && random().nextInt(100) < 20) {
final String deleteId = liveDocIDs.remove(random().nextInt(liveDocIDs.size()));
writer.deleteDocuments(new Term("id", deleteId));
}
if (rarely()) {
writer.commit();
verifyTermVectors.run();
}
if (rarely()) {
writer.forceMerge(1);
verifyTermVectors.run();
}
}
for (int delete : deletes) {
writer.deleteDocuments(new Term("id", "" + delete));
}
// merge with deletes
verifyTermVectors.run();
writer.forceMerge(1);
final IndexReader reader = writer.getReader();
for (int i = 0; i < numDocs; ++i) {
if (!deletes.contains(i)) {
final int docID = docID(reader, "" + i);
assertEquals(docs[i], reader.getTermVectors(docID));
}
}
reader.close();
writer.close();
dir.close();
verifyTermVectors.run();
IOUtils.close(writer, dir);
}
}
public void testMergeWithIndexSort() throws IOException {
SortField[] sortFields = new SortField[TestUtil.nextInt(random(), 1, 2)];
for (int i = 0; i < sortFields.length; i++) {
sortFields[i] = new SortField("sort_field_" + i, SortField.Type.LONG);
}
doTestMerge(new Sort(sortFields), false);
doTestMerge(new Sort(sortFields), true);
}
public void testMergeWithoutIndexSort() throws IOException {
doTestMerge(null, false);
doTestMerge(null, true);
}
// run random tests from different threads to make sure the per-thread clones
// don't share mutable data
public void testClone() throws IOException, InterruptedException {