From 54fb21e862c2041cb907517ed993c8ece898cb26 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Thu, 10 Jun 2021 11:03:17 -0400 Subject: [PATCH] LUCENE-9935: Enable bulk-merge for term vectors with index sort (#140) This change enables bulk-merge for term vectors with index sort. The algorithm used here is similar to the one that is used to merge stored fields. Relates #134 --- .../Lucene90CompressingTermVectorsReader.java | 29 +- .../Lucene90CompressingTermVectorsWriter.java | 284 +++++++++++------- .../index/BaseTermVectorsFormatTestCase.java | 107 +++++-- 3 files changed, 275 insertions(+), 145 deletions(-) diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java index ec8823d70de..802eb1c2a8c 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java @@ -91,6 +91,7 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade private final long numDirtyChunks; // number of incomplete compressed blocks written private final long numDirtyDocs; // cumulative number of docs in incomplete chunks private final long maxPointer; // end of the data section + private BlockState blockState = new BlockState(-1, -1, 0); // used by clone private Lucene90CompressingTermVectorsReader(Lucene90CompressingTermVectorsReader reader) { @@ -310,25 +311,45 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade return new ByteBuffersDataInput(Collections.singletonList(ByteBuffer.wrap(bytes))); } + /** Checks if a given docID was loaded in the current block state. */ + boolean isLoaded(int docID) { + return blockState.docBase <= docID && docID < blockState.docBase + blockState.chunkDocs; + } + + private static class BlockState { + final long startPointer; + final int docBase; + final int chunkDocs; + + BlockState(long startPointer, int docBase, int chunkDocs) { + this.startPointer = startPointer; + this.docBase = docBase; + this.chunkDocs = chunkDocs; + } + } + @Override public Fields get(int doc) throws IOException { ensureOpen(); // seek to the right place - { - final long startPointer = indexReader.getStartPointer(doc); - vectorsStream.seek(startPointer); + final long startPointer; + if (isLoaded(doc)) { + startPointer = blockState.startPointer; // avoid searching the start pointer + } else { + startPointer = indexReader.getStartPointer(doc); } // decode // - docBase: first doc ID of the chunk // - chunkDocs: number of docs of the chunk final int docBase = vectorsStream.readVInt(); - final int chunkDocs = vectorsStream.readVInt(); + final int chunkDocs = vectorsStream.readVInt() >>> 1; if (doc < docBase || doc >= docBase + chunkDocs || docBase + chunkDocs > numDocs) { throw new CorruptIndexException( "docBase=" + docBase + ",chunkDocs=" + chunkDocs + ",doc=" + doc, vectorsStream); } + this.blockState = new BlockState(startPointer, docBase, chunkDocs); final int skip; // number of fields to skip final int numFields; // number of fields of the document we're looking for diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java index ed54ce736a3..638b9e77b6f 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java @@ -16,8 +16,11 @@ */ package org.apache.lucene.codecs.lucene90.compressing; +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + import java.io.IOException; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Deque; @@ -32,6 +35,7 @@ import org.apache.lucene.codecs.compressing.CompressionMode; import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.MatchingReaders; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.DocIDMerger; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.Fields; @@ -46,7 +50,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.ArrayUtil; -import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.StringHelper; @@ -325,7 +328,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite payloadBytes.reset(); ++numDocs; if (triggerFlush()) { - flush(); + flush(false); } curDoc = null; } @@ -379,17 +382,22 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk; } - private void flush() throws IOException { - numChunks++; + private void flush(boolean force) throws IOException { + assert force != triggerFlush(); final int chunkDocs = pendingDocs.size(); assert chunkDocs > 0 : chunkDocs; - + numChunks++; + if (force) { + numDirtyChunks++; // incomplete: we had to force this flush + numDirtyDocs += pendingDocs.size(); + } // write the index file indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer()); final int docBase = numDocs - chunkDocs; vectorsStream.writeVInt(docBase); - vectorsStream.writeVInt(chunkDocs); + final int dirtyBit = force ? 1 : 0; + vectorsStream.writeVInt((chunkDocs << 1) | dirtyBit); // total number of fields of the chunk final int totalFields = flushNumFields(chunkDocs); @@ -715,9 +723,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite @Override public void finish(FieldInfos fis, int numDocs) throws IOException { if (!pendingDocs.isEmpty()) { - numDirtyChunks++; // incomplete: we had to force this flush - numDirtyDocs += pendingDocs.size(); - flush(); + flush(true); } if (numDocs != this.numDocs) { throw new RuntimeException( @@ -806,127 +812,131 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite BULK_MERGE_ENABLED = v; } - @Override - public int merge(MergeState mergeState) throws IOException { - if (mergeState.needsIndexSort) { - // TODO: can we gain back some optos even if index is sorted? E.g. if sort results in large - // chunks of contiguous docs from one sub - // being copied over...? - return super.merge(mergeState); + private void copyChunks( + final MergeState mergeState, + final CompressingTermVectorsSub sub, + final int fromDocID, + final int toDocID) + throws IOException { + final Lucene90CompressingTermVectorsReader reader = + (Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[sub.readerIndex]; + assert reader.getVersion() == VERSION_CURRENT; + assert reader.getChunkSize() == chunkSize; + assert reader.getCompressionMode() == compressionMode; + assert !tooDirty(reader); + assert mergeState.liveDocs[sub.readerIndex] == null; + + int docID = fromDocID; + final FieldsIndex index = reader.getIndexReader(); + + // copy docs that belong to the previous chunk + while (docID < toDocID && reader.isLoaded(docID)) { + addAllDocVectors(reader.get(docID++), mergeState); } - int docCount = 0; - int numReaders = mergeState.maxDocs.length; - MatchingReaders matching = new MatchingReaders(mergeState); - - for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) { - Lucene90CompressingTermVectorsReader matchingVectorsReader = null; - final TermVectorsReader vectorsReader = mergeState.termVectorsReaders[readerIndex]; - if (matching.matchingReaders[readerIndex]) { - // we can only bulk-copy if the matching reader is also a CompressingTermVectorsReader - if (vectorsReader != null - && vectorsReader instanceof Lucene90CompressingTermVectorsReader) { - matchingVectorsReader = (Lucene90CompressingTermVectorsReader) vectorsReader; - } + if (docID >= toDocID) { + return; + } + // copy chunks + long fromPointer = index.getStartPointer(docID); + final long toPointer = + toDocID == sub.maxDoc ? reader.getMaxPointer() : index.getStartPointer(toDocID); + if (fromPointer < toPointer) { + // flush any pending chunks + if (!pendingDocs.isEmpty()) { + flush(true); } - - final int maxDoc = mergeState.maxDocs[readerIndex]; - final Bits liveDocs = mergeState.liveDocs[readerIndex]; - - if (matchingVectorsReader != null - && matchingVectorsReader.getCompressionMode() == compressionMode - && matchingVectorsReader.getChunkSize() == chunkSize - && matchingVectorsReader.getVersion() == VERSION_CURRENT - && matchingVectorsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT - && BULK_MERGE_ENABLED - && liveDocs == null - && !tooDirty(matchingVectorsReader)) { - // optimized merge, raw byte copy - // its not worth fine-graining this if there are deletions. - - matchingVectorsReader.checkIntegrity(); - - // flush any pending chunks - if (!pendingDocs.isEmpty()) { - numDirtyChunks++; // incomplete: we had to force this flush - numDirtyDocs += pendingDocs.size(); - flush(); - } - + final IndexInput rawDocs = reader.getVectorsStream(); + rawDocs.seek(fromPointer); + do { // iterate over each chunk. we use the vectors index to find chunk boundaries, // read the docstart + doccount from the chunk header (we write a new header, since doc // numbers will change), // and just copy the bytes directly. - IndexInput rawDocs = matchingVectorsReader.getVectorsStream(); - FieldsIndex index = matchingVectorsReader.getIndexReader(); - rawDocs.seek(index.getStartPointer(0)); - int docID = 0; - while (docID < maxDoc) { - // read header - int base = rawDocs.readVInt(); - if (base != docID) { - throw new CorruptIndexException( - "invalid state: base=" + base + ", docID=" + docID, rawDocs); - } - int bufferedDocs = rawDocs.readVInt(); - - // write a new index entry and new header for this chunk. - indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer()); - vectorsStream.writeVInt(docCount); // rebase - vectorsStream.writeVInt(bufferedDocs); - docID += bufferedDocs; - docCount += bufferedDocs; - numDocs += bufferedDocs; - - if (docID > maxDoc) { - throw new CorruptIndexException( - "invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc, - rawDocs); - } - - // copy bytes until the next chunk boundary (or end of chunk data). - // using the stored fields index for this isn't the most efficient, but fast enough - // and is a source of redundancy for detecting bad things. - final long end; - if (docID == maxDoc) { - end = matchingVectorsReader.getMaxPointer(); - } else { - end = index.getStartPointer(docID); - } - vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer()); + // read header + final int base = rawDocs.readVInt(); + if (base != docID) { + throw new CorruptIndexException( + "invalid state: base=" + base + ", docID=" + docID, rawDocs); } - if (rawDocs.getFilePointer() != matchingVectorsReader.getMaxPointer()) { + final int code = rawDocs.readVInt(); + final int bufferedDocs = code >>> 1; + + // write a new index entry and new header for this chunk. + indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer()); + vectorsStream.writeVInt(numDocs); // rebase + vectorsStream.writeVInt(code); + docID += bufferedDocs; + numDocs += bufferedDocs; + if (docID > toDocID) { throw new CorruptIndexException( - "invalid state: pos=" - + rawDocs.getFilePointer() - + ", max=" - + matchingVectorsReader.getMaxPointer(), + "invalid state: base=" + base + ", count=" + bufferedDocs + ", toDocID=" + toDocID, rawDocs); } - // since we bulk merged all chunks, we inherit any dirty ones from this segment. - numChunks += matchingVectorsReader.getNumChunks(); - numDirtyChunks += matchingVectorsReader.getNumDirtyChunks(); - numDirtyDocs += matchingVectorsReader.getNumDirtyDocs(); + // copy bytes until the next chunk boundary (or end of chunk data). + // using the stored fields index for this isn't the most efficient, but fast enough + // and is a source of redundancy for detecting bad things. + final long end; + if (docID == sub.maxDoc) { + end = reader.getMaxPointer(); + } else { + end = index.getStartPointer(docID); + } + vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer()); + ++numChunks; + boolean dirtyChunk = (code & 1) != 0; + if (dirtyChunk) { + numDirtyChunks++; + numDirtyDocs += bufferedDocs; + } + fromPointer = end; + } while (fromPointer < toPointer); + } + // copy leftover docs that don't form a complete chunk + assert reader.isLoaded(docID) == false; + while (docID < toDocID) { + addAllDocVectors(reader.get(docID++), mergeState); + } + } + + @Override + public int merge(MergeState mergeState) throws IOException { + final int numReaders = mergeState.termVectorsReaders.length; + final MatchingReaders matchingReaders = new MatchingReaders(mergeState); + final List subs = new ArrayList<>(numReaders); + for (int i = 0; i < numReaders; i++) { + final TermVectorsReader reader = mergeState.termVectorsReaders[i]; + if (reader != null) { + reader.checkIntegrity(); + } + final boolean bulkMerge = canPerformBulkMerge(mergeState, matchingReaders, i); + subs.add(new CompressingTermVectorsSub(mergeState, bulkMerge, i)); + } + int docCount = 0; + final DocIDMerger docIDMerger = + DocIDMerger.of(subs, mergeState.needsIndexSort); + CompressingTermVectorsSub sub = docIDMerger.next(); + while (sub != null) { + assert sub.mappedDocID == docCount : sub.mappedDocID + " != " + docCount; + if (sub.canPerformBulkMerge) { + final int fromDocID = sub.docID; + int toDocID = fromDocID; + final CompressingTermVectorsSub current = sub; + while ((sub = docIDMerger.next()) == current) { + ++toDocID; + assert sub.docID == toDocID; + } + ++toDocID; // exclusive bound + copyChunks(mergeState, current, fromDocID, toDocID); + docCount += toDocID - fromDocID; } else { - // naive merge... - if (vectorsReader != null) { - vectorsReader.checkIntegrity(); - } - for (int i = 0; i < maxDoc; i++) { - if (liveDocs != null && liveDocs.get(i) == false) { - continue; - } - Fields vectors; - if (vectorsReader == null) { - vectors = null; - } else { - vectors = vectorsReader.get(i); - } - addAllDocVectors(vectors, mergeState); - ++docCount; - } + final TermVectorsReader reader = mergeState.termVectorsReaders[sub.readerIndex]; + final Fields vectors = reader != null ? reader.get(sub.docID) : null; + addAllDocVectors(vectors, mergeState); + ++docCount; + sub = docIDMerger.next(); } } finish(mergeState.mergeFieldInfos, docCount); @@ -948,6 +958,48 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite && candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks(); } + private boolean canPerformBulkMerge( + MergeState mergeState, MatchingReaders matchingReaders, int readerIndex) { + if (mergeState.termVectorsReaders[readerIndex] + instanceof Lucene90CompressingTermVectorsReader) { + final Lucene90CompressingTermVectorsReader reader = + (Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[readerIndex]; + return BULK_MERGE_ENABLED + && matchingReaders.matchingReaders[readerIndex] + && reader.getCompressionMode() == compressionMode + && reader.getChunkSize() == chunkSize + && reader.getVersion() == VERSION_CURRENT + && reader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT + && mergeState.liveDocs[readerIndex] == null + && !tooDirty(reader); + } + return false; + } + + private static class CompressingTermVectorsSub extends DocIDMerger.Sub { + final int maxDoc; + final int readerIndex; + final boolean canPerformBulkMerge; + int docID = -1; + + CompressingTermVectorsSub(MergeState mergeState, boolean canPerformBulkMerge, int readerIndex) { + super(mergeState.docMaps[readerIndex]); + this.maxDoc = mergeState.maxDocs[readerIndex]; + this.readerIndex = readerIndex; + this.canPerformBulkMerge = canPerformBulkMerge; + } + + @Override + public int nextDoc() { + docID++; + if (docID == maxDoc) { + return NO_MORE_DOCS; + } else { + return docID; + } + } + } + @Override public long ramBytesUsed() { return positionsBuf.length diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java index 0d0604e4531..12090ce2411 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java @@ -25,12 +25,14 @@ import static org.apache.lucene.index.PostingsEnum.POSITIONS; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -49,16 +51,20 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.TermsEnum.SeekStatus; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.AttributeImpl; import org.apache.lucene.util.AttributeReflector; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; /** @@ -667,45 +673,96 @@ public abstract class BaseTermVectorsFormatTestCase extends BaseIndexFileFormatT dir.close(); } - public void testMerge() throws IOException { + private void doTestMerge(Sort indexSort, boolean allowDeletes) throws IOException { final RandomDocumentFactory docFactory = new RandomDocumentFactory(5, 20); final int numDocs = atLeast(100); - final int numDeletes = random().nextInt(numDocs); - final Set deletes = new HashSet<>(); - while (deletes.size() < numDeletes) { - deletes.add(random().nextInt(numDocs)); - } for (Options options : validOptions()) { - final RandomDocument[] docs = new RandomDocument[numDocs]; + Map docs = new HashMap<>(); for (int i = 0; i < numDocs; ++i) { - docs[i] = docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options); + docs.put( + Integer.toString(i), + docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options)); } final Directory dir = newDirectory(); - final RandomIndexWriter writer = new RandomIndexWriter(random(), dir); - for (int i = 0; i < numDocs; ++i) { - writer.addDocument(addId(docs[i].toDocument(), "" + i)); + final IndexWriterConfig iwc = newIndexWriterConfig(); + if (indexSort != null) { + iwc.setIndexSort(indexSort); + } + final RandomIndexWriter writer = new RandomIndexWriter(random(), dir, iwc); + List liveDocIDs = new ArrayList<>(); + List ids = new ArrayList<>(docs.keySet()); + Collections.shuffle(ids, random()); + Runnable verifyTermVectors = + () -> { + try (DirectoryReader reader = maybeWrapWithMergingReader(writer.getReader())) { + for (String id : liveDocIDs) { + final int docID = docID(reader, id); + assertEquals(docs.get(id), reader.getTermVectors(docID)); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }; + for (String id : ids) { + final Document doc = addId(docs.get(id).toDocument(), id); + if (indexSort != null) { + for (SortField sortField : indexSort.getSort()) { + doc.add( + new NumericDocValuesField( + sortField.getField(), TestUtil.nextInt(random(), 0, 1024))); + } + } + if (random().nextInt(100) < 5) { + // add via foreign writer + IndexWriterConfig otherIwc = newIndexWriterConfig(); + if (indexSort != null) { + otherIwc.setIndexSort(indexSort); + } + try (Directory otherDir = newDirectory(); + RandomIndexWriter otherIw = new RandomIndexWriter(random(), otherDir, otherIwc)) { + otherIw.addDocument(doc); + try (DirectoryReader otherReader = otherIw.getReader()) { + TestUtil.addIndexesSlowly(writer.w, otherReader); + } + } + } else { + writer.addDocument(doc); + } + liveDocIDs.add(id); + if (allowDeletes && random().nextInt(100) < 20) { + final String deleteId = liveDocIDs.remove(random().nextInt(liveDocIDs.size())); + writer.deleteDocuments(new Term("id", deleteId)); + } if (rarely()) { writer.commit(); + verifyTermVectors.run(); + } + if (rarely()) { + writer.forceMerge(1); + verifyTermVectors.run(); } } - for (int delete : deletes) { - writer.deleteDocuments(new Term("id", "" + delete)); - } - // merge with deletes + verifyTermVectors.run(); writer.forceMerge(1); - final IndexReader reader = writer.getReader(); - for (int i = 0; i < numDocs; ++i) { - if (!deletes.contains(i)) { - final int docID = docID(reader, "" + i); - assertEquals(docs[i], reader.getTermVectors(docID)); - } - } - reader.close(); - writer.close(); - dir.close(); + verifyTermVectors.run(); + IOUtils.close(writer, dir); } } + public void testMergeWithIndexSort() throws IOException { + SortField[] sortFields = new SortField[TestUtil.nextInt(random(), 1, 2)]; + for (int i = 0; i < sortFields.length; i++) { + sortFields[i] = new SortField("sort_field_" + i, SortField.Type.LONG); + } + doTestMerge(new Sort(sortFields), false); + doTestMerge(new Sort(sortFields), true); + } + + public void testMergeWithoutIndexSort() throws IOException { + doTestMerge(null, false); + doTestMerge(null, true); + } + // run random tests from different threads to make sure the per-thread clones // don't share mutable data public void testClone() throws IOException, InterruptedException {