LUCENE-9827: Update backward codec in Lucene 9.0 (#147)

We need to update the reading logic of the backward codec in Lucene 9 
for LUCENE-9827 and LUCENE-9935 as we have backported them to Lucene 8.

Relates apache/lucene-solr#2495
Relates apache/lucene-solr#2494
This commit is contained in:
Nhat Nguyen 2021-05-20 08:49:43 -04:00 committed by GitHub
parent f919672647
commit a12260eb95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 65 additions and 524 deletions

View File

@ -154,6 +154,6 @@ public final class Lucene50TermVectorsFormat extends Lucene50CompressingTermVect
/** Sole constructor. */
public Lucene50TermVectorsFormat() {
super("Lucene50TermVectorsData", "", CompressionMode.FAST, 1 << 12, 10);
super("Lucene50TermVectorsData", "", CompressionMode.FAST, 1 << 12, 128, 10);
}
}

View File

@ -75,8 +75,13 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
static final int VERSION_OFFHEAP_INDEX = 2;
/** Version where all metadata were moved to the meta file. */
static final int VERSION_META = 3;
/**
* Version where numChunks is explicitly recorded in meta file and a dirty chunk bit is recorded
* in each chunk
*/
static final int VERSION_NUM_CHUNKS = 4;
static final int VERSION_CURRENT = VERSION_META;
static final int VERSION_CURRENT = VERSION_NUM_CHUNKS;
static final int META_VERSION_START = 0;
// for compression of timestamps
@ -99,8 +104,6 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
private final int numDocs;
private final boolean merging;
private final BlockState state;
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private boolean closed;
// used by clone
@ -116,8 +119,6 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.merging = merging;
this.state = new BlockState();
this.closed = false;
@ -231,14 +232,14 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
this.maxPointer = maxPointer;
this.indexReader = indexReader;
if (version >= VERSION_NUM_CHUNKS) {
// discard num_chunks
metaIn.readVLong();
}
if (version >= VERSION_META) {
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();
} else {
// Old versions of this format did not record numDirtyDocs. Since bulk
// merges are disabled on version increments anyway, we make no effort
// to get valid values of numDirtyChunks and numDirtyDocs.
numDirtyChunks = numDirtyDocs = -1;
// consume dirty chunks/docs stats we wrote
metaIn.readVLong();
metaIn.readVLong();
}
if (metaIn != null) {
@ -482,7 +483,7 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
private void doReset(int docID) throws IOException {
docBase = fieldsStream.readVInt();
final int token = fieldsStream.readVInt();
chunkDocs = token >>> 1;
chunkDocs = version >= VERSION_NUM_CHUNKS ? token >>> 2 : token >>> 1;
if (contains(docID) == false || docBase + chunkDocs > numDocs) {
throw new CorruptIndexException(
"Corrupted: docID="
@ -740,56 +741,6 @@ public final class Lucene50CompressingStoredFieldsReader extends StoredFieldsRea
return new Lucene50CompressingStoredFieldsReader(this, true);
}
int getVersion() {
return version;
}
CompressionMode getCompressionMode() {
return compressionMode;
}
FieldsIndex getIndexReader() {
return indexReader;
}
long getMaxPointer() {
return maxPointer;
}
IndexInput getFieldsStream() {
return fieldsStream;
}
int getChunkSize() {
return chunkSize;
}
long getNumDirtyDocs() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumDirtyDocs should only ever get called when the reader is on the current version");
}
assert numDirtyDocs >= 0;
return numDirtyDocs;
}
long getNumDirtyChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumDirtyChunks should only ever get called when the reader is on the current version");
}
assert numDirtyChunks >= 0;
return numDirtyChunks;
}
int getNumDocs() {
return numDocs;
}
int getPackedIntsVersion() {
return packedIntsVersion;
}
@Override
public void checkIntegrity() throws IOException {
indexReader.checkIntegrity();

View File

@ -46,6 +46,8 @@ public class Lucene50CompressingTermVectorsFormat extends TermVectorsFormat {
protected final int chunkSize;
/** block size */
protected final int blockSize;
/** max docs per chunk */
protected final int maxDocsPerChunk;
/**
* Create a new {@link Lucene50CompressingTermVectorsFormat}.
@ -68,6 +70,7 @@ public class Lucene50CompressingTermVectorsFormat extends TermVectorsFormat {
* @param segmentSuffix a suffix to append to files created by this format
* @param compressionMode the {@link CompressionMode} to use
* @param chunkSize the minimum number of bytes of a single chunk of stored documents
* @param maxDocsPerChunk the maximum number of documents in a single chunk
* @param blockSize the number of chunks to store in an index block.
* @see CompressionMode
*/
@ -76,6 +79,7 @@ public class Lucene50CompressingTermVectorsFormat extends TermVectorsFormat {
String segmentSuffix,
CompressionMode compressionMode,
int chunkSize,
int maxDocsPerChunk,
int blockSize) {
this.formatName = formatName;
this.segmentSuffix = segmentSuffix;
@ -84,6 +88,7 @@ public class Lucene50CompressingTermVectorsFormat extends TermVectorsFormat {
throw new IllegalArgumentException("chunkSize must be >= 1");
}
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
if (blockSize < 1) {
throw new IllegalArgumentException("blockSize must be >= 1");
}
@ -111,6 +116,8 @@ public class Lucene50CompressingTermVectorsFormat extends TermVectorsFormat {
+ compressionMode
+ ", chunkSize="
+ chunkSize
+ ", maxDocsPerChunk="
+ maxDocsPerChunk
+ ", blockSize="
+ blockSize
+ ")";

View File

@ -57,9 +57,6 @@ import org.apache.lucene.util.packed.PackedInts;
*/
public final class Lucene50CompressingTermVectorsReader extends TermVectorsReader {
// hard limit on the maximum number of documents per chunk
static final int MAX_DOCUMENTS_PER_CHUNK = 128;
static final String VECTORS_EXTENSION = "tvd";
static final String VECTORS_INDEX_EXTENSION = "tvx";
static final String VECTORS_META_EXTENSION = "tvm";
@ -69,8 +66,10 @@ public final class Lucene50CompressingTermVectorsReader extends TermVectorsReade
static final int VERSION_OFFHEAP_INDEX = 2;
/** Version where all metadata were moved to the meta file. */
static final int VERSION_META = 3;
/** Version where numChunks is explicitly recorded in meta file */
static final int VERSION_NUM_CHUNKS = 4;
static final int VERSION_CURRENT = VERSION_META;
static final int VERSION_CURRENT = VERSION_NUM_CHUNKS;
static final int META_VERSION_START = 0;
static final int PACKED_BLOCK_SIZE = 64;
@ -91,8 +90,6 @@ public final class Lucene50CompressingTermVectorsReader extends TermVectorsReade
private final int numDocs;
private boolean closed;
private final BlockPackedReaderIterator reader;
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private final long maxPointer; // end of the data section
// used by clone
@ -108,8 +105,6 @@ public final class Lucene50CompressingTermVectorsReader extends TermVectorsReade
this.reader =
new BlockPackedReaderIterator(vectorsStream, packedIntsVersion, PACKED_BLOCK_SIZE, 0);
this.version = reader.version;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.maxPointer = reader.maxPointer;
this.closed = false;
}
@ -225,14 +220,14 @@ public final class Lucene50CompressingTermVectorsReader extends TermVectorsReade
this.indexReader = indexReader;
this.maxPointer = maxPointer;
if (version >= VERSION_NUM_CHUNKS) {
// consume num_chunks
metaIn.readVLong();
}
if (version >= VERSION_META) {
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();
} else {
// Old versions of this format did not record numDirtyDocs. Since bulk
// merges are disabled on version increments anyway, we make no effort
// to get valid values of numDirtyChunks and numDirtyDocs.
numDirtyChunks = numDirtyDocs = -1;
// consume dirty chunks/docs stats we wrote
metaIn.readVLong();
metaIn.readVLong();
}
decompressor = compressionMode.newDecompressor();
@ -259,56 +254,6 @@ public final class Lucene50CompressingTermVectorsReader extends TermVectorsReade
}
}
CompressionMode getCompressionMode() {
return compressionMode;
}
int getChunkSize() {
return chunkSize;
}
int getPackedIntsVersion() {
return packedIntsVersion;
}
int getVersion() {
return version;
}
FieldsIndex getIndexReader() {
return indexReader;
}
IndexInput getVectorsStream() {
return vectorsStream;
}
long getMaxPointer() {
return maxPointer;
}
long getNumDirtyDocs() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumDirtyDocs should only ever get called when the reader is on the current version");
}
assert numDirtyDocs >= 0;
return numDirtyDocs;
}
long getNumDirtyChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException(
"getNumDirtyChunks should only ever get called when the reader is on the current version");
}
assert numDirtyChunks >= 0;
return numDirtyChunks;
}
int getNumDocs() {
return numDocs;
}
/** @throws AlreadyClosedException if this TermVectorsReader is closed */
private void ensureOpen() throws AlreadyClosedException {
if (closed) {

View File

@ -24,6 +24,6 @@ public final class Lucene50RWTermVectorsFormat extends Lucene50RWCompressingTerm
/** Sole constructor. */
public Lucene50RWTermVectorsFormat() {
super("Lucene50TermVectorsData", "", CompressionMode.FAST, 1 << 12, 10);
super("Lucene50TermVectorsData", "", CompressionMode.FAST, 1 << 12, 128, 10);
}
}

View File

@ -28,35 +28,25 @@ import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50Com
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsReader.STRING;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsReader.TYPE_BITS;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsReader.VERSION_CURRENT;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.MatchingReaders;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DocIDMerger;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.packed.PackedInts;
@ -73,7 +63,6 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
private IndexOutput metaStream, fieldsStream;
private Compressor compressor;
private final CompressionMode compressionMode;
private final int chunkSize;
private final int maxDocsPerChunk;
@ -83,6 +72,7 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
private int docBase; // doc ID at the beginning of the chunk
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID
private long numChunks; // number of written chunks
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
@ -100,7 +90,6 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
throws IOException {
assert directory != null;
this.segment = si.name;
this.compressionMode = compressionMode;
this.compressor = compressionMode.newCompressor();
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
@ -183,7 +172,7 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
endOffsets[numBufferedDocs] = Math.toIntExact(bufferedDocs.size());
++numBufferedDocs;
if (triggerFlush()) {
flush();
flush(false);
}
}
@ -220,13 +209,19 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
}
private void writeHeader(
int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths, boolean sliced)
int docBase,
int numBufferedDocs,
int[] numStoredFields,
int[] lengths,
boolean sliced,
boolean dirtyChunk)
throws IOException {
final int slicedBit = sliced ? 1 : 0;
final int dirtyBit = dirtyChunk ? 2 : 0;
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
fieldsStream.writeVInt((numBufferedDocs) << 1 | slicedBit);
fieldsStream.writeVInt((numBufferedDocs << 2) | dirtyBit | slicedBit);
// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
@ -241,7 +236,13 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
numBufferedDocs >= maxDocsPerChunk;
}
private void flush() throws IOException {
private void flush(boolean force) throws IOException {
assert triggerFlush() != force;
numChunks++;
if (force) {
numDirtyChunks++; // incomplete: we had to force this flush
numDirtyDocs += numBufferedDocs;
}
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());
// transform end offsets into lengths
@ -251,7 +252,7 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
assert lengths[i] >= 0;
}
final boolean sliced = bufferedDocs.size() >= 2 * chunkSize;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced, force);
// compress stored fields to fieldsStream.
//
@ -481,12 +482,7 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numBufferedDocs > 0) {
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs =
Math.min(
maxDocsPerChunk, (long) ((double) chunkSize / bufferedDocs.size() * numBufferedDocs));
numDirtyDocs += expectedChunkDocs - numBufferedDocs;
flush();
flush(true);
} else {
assert bufferedDocs.size() == 0;
}
@ -495,6 +491,7 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
"Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
@ -520,220 +517,6 @@ public final class Lucene50CompressingStoredFieldsWriter extends StoredFieldsWri
BULK_MERGE_ENABLED = v;
}
@Override
public int merge(MergeState mergeState) throws IOException {
int docCount = 0;
int numReaders = mergeState.maxDocs.length;
MatchingReaders matching = new MatchingReaders(mergeState);
if (mergeState.needsIndexSort) {
/**
* If all readers are compressed and they have the same fieldinfos then we can merge the
* serialized document directly.
*/
List<CompressingStoredFieldsMergeSub> subs = new ArrayList<>();
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
if (matching.matchingReaders[i]
&& mergeState.storedFieldsReaders[i] instanceof Lucene50CompressingStoredFieldsReader) {
Lucene50CompressingStoredFieldsReader storedFieldsReader =
(Lucene50CompressingStoredFieldsReader) mergeState.storedFieldsReaders[i];
storedFieldsReader.checkIntegrity();
subs.add(
new CompressingStoredFieldsMergeSub(
storedFieldsReader, mergeState.docMaps[i], mergeState.maxDocs[i]));
} else {
return super.merge(mergeState);
}
}
final DocIDMerger<CompressingStoredFieldsMergeSub> docIDMerger = DocIDMerger.of(subs, true);
while (true) {
CompressingStoredFieldsMergeSub sub = docIDMerger.next();
if (sub == null) {
break;
}
assert sub.mappedDocID == docCount;
Lucene50CompressingStoredFieldsReader.SerializedDocument doc =
sub.reader.document(sub.docID);
startDocument();
bufferedDocs.copyBytes(doc.in, doc.length);
numStoredFieldsInDoc = doc.numStoredFields;
finishDocument();
++docCount;
}
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) {
MergeVisitor visitor = new MergeVisitor(mergeState, readerIndex);
Lucene50CompressingStoredFieldsReader matchingFieldsReader = null;
if (matching.matchingReaders[readerIndex]) {
final StoredFieldsReader fieldsReader = mergeState.storedFieldsReaders[readerIndex];
// we can only bulk-copy if the matching reader is also a CompressingStoredFieldsReader
if (fieldsReader != null && fieldsReader instanceof Lucene50CompressingStoredFieldsReader) {
matchingFieldsReader = (Lucene50CompressingStoredFieldsReader) fieldsReader;
}
}
final int maxDoc = mergeState.maxDocs[readerIndex];
final Bits liveDocs = mergeState.liveDocs[readerIndex];
// if its some other format, or an older version of this format, or safety switch:
if (matchingFieldsReader == null
|| matchingFieldsReader.getVersion() != VERSION_CURRENT
|| BULK_MERGE_ENABLED == false) {
// naive merge...
StoredFieldsReader storedFieldsReader = mergeState.storedFieldsReaders[readerIndex];
if (storedFieldsReader != null) {
storedFieldsReader.checkIntegrity();
}
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && liveDocs.get(docID) == false) {
continue;
}
startDocument();
storedFieldsReader.visitDocument(docID, visitor);
finishDocument();
++docCount;
}
} else if (matchingFieldsReader.getCompressionMode() == compressionMode
&& matchingFieldsReader.getChunkSize() == chunkSize
&& matchingFieldsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT
&& liveDocs == null
&& !tooDirty(matchingFieldsReader)) {
// optimized merge, raw byte copy
// its not worth fine-graining this if there are deletions.
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
// flush any pending chunks
if (numBufferedDocs > 0) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
}
// iterate over each chunk. we use the stored fields index to find chunk boundaries,
// read the docstart + doccount from the chunk header (we write a new header, since doc
// numbers will change),
// and just copy the bytes directly.
IndexInput rawDocs = matchingFieldsReader.getFieldsStream();
FieldsIndex index = matchingFieldsReader.getIndexReader();
rawDocs.seek(index.getStartPointer(0));
int docID = 0;
while (docID < maxDoc) {
// read header
int base = rawDocs.readVInt();
if (base != docID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
int code = rawDocs.readVInt();
// write a new index entry and new header for this chunk.
int bufferedDocs = code >>> 1;
indexWriter.writeIndex(bufferedDocs, fieldsStream.getFilePointer());
fieldsStream.writeVInt(docBase); // rebase
fieldsStream.writeVInt(code);
docID += bufferedDocs;
docBase += bufferedDocs;
docCount += bufferedDocs;
if (docID > maxDoc) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc,
rawDocs);
}
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == maxDoc) {
end = matchingFieldsReader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
}
fieldsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
}
if (rawDocs.getFilePointer() != matchingFieldsReader.getMaxPointer()) {
throw new CorruptIndexException(
"invalid state: pos="
+ rawDocs.getFilePointer()
+ ", max="
+ matchingFieldsReader.getMaxPointer(),
rawDocs);
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numDirtyChunks += matchingFieldsReader.getNumDirtyChunks();
numDirtyDocs += matchingFieldsReader.getNumDirtyDocs();
} else {
// optimized merge, we copy serialized (but decompressed) bytes directly
// even on simple docs (1 stored field), it seems to help by about 20%
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && liveDocs.get(docID) == false) {
continue;
}
Lucene50CompressingStoredFieldsReader.SerializedDocument doc =
matchingFieldsReader.document(docID);
startDocument();
bufferedDocs.copyBytes(doc.in, doc.length);
numStoredFieldsInDoc = doc.numStoredFields;
finishDocument();
++docCount;
}
}
}
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
/**
* Returns true if we should recompress this reader, even though we could bulk merge compressed
* data
*
* <p>The last chunk written for a segment is typically incomplete, so without recompressing, in
* some worst-case situations (e.g. frequent reopen with tiny flushes), over time the compression
* ratio can degrade. This is a safety switch.
*/
boolean tooDirty(Lucene50CompressingStoredFieldsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024
|| candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs();
}
private static class CompressingStoredFieldsMergeSub extends DocIDMerger.Sub {
private final Lucene50CompressingStoredFieldsReader reader;
private final int maxDoc;
int docID = -1;
CompressingStoredFieldsMergeSub(
Lucene50CompressingStoredFieldsReader reader, MergeState.DocMap docMap, int maxDoc) {
super(docMap);
this.maxDoc = maxDoc;
this.reader = reader;
}
@Override
public int nextDoc() {
docID++;
if (docID == maxDoc) {
return NO_MORE_DOCS;
} else {
return docID;
}
}
}
@Override
public long ramBytesUsed() {
return bufferedDocs.ramBytesUsed()

View File

@ -17,7 +17,6 @@
package org.apache.lucene.backward_codecs.lucene50.compressing;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.FLAGS_BITS;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.MAX_DOCUMENTS_PER_CHUNK;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.OFFSETS;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.PACKED_BLOCK_SIZE;
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingTermVectorsReader.PAYLOADS;
@ -39,27 +38,20 @@ import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.MatchingReaders;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.StringHelper;
@ -77,12 +69,12 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
private FieldsIndexWriter indexWriter;
private IndexOutput metaStream, vectorsStream;
private final CompressionMode compressionMode;
private final Compressor compressor;
private final int chunkSize;
private long numChunks; // number of chunks
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private long numDirtyDocs; // cumulative number of docs in incomplete chunks
/** a pending doc */
private class DocData {
@ -218,6 +210,7 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
private final ByteBuffersDataOutput termSuffixes; // buffered term suffixes
private final ByteBuffersDataOutput payloadBytes; // buffered term payloads
private final BlockPackedWriter writer;
private final int maxDocsPerChunk; // hard limit on number of docs per chunk
/** Sole constructor. */
Lucene50CompressingTermVectorsWriter(
@ -228,13 +221,14 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
String formatName,
CompressionMode compressionMode,
int chunkSize,
int maxDocsPerChunk,
int blockShift)
throws IOException {
assert directory != null;
this.segment = si.name;
this.compressionMode = compressionMode;
this.compressor = compressionMode.newCompressor();
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
numDocs = 0;
pendingDocs = new ArrayDeque<>();
@ -370,10 +364,11 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
}
private boolean triggerFlush() {
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK;
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
}
private void flush() throws IOException {
numChunks++;
final int chunkDocs = pendingDocs.size();
assert chunkDocs > 0 : chunkDocs;
@ -709,11 +704,7 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (!pendingDocs.isEmpty()) {
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs =
Math.min(
MAX_DOCUMENTS_PER_CHUNK,
(long) ((double) chunkSize / termSuffixes.size() * pendingDocs.size()));
numDirtyDocs += expectedChunkDocs - pendingDocs.size();
numDirtyDocs += pendingDocs.size();
flush();
}
if (numDocs != this.numDocs) {
@ -721,6 +712,7 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
"Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
@ -802,145 +794,6 @@ public final class Lucene50CompressingTermVectorsWriter extends TermVectorsWrite
BULK_MERGE_ENABLED = v;
}
@Override
public int merge(MergeState mergeState) throws IOException {
if (mergeState.needsIndexSort) {
// TODO: can we gain back some optos even if index is sorted? E.g. if sort results in large
// chunks of contiguous docs from one sub
// being copied over...?
return super.merge(mergeState);
}
int docCount = 0;
int numReaders = mergeState.maxDocs.length;
MatchingReaders matching = new MatchingReaders(mergeState);
for (int readerIndex = 0; readerIndex < numReaders; readerIndex++) {
Lucene50CompressingTermVectorsReader matchingVectorsReader = null;
final TermVectorsReader vectorsReader = mergeState.termVectorsReaders[readerIndex];
if (matching.matchingReaders[readerIndex]) {
// we can only bulk-copy if the matching reader is also a CompressingTermVectorsReader
if (vectorsReader != null
&& vectorsReader instanceof Lucene50CompressingTermVectorsReader) {
matchingVectorsReader = (Lucene50CompressingTermVectorsReader) vectorsReader;
}
}
final int maxDoc = mergeState.maxDocs[readerIndex];
final Bits liveDocs = mergeState.liveDocs[readerIndex];
if (matchingVectorsReader != null
&& matchingVectorsReader.getCompressionMode() == compressionMode
&& matchingVectorsReader.getChunkSize() == chunkSize
&& matchingVectorsReader.getVersion() == VERSION_CURRENT
&& matchingVectorsReader.getPackedIntsVersion() == PackedInts.VERSION_CURRENT
&& BULK_MERGE_ENABLED
&& liveDocs == null
&& !tooDirty(matchingVectorsReader)) {
// optimized merge, raw byte copy
// its not worth fine-graining this if there are deletions.
matchingVectorsReader.checkIntegrity();
// flush any pending chunks
if (!pendingDocs.isEmpty()) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
}
// iterate over each chunk. we use the vectors index to find chunk boundaries,
// read the docstart + doccount from the chunk header (we write a new header, since doc
// numbers will change),
// and just copy the bytes directly.
IndexInput rawDocs = matchingVectorsReader.getVectorsStream();
FieldsIndex index = matchingVectorsReader.getIndexReader();
rawDocs.seek(index.getStartPointer(0));
int docID = 0;
while (docID < maxDoc) {
// read header
int base = rawDocs.readVInt();
if (base != docID) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
int bufferedDocs = rawDocs.readVInt();
// write a new index entry and new header for this chunk.
indexWriter.writeIndex(bufferedDocs, vectorsStream.getFilePointer());
vectorsStream.writeVInt(docCount); // rebase
vectorsStream.writeVInt(bufferedDocs);
docID += bufferedDocs;
docCount += bufferedDocs;
numDocs += bufferedDocs;
if (docID > maxDoc) {
throw new CorruptIndexException(
"invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc,
rawDocs);
}
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == maxDoc) {
end = matchingVectorsReader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
}
vectorsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
}
if (rawDocs.getFilePointer() != matchingVectorsReader.getMaxPointer()) {
throw new CorruptIndexException(
"invalid state: pos="
+ rawDocs.getFilePointer()
+ ", max="
+ matchingVectorsReader.getMaxPointer(),
rawDocs);
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numDirtyChunks += matchingVectorsReader.getNumDirtyChunks();
numDirtyDocs += matchingVectorsReader.getNumDirtyDocs();
} else {
// naive merge...
if (vectorsReader != null) {
vectorsReader.checkIntegrity();
}
for (int i = 0; i < maxDoc; i++) {
if (liveDocs != null && liveDocs.get(i) == false) {
continue;
}
Fields vectors;
if (vectorsReader == null) {
vectors = null;
} else {
vectors = vectorsReader.get(i);
}
addAllDocVectors(vectors, mergeState);
++docCount;
}
}
}
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
/**
* Returns true if we should recompress this reader, even though we could bulk merge compressed
* data
*
* <p>The last chunk written for a segment is typically incomplete, so without recompressing, in
* some worst-case situations (e.g. frequent reopen with tiny flushes), over time the compression
* ratio can degrade. This is a safety switch.
*/
boolean tooDirty(Lucene50CompressingTermVectorsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024
|| candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs();
}
@Override
public long ramBytesUsed() {
return positionsBuf.length

View File

@ -32,8 +32,9 @@ public class Lucene50RWCompressingTermVectorsFormat extends Lucene50CompressingT
String segmentSuffix,
CompressionMode compressionMode,
int chunkSize,
int maxDocsPerChunk,
int blockSize) {
super(formatName, segmentSuffix, compressionMode, chunkSize, blockSize);
super(formatName, segmentSuffix, compressionMode, chunkSize, maxDocsPerChunk, blockSize);
}
@Override
@ -47,6 +48,7 @@ public class Lucene50RWCompressingTermVectorsFormat extends Lucene50CompressingT
formatName,
compressionMode,
chunkSize,
maxDocsPerChunk,
blockSize);
}
}