LUCENE-9211 Add compression for Binary doc value fields (#1234)

Stores groups of 32 binary doc values in LZ4-compressed blocks.
This commit is contained in:
markharwood 2020-02-18 14:02:42 +00:00 committed by GitHub
parent ccb390d4a6
commit ce2959fe4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 386 additions and 61 deletions

View File

@ -122,6 +122,8 @@ New Features
Improvements
---------------------
* LUCENE-9211: Add compression for Binary doc value fields. (Mark Harwood)
* LUCENE-9149: Increase data dimension limit in BKD. (Nick Knize)
* LUCENE-9102: Add maxQueryLength option to DirectSpellchecker. (Andy Webb via Bruno Roustant)

View File

@ -29,6 +29,7 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.codecs.DocValuesProducer;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.EmptyDocValuesProducer;
import org.apache.lucene.index.FieldInfo;
@ -42,6 +43,8 @@ import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.SortedSetSelector;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
@ -49,6 +52,8 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.MathUtil;
import org.apache.lucene.util.StringHelper;
import org.apache.lucene.util.compress.LZ4;
import org.apache.lucene.util.compress.LZ4.FastCompressionHashTable;
import org.apache.lucene.util.packed.DirectMonotonicWriter;
import org.apache.lucene.util.packed.DirectWriter;
@ -61,11 +66,13 @@ final class Lucene80DocValuesConsumer extends DocValuesConsumer implements Close
IndexOutput data, meta;
final int maxDoc;
private final SegmentWriteState state;
/** expert: Creates a new writer */
public Lucene80DocValuesConsumer(SegmentWriteState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
boolean success = false;
try {
this.state = state;
String dataName = IndexFileNames.segmentFileName(state.segmentInfo.name, state.segmentSuffix, dataExtension);
data = state.directory.createOutput(dataName, state.context);
CodecUtil.writeIndexHeader(data, dataCodec, Lucene80DocValuesFormat.VERSION_CURRENT, state.segmentInfo.getId(), state.segmentSuffix);
@ -353,67 +360,191 @@ final class Lucene80DocValuesConsumer extends DocValuesConsumer implements Close
}
}
class CompressedBinaryBlockWriter implements Closeable {
final FastCompressionHashTable ht = new LZ4.FastCompressionHashTable();
int uncompressedBlockLength = 0;
int maxUncompressedBlockLength = 0;
int numDocsInCurrentBlock = 0;
final int[] docLengths = new int[Lucene80DocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK];
byte[] block = BytesRef.EMPTY_BYTES;
int totalChunks = 0;
long maxPointer = 0;
final long blockAddressesStart;
private final IndexOutput tempBinaryOffsets;
public CompressedBinaryBlockWriter() throws IOException {
tempBinaryOffsets = state.directory.createTempOutput(state.segmentInfo.name, "binary_pointers", state.context);
boolean success = false;
try {
CodecUtil.writeHeader(tempBinaryOffsets, Lucene80DocValuesFormat.META_CODEC + "FilePointers", Lucene80DocValuesFormat.VERSION_CURRENT);
blockAddressesStart = data.getFilePointer();
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(this); //self-close because constructor caller can't
}
}
}
void addDoc(int doc, BytesRef v) throws IOException {
docLengths[numDocsInCurrentBlock] = v.length;
block = ArrayUtil.grow(block, uncompressedBlockLength + v.length);
System.arraycopy(v.bytes, v.offset, block, uncompressedBlockLength, v.length);
uncompressedBlockLength += v.length;
numDocsInCurrentBlock++;
if (numDocsInCurrentBlock == Lucene80DocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK) {
flushData();
}
}
private void flushData() throws IOException {
if (numDocsInCurrentBlock > 0) {
// Write offset to this block to temporary offsets file
totalChunks++;
long thisBlockStartPointer = data.getFilePointer();
// Optimisation - check if all lengths are same
boolean allLengthsSame = true;
for (int i = 1; i < Lucene80DocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
if (docLengths[i] != docLengths[i-1]) {
allLengthsSame = false;
break;
}
}
if (allLengthsSame) {
// Only write one value shifted. Steal a bit to indicate all other lengths are the same
int onlyOneLength = (docLengths[0] <<1) | 1;
data.writeVInt(onlyOneLength);
} else {
for (int i = 0; i < Lucene80DocValuesFormat.BINARY_DOCS_PER_COMPRESSED_BLOCK; i++) {
if (i == 0) {
// Write first value shifted and steal a bit to indicate other lengths are to follow
int multipleLengths = (docLengths[0] <<1);
data.writeVInt(multipleLengths);
} else {
data.writeVInt(docLengths[i]);
}
}
}
maxUncompressedBlockLength = Math.max(maxUncompressedBlockLength, uncompressedBlockLength);
LZ4.compress(block, 0, uncompressedBlockLength, data, ht);
numDocsInCurrentBlock = 0;
// Ensure initialized with zeroes because full array is always written
Arrays.fill(docLengths, 0);
uncompressedBlockLength = 0;
maxPointer = data.getFilePointer();
tempBinaryOffsets.writeVLong(maxPointer - thisBlockStartPointer);
}
}
void writeMetaData() throws IOException {
if (totalChunks == 0) {
return;
}
long startDMW = data.getFilePointer();
meta.writeLong(startDMW);
meta.writeVInt(totalChunks);
meta.writeVInt(Lucene80DocValuesFormat.BINARY_BLOCK_SHIFT);
meta.writeVInt(maxUncompressedBlockLength);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
CodecUtil.writeFooter(tempBinaryOffsets);
IOUtils.close(tempBinaryOffsets);
//write the compressed block offsets info to the meta file by reading from temp file
try (ChecksumIndexInput filePointersIn = state.directory.openChecksumInput(tempBinaryOffsets.getName(), IOContext.READONCE)) {
CodecUtil.checkHeader(filePointersIn, Lucene80DocValuesFormat.META_CODEC + "FilePointers", Lucene80DocValuesFormat.VERSION_CURRENT,
Lucene80DocValuesFormat.VERSION_CURRENT);
Throwable priorE = null;
try {
final DirectMonotonicWriter filePointers = DirectMonotonicWriter.getInstance(meta, data, totalChunks, DIRECT_MONOTONIC_BLOCK_SHIFT);
long fp = blockAddressesStart;
for (int i = 0; i < totalChunks; ++i) {
filePointers.add(fp);
fp += filePointersIn.readVLong();
}
if (maxPointer < fp) {
throw new CorruptIndexException("File pointers don't add up ("+fp+" vs expected "+maxPointer+")", filePointersIn);
}
filePointers.finish();
} catch (Throwable e) {
priorE = e;
} finally {
CodecUtil.checkFooter(filePointersIn, priorE);
}
}
// Write the length of the DMW block in the data
meta.writeLong(data.getFilePointer() - startDMW);
}
@Override
public void close() throws IOException {
if (tempBinaryOffsets != null) {
IOUtils.close(tempBinaryOffsets);
state.directory.deleteFile(tempBinaryOffsets.getName());
}
}
}
@Override
public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException {
meta.writeInt(field.number);
meta.writeByte(Lucene80DocValuesFormat.BINARY);
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
numDocsWithField++;
BytesRef v = values.binaryValue();
int length = v.length;
data.writeBytes(v.bytes, v.offset, v.length);
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength
if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);
if (maxLength > minLength) {
start = data.getFilePointer();
meta.writeLong(start);
meta.writeVInt(DIRECT_MONOTONIC_BLOCK_SHIFT);
final DirectMonotonicWriter writer = DirectMonotonicWriter.getInstance(meta, data, numDocsWithField + 1, DIRECT_MONOTONIC_BLOCK_SHIFT);
long addr = 0;
writer.add(addr);
values = valuesProducer.getBinary(field);
try (CompressedBinaryBlockWriter blockWriter = new CompressedBinaryBlockWriter()){
BinaryDocValues values = valuesProducer.getBinary(field);
long start = data.getFilePointer();
meta.writeLong(start); // dataOffset
int numDocsWithField = 0;
int minLength = Integer.MAX_VALUE;
int maxLength = 0;
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
addr += values.binaryValue().length;
writer.add(addr);
numDocsWithField++;
BytesRef v = values.binaryValue();
blockWriter.addDoc(doc, v);
int length = v.length;
minLength = Math.min(length, minLength);
maxLength = Math.max(length, maxLength);
}
writer.finish();
meta.writeLong(data.getFilePointer() - start);
blockWriter.flushData();
assert numDocsWithField <= maxDoc;
meta.writeLong(data.getFilePointer() - start); // dataLength
if (numDocsWithField == 0) {
meta.writeLong(-2); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else if (numDocsWithField == maxDoc) {
meta.writeLong(-1); // docsWithFieldOffset
meta.writeLong(0L); // docsWithFieldLength
meta.writeShort((short) -1); // jumpTableEntryCount
meta.writeByte((byte) -1); // denseRankPower
} else {
long offset = data.getFilePointer();
meta.writeLong(offset); // docsWithFieldOffset
values = valuesProducer.getBinary(field);
final short jumpTableEntryCount = IndexedDISI.writeBitSet(values, data, IndexedDISI.DEFAULT_DENSE_RANK_POWER);
meta.writeLong(data.getFilePointer() - offset); // docsWithFieldLength
meta.writeShort(jumpTableEntryCount);
meta.writeByte(IndexedDISI.DEFAULT_DENSE_RANK_POWER);
}
meta.writeInt(numDocsWithField);
meta.writeInt(minLength);
meta.writeInt(maxLength);
blockWriter.writeMetaData();
}
}
@Override

View File

@ -151,7 +151,8 @@ public final class Lucene80DocValuesFormat extends DocValuesFormat {
static final String META_CODEC = "Lucene80DocValuesMetadata";
static final String META_EXTENSION = "dvm";
static final int VERSION_START = 0;
static final int VERSION_CURRENT = VERSION_START;
static final int VERSION_BIN_COMPRESSED = 1;
static final int VERSION_CURRENT = VERSION_BIN_COMPRESSED;
// indicates docvalues type
static final byte NUMERIC = 0;
@ -165,6 +166,9 @@ public final class Lucene80DocValuesFormat extends DocValuesFormat {
static final int NUMERIC_BLOCK_SHIFT = 14;
static final int NUMERIC_BLOCK_SIZE = 1 << NUMERIC_BLOCK_SHIFT;
static final int BINARY_BLOCK_SHIFT = 5;
static final int BINARY_DOCS_PER_COMPRESSED_BLOCK = 1 << BINARY_BLOCK_SHIFT;
static final int TERMS_DICT_BLOCK_SHIFT = 4;
static final int TERMS_DICT_BLOCK_SIZE = 1 << TERMS_DICT_BLOCK_SHIFT;
static final int TERMS_DICT_BLOCK_MASK = TERMS_DICT_BLOCK_SIZE - 1;

View File

@ -46,6 +46,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LongValues;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.compress.LZ4;
import org.apache.lucene.util.packed.DirectMonotonicReader;
import org.apache.lucene.util.packed.DirectReader;
@ -59,6 +60,7 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
private long ramBytesUsed;
private final IndexInput data;
private final int maxDoc;
private int version = -1;
/** expert: instantiates a new reader */
Lucene80DocValuesProducer(SegmentReadState state, String dataCodec, String dataExtension, String metaCodec, String metaExtension) throws IOException {
@ -66,11 +68,10 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
this.maxDoc = state.segmentInfo.maxDoc();
ramBytesUsed = RamUsageEstimator.shallowSizeOfInstance(getClass());
int version = -1;
// read in the entries from the metadata file.
try (ChecksumIndexInput in = state.directory.openChecksumInput(metaName, state.context)) {
Throwable priorE = null;
try {
version = CodecUtil.checkIndexHeader(in, metaCodec,
Lucene80DocValuesFormat.VERSION_START,
@ -182,10 +183,21 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
entry.numDocsWithField = meta.readInt();
entry.minLength = meta.readInt();
entry.maxLength = meta.readInt();
if (entry.minLength < entry.maxLength) {
if ((version >= Lucene80DocValuesFormat.VERSION_BIN_COMPRESSED && entry.numDocsWithField > 0) || entry.minLength < entry.maxLength) {
entry.addressesOffset = meta.readLong();
// Old count of uncompressed addresses
long numAddresses = entry.numDocsWithField + 1L;
// New count of compressed addresses - the number of compresseed blocks
if (version >= Lucene80DocValuesFormat.VERSION_BIN_COMPRESSED) {
entry.numCompressedChunks = meta.readVInt();
entry.docsPerChunkShift = meta.readVInt();
entry.maxUncompressedChunkSize = meta.readVInt();
numAddresses = entry.numCompressedChunks;
}
final int blockShift = meta.readVInt();
entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, entry.numDocsWithField + 1L, blockShift);
entry.addressesMeta = DirectMonotonicReader.loadMeta(meta, numAddresses, blockShift);
ramBytesUsed += entry.addressesMeta.ramBytesUsed();
entry.addressesLength = meta.readLong();
}
@ -303,6 +315,9 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
long addressesOffset;
long addressesLength;
DirectMonotonicReader.Meta addressesMeta;
int numCompressedChunks;
int docsPerChunkShift;
int maxUncompressedChunkSize;
}
private static class TermsDictEntry {
@ -664,9 +679,9 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
return disi.advanceExact(target);
}
}
@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
// BWC - old binary format
private BinaryDocValues getUncompressedBinary(FieldInfo field) throws IOException {
BinaryEntry entry = binaries.get(field.name);
if (entry.docsWithFieldOffset == -2) {
return DocValues.emptyBinary();
@ -742,6 +757,128 @@ final class Lucene80DocValuesProducer extends DocValuesProducer implements Close
};
}
}
}
// Decompresses blocks of binary values to retrieve content
class BinaryDecoder {
private final LongValues addresses;
private final IndexInput compressedData;
// Cache of last uncompressed block
private long lastBlockId = -1;
private final int []uncompressedDocStarts;
private int uncompressedBlockLength = 0;
private final byte[] uncompressedBlock;
private final BytesRef uncompressedBytesRef;
private final int docsPerChunk;
private final int docsPerChunkShift;
public BinaryDecoder(LongValues addresses, IndexInput compressedData, int biggestUncompressedBlockSize, int docsPerChunkShift) {
super();
this.addresses = addresses;
this.compressedData = compressedData;
// pre-allocate a byte array large enough for the biggest uncompressed block needed.
this.uncompressedBlock = new byte[biggestUncompressedBlockSize];
uncompressedBytesRef = new BytesRef(uncompressedBlock);
this.docsPerChunk = 1 << docsPerChunkShift;
this.docsPerChunkShift = docsPerChunkShift;
uncompressedDocStarts = new int[docsPerChunk + 1];
}
BytesRef decode(int docNumber) throws IOException {
int blockId = docNumber >> docsPerChunkShift;
int docInBlockId = docNumber % docsPerChunk;
assert docInBlockId < docsPerChunk;
// already read and uncompressed?
if (blockId != lastBlockId) {
lastBlockId = blockId;
long blockStartOffset = addresses.get(blockId);
compressedData.seek(blockStartOffset);
uncompressedBlockLength = 0;
int onlyLength = -1;
for (int i = 0; i < docsPerChunk; i++) {
if (i == 0) {
// The first length value is special. It is shifted and has a bit to denote if
// all other values are the same length
int lengthPlusSameInd = compressedData.readVInt();
int sameIndicator = lengthPlusSameInd & 1;
int firstValLength = lengthPlusSameInd >>>1;
if (sameIndicator == 1) {
onlyLength = firstValLength;
}
uncompressedBlockLength += firstValLength;
} else {
if (onlyLength == -1) {
// Various lengths are stored - read each from disk
uncompressedBlockLength += compressedData.readVInt();
} else {
// Only one length
uncompressedBlockLength += onlyLength;
}
}
uncompressedDocStarts[i+1] = uncompressedBlockLength;
}
if (uncompressedBlockLength == 0) {
uncompressedBytesRef.offset = 0;
uncompressedBytesRef.length = 0;
return uncompressedBytesRef;
}
assert uncompressedBlockLength <= uncompressedBlock.length;
LZ4.decompress(compressedData, uncompressedBlockLength, uncompressedBlock, 0);
}
uncompressedBytesRef.offset = uncompressedDocStarts[docInBlockId];
uncompressedBytesRef.length = uncompressedDocStarts[docInBlockId +1] - uncompressedBytesRef.offset;
return uncompressedBytesRef;
}
}
@Override
public BinaryDocValues getBinary(FieldInfo field) throws IOException {
if (version < Lucene80DocValuesFormat.VERSION_BIN_COMPRESSED) {
return getUncompressedBinary(field);
}
BinaryEntry entry = binaries.get(field.name);
if (entry.docsWithFieldOffset == -2) {
return DocValues.emptyBinary();
}
if (entry.docsWithFieldOffset == -1) {
// dense
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
return new DenseBinaryDocValues(maxDoc) {
BinaryDecoder decoder = new BinaryDecoder(addresses, data.clone(), entry.maxUncompressedChunkSize, entry.docsPerChunkShift);
@Override
public BytesRef binaryValue() throws IOException {
return decoder.decode(doc);
}
};
} else {
// sparse
final IndexedDISI disi = new IndexedDISI(data, entry.docsWithFieldOffset, entry.docsWithFieldLength,
entry.jumpTableEntryCount, entry.denseRankPower, entry.numDocsWithField);
final RandomAccessInput addressesData = this.data.randomAccessSlice(entry.addressesOffset, entry.addressesLength);
final LongValues addresses = DirectMonotonicReader.getInstance(entry.addressesMeta, addressesData);
return new SparseBinaryDocValues(disi) {
BinaryDecoder decoder = new BinaryDecoder(addresses, data.clone(), entry.maxUncompressedChunkSize, entry.docsPerChunkShift);
@Override
public BytesRef binaryValue() throws IOException {
return decoder.decode(disi.index());
}
};
}
}
@Override

View File

@ -246,6 +246,57 @@ public abstract class BaseDocValuesFormatTestCase extends BaseIndexFileFormatTes
ireader.close();
directory.close();
}
public void testVariouslyCompressibleBinaryValues() throws IOException {
Directory directory = newDirectory();
RandomIndexWriter iwriter = new RandomIndexWriter(random(), directory);
int numDocs = 1 + random().nextInt(100);
HashMap<Integer,BytesRef> writtenValues = new HashMap<>(numDocs);
// Small vocabulary ranges will be highly compressible
int vocabRange = random().nextInt(Byte.MAX_VALUE);
for (int i = 0; i < numDocs; i++) {
Document doc = new Document();
// Generate random-sized byte array with random choice of bytes in vocab range
byte[] value = new byte[500 + random().nextInt(1024)];
for (int j = 0; j < value.length; j++) {
value[j] = (byte) random().nextInt(vocabRange);
}
BytesRef bytesRef = new BytesRef(value);
writtenValues.put(i, bytesRef);
doc.add(newTextField("id", Integer.toString(i), Field.Store.YES));
doc.add(new BinaryDocValuesField("dv1", bytesRef));
iwriter.addDocument(doc);
}
iwriter.forceMerge(1);
iwriter.close();
// Now search the index:
IndexReader ireader = DirectoryReader.open(directory); // read-only=true
IndexSearcher isearcher = new IndexSearcher(ireader);
for (int i = 0; i < numDocs; i++) {
String id = Integer.toString(i);
Query query = new TermQuery(new Term("id", id));
TopDocs hits = isearcher.search(query, 1);
assertEquals(1, hits.totalHits.value);
// Iterate through the results:
int hitDocID = hits.scoreDocs[0].doc;
Document hitDoc = isearcher.doc(hitDocID);
assertEquals(id, hitDoc.get("id"));
assert ireader.leaves().size() == 1;
BinaryDocValues dv = ireader.leaves().get(0).reader().getBinaryDocValues("dv1");
assertEquals(hitDocID, dv.advance(hitDocID));
BytesRef scratch = dv.binaryValue();
assertEquals(writtenValues.get(i), scratch);
}
ireader.close();
directory.close();
}
public void testTwoFieldsMixed() throws IOException {
Directory directory = newDirectory();