diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java index 802eb1c2a8c..ec8823d70de 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsReader.java @@ -91,7 +91,6 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade private final long numDirtyChunks; // number of incomplete compressed blocks written private final long numDirtyDocs; // cumulative number of docs in incomplete chunks private final long maxPointer; // end of the data section - private BlockState blockState = new BlockState(-1, -1, 0); // used by clone private Lucene90CompressingTermVectorsReader(Lucene90CompressingTermVectorsReader reader) { @@ -311,45 +310,25 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade return new ByteBuffersDataInput(Collections.singletonList(ByteBuffer.wrap(bytes))); } - /** Checks if a given docID was loaded in the current block state. */ - boolean isLoaded(int docID) { - return blockState.docBase <= docID && docID < blockState.docBase + blockState.chunkDocs; - } - - private static class BlockState { - final long startPointer; - final int docBase; - final int chunkDocs; - - BlockState(long startPointer, int docBase, int chunkDocs) { - this.startPointer = startPointer; - this.docBase = docBase; - this.chunkDocs = chunkDocs; - } - } - @Override public Fields get(int doc) throws IOException { ensureOpen(); // seek to the right place - final long startPointer; - if (isLoaded(doc)) { - startPointer = blockState.startPointer; // avoid searching the start pointer - } else { - startPointer = indexReader.getStartPointer(doc); + { + final long startPointer = indexReader.getStartPointer(doc); + vectorsStream.seek(startPointer); } // decode // - docBase: first doc ID of the chunk // - chunkDocs: number of docs of the chunk final int docBase = vectorsStream.readVInt(); - final int chunkDocs = vectorsStream.readVInt() >>> 1; + final int chunkDocs = vectorsStream.readVInt(); if (doc < docBase || doc >= docBase + chunkDocs || docBase + chunkDocs > numDocs) { throw new CorruptIndexException( "docBase=" + docBase + ",chunkDocs=" + chunkDocs + ",doc=" + doc, vectorsStream); } - this.blockState = new BlockState(startPointer, docBase, chunkDocs); final int skip; // number of fields to skip final int numFields; // number of fields of the document we're looking for diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java index 638b9e77b6f..ed54ce736a3 100644 --- a/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java +++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene90/compressing/Lucene90CompressingTermVectorsWriter.java @@ -16,11 +16,8 @@ */ package org.apache.lucene.codecs.lucene90.compressing; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - import java.io.IOException; import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Deque; @@ -35,7 +32,6 @@ import org.apache.lucene.codecs.compressing.CompressionMode; import org.apache.lucene.codecs.compressing.Compressor; import org.apache.lucene.codecs.compressing.MatchingReaders; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.DocIDMerger; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.FieldInfos; import org.apache.lucene.index.Fields; @@ -50,6 +46,7 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.StringHelper; @@ -328,7 +325,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite payloadBytes.reset(); ++numDocs; if (triggerFlush()) { - flush(false); + flush(); } curDoc = null; } @@ -382,22 +379,17 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk; } - private void flush(boolean force) throws IOException { - assert force != triggerFlush(); + private void flush() throws IOException { + numChunks++; final int chunkDocs = pendingDocs.size(); assert chunkDocs > 0 : chunkDocs; - numChunks++; - if (force) { - numDirtyChunks++; // incomplete: we had to force this flush - numDirtyDocs += pendingDocs.size(); - } + // write the index file indexWriter.writeIndex(chunkDocs, vectorsStream.getFilePointer()); final int docBase = numDocs - chunkDocs; vectorsStream.writeVInt(docBase); - final int dirtyBit = force ? 1 : 0; - vectorsStream.writeVInt((chunkDocs << 1) | dirtyBit); + vectorsStream.writeVInt(chunkDocs); // total number of fields of the chunk final int totalFields = flushNumFields(chunkDocs); @@ -723,7 +715,9 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite @Override public void finish(FieldInfos fis, int numDocs) throws IOException { if (!pendingDocs.isEmpty()) { - flush(true); + numDirtyChunks++; // incomplete: we had to force this flush + numDirtyDocs += pendingDocs.size(); + flush(); } if (numDocs != this.numDocs) { throw new RuntimeException( @@ -812,131 +806,127 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite BULK_MERGE_ENABLED = v; } - private void copyChunks( - final MergeState mergeState, - final CompressingTermVectorsSub sub, - final int fromDocID, - final int toDocID) - throws IOException { - final Lucene90CompressingTermVectorsReader reader = - (Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[sub.readerIndex]; - assert reader.getVersion() == VERSION_CURRENT; - assert reader.getChunkSize() == chunkSize; - assert reader.getCompressionMode() == compressionMode; - assert !tooDirty(reader); - assert mergeState.liveDocs[sub.readerIndex] == null; - - int docID = fromDocID; - final FieldsIndex index = reader.getIndexReader(); - - // copy docs that belong to the previous chunk - while (docID < toDocID && reader.isLoaded(docID)) { - addAllDocVectors(reader.get(docID++), mergeState); + @Override + public int merge(MergeState mergeState) throws IOException { + if (mergeState.needsIndexSort) { + // TODO: can we gain back some optos even if index is sorted? E.g. if sort results in large + // chunks of contiguous docs from one sub + // being copied over...? + return super.merge(mergeState); } + int docCount = 0; + int numReaders = mergeState.maxDocs.length; - if (docID >= toDocID) { - return; - } - // copy chunks - long fromPointer = index.getStartPointer(docID); - final long toPointer = - toDocID == sub.maxDoc ? reader.getMaxPointer() : index.getStartPointer(toDocID); - if (fromPointer < toPointer) { - // flush any pending chunks - if (!pendingDocs.isEmpty()) { - flush(true); + MatchingReaders matching = new MatchingReaders(mergeState); + + for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) { + Lucene90CompressingTermVectorsReader matchingVectorsReader = null; + final TermVectorsReader vectorsReader = mergeState.termVectorsReaders[readerIndex]; + if (matching.matchingReaders[readerIndex]) { + // we can only bulk-copy if the matching reader is also a CompressingTermVectorsReader + if (vectorsReader != null + && vectorsReader instanceof Lucene90CompressingTermVectorsReader) { + matchingVectorsReader = (Lucene90CompressingTermVectorsReader) vectorsReader; + } } - final IndexInput rawDocs = reader.getVectorsStream(); - rawDocs.seek(fromPointer); - do { + + final int maxDoc = mergeState.maxDocs[readerIndex]; + final Bits liveDocs = mergeState.liveDocs[readerIndex]; + + if (matchingVectorsReader != null + && matchingVectorsReader.getCompressionMode() == compressionMode + && matchingVectorsReader.getChunkSize() == chunkSize + && matchingVectorsReader.getVersion() == VERSION_CURRENT + && matchingVectorsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT + && BULK_MERGE_ENABLED + && liveDocs == null + && !tooDirty(matchingVectorsReader)) { + // optimized merge, raw byte copy + // its not worth fine-graining this if there are deletions. + + matchingVectorsReader.checkIntegrity(); + + // flush any pending chunks + if (!pendingDocs.isEmpty()) { + numDirtyChunks++; // incomplete: we had to force this flush + numDirtyDocs += pendingDocs.size(); + flush(); + } + // iterate over each chunk. we use the vectors index to find chunk boundaries, // read the docstart + doccount from the chunk header (we write a new header, since doc // numbers will change), // and just copy the bytes directly. - // read header - final int base = rawDocs.readVInt(); - if (base != docID) { - throw new CorruptIndexException( - "invalid state: base=" + base + ", docID=" + docID, rawDocs); + IndexInput rawDocs = matchingVectorsReader.getVectorsStream(); + FieldsIndex index = matchingVectorsReader.getIndexReader(); + rawDocs.seek(index.getStartPointer(0)); + int docID = 0; + while (docID < maxDoc) { + // read header + int base = rawDocs.readVInt(); + if (base != docID) { + throw new CorruptIndexException( + "invalid state: base=" + base + ", docID=" + docID, rawDocs); + } + int bufferedDocs = rawDocs.readVInt(); + + // write a new index entry and new header for this chunk. + indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer()); + vectorsStream.writeVInt(docCount); // rebase + vectorsStream.writeVInt(bufferedDocs); + docID += bufferedDocs; + docCount += bufferedDocs; + numDocs += bufferedDocs; + + if (docID > maxDoc) { + throw new CorruptIndexException( + "invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc, + rawDocs); + } + + // copy bytes until the next chunk boundary (or end of chunk data). + // using the stored fields index for this isn't the most efficient, but fast enough + // and is a source of redundancy for detecting bad things. + final long end; + if (docID == maxDoc) { + end = matchingVectorsReader.getMaxPointer(); + } else { + end = index.getStartPointer(docID); + } + vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer()); } - final int code = rawDocs.readVInt(); - final int bufferedDocs = code >>> 1; - - // write a new index entry and new header for this chunk. - indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer()); - vectorsStream.writeVInt(numDocs); // rebase - vectorsStream.writeVInt(code); - docID += bufferedDocs; - numDocs += bufferedDocs; - if (docID > toDocID) { + if (rawDocs.getFilePointer() != matchingVectorsReader.getMaxPointer()) { throw new CorruptIndexException( - "invalid state: base=" + base + ", count=" + bufferedDocs + ", toDocID=" + toDocID, + "invalid state: pos=" + + rawDocs.getFilePointer() + + ", max=" + + matchingVectorsReader.getMaxPointer(), rawDocs); } - // copy bytes until the next chunk boundary (or end of chunk data). - // using the stored fields index for this isn't the most efficient, but fast enough - // and is a source of redundancy for detecting bad things. - final long end; - if (docID == sub.maxDoc) { - end = reader.getMaxPointer(); - } else { - end = index.getStartPointer(docID); - } - vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer()); - ++numChunks; - boolean dirtyChunk = (code & 1) != 0; - if (dirtyChunk) { - numDirtyChunks++; - numDirtyDocs += bufferedDocs; - } - fromPointer = end; - } while (fromPointer < toPointer); - } - // copy leftover docs that don't form a complete chunk - assert reader.isLoaded(docID) == false; - while (docID < toDocID) { - addAllDocVectors(reader.get(docID++), mergeState); - } - } - - @Override - public int merge(MergeState mergeState) throws IOException { - final int numReaders = mergeState.termVectorsReaders.length; - final MatchingReaders matchingReaders = new MatchingReaders(mergeState); - final List subs = new ArrayList<>(numReaders); - for (int i = 0; i < numReaders; i++) { - final TermVectorsReader reader = mergeState.termVectorsReaders[i]; - if (reader != null) { - reader.checkIntegrity(); - } - final boolean bulkMerge = canPerformBulkMerge(mergeState, matchingReaders, i); - subs.add(new CompressingTermVectorsSub(mergeState, bulkMerge, i)); - } - int docCount = 0; - final DocIDMerger docIDMerger = - DocIDMerger.of(subs, mergeState.needsIndexSort); - CompressingTermVectorsSub sub = docIDMerger.next(); - while (sub != null) { - assert sub.mappedDocID == docCount : sub.mappedDocID + " != " + docCount; - if (sub.canPerformBulkMerge) { - final int fromDocID = sub.docID; - int toDocID = fromDocID; - final CompressingTermVectorsSub current = sub; - while ((sub = docIDMerger.next()) == current) { - ++toDocID; - assert sub.docID == toDocID; - } - ++toDocID; // exclusive bound - copyChunks(mergeState, current, fromDocID, toDocID); - docCount += toDocID - fromDocID; + // since we bulk merged all chunks, we inherit any dirty ones from this segment. + numChunks += matchingVectorsReader.getNumChunks(); + numDirtyChunks += matchingVectorsReader.getNumDirtyChunks(); + numDirtyDocs += matchingVectorsReader.getNumDirtyDocs(); } else { - final TermVectorsReader reader = mergeState.termVectorsReaders[sub.readerIndex]; - final Fields vectors = reader != null ? reader.get(sub.docID) : null; - addAllDocVectors(vectors, mergeState); - ++docCount; - sub = docIDMerger.next(); + // naive merge... + if (vectorsReader != null) { + vectorsReader.checkIntegrity(); + } + for (int i = 0; i < maxDoc; i++) { + if (liveDocs != null && liveDocs.get(i) == false) { + continue; + } + Fields vectors; + if (vectorsReader == null) { + vectors = null; + } else { + vectors = vectorsReader.get(i); + } + addAllDocVectors(vectors, mergeState); + ++docCount; + } } } finish(mergeState.mergeFieldInfos, docCount); @@ -958,48 +948,6 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite && candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks(); } - private boolean canPerformBulkMerge( - MergeState mergeState, MatchingReaders matchingReaders, int readerIndex) { - if (mergeState.termVectorsReaders[readerIndex] - instanceof Lucene90CompressingTermVectorsReader) { - final Lucene90CompressingTermVectorsReader reader = - (Lucene90CompressingTermVectorsReader) mergeState.termVectorsReaders[readerIndex]; - return BULK_MERGE_ENABLED - && matchingReaders.matchingReaders[readerIndex] - && reader.getCompressionMode() == compressionMode - && reader.getChunkSize() == chunkSize - && reader.getVersion() == VERSION_CURRENT - && reader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT - && mergeState.liveDocs[readerIndex] == null - && !tooDirty(reader); - } - return false; - } - - private static class CompressingTermVectorsSub extends DocIDMerger.Sub { - final int maxDoc; - final int readerIndex; - final boolean canPerformBulkMerge; - int docID = -1; - - CompressingTermVectorsSub(MergeState mergeState, boolean canPerformBulkMerge, int readerIndex) { - super(mergeState.docMaps[readerIndex]); - this.maxDoc = mergeState.maxDocs[readerIndex]; - this.readerIndex = readerIndex; - this.canPerformBulkMerge = canPerformBulkMerge; - } - - @Override - public int nextDoc() { - docID++; - if (docID == maxDoc) { - return NO_MORE_DOCS; - } else { - return docID; - } - } - } - @Override public long ramBytesUsed() { return positionsBuf.length diff --git a/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java index 12090ce2411..0d0604e4531 100644 --- a/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java +++ b/lucene/test-framework/src/java/org/apache/lucene/index/BaseTermVectorsFormatTestCase.java @@ -25,14 +25,12 @@ import static org.apache.lucene.index.PostingsEnum.POSITIONS; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -51,20 +49,16 @@ import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.FieldType; -import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.document.TextField; import org.apache.lucene.index.TermsEnum.SeekStatus; import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.Sort; -import org.apache.lucene.search.SortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.Directory; import org.apache.lucene.util.AttributeImpl; import org.apache.lucene.util.AttributeReflector; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; /** @@ -673,96 +667,45 @@ public abstract class BaseTermVectorsFormatTestCase extends BaseIndexFileFormatT dir.close(); } - private void doTestMerge(Sort indexSort, boolean allowDeletes) throws IOException { + public void testMerge() throws IOException { final RandomDocumentFactory docFactory = new RandomDocumentFactory(5, 20); final int numDocs = atLeast(100); + final int numDeletes = random().nextInt(numDocs); + final Set deletes = new HashSet<>(); + while (deletes.size() < numDeletes) { + deletes.add(random().nextInt(numDocs)); + } for (Options options : validOptions()) { - Map docs = new HashMap<>(); + final RandomDocument[] docs = new RandomDocument[numDocs]; for (int i = 0; i < numDocs; ++i) { - docs.put( - Integer.toString(i), - docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options)); + docs[i] = docFactory.newDocument(TestUtil.nextInt(random(), 1, 3), atLeast(10), options); } final Directory dir = newDirectory(); - final IndexWriterConfig iwc = newIndexWriterConfig(); - if (indexSort != null) { - iwc.setIndexSort(indexSort); - } - final RandomIndexWriter writer = new RandomIndexWriter(random(), dir, iwc); - List liveDocIDs = new ArrayList<>(); - List ids = new ArrayList<>(docs.keySet()); - Collections.shuffle(ids, random()); - Runnable verifyTermVectors = - () -> { - try (DirectoryReader reader = maybeWrapWithMergingReader(writer.getReader())) { - for (String id : liveDocIDs) { - final int docID = docID(reader, id); - assertEquals(docs.get(id), reader.getTermVectors(docID)); - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }; - for (String id : ids) { - final Document doc = addId(docs.get(id).toDocument(), id); - if (indexSort != null) { - for (SortField sortField : indexSort.getSort()) { - doc.add( - new NumericDocValuesField( - sortField.getField(), TestUtil.nextInt(random(), 0, 1024))); - } - } - if (random().nextInt(100) < 5) { - // add via foreign writer - IndexWriterConfig otherIwc = newIndexWriterConfig(); - if (indexSort != null) { - otherIwc.setIndexSort(indexSort); - } - try (Directory otherDir = newDirectory(); - RandomIndexWriter otherIw = new RandomIndexWriter(random(), otherDir, otherIwc)) { - otherIw.addDocument(doc); - try (DirectoryReader otherReader = otherIw.getReader()) { - TestUtil.addIndexesSlowly(writer.w, otherReader); - } - } - } else { - writer.addDocument(doc); - } - liveDocIDs.add(id); - if (allowDeletes && random().nextInt(100) < 20) { - final String deleteId = liveDocIDs.remove(random().nextInt(liveDocIDs.size())); - writer.deleteDocuments(new Term("id", deleteId)); - } + final RandomIndexWriter writer = new RandomIndexWriter(random(), dir); + for (int i = 0; i < numDocs; ++i) { + writer.addDocument(addId(docs[i].toDocument(), "" + i)); if (rarely()) { writer.commit(); - verifyTermVectors.run(); - } - if (rarely()) { - writer.forceMerge(1); - verifyTermVectors.run(); } } - verifyTermVectors.run(); + for (int delete : deletes) { + writer.deleteDocuments(new Term("id", "" + delete)); + } + // merge with deletes writer.forceMerge(1); - verifyTermVectors.run(); - IOUtils.close(writer, dir); + final IndexReader reader = writer.getReader(); + for (int i = 0; i < numDocs; ++i) { + if (!deletes.contains(i)) { + final int docID = docID(reader, "" + i); + assertEquals(docs[i], reader.getTermVectors(docID)); + } + } + reader.close(); + writer.close(); + dir.close(); } } - public void testMergeWithIndexSort() throws IOException { - SortField[] sortFields = new SortField[TestUtil.nextInt(random(), 1, 2)]; - for (int i = 0; i < sortFields.length; i++) { - sortFields[i] = new SortField("sort_field_" + i, SortField.Type.LONG); - } - doTestMerge(new Sort(sortFields), false); - doTestMerge(new Sort(sortFields), true); - } - - public void testMergeWithoutIndexSort() throws IOException { - doTestMerge(null, false); - doTestMerge(null, true); - } - // run random tests from different threads to make sure the per-thread clones // don't share mutable data public void testClone() throws IOException, InterruptedException {