LUCENE-4527: CompressingStoreFieldsFormat: encode numStoredFields more efficiently.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1406704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2012-11-07 16:24:24 +00:00
parent f8be77c39b
commit 980915f6ca
2 changed files with 162 additions and 94 deletions

View File

@ -19,7 +19,7 @@ package org.apache.lucene.codecs.compressing;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.BYTE_ARR;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.CODEC_NAME_DAT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.*;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.CODEC_NAME_IDX;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.HEADER_LENGTH_DAT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.HEADER_LENGTH_IDX;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.NUMERIC_DOUBLE;
@ -27,6 +27,7 @@ import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.NUMERIC_INT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.NUMERIC_LONG;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.STRING;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_BITS;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_MASK;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_CURRENT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_START;
@ -34,6 +35,7 @@ import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELD
import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_INDEX_EXTENSION;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
@ -187,31 +189,63 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
final int docBase = fieldsStream.readVInt();
final int chunkDocs = fieldsStream.readVInt();
final int bitsPerValue = fieldsStream.readVInt();
if (docID < docBase
|| docID >= docBase + chunkDocs
|| docBase + chunkDocs > numDocs
|| bitsPerValue > 31) {
|| docBase + chunkDocs > numDocs) {
throw new CorruptIndexException("Corrupted: docID=" + docID
+ ", docBase=" + docBase + ", chunkDocs=" + chunkDocs
+ ", numDocs=" + numDocs + ", bitsPerValue=" + bitsPerValue);
+ ", numDocs=" + numDocs);
}
final int numStoredFields, offset, length;
if (chunkDocs == 1) {
numStoredFields = fieldsStream.readVInt();
offset = 0;
length = fieldsStream.readVInt();
} else {
final int bitsPerStoredFields = fieldsStream.readVInt();
if (bitsPerStoredFields == 0) {
numStoredFields = fieldsStream.readVInt();
} else if (bitsPerStoredFields > 31) {
throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields);
} else {
final long filePointer = fieldsStream.getFilePointer();
final PackedInts.ReaderIterator lengths = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerValue, 1);
int offset = 0;
for (int i = docBase; i < docID; ++i) {
offset += lengths.next();
final PackedInts.Reader reader = PackedInts.getDirectReaderNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields);
numStoredFields = (int) (reader.get(docID - docBase));
fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerStoredFields));
}
final int bitsPerLength = fieldsStream.readVInt();
if (bitsPerLength == 0) {
length = fieldsStream.readVInt();
offset = (docID - docBase) * length;
} else if (bitsPerStoredFields > 31) {
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength);
} else {
final long filePointer = fieldsStream.getFilePointer();
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
int off = 0;
for (int i = 0; i < docID - docBase; ++i) {
off += it.next();
}
offset = off;
length = (int) it.next();
fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerLength));
}
}
if ((length == 0) != (numStoredFields == 0)) {
throw new CorruptIndexException("length=" + length + ", numStoredFields=" + numStoredFields);
}
if (numStoredFields == 0) {
// nothing to do
return;
}
final int length = (int) lengths.next();
// skip the last values
fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerValue));
decompressor.decompress(fieldsStream, offset, length, bytes);
final ByteArrayDataInput documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
final int numFields = documentInput.readVInt();
for (int fieldIDX = 0; fieldIDX < numFields; fieldIDX++) {
for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
final long infoAndBits = documentInput.readVLong();
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
@ -222,14 +256,17 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
switch(visitor.needsField(fieldInfo)) {
case YES:
readField(documentInput, visitor, fieldInfo, bits);
assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
break;
case NO:
skipField(documentInput, bits);
assert documentInput.getPosition() <= bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + bytes.length;
break;
case STOP:
return;
}
}
assert documentInput.getPosition() == bytes.offset + bytes.length : documentInput.getPosition() + " " + bytes.offset + " " + bytes.length;
}
@Override
@ -253,28 +290,14 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
BytesRef bytes;
int docBase;
int chunkDocs;
int[] numStoredFields;
int[] lengths;
private ChunkIterator() {
this.docBase = -1;
bytes = new BytesRef();
lengths = new int[0];
}
private int readHeader() throws IOException {
final int docBase = fieldsStream.readVInt();
final int chunkDocs = fieldsStream.readVInt();
final int bitsPerValue = fieldsStream.readVInt();
if (docBase < this.docBase + this.chunkDocs
|| docBase + chunkDocs > numDocs
|| bitsPerValue > 31) {
throw new CorruptIndexException("Corrupted: current docBase=" + this.docBase
+ ", current numDocs=" + this.chunkDocs + ", new docBase=" + docBase
+ ", new numDocs=" + chunkDocs + ", bitsPerValue=" + bitsPerValue);
}
this.docBase = docBase;
this.chunkDocs = chunkDocs;
return bitsPerValue;
numStoredFields = new int[1];
lengths = new int[1];
}
/**
@ -293,26 +316,52 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
*/
void next(int doc) throws IOException {
assert doc >= docBase + chunkDocs : doc + " " + docBase + " " + chunkDocs;
// try next chunk
int bitsPerValue = readHeader();
if (docBase + chunkDocs <= doc) {
// doc is not in the next chunk, use seek to skip to the next document chunk
fieldsStream.seek(indexReader.getStartPointer(doc));
bitsPerValue = readHeader();
final int docBase = fieldsStream.readVInt();
final int chunkDocs = fieldsStream.readVInt();
if (docBase < this.docBase + this.chunkDocs
|| docBase + chunkDocs > numDocs) {
throw new CorruptIndexException("Corrupted: current docBase=" + this.docBase
+ ", current numDocs=" + this.chunkDocs + ", new docBase=" + docBase
+ ", new numDocs=" + chunkDocs);
}
if (doc < docBase
|| doc >= docBase + chunkDocs) {
throw new CorruptIndexException("Corrupted: docID=" + doc
+ ", docBase=" + docBase + ", chunkDocs=" + chunkDocs);
this.docBase = docBase;
this.chunkDocs = chunkDocs;
if (chunkDocs > numStoredFields.length) {
final int newLength = ArrayUtil.oversize(chunkDocs, 4);
numStoredFields = new int[newLength];
lengths = new int[newLength];
}
// decode lengths
if (lengths.length < chunkDocs) {
lengths = new int[ArrayUtil.oversize(chunkDocs, 4)];
}
final PackedInts.ReaderIterator iterator = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerValue, 0);
if (chunkDocs == 1) {
numStoredFields[0] = fieldsStream.readVInt();
lengths[0] = fieldsStream.readVInt();
} else {
final int bitsPerStoredFields = fieldsStream.readVInt();
if (bitsPerStoredFields == 0) {
Arrays.fill(numStoredFields, 0, chunkDocs, fieldsStream.readVInt());
} else if (bitsPerStoredFields > 31) {
throw new CorruptIndexException("bitsPerStoredFields=" + bitsPerStoredFields);
} else {
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerStoredFields, 1);
for (int i = 0; i < chunkDocs; ++i) {
lengths[i] = (int) iterator.next();
numStoredFields[i] = (int) it.next();
}
}
final int bitsPerLength = fieldsStream.readVInt();
if (bitsPerLength == 0) {
Arrays.fill(lengths, 0, chunkDocs, fieldsStream.readVInt());
} else if (bitsPerLength > 31) {
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength);
} else {
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
for (int i = 0; i < chunkDocs; ++i) {
lengths[i] = (int) it.next();
}
}
}
}

View File

@ -21,6 +21,7 @@ import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELD
import static org.apache.lucene.codecs.lucene40.Lucene40StoredFieldsWriter.FIELDS_INDEX_EXTENSION;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
@ -36,6 +37,7 @@ import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StorableField;
import org.apache.lucene.index.StoredDocument;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
@ -74,6 +76,7 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
private final int chunkSize;
private final GrowableByteArrayDataOutput bufferedDocs;
private int[] numStoredFields; // number of stored fields
private int[] endOffsets; // end offsets in bufferedDocs
private int docBase; // doc ID at the beginning of the chunk
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID
@ -88,6 +91,7 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
this.chunkSize = chunkSize;
this.docBase = 0;
this.bufferedDocs = new GrowableByteArrayDataOutput(chunkSize);
this.numStoredFields = new int[16];
this.endOffsets = new int[16];
this.numBufferedDocs = 0;
@ -128,50 +132,72 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
private void endWithPreviousDocument() throws IOException {
if (numBufferedDocs > 0) {
assert bufferedDocs.length > 0;
if (numBufferedDocs == endOffsets.length) {
endOffsets = ArrayUtil.grow(endOffsets);
}
endOffsets[numBufferedDocs - 1] = bufferedDocs.length;
}
}
private void addRawDocument(byte[] buf, int off, int len) throws IOException {
endWithPreviousDocument();
if (bufferedDocs.length >= chunkSize) {
flush();
}
bufferedDocs.writeBytes(buf, off, len);
++numBufferedDocs;
}
@Override
public void startDocument(int numStoredFields) throws IOException {
endWithPreviousDocument();
if (bufferedDocs.length >= chunkSize) {
if (triggerFlush()) {
flush();
}
bufferedDocs.writeVInt(numStoredFields);
if (numBufferedDocs == this.numStoredFields.length) {
final int newLength = ArrayUtil.oversize(numBufferedDocs + 1, 4);
this.numStoredFields = Arrays.copyOf(this.numStoredFields, newLength);
endOffsets = Arrays.copyOf(endOffsets, newLength);
}
this.numStoredFields[numBufferedDocs] = numStoredFields;
++numBufferedDocs;
}
private void writeHeader(int docBase, int numBufferedDocs, int[] lengths) throws IOException {
private static void saveInts(int[] values, int length, DataOutput out) throws IOException {
assert length > 0;
if (length == 1) {
out.writeVInt(values[0]);
} else {
boolean allEqual = true;
for (int i = 1; i < length; ++i) {
if (values[i] != values[0]) {
allEqual = false;
break;
}
}
if (allEqual) {
out.writeVInt(0);
out.writeVInt(values[0]);
} else {
long max = 0;
for (int i = 0; i < length; ++i) {
max |= values[i];
}
final int bitsRequired = PackedInts.bitsRequired(max);
out.writeVInt(bitsRequired);
final PackedInts.Writer w = PackedInts.getWriterNoHeader(out, PackedInts.Format.PACKED, length, bitsRequired, 1);
for (int i = 0; i < length; ++i) {
w.add(values[i]);
}
w.finish();
}
}
}
private void writeHeader(int docBase, int numBufferedDocs, int[] numStoredFields, int[] lengths) throws IOException {
// save docBase and numBufferedDocs
fieldsStream.writeVInt(docBase);
fieldsStream.writeVInt(numBufferedDocs);
// save lengths
final int bitsRequired = bitsRequired(lengths, numBufferedDocs);
assert bitsRequired <= 31;
fieldsStream.writeVInt(bitsRequired);
// save numStoredFields
saveInts(numStoredFields, numBufferedDocs, fieldsStream);
final PackedInts.Writer writer = PackedInts.getWriterNoHeader(fieldsStream, PackedInts.Format.PACKED, numBufferedDocs, bitsRequired, 1);
for (int i = 0; i < numBufferedDocs; ++i) {
assert lengths[i] > 0;
writer.add(lengths[i]);
// save lengths
saveInts(lengths, numBufferedDocs, fieldsStream);
}
assert writer.ord() + 1 == numBufferedDocs;
writer.finish();
private boolean triggerFlush() {
return bufferedDocs.length >= chunkSize || // chunks of at least chunkSize bytes
numBufferedDocs >= chunkSize; // can be necessary if most docs are empty
}
private void flush() throws IOException {
@ -181,9 +207,9 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
final int[] lengths = endOffsets;
for (int i = numBufferedDocs - 1; i > 0; --i) {
lengths[i] = endOffsets[i] - endOffsets[i - 1];
assert lengths[i] > 0;
assert lengths[i] >= 0;
}
writeHeader(docBase, numBufferedDocs, lengths);
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths);
// compress stored fields to fieldsStream
compressor.compress(bufferedDocs.bytes, 0, bufferedDocs.length, fieldsStream);
@ -194,14 +220,6 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
bufferedDocs.length = 0;
}
private static int bitsRequired(int[] data, int length) {
int or = data[0];
for (int i = 1; i < length; ++i) {
or |= data[i];
}
return PackedInts.bitsRequired(or);
}
@Override
public void writeField(FieldInfo info, StorableField field)
throws IOException {
@ -327,7 +345,7 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
if (compressionMode == matchingFieldsReader.getCompressionMode() // same compression mode
&& (numBufferedDocs == 0 || bufferedDocs.length >= chunkSize) // starting a new chunk
&& (numBufferedDocs == 0 || triggerFlush()) // starting a new chunk
&& startOffsets[it.chunkDocs - 1] < chunkSize // chunk is small enough
&& startOffsets[it.chunkDocs - 1] + it.lengths[it.chunkDocs - 1] >= chunkSize // chunk is large enough
&& nextDeletedDoc(it.docBase, liveDocs, it.docBase + it.chunkDocs) == it.docBase + it.chunkDocs) { // no deletion in the chunk
@ -335,11 +353,11 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
// no need to decompress, just copy data
endWithPreviousDocument();
if (bufferedDocs.length >= chunkSize) {
if (triggerFlush()) {
flush();
}
indexWriter.writeIndex(it.chunkDocs, fieldsStream.getFilePointer());
writeHeader(this.docBase, it.chunkDocs, it.lengths);
writeHeader(this.docBase, it.chunkDocs, it.numStoredFields, it.lengths);
it.copyCompressedData(fieldsStream);
this.docBase += it.chunkDocs;
docID = nextLiveDoc(it.docBase + it.chunkDocs, liveDocs, maxDoc);
@ -354,7 +372,8 @@ final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
// copy non-deleted docs
for (; docID < it.docBase + it.chunkDocs; docID = nextLiveDoc(docID + 1, liveDocs, maxDoc)) {
final int diff = docID - it.docBase;
addRawDocument(it.bytes.bytes, it.bytes.offset + startOffsets[diff], it.lengths[diff]);
startDocument(it.numStoredFields[diff]);
bufferedDocs.writeBytes(it.bytes.bytes, it.bytes.offset + startOffsets[diff], it.lengths[diff]);
++docCount;
mergeState.checkAbort.work(300);
}