mirror of https://github.com/apache/lucene.git
LUCENE-6115: Add getMergeInstance to CompressingStoredFieldsReader.
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1646413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e23993550e
commit
84e6519950
|
@ -46,6 +46,7 @@ import java.util.Collections;
|
|||
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.codecs.StoredFieldsReader;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.FieldInfos;
|
||||
|
@ -53,7 +54,6 @@ import org.apache.lucene.index.IndexFileNames;
|
|||
import org.apache.lucene.index.SegmentInfo;
|
||||
import org.apache.lucene.index.StoredFieldVisitor;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.BufferedChecksumIndexInput;
|
||||
import org.apache.lucene.store.ByteArrayDataInput;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
import org.apache.lucene.store.DataInput;
|
||||
|
@ -66,6 +66,7 @@ import org.apache.lucene.util.ArrayUtil;
|
|||
import org.apache.lucene.util.BitUtil;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.IntsRef;
|
||||
import org.apache.lucene.util.packed.PackedInts;
|
||||
|
||||
/**
|
||||
|
@ -74,9 +75,6 @@ import org.apache.lucene.util.packed.PackedInts;
|
|||
*/
|
||||
public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
||||
|
||||
// Do not reuse the decompression buffer when there is more than 32kb to decompress
|
||||
private static final int BUFFER_REUSE_THRESHOLD = 1 << 15;
|
||||
|
||||
private final int version;
|
||||
private final FieldInfos fieldInfos;
|
||||
private final CompressingStoredFieldsIndexReader indexReader;
|
||||
|
@ -86,12 +84,13 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
private final int packedIntsVersion;
|
||||
private final CompressionMode compressionMode;
|
||||
private final Decompressor decompressor;
|
||||
private final BytesRef bytes;
|
||||
private final int numDocs;
|
||||
private final boolean merging;
|
||||
private final BlockState state;
|
||||
private boolean closed;
|
||||
|
||||
// used by clone
|
||||
private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader) {
|
||||
private CompressingStoredFieldsReader(CompressingStoredFieldsReader reader, boolean merging) {
|
||||
this.version = reader.version;
|
||||
this.fieldInfos = reader.fieldInfos;
|
||||
this.fieldsStream = reader.fieldsStream.clone();
|
||||
|
@ -102,7 +101,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
this.compressionMode = reader.compressionMode;
|
||||
this.decompressor = reader.decompressor.clone();
|
||||
this.numDocs = reader.numDocs;
|
||||
this.bytes = new BytesRef(reader.bytes.bytes.length);
|
||||
this.merging = merging;
|
||||
this.state = new BlockState();
|
||||
this.closed = false;
|
||||
}
|
||||
|
||||
|
@ -157,7 +157,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
chunkSize = fieldsStream.readVInt();
|
||||
packedIntsVersion = fieldsStream.readVInt();
|
||||
decompressor = compressionMode.newDecompressor();
|
||||
this.bytes = new BytesRef();
|
||||
this.merging = false;
|
||||
this.state = new BlockState();
|
||||
|
||||
// NOTE: data file is too costly to verify checksum against all the bytes on open,
|
||||
// but for now we at least verify proper structure of the checksum footer: which looks
|
||||
|
@ -324,227 +325,94 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
return l;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visitDocument(int docID, StoredFieldVisitor visitor)
|
||||
throws IOException {
|
||||
fieldsStream.seek(indexReader.getStartPointer(docID));
|
||||
/**
|
||||
* A serialized document, you need to decode its input in order to get an actual
|
||||
* {@link Document}.
|
||||
*/
|
||||
static class SerializedDocument {
|
||||
|
||||
final int docBase = fieldsStream.readVInt();
|
||||
final int chunkDocs = fieldsStream.readVInt();
|
||||
if (docID < docBase
|
||||
|| docID >= docBase + chunkDocs
|
||||
|| docBase + chunkDocs > numDocs) {
|
||||
throw new CorruptIndexException("Corrupted: docID=" + docID
|
||||
+ ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
|
||||
+ ", numDocs=" + numDocs, fieldsStream);
|
||||
// the serialized data
|
||||
final DataInput in;
|
||||
|
||||
// the number of bytes on which the document is encoded
|
||||
final int length;
|
||||
|
||||
// the number of stored fields
|
||||
final int numStoredFields;
|
||||
|
||||
private SerializedDocument(DataInput in, int length, int numStoredFields) {
|
||||
this.in = in;
|
||||
this.length = length;
|
||||
this.numStoredFields = numStoredFields;
|
||||
}
|
||||
|
||||
final int numStoredFields, offset, length, totalLength;
|
||||
if (chunkDocs == 1) {
|
||||
numStoredFields = fieldsStream.readVInt();
|
||||
offset = 0;
|
||||
length = fieldsStream.readVInt();
|
||||
totalLength = length;
|
||||
} else {
|
||||
final int bitsPerStoredFields = fieldsStream.readVInt();
|
||||
if (bitsPerStoredFields == 0) {
|
||||
numStoredFields = fieldsStream.readVInt();
|
||||
} else if (bitsPerStoredFields > 31) {
|
||||
throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields, fieldsStream);
|
||||
} else {
|
||||
final long filePointer = fieldsStream.getFilePointer();
|
||||
final PackedInts.Reader reader = PackedInts.getDirectReaderNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields);
|
||||
numStoredFields = (int) (reader.get(docID - docBase));
|
||||
fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerStoredFields));
|
||||
}
|
||||
|
||||
final int bitsPerLength = fieldsStream.readVInt();
|
||||
if (bitsPerLength == 0) {
|
||||
length = fieldsStream.readVInt();
|
||||
offset = (docID - docBase) * length;
|
||||
totalLength = chunkDocs * length;
|
||||
} else if (bitsPerStoredFields > 31) {
|
||||
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
|
||||
} else {
|
||||
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
|
||||
int off = 0;
|
||||
for (int i = 0; i < docID - docBase; ++i) {
|
||||
off += it.next();
|
||||
}
|
||||
offset = off;
|
||||
length = (int) it.next();
|
||||
off += length;
|
||||
for (int i = docID - docBase + 1; i < chunkDocs; ++i) {
|
||||
off += it.next();
|
||||
}
|
||||
totalLength = off;
|
||||
}
|
||||
}
|
||||
|
||||
if ((length == 0) != (numStoredFields == 0)) {
|
||||
throw new CorruptIndexException("length=" + length + ", numStoredFields=" + numStoredFields, fieldsStream);
|
||||
}
|
||||
if (numStoredFields == 0) {
|
||||
// nothing to do
|
||||
return;
|
||||
}
|
||||
|
||||
final DataInput documentInput;
|
||||
if (totalLength >= 2 * chunkSize) {
|
||||
assert chunkSize > 0;
|
||||
assert offset < chunkSize;
|
||||
|
||||
decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
|
||||
documentInput = new DataInput() {
|
||||
|
||||
int decompressed = bytes.length;
|
||||
|
||||
void fillBuffer() throws IOException {
|
||||
assert decompressed <= length;
|
||||
if (decompressed == length) {
|
||||
throw new EOFException();
|
||||
}
|
||||
final int toDecompress = Math.min(length - decompressed, chunkSize);
|
||||
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
|
||||
decompressed += toDecompress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
if (bytes.length == 0) {
|
||||
fillBuffer();
|
||||
}
|
||||
--bytes.length;
|
||||
return bytes.bytes[bytes.offset++];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
while (len > bytes.length) {
|
||||
System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
|
||||
len -= bytes.length;
|
||||
offset += bytes.length;
|
||||
fillBuffer();
|
||||
}
|
||||
System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
|
||||
bytes.offset += len;
|
||||
bytes.length -= len;
|
||||
}
|
||||
|
||||
};
|
||||
} else {
|
||||
final BytesRef bytes = totalLength <= BUFFER_REUSE_THRESHOLD ? this.bytes : new BytesRef();
|
||||
decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
|
||||
assert bytes.length == length;
|
||||
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
|
||||
}
|
||||
|
||||
for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
|
||||
final long infoAndBits = documentInput.readVLong();
|
||||
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
|
||||
final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
|
||||
|
||||
final int bits = (int) (infoAndBits & TYPE_MASK);
|
||||
assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);
|
||||
|
||||
switch(visitor.needsField(fieldInfo)) {
|
||||
case YES:
|
||||
readField(documentInput, visitor, fieldInfo, bits);
|
||||
break;
|
||||
case NO:
|
||||
skipField(documentInput, bits);
|
||||
break;
|
||||
case STOP:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoredFieldsReader clone() {
|
||||
ensureOpen();
|
||||
return new CompressingStoredFieldsReader(this);
|
||||
}
|
||||
/**
|
||||
* Keeps state about the current block of documents.
|
||||
*/
|
||||
private class BlockState {
|
||||
|
||||
int getVersion() {
|
||||
return version;
|
||||
}
|
||||
private int docBase, chunkDocs;
|
||||
|
||||
CompressionMode getCompressionMode() {
|
||||
return compressionMode;
|
||||
}
|
||||
// whether the block has been sliced, this happens for large documents
|
||||
private boolean sliced;
|
||||
|
||||
int getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
private int[] offsets = IntsRef.EMPTY_INTS;
|
||||
private int[] numStoredFields = IntsRef.EMPTY_INTS;
|
||||
|
||||
ChunkIterator chunkIterator(int startDocID) throws IOException {
|
||||
ensureOpen();
|
||||
return new ChunkIterator(startDocID);
|
||||
}
|
||||
// the start pointer at which you can read the compressed documents
|
||||
private long startPointer;
|
||||
|
||||
final class ChunkIterator {
|
||||
private final BytesRef spare = new BytesRef();
|
||||
private final BytesRef bytes = new BytesRef();
|
||||
|
||||
final ChecksumIndexInput fieldsStream;
|
||||
final BytesRef spare;
|
||||
final BytesRef bytes;
|
||||
int docBase;
|
||||
int chunkDocs;
|
||||
int[] numStoredFields;
|
||||
int[] lengths;
|
||||
|
||||
private ChunkIterator(int startDocId) throws IOException {
|
||||
this.docBase = -1;
|
||||
bytes = new BytesRef();
|
||||
spare = new BytesRef();
|
||||
numStoredFields = new int[1];
|
||||
lengths = new int[1];
|
||||
|
||||
IndexInput in = CompressingStoredFieldsReader.this.fieldsStream;
|
||||
in.seek(0);
|
||||
fieldsStream = new BufferedChecksumIndexInput(in);
|
||||
fieldsStream.seek(indexReader.getStartPointer(startDocId));
|
||||
boolean contains(int docID) {
|
||||
return docID >= docBase && docID < docBase + chunkDocs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the decompressed size of the chunk
|
||||
* Reset this block so that it stores state for the block
|
||||
* that contains the given doc id.
|
||||
*/
|
||||
int chunkSize() {
|
||||
int sum = 0;
|
||||
for (int i = 0; i < chunkDocs; ++i) {
|
||||
sum += lengths[i];
|
||||
void reset(int docID) throws IOException {
|
||||
boolean success = false;
|
||||
try {
|
||||
doReset(docID);
|
||||
success = true;
|
||||
} finally {
|
||||
if (success == false) {
|
||||
// if the read failed, set chunkDocs to 0 so that it does not
|
||||
// contain any docs anymore and is not reused. This should help
|
||||
// get consistent exceptions when trying to get several
|
||||
// documents which are in the same corrupted block since it will
|
||||
// force the header to be decoded again
|
||||
chunkDocs = 0;
|
||||
}
|
||||
}
|
||||
return sum;
|
||||
}
|
||||
|
||||
/**
|
||||
* Go to the chunk containing the provided doc ID.
|
||||
*/
|
||||
void next(int doc) throws IOException {
|
||||
assert doc >= docBase + chunkDocs : doc + " " + docBase + " " + chunkDocs;
|
||||
fieldsStream.seek(indexReader.getStartPointer(doc));
|
||||
|
||||
final int docBase = fieldsStream.readVInt();
|
||||
final int chunkDocs = fieldsStream.readVInt();
|
||||
if (docBase < this.docBase + this.chunkDocs
|
||||
private void doReset(int docID) throws IOException {
|
||||
docBase = fieldsStream.readVInt();
|
||||
final int token = fieldsStream.readVInt();
|
||||
chunkDocs = token >>> 1;
|
||||
if (contains(docID) == false
|
||||
|| docBase + chunkDocs > numDocs) {
|
||||
throw new CorruptIndexException("Corrupted: current docBase=" + this.docBase
|
||||
+ ", current numDocs=" + this.chunkDocs + ", new docBase=" + docBase
|
||||
+ ", new numDocs=" + chunkDocs, fieldsStream);
|
||||
throw new CorruptIndexException("Corrupted: docID=" + docID
|
||||
+ ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
|
||||
+ ", numDocs=" + numDocs, fieldsStream);
|
||||
}
|
||||
this.docBase = docBase;
|
||||
this.chunkDocs = chunkDocs;
|
||||
|
||||
if (chunkDocs > numStoredFields.length) {
|
||||
final int newLength = ArrayUtil.oversize(chunkDocs, 4);
|
||||
numStoredFields = new int[newLength];
|
||||
lengths = new int[newLength];
|
||||
}
|
||||
sliced = (token & 1) != 0;
|
||||
|
||||
offsets = ArrayUtil.grow(offsets, chunkDocs + 1);
|
||||
numStoredFields = ArrayUtil.grow(numStoredFields, chunkDocs);
|
||||
|
||||
if (chunkDocs == 1) {
|
||||
numStoredFields[0] = fieldsStream.readVInt();
|
||||
lengths[0] = fieldsStream.readVInt();
|
||||
offsets[1] = fieldsStream.readVInt();
|
||||
} else {
|
||||
// Number of stored fields per document
|
||||
final int bitsPerStoredFields = fieldsStream.readVInt();
|
||||
if (bitsPerStoredFields == 0) {
|
||||
Arrays.fill(numStoredFields, 0, chunkDocs, fieldsStream.readVInt());
|
||||
|
@ -557,54 +425,195 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
|
|||
}
|
||||
}
|
||||
|
||||
// The stream encodes the length of each document and we decode
|
||||
// it into a list of monotonically increasing offsets
|
||||
final int bitsPerLength = fieldsStream.readVInt();
|
||||
if (bitsPerLength == 0) {
|
||||
Arrays.fill(lengths, 0, chunkDocs, fieldsStream.readVInt());
|
||||
} else if (bitsPerLength > 31) {
|
||||
final int length = fieldsStream.readVInt();
|
||||
for (int i = 0; i < chunkDocs; ++i) {
|
||||
offsets[1 + i] = (1 + i) * length;
|
||||
}
|
||||
} else if (bitsPerStoredFields > 31) {
|
||||
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength, fieldsStream);
|
||||
} else {
|
||||
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
|
||||
for (int i = 0; i < chunkDocs; ++i) {
|
||||
lengths[i] = (int) it.next();
|
||||
offsets[i + 1] = (int) it.next();
|
||||
}
|
||||
for (int i = 0; i < chunkDocs; ++i) {
|
||||
offsets[i + 1] += offsets[i];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decompress the chunk.
|
||||
*/
|
||||
void decompress() throws IOException {
|
||||
// decompress data
|
||||
final int chunkSize = chunkSize();
|
||||
if (chunkSize >= 2 * CompressingStoredFieldsReader.this.chunkSize) {
|
||||
bytes.offset = bytes.length = 0;
|
||||
for (int decompressed = 0; decompressed < chunkSize; ) {
|
||||
final int toDecompress = Math.min(chunkSize - decompressed, CompressingStoredFieldsReader.this.chunkSize);
|
||||
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
|
||||
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
|
||||
System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
|
||||
bytes.length += spare.length;
|
||||
decompressed += toDecompress;
|
||||
// Additional validation: only the empty document has a serialized length of 0
|
||||
for (int i = 0; i < chunkDocs; ++i) {
|
||||
final int len = offsets[i + 1] - offsets[i];
|
||||
final int storedFields = numStoredFields[i];
|
||||
if ((len == 0) != (storedFields == 0)) {
|
||||
throw new CorruptIndexException("length=" + len + ", numStoredFields=" + storedFields, fieldsStream);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
decompressor.decompress(fieldsStream, chunkSize, 0, chunkSize, bytes);
|
||||
|
||||
}
|
||||
if (bytes.length != chunkSize) {
|
||||
throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length, fieldsStream);
|
||||
|
||||
startPointer = fieldsStream.getFilePointer();
|
||||
|
||||
if (merging) {
|
||||
final int totalLength = offsets[chunkDocs];
|
||||
// decompress eagerly
|
||||
if (sliced) {
|
||||
bytes.offset = bytes.length = 0;
|
||||
for (int decompressed = 0; decompressed < totalLength; ) {
|
||||
final int toDecompress = Math.min(totalLength - decompressed, chunkSize);
|
||||
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, spare);
|
||||
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + spare.length);
|
||||
System.arraycopy(spare.bytes, spare.offset, bytes.bytes, bytes.length, spare.length);
|
||||
bytes.length += spare.length;
|
||||
decompressed += toDecompress;
|
||||
}
|
||||
} else {
|
||||
decompressor.decompress(fieldsStream, totalLength, 0, totalLength, bytes);
|
||||
}
|
||||
if (bytes.length != totalLength) {
|
||||
throw new CorruptIndexException("Corrupted: expected chunk size = " + totalLength + ", got " + bytes.length, fieldsStream);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check integrity of the data. The iterator is not usable after this method has been called.
|
||||
* Get the serialized representation of the given docID. This docID has
|
||||
* to be contained in the current block.
|
||||
*/
|
||||
void checkIntegrity() throws IOException {
|
||||
fieldsStream.seek(fieldsStream.length() - CodecUtil.footerLength());
|
||||
CodecUtil.checkFooter(fieldsStream);
|
||||
SerializedDocument document(int docID) throws IOException {
|
||||
if (contains(docID) == false) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
final int index = docID - docBase;
|
||||
final int offset = offsets[index];
|
||||
final int length = offsets[index+1] - offset;
|
||||
final int totalLength = offsets[chunkDocs];
|
||||
final int numStoredFields = this.numStoredFields[index];
|
||||
|
||||
fieldsStream.seek(startPointer);
|
||||
|
||||
final DataInput documentInput;
|
||||
if (length == 0) {
|
||||
// empty
|
||||
documentInput = new ByteArrayDataInput();
|
||||
} else if (merging) {
|
||||
// already decompressed
|
||||
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
|
||||
} else if (sliced) {
|
||||
decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
|
||||
documentInput = new DataInput() {
|
||||
|
||||
int decompressed = bytes.length;
|
||||
|
||||
void fillBuffer() throws IOException {
|
||||
assert decompressed <= length;
|
||||
if (decompressed == length) {
|
||||
throw new EOFException();
|
||||
}
|
||||
final int toDecompress = Math.min(length - decompressed, chunkSize);
|
||||
decompressor.decompress(fieldsStream, toDecompress, 0, toDecompress, bytes);
|
||||
decompressed += toDecompress;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte readByte() throws IOException {
|
||||
if (bytes.length == 0) {
|
||||
fillBuffer();
|
||||
}
|
||||
--bytes.length;
|
||||
return bytes.bytes[bytes.offset++];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readBytes(byte[] b, int offset, int len) throws IOException {
|
||||
while (len > bytes.length) {
|
||||
System.arraycopy(bytes.bytes, bytes.offset, b, offset, bytes.length);
|
||||
len -= bytes.length;
|
||||
offset += bytes.length;
|
||||
fillBuffer();
|
||||
}
|
||||
System.arraycopy(bytes.bytes, bytes.offset, b, offset, len);
|
||||
bytes.offset += len;
|
||||
bytes.length -= len;
|
||||
}
|
||||
|
||||
};
|
||||
} else {
|
||||
decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
|
||||
assert bytes.length == length;
|
||||
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
|
||||
}
|
||||
|
||||
return new SerializedDocument(documentInput, length, numStoredFields);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
SerializedDocument document(int docID) throws IOException {
|
||||
if (state.contains(docID) == false) {
|
||||
fieldsStream.seek(indexReader.getStartPointer(docID));
|
||||
state.reset(docID);
|
||||
}
|
||||
assert state.contains(docID);
|
||||
return state.document(docID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void visitDocument(int docID, StoredFieldVisitor visitor)
|
||||
throws IOException {
|
||||
|
||||
final SerializedDocument doc = document(docID);
|
||||
|
||||
for (int fieldIDX = 0; fieldIDX < doc.numStoredFields; fieldIDX++) {
|
||||
final long infoAndBits = doc.in.readVLong();
|
||||
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
|
||||
final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
|
||||
|
||||
final int bits = (int) (infoAndBits & TYPE_MASK);
|
||||
assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);
|
||||
|
||||
switch(visitor.needsField(fieldInfo)) {
|
||||
case YES:
|
||||
readField(doc.in, visitor, fieldInfo, bits);
|
||||
break;
|
||||
case NO:
|
||||
skipField(doc.in, bits);
|
||||
break;
|
||||
case STOP:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoredFieldsReader clone() {
|
||||
ensureOpen();
|
||||
return new CompressingStoredFieldsReader(this, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoredFieldsReader getMergeInstance() {
|
||||
ensureOpen();
|
||||
return new CompressingStoredFieldsReader(this, true);
|
||||
}
|
||||
|
||||
int getVersion() {
|
||||
return version;
|
||||
}
|
||||
|
||||
CompressionMode getCompressionMode() {
|
||||
return compressionMode;
|
||||
}
|
||||
|
||||
int getChunkSize() {
|
||||
return chunkSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long ramBytesUsed() {
|
||||
return indexReader.ramBytesUsed();
|
||||
|
|
|
@ -23,9 +23,8 @@ import java.util.Arrays;
|
|||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.codecs.StoredFieldsReader;
|
||||
import org.apache.lucene.codecs.StoredFieldsWriter;
|
||||
import org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.ChunkIterator;
|
||||
import org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.SerializedDocument;
|
||||
import org.apache.lucene.document.DocumentStoredFieldVisitor;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.FieldInfos;
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
|
@ -192,10 +191,12 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeHeader(int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths) throws IOException {
|
||||
private void writeHeader(int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths, boolean sliced) throws IOException {
|
||||
final int slicedBit = sliced ? 1 : 0;
|
||||
|
||||
// save docBase and numBufferedDocs
|
||||
fieldsStream.writeVInt(docBase);
|
||||
fieldsStream.writeVInt(numBufferedDocs);
|
||||
fieldsStream.writeVInt((numBufferedDocs) << 1 | slicedBit);
|
||||
|
||||
// save numStoredFields
|
||||
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
|
||||
|
@ -218,10 +219,11 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
|
|||
lengths[i] = endOffsets[i] - endOffsets[i - 1];
|
||||
assert lengths[i] >= 0;
|
||||
}
|
||||
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths);
|
||||
final boolean sliced = bufferedDocs.length >= 2 * chunkSize;
|
||||
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced);
|
||||
|
||||
// compress stored fields to fieldsStream
|
||||
if (bufferedDocs.length >= 2 * chunkSize) {
|
||||
if (sliced) {
|
||||
// big chunk, slice it
|
||||
for (int compressed = 0; compressed < bufferedDocs.length; compressed += chunkSize) {
|
||||
compressor.compress(bufferedDocs.bytes, compressed, Math.min(chunkSize, bufferedDocs.length - compressed), fieldsStream);
|
||||
|
@ -493,62 +495,35 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
|
|||
if (storedFieldsReader != null) {
|
||||
storedFieldsReader.checkIntegrity();
|
||||
}
|
||||
for (int i = nextLiveDoc(0, liveDocs, maxDoc); i < maxDoc; i = nextLiveDoc(i + 1, liveDocs, maxDoc)) {
|
||||
for (int docID = 0; docID < maxDoc; docID++) {
|
||||
if (liveDocs != null && liveDocs.get(docID) == false) {
|
||||
continue;
|
||||
}
|
||||
DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor();
|
||||
storedFieldsReader.visitDocument(i, visitor);
|
||||
storedFieldsReader.visitDocument(docID, visitor);
|
||||
addDocument(visitor.getDocument(), mergeState.mergeFieldInfos);
|
||||
++docCount;
|
||||
mergeState.checkAbort.work(300);
|
||||
}
|
||||
} else {
|
||||
int docID = nextLiveDoc(0, liveDocs, maxDoc);
|
||||
if (docID < maxDoc) {
|
||||
// not all docs were deleted
|
||||
final ChunkIterator it = matchingFieldsReader.chunkIterator(docID);
|
||||
int[] startOffsets = new int[0];
|
||||
do {
|
||||
// go to the next chunk that contains docID
|
||||
it.next(docID);
|
||||
// transform lengths into offsets
|
||||
if (startOffsets.length < it.chunkDocs) {
|
||||
startOffsets = new int[ArrayUtil.oversize(it.chunkDocs, 4)];
|
||||
}
|
||||
for (int i = 1; i < it.chunkDocs; ++i) {
|
||||
startOffsets[i] = startOffsets[i - 1] + it.lengths[i - 1];
|
||||
}
|
||||
|
||||
// decompress
|
||||
it.decompress();
|
||||
if (startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] != it.bytes.length) {
|
||||
throw new CorruptIndexException("Corrupted: expected chunk size=" + startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] + ", got " + it.bytes.length, it.fieldsStream);
|
||||
}
|
||||
// copy non-deleted docs
|
||||
for (; docID < it.docBase + it.chunkDocs; docID = nextLiveDoc(docID + 1, liveDocs, maxDoc)) {
|
||||
final int diff = docID - it.docBase;
|
||||
startDocument();
|
||||
bufferedDocs.writeBytes(it.bytes.bytes, it.bytes.offset + startOffsets[diff], it.lengths[diff]);
|
||||
numStoredFieldsInDoc = it.numStoredFields[diff];
|
||||
finishDocument();
|
||||
++docCount;
|
||||
mergeState.checkAbort.work(300);
|
||||
}
|
||||
} while (docID < maxDoc);
|
||||
|
||||
it.checkIntegrity();
|
||||
// optimized merge, we copy serialized (but decompressed) bytes directly
|
||||
// even on simple docs (1 stored field), it seems to help by about 20%
|
||||
matchingFieldsReader.checkIntegrity();
|
||||
for (int docID = 0; docID < maxDoc; docID++) {
|
||||
if (liveDocs != null && liveDocs.get(docID) == false) {
|
||||
continue;
|
||||
}
|
||||
SerializedDocument doc = matchingFieldsReader.document(docID);
|
||||
startDocument();
|
||||
bufferedDocs.copyBytes(doc.in, doc.length);
|
||||
numStoredFieldsInDoc = doc.numStoredFields;
|
||||
finishDocument();
|
||||
++docCount;
|
||||
mergeState.checkAbort.work(300);
|
||||
}
|
||||
}
|
||||
}
|
||||
finish(mergeState.mergeFieldInfos, docCount);
|
||||
return docCount;
|
||||
}
|
||||
|
||||
private static int nextLiveDoc(int doc, Bits liveDocs, int maxDoc) {
|
||||
if (liveDocs == null) {
|
||||
return doc;
|
||||
}
|
||||
while (doc < maxDoc && !liveDocs.get(doc)) {
|
||||
++doc;
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,9 @@ public abstract class CompressingCodec extends FilterCodec {
|
|||
* suffix
|
||||
*/
|
||||
public static CompressingCodec randomInstance(Random random) {
|
||||
return randomInstance(random, RandomInts.randomIntBetween(random, 1, 1 << 15), RandomInts.randomIntBetween(random, 64, 1024), false);
|
||||
final int chunkSize = random.nextBoolean() ? RandomInts.randomIntBetween(random, 1, 10) : RandomInts.randomIntBetween(random, 1, 1 << 15);
|
||||
final int chunkDocs = random.nextBoolean() ? RandomInts.randomIntBetween(random, 1, 10) : RandomInts.randomIntBetween(random, 64, 1024);
|
||||
return randomInstance(random, chunkSize, chunkDocs, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -57,8 +57,10 @@ import org.apache.lucene.store.MockDirectoryWrapper.Throttling;
|
|||
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomInts;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
|
||||
|
||||
/**
|
||||
* Base class aiming at testing {@link StoredFieldsFormat stored fields formats}.
|
||||
|
@ -579,7 +581,94 @@ public abstract class BaseStoredFieldsFormatTestCase extends BaseIndexFileFormat
|
|||
iw.close();
|
||||
dir.close();
|
||||
}
|
||||
|
||||
|
||||
/** A dummy filter reader that reverse the order of documents in stored fields. */
|
||||
private static class DummyFilterLeafReader extends FilterLeafReader {
|
||||
|
||||
public DummyFilterLeafReader(LeafReader in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void document(int docID, StoredFieldVisitor visitor) throws IOException {
|
||||
super.document(maxDoc() - 1 - docID, visitor);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class DummyFilterDirectoryReader extends FilterDirectoryReader {
|
||||
|
||||
public DummyFilterDirectoryReader(DirectoryReader in) {
|
||||
super(in, new SubReaderWrapper() {
|
||||
@Override
|
||||
public LeafReader wrap(LeafReader reader) {
|
||||
return new DummyFilterLeafReader(reader);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) {
|
||||
return new DummyFilterDirectoryReader(in);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testMergeFilterReader() throws IOException {
|
||||
Directory dir = newDirectory();
|
||||
RandomIndexWriter w = new RandomIndexWriter(random(), dir);
|
||||
final int numDocs = atLeast(200);
|
||||
final String[] stringValues = new String[10];
|
||||
for (int i = 0; i < stringValues.length; ++i) {
|
||||
stringValues[i] = RandomStrings.randomRealisticUnicodeOfLength(random(), 10);
|
||||
}
|
||||
Document[] docs = new Document[numDocs];
|
||||
for (int i = 0; i < numDocs; ++i) {
|
||||
Document doc = new Document();
|
||||
doc.add(new StringField("to_delete", random().nextBoolean() ? "yes" : "no", Store.NO));
|
||||
doc.add(new StoredField("id", i));
|
||||
doc.add(new StoredField("i", random().nextInt(50)));
|
||||
doc.add(new StoredField("l", random().nextLong()));
|
||||
doc.add(new StoredField("d", random().nextDouble()));
|
||||
doc.add(new StoredField("f", random().nextFloat()));
|
||||
doc.add(new StoredField("s", RandomPicks.randomFrom(random(), stringValues)));
|
||||
doc.add(new StoredField("b", new BytesRef(RandomPicks.randomFrom(random(), stringValues))));
|
||||
docs[i] = doc;
|
||||
w.addDocument(doc);
|
||||
}
|
||||
if (random().nextBoolean()) {
|
||||
w.deleteDocuments(new Term("to_delete", "yes"));
|
||||
}
|
||||
w.commit();
|
||||
w.close();
|
||||
|
||||
DirectoryReader reader = new DummyFilterDirectoryReader(DirectoryReader.open(dir));
|
||||
|
||||
Directory dir2 = newDirectory();
|
||||
w = new RandomIndexWriter(random(), dir2);
|
||||
w.addIndexes(reader);
|
||||
reader.close();
|
||||
dir.close();
|
||||
|
||||
reader = w.getReader();
|
||||
for (int i = 0; i < reader.maxDoc(); ++i) {
|
||||
final StoredDocument doc = reader.document(i);
|
||||
final int id = doc.getField("id").numericValue().intValue();
|
||||
final Document expected = docs[id];
|
||||
assertEquals(expected.get("s"), doc.get("s"));
|
||||
assertEquals(expected.getField("i").numericValue(), doc.getField("i").numericValue());
|
||||
assertEquals(expected.getField("l").numericValue(), doc.getField("l").numericValue());
|
||||
assertEquals(expected.getField("d").numericValue(), doc.getField("d").numericValue());
|
||||
assertEquals(expected.getField("f").numericValue(), doc.getField("f").numericValue());
|
||||
assertEquals(expected.getField("b").binaryValue(), doc.getField("b").binaryValue());
|
||||
}
|
||||
|
||||
reader.close();
|
||||
w.close();
|
||||
TestUtil.checkIndex(dir2);
|
||||
dir2.close();
|
||||
}
|
||||
|
||||
@Nightly
|
||||
public void testBigDocuments() throws IOException {
|
||||
// "big" as "much bigger than the chunk size"
|
||||
|
|
Loading…
Reference in New Issue