LUCENE-4554: CompressingStoredFieldsFormat: don't write the original length at the beginning of the chunk with LZ4 compressors

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1408476 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrien Grand 2012-11-12 22:17:13 +00:00
parent 6794693f30
commit f1fda6880b
5 changed files with 150 additions and 138 deletions

View File

@ -197,11 +197,12 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
+ ", numDocs=" + numDocs); + ", numDocs=" + numDocs);
} }
final int numStoredFields, offset, length; final int numStoredFields, offset, length, totalLength;
if (chunkDocs == 1) { if (chunkDocs == 1) {
numStoredFields = fieldsStream.readVInt(); numStoredFields = fieldsStream.readVInt();
offset = 0; offset = 0;
length = fieldsStream.readVInt(); length = fieldsStream.readVInt();
totalLength = length;
} else { } else {
final int bitsPerStoredFields = fieldsStream.readVInt(); final int bitsPerStoredFields = fieldsStream.readVInt();
if (bitsPerStoredFields == 0) { if (bitsPerStoredFields == 0) {
@ -219,10 +220,10 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
if (bitsPerLength == 0) { if (bitsPerLength == 0) {
length = fieldsStream.readVInt(); length = fieldsStream.readVInt();
offset = (docID - docBase) * length; offset = (docID - docBase) * length;
totalLength = chunkDocs * length;
} else if (bitsPerStoredFields > 31) { } else if (bitsPerStoredFields > 31) {
throw new CorruptIndexException("bitsPerLength=" + bitsPerLength); throw new CorruptIndexException("bitsPerLength=" + bitsPerLength);
} else { } else {
final long filePointer = fieldsStream.getFilePointer();
final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1); final PackedInts.ReaderIterator it = PackedInts.getReaderIteratorNoHeader(fieldsStream, PackedInts.Format.PACKED, packedIntsVersion, chunkDocs, bitsPerLength, 1);
int off = 0; int off = 0;
for (int i = 0; i < docID - docBase; ++i) { for (int i = 0; i < docID - docBase; ++i) {
@ -230,7 +231,11 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
} }
offset = off; offset = off;
length = (int) it.next(); length = (int) it.next();
fieldsStream.seek(filePointer + PackedInts.Format.PACKED.byteCount(packedIntsVersion, chunkDocs, bitsPerLength)); off += length;
for (int i = docID - docBase + 1; i < chunkDocs; ++i) {
off += it.next();
}
totalLength = off;
} }
} }
@ -242,13 +247,14 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
return; return;
} }
decompressor.decompress(fieldsStream, offset, length, bytes); decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
assert bytes.length == length;
final ByteArrayDataInput documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length); final ByteArrayDataInput documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) { for (int fieldIDX = 0; fieldIDX < numStoredFields; fieldIDX++) {
final long infoAndBits = documentInput.readVLong(); final long infoAndBits = documentInput.readVLong();
final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS); final int fieldNumber = (int) (infoAndBits >>> TYPE_BITS);
FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber); final FieldInfo fieldInfo = fieldInfos.fieldInfo(fieldNumber);
final int bits = (int) (infoAndBits & TYPE_MASK); final int bits = (int) (infoAndBits & TYPE_MASK);
assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits); assert bits <= NUMERIC_DOUBLE: "bits=" + Integer.toHexString(bits);
@ -365,12 +371,23 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
} }
} }
// total length of the chunk
private int totalLength() {
int totalLength = 0;
for (int i = 0; i < chunkDocs; ++i) {
totalLength += lengths[i];
}
return totalLength;
}
/** /**
* Decompress the chunk. * Decompress the chunk.
*/ */
void decompress() throws IOException { void decompress() throws IOException {
// decompress data // decompress data
decompressor.decompress(fieldsStream, bytes); final int totalLength = totalLength();
decompressor.decompress(fieldsStream, totalLength, 0, totalLength, bytes);
assert bytes.length == totalLength;
if (bytes.length != chunkSize()) { if (bytes.length != chunkSize()) {
throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length); throw new CorruptIndexException("Corrupted: expected chunk size = " + chunkSize() + ", got " + bytes.length);
} }
@ -380,7 +397,8 @@ final class CompressingStoredFieldsReader extends StoredFieldsReader {
* Copy compressed data. * Copy compressed data.
*/ */
void copyCompressedData(DataOutput out) throws IOException { void copyCompressedData(DataOutput out) throws IOException {
decompressor.copyCompressedData(fieldsStream, out); final int totalLength = totalLength();
decompressor.copyCompressedData(fieldsStream, totalLength, out);
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.zip.DataFormatException;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.Inflater; import java.util.zip.Inflater;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
@ -94,6 +95,7 @@ public enum CompressionMode {
}; };
/** Get a {@link CompressionMode} according to its id. */
public static CompressionMode byId(int id) { public static CompressionMode byId(int id) {
for (CompressionMode mode : CompressionMode.values()) { for (CompressionMode mode : CompressionMode.values()) {
if (mode.getId() == id) { if (mode.getId() == id) {
@ -132,85 +134,25 @@ public enum CompressionMode {
private static final Decompressor LZ4_DECOMPRESSOR = new Decompressor() { private static final Decompressor LZ4_DECOMPRESSOR = new Decompressor() {
@Override @Override
public void decompress(DataInput in, BytesRef bytes) throws IOException { public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
final int decompressedLen = in.readVInt(); assert offset + length <= originalLength;
if (bytes.bytes.length < decompressedLen + 8) { // add 7 padding bytes, this is not necessary but can help decompression run faster
bytes.bytes = ArrayUtil.grow(bytes.bytes, decompressedLen + 8); if (bytes.bytes.length < originalLength + 7) {
bytes.bytes = new byte[ArrayUtil.oversize(originalLength + 7, 1)];
} }
LZ4.decompress(in, decompressedLen, bytes); final int decompressedLength = LZ4.decompress(in, offset + length, bytes.bytes, 0);
if (bytes.length != decompressedLen) { if (decompressedLength > originalLength) {
throw new IOException("Corrupted"); throw new CorruptIndexException("Corrupted: lengths mismatch: " + decompressedLength + " > " + originalLength);
} }
bytes.offset = offset;
bytes.length = length;
} }
@Override @Override
public void decompress(DataInput in, int offset, int length, BytesRef bytes) throws IOException { public void copyCompressedData(DataInput in, int originalLength, DataOutput out) throws IOException {
final int decompressedLen = in.readVInt(); final int copied = LZ4.copyCompressedData(in, originalLength, out);
if (offset > decompressedLen) { if (copied != originalLength) {
bytes.length = 0; throw new CorruptIndexException("Currupted compressed stream: expected " + originalLength + " bytes, but got at least" + copied);
return;
}
if (bytes.bytes.length < decompressedLen) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, decompressedLen);
}
LZ4.decompress(in, offset + length, bytes);
bytes.offset = offset;
if (offset + length >= decompressedLen) {
if (bytes.length != decompressedLen) {
throw new IOException("Corrupted");
}
bytes.length = decompressedLen - offset;
} else {
bytes.length = length;
}
}
public void copyCompressedData(DataInput in, DataOutput out) throws IOException {
final int decompressedLen = in.readVInt();
out.writeVInt(decompressedLen);
if (decompressedLen == 0) {
out.writeByte((byte) 0); // the token
return;
}
int n = 0;
while (n < decompressedLen) {
// literals
final byte token = in.readByte();
out.writeByte(token);
int literalLen = (token & 0xFF) >>> 4;
if (literalLen == 0x0F) {
byte len;
while ((len = in.readByte()) == (byte) 0xFF) {
literalLen += 0xFF;
out.writeByte(len);
}
literalLen += len & 0xFF;
out.writeByte(len);
}
out.copyBytes(in, literalLen);
n += literalLen;
if (n >= decompressedLen) {
break;
}
// matchs
out.copyBytes(in, 2); // match dec
int matchLen = token & 0x0F;
if (matchLen == 0x0F) {
byte len;
while ((len = in.readByte()) == (byte) 0xFF) {
matchLen += 0xFF;
out.writeByte(len);
}
matchLen += len & 0xFF;
out.writeByte(len);
}
matchLen += LZ4.MIN_MATCH;
n += matchLen;
}
if (n != decompressedLen) {
throw new IOException("Currupted compressed stream: expected " + decompressedLen + " bytes, but got at least" + n);
} }
} }
@ -226,7 +168,6 @@ public enum CompressionMode {
@Override @Override
public void compress(byte[] bytes, int off, int len, DataOutput out) public void compress(byte[] bytes, int off, int len, DataOutput out)
throws IOException { throws IOException {
out.writeVInt(len);
LZ4.compress(bytes, off, len, out); LZ4.compress(bytes, off, len, out);
} }
@ -237,7 +178,6 @@ public enum CompressionMode {
@Override @Override
public void compress(byte[] bytes, int off, int len, DataOutput out) public void compress(byte[] bytes, int off, int len, DataOutput out)
throws IOException { throws IOException {
out.writeVInt(len);
LZ4.compressHC(bytes, off, len, out); LZ4.compressHC(bytes, off, len, out);
} }
@ -254,21 +194,22 @@ public enum CompressionMode {
} }
@Override @Override
public void decompress(DataInput in, BytesRef bytes) throws IOException { public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException {
bytes.offset = bytes.length = 0; assert offset + length <= originalLength;
if (length == 0) {
bytes.length = 0;
return;
}
final int compressedLength = in.readVInt(); final int compressedLength = in.readVInt();
if (compressedLength > compressed.length) { if (compressedLength > compressed.length) {
compressed = ArrayUtil.grow(compressed, compressedLength); compressed = new byte[ArrayUtil.oversize(compressedLength, 1)];
} }
in.readBytes(compressed, 0, compressedLength); in.readBytes(compressed, 0, compressedLength);
decompressor.reset(); decompressor.reset();
decompressor.setInput(compressed, 0, compressedLength); decompressor.setInput(compressed, 0, compressedLength);
if (decompressor.needsInput()) {
return;
}
bytes.offset = bytes.length = 0;
while (true) { while (true) {
final int count; final int count;
try { try {
@ -284,10 +225,15 @@ public enum CompressionMode {
bytes.bytes = ArrayUtil.grow(bytes.bytes); bytes.bytes = ArrayUtil.grow(bytes.bytes);
} }
} }
if (bytes.length != originalLength) {
throw new CorruptIndexException("Lengths mismatch: " + bytes.length + " != " + originalLength);
}
bytes.offset = offset;
bytes.length = length;
} }
@Override @Override
public void copyCompressedData(DataInput in, DataOutput out) throws IOException { public void copyCompressedData(DataInput in, int originalLength, DataOutput out) throws IOException {
final int compressedLength = in.readVInt(); final int compressedLength = in.readVInt();
out.writeVInt(compressedLength); out.writeVInt(compressedLength);
out.copyBytes(in, compressedLength); out.copyBytes(in, compressedLength);

View File

@ -29,26 +29,24 @@ import org.apache.lucene.util.BytesRef;
abstract class Decompressor implements Cloneable { abstract class Decompressor implements Cloneable {
/** /**
* Decompress bytes. This method is free to resize <code>bytes</code> in case * Decompress bytes that were stored between offsets <code>offset</code> and
* it is too small to hold all the decompressed data. * <code>offset+length</code> in the original stream from the compressed
* stream <code>in</code> to <code>bytes</code>. After returning, the length
* of <code>bytes</code> (<code>bytes.length</code>) must be equal to
* <code>length</code>. Implementations of this method are free to resize
* <code>bytes</code> depending on their needs.
*
* @param in the input that stores the compressed stream
* @param originalLength the length of the original data (before compression)
* @param offset bytes before this offset do not need to be decompressed
* @param length bytes after <code>offset+length</code> do not need to be decompressed
* @param bytes a {@link BytesRef} where to store the decompressed data
*/ */
public abstract void decompress(DataInput in, BytesRef bytes) throws IOException; public abstract void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException;
/** /** Copy a compressed stream whose original length is
* Method to use if you are only interested into <code>length</code> * <code>originalLength</code> from <code>in</code> to <code>out</code>. */
* decompressed bytes starting at offset <code>offset</code>. Some compression public abstract void copyCompressedData(DataInput in, int originalLength, DataOutput out) throws IOException;
* codecs might have optimizations for this special case.
*/
public void decompress(DataInput in, int offset, int length, BytesRef bytes) throws IOException {
decompress(in, bytes);
if (bytes.length < offset + length) {
throw new IndexOutOfBoundsException((offset + length) + " > " + bytes.length);
}
bytes.offset += offset;
bytes.length = length;
}
public abstract void copyCompressedData(DataInput in, DataOutput out) throws IOException;
@Override @Override
public abstract Decompressor clone(); public abstract Decompressor clone();

View File

@ -22,7 +22,6 @@ import java.util.Arrays;
import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput; import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedInts;
/** /**
@ -78,17 +77,15 @@ class LZ4 {
} }
/** /**
* Decompress at least <code>decompressedLen</code> bytes into <code>destBytes</code>. * Decompress at least <code>decompressedLen</code> bytes into
* Please note that <code>destBytes</code> must be large enough to be able to hold * <code>dest[dOff:]</code>. Please note that <code>dest</code> must be large
* <b>all</b> decompressed data plus 8 bytes (meaning that you need to know the total * enough to be able to hold <b>all</b> decompressed data (meaning that you
* decompressed length). * need to know the total decompressed length).
*/ */
public static void decompress(DataInput compressed, int decompressedLen, BytesRef destBytes) throws IOException { public static int decompress(DataInput compressed, int decompressedLen, byte[] dest, int dOff) throws IOException {
final byte[] dest = destBytes.bytes;
final int destEnd = dest.length; final int destEnd = dest.length;
int dOff = 0;
while (dOff < decompressedLen) { do {
// literals // literals
final int token = compressed.readByte() & 0xFF; final int token = compressed.readByte() & 0xFF;
int literalLen = token >>> 4; int literalLen = token >>> 4;
@ -135,9 +132,9 @@ class LZ4 {
System.arraycopy(dest, dOff - matchDec, dest, dOff, fastLen); System.arraycopy(dest, dOff - matchDec, dest, dOff, fastLen);
dOff += matchLen; dOff += matchLen;
} }
} } while (dOff < decompressedLen);
destBytes.offset = 0;
destBytes.length = dOff; return dOff;
} }
private static void encodeLen(int l, DataOutput out) throws IOException { private static void encodeLen(int l, DataOutput out) throws IOException {
@ -186,8 +183,7 @@ class LZ4 {
/** /**
* Compress <code>bytes[off:off+len]</code> into <code>out</code> using * Compress <code>bytes[off:off+len]</code> into <code>out</code> using
* 2<sup>hashLog</sup> bytes of memory. Higher values of <code>hashLog</code> * at most 16KB of memory.
* improve the compression ratio.
*/ */
public static void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException { public static void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
@ -357,6 +353,13 @@ class LZ4 {
} }
/**
* Compress <code>bytes[off:off+len]</code> into <code>out</code>. Compared to
* {@link LZ4#compress(byte[], int, int, DataOutput)}, this method is slower,
* uses more memory (~ 256KB), but should provide better compression ratios
* (especially on large inputs) because it chooses the best match among up to
* 256 candidates and then performs trade-offs to fix overlapping matches.
*/
public static void compressHC(byte[] src, int srcOff, int srcLen, DataOutput out) throws IOException { public static void compressHC(byte[] src, int srcOff, int srcLen, DataOutput out) throws IOException {
final int srcEnd = srcOff + srcLen; final int srcEnd = srcOff + srcLen;
@ -503,4 +506,51 @@ class LZ4 {
encodeLastLiterals(src, anchor, srcEnd - anchor, out); encodeLastLiterals(src, anchor, srcEnd - anchor, out);
} }
/** Copy bytes from <code>in</code> to <code>out</code> where
* <code>in</code> is a LZ4-encoded stream. This method copies enough bytes
* so that <code>out</code> can be used later on to restore the first
* <code>length</code> bytes of the stream. This method always reads at
* least one byte from <code>in</code> so make sure not to call this method
* if <code>in</code> reached the end of the stream, even if
* <code>length=0</code>. */
public static int copyCompressedData(DataInput in, int length, DataOutput out) throws IOException {
int n = 0;
do {
// literals
final byte token = in.readByte();
out.writeByte(token);
int literalLen = (token & 0xFF) >>> 4;
if (literalLen == 0x0F) {
byte len;
while ((len = in.readByte()) == (byte) 0xFF) {
literalLen += 0xFF;
out.writeByte(len);
}
literalLen += len & 0xFF;
out.writeByte(len);
}
out.copyBytes(in, literalLen);
n += literalLen;
if (n >= length) {
break;
}
// matchs
out.copyBytes(in, 2); // match dec
int matchLen = token & 0x0F;
if (matchLen == 0x0F) {
byte len;
while ((len = in.readByte()) == (byte) 0xFF) {
matchLen += 0xFF;
out.writeByte(len);
}
matchLen += len & 0xFF;
out.writeByte(len);
}
matchLen += MIN_MATCH;
n += matchLen;
} while (n < length);
return n;
}
} }

View File

@ -62,38 +62,38 @@ public abstract class AbstractTestCompressionMode extends LuceneTestCase {
return Arrays.copyOf(compressed, compressedLen); return Arrays.copyOf(compressed, compressedLen);
} }
byte[] decompress(byte[] compressed) throws IOException { byte[] decompress(byte[] compressed, int originalLength) throws IOException {
Decompressor decompressor = mode.newDecompressor(); Decompressor decompressor = mode.newDecompressor();
return decompress(decompressor, compressed); return decompress(decompressor, compressed, originalLength);
} }
static byte[] decompress(Decompressor decompressor, byte[] compressed) throws IOException { static byte[] decompress(Decompressor decompressor, byte[] compressed, int originalLength) throws IOException {
final BytesRef bytes = new BytesRef(); final BytesRef bytes = new BytesRef();
decompressor.decompress(new ByteArrayDataInput(compressed), bytes); decompressor.decompress(new ByteArrayDataInput(compressed), originalLength, 0, originalLength, bytes);
return Arrays.copyOfRange(bytes.bytes, bytes.offset, bytes.offset + bytes.length); return Arrays.copyOfRange(bytes.bytes, bytes.offset, bytes.offset + bytes.length);
} }
byte[] decompress(byte[] compressed, int offset, int length) throws IOException { byte[] decompress(byte[] compressed, int originalLength, int offset, int length) throws IOException {
Decompressor decompressor = mode.newDecompressor(); Decompressor decompressor = mode.newDecompressor();
final BytesRef bytes = new BytesRef(); final BytesRef bytes = new BytesRef();
decompressor.decompress(new ByteArrayDataInput(compressed), offset, length, bytes); decompressor.decompress(new ByteArrayDataInput(compressed), originalLength, offset, length, bytes);
return Arrays.copyOfRange(bytes.bytes, bytes.offset, bytes.offset + bytes.length); return Arrays.copyOfRange(bytes.bytes, bytes.offset, bytes.offset + bytes.length);
} }
static byte[] copyCompressedData(Decompressor decompressor, byte[] compressed) throws IOException { static byte[] copyCompressedData(Decompressor decompressor, byte[] compressed, int originalLength) throws IOException {
GrowableByteArrayDataOutput out = new GrowableByteArrayDataOutput(compressed.length); GrowableByteArrayDataOutput out = new GrowableByteArrayDataOutput(compressed.length);
decompressor.copyCompressedData(new ByteArrayDataInput(compressed), out); decompressor.copyCompressedData(new ByteArrayDataInput(compressed), originalLength, out);
return Arrays.copyOf(out.bytes, out.length); return Arrays.copyOf(out.bytes, out.length);
} }
byte[] copyCompressedData(byte[] compressed) throws IOException { byte[] copyCompressedData(byte[] compressed, int originalLength) throws IOException {
return copyCompressedData(mode.newDecompressor(), compressed); return copyCompressedData(mode.newDecompressor(), compressed, originalLength);
} }
public void testDecompress() throws IOException { public void testDecompress() throws IOException {
final byte[] decompressed = randomArray(); final byte[] decompressed = randomArray();
final byte[] compressed = compress(decompressed); final byte[] compressed = compress(decompressed);
final byte[] restored = decompress(compressed); final byte[] restored = decompress(compressed, decompressed.length);
assertArrayEquals(decompressed, restored); assertArrayEquals(decompressed, restored);
} }
@ -109,7 +109,7 @@ public abstract class AbstractTestCompressionMode extends LuceneTestCase {
offset = random().nextInt(decompressed.length); offset = random().nextInt(decompressed.length);
length = random().nextInt(decompressed.length - offset); length = random().nextInt(decompressed.length - offset);
} }
final byte[] restored = decompress(compressed, offset, length); final byte[] restored = decompress(compressed, decompressed.length, offset, length);
assertArrayEquals(Arrays.copyOfRange(decompressed, offset, offset + length), restored); assertArrayEquals(Arrays.copyOfRange(decompressed, offset, offset + length), restored);
} }
} }
@ -117,14 +117,14 @@ public abstract class AbstractTestCompressionMode extends LuceneTestCase {
public void testCopyCompressedData() throws IOException { public void testCopyCompressedData() throws IOException {
final byte[] decompressed = randomArray(); final byte[] decompressed = randomArray();
final byte[] compressed = compress(decompressed); final byte[] compressed = compress(decompressed);
assertArrayEquals(compressed, copyCompressedData(compressed)); assertArrayEquals(compressed, copyCompressedData(compressed, decompressed.length));
} }
public void test(byte[] decompressed) throws IOException { public void test(byte[] decompressed) throws IOException {
final byte[] compressed = compress(decompressed); final byte[] compressed = compress(decompressed);
final byte[] restored = decompress(compressed); final byte[] restored = decompress(compressed, decompressed.length);
assertEquals(decompressed.length, restored.length); assertEquals(decompressed.length, restored.length);
assertArrayEquals(compressed, copyCompressedData(compressed)); assertArrayEquals(compressed, copyCompressedData(compressed, decompressed.length));
} }
public void testEmptySequence() throws IOException { public void testEmptySequence() throws IOException {