LUCENE-6183: Avoid re-compression on stored fields merge

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1652269 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Muir 2015-01-15 21:09:45 +00:00
parent dc3420f76f
commit 7af820f9b1
6 changed files with 214 additions and 12 deletions

View File

@ -30,7 +30,13 @@ API Changes
implementation returning the empty list. (Robert Muir)
======================= Lucene 5.1.0 =======================
(No Changes)
Optimizations
* LUCENE-6183: Avoid recompressing stored fields when merging
segments without deletions. Lucene50Codec's BEST_COMPRESSION
mode uses a higher deflate level for more compact storage.
(Robert Muir)
======================= Lucene 5.0.0 =======================

View File

@ -36,6 +36,7 @@ import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_BITS;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.TYPE_MASK;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_CURRENT;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_CHUNK_STATS;
import static org.apache.lucene.codecs.compressing.CompressingStoredFieldsWriter.VERSION_START;
import java.io.EOFException;
@ -88,6 +89,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
private final int numDocs;
private final boolean merging;
private final BlockState state;
private final long numChunks; // number of compressed blocks written
private final long numDirtyChunks; // number of incomplete compressed blocks written
private boolean closed;
// used by clone
@ -102,6 +105,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.merging = merging;
this.state = new BlockState();
this.closed = false;
@ -145,9 +150,6 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
try {
// Open the data file and read metadata
fieldsStream = d.openInput(fieldsStreamFN, context);
if (maxPointer + CodecUtil.footerLength() != fieldsStream.length()) {
throw new CorruptIndexException("Invalid fieldsStream maxPointer (file truncated?): maxPointer=" + maxPointer + ", length=" + fieldsStream.length(), fieldsStream);
}
final String codecNameDat = formatName + CODEC_SFX_DAT;
final int fieldsVersion = CodecUtil.checkIndexHeader(fieldsStream, codecNameDat, VERSION_START, VERSION_CURRENT, si.getId(), segmentSuffix);
if (version != fieldsVersion) {
@ -161,6 +163,17 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
this.merging = false;
this.state = new BlockState();
if (version >= VERSION_CHUNK_STATS) {
fieldsStream.seek(maxPointer);
numChunks = fieldsStream.readVLong();
numDirtyChunks = fieldsStream.readVLong();
if (numDirtyChunks > numChunks) {
throw new CorruptIndexException("invalid chunk counts: dirty=" + numDirtyChunks + ", total=" + numChunks, fieldsStream);
}
} else {
numChunks = numDirtyChunks = -1;
}
// NOTE: data file is too costly to verify checksum against all the bytes on open,
// but for now we at least verify proper structure of the checksum footer: which looks
// for FOOTER_MAGIC + algorithmID. This is cheap and can detect some forms of corruption
@ -496,8 +509,6 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
final int totalLength = offsets[chunkDocs];
final int numStoredFields = this.numStoredFields[index];
fieldsStream.seek(startPointer);
final DataInput documentInput;
if (length == 0) {
// empty
@ -506,6 +517,7 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
// already decompressed
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset + offset, length);
} else if (sliced) {
fieldsStream.seek(startPointer);
decompressor.decompress(fieldsStream, chunkSize, offset, Math.min(length, chunkSize - offset), bytes);
documentInput = new DataInput() {
@ -545,6 +557,7 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
};
} else {
fieldsStream.seek(startPointer);
decompressor.decompress(fieldsStream, totalLength, offset, length, bytes);
assert bytes.length == length;
documentInput = new ByteArrayDataInput(bytes.bytes, bytes.offset, bytes.length);
@ -611,10 +624,30 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
return compressionMode;
}
CompressingStoredFieldsIndexReader getIndexReader() {
return indexReader;
}
long getMaxPointer() {
return maxPointer;
}
IndexInput getFieldsStream() {
return fieldsStream;
}
int getChunkSize() {
return chunkSize;
}
long getNumChunks() {
return numChunks;
}
long getNumDirtyChunks() {
return numDirtyChunks;
}
@Override
public long ramBytesUsed() {
return indexReader.ramBytesUsed();

View File

@ -24,6 +24,7 @@ import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressingStoredFieldsReader.SerializedDocument;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.IndexFileNames;
@ -33,6 +34,7 @@ import org.apache.lucene.index.StorableField;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BitUtil;
@ -68,13 +70,15 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
static final String CODEC_SFX_IDX = "Index";
static final String CODEC_SFX_DAT = "Data";
static final int VERSION_START = 0;
static final int VERSION_CURRENT = VERSION_START;
static final int VERSION_CHUNK_STATS = 1;
static final int VERSION_CURRENT = VERSION_CHUNK_STATS;
private final String segment;
private CompressingStoredFieldsIndexWriter indexWriter;
private IndexOutput fieldsStream;
private final Compressor compressor;
private final CompressionMode compressionMode;
private final int chunkSize;
private final int maxDocsPerChunk;
@ -84,11 +88,15 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
private int docBase; // doc ID at the beginning of the chunk
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID
private long numChunks; // number of compressed blocks written
private long numDirtyChunks; // number of incomplete compressed blocks written
/** Sole constructor. */
public CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context,
String formatName, CompressionMode compressionMode, int chunkSize, int maxDocsPerChunk, int blockSize) throws IOException {
assert directory != null;
this.segment = si.name;
this.compressionMode = compressionMode;
this.compressor = compressionMode.newCompressor();
this.chunkSize = chunkSize;
this.maxDocsPerChunk = maxDocsPerChunk;
@ -234,6 +242,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
docBase += numBufferedDocs;
numBufferedDocs = 0;
bufferedDocs.length = 0;
numChunks++;
}
byte scratchBytes[] = new byte[16];
@ -459,6 +468,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numBufferedDocs > 0) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
} else {
assert bufferedDocs.length == 0;
}
@ -466,10 +476,25 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, fieldsStream.getFilePointer());
fieldsStream.writeVLong(numChunks);
fieldsStream.writeVLong(numDirtyChunks);
CodecUtil.writeFooter(fieldsStream);
assert bufferedDocs.length == 0;
}
// bulk merge is scary: its caused corruption bugs in the past.
// we try to be extra safe with this impl, but add an escape hatch to
// have a workaround for undiscovered bugs.
static final String BULK_MERGE_ENABLED_SYSPROP = CompressingStoredFieldsWriter.class.getName() + ".enableBulkMerge";
static final boolean BULK_MERGE_ENABLED;
static {
boolean v = true;
try {
v = Boolean.parseBoolean(System.getProperty(BULK_MERGE_ENABLED_SYSPROP, "true"));
} catch (SecurityException ignored) {}
BULK_MERGE_ENABLED = v;
}
@Override
public int merge(MergeState mergeState) throws IOException {
int docCount = 0;
@ -491,8 +516,8 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
final int maxDoc = mergeState.maxDocs[readerIndex];
final Bits liveDocs = mergeState.liveDocs[readerIndex];
// if its some other format, or an older version of this format:
if (matchingFieldsReader == null || matchingFieldsReader.getVersion() != VERSION_CURRENT) {
// if its some other format, or an older version of this format, or safety switch:
if (matchingFieldsReader == null || matchingFieldsReader.getVersion() != VERSION_CURRENT || BULK_MERGE_ENABLED == false) {
// naive merge...
StoredFieldsReader storedFieldsReader = mergeState.storedFieldsReaders[readerIndex];
if (storedFieldsReader != null) {
@ -507,10 +532,78 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
finishDocument();
++docCount;
}
} else if (matchingFieldsReader.getCompressionMode() == compressionMode &&
matchingFieldsReader.getChunkSize() == chunkSize &&
liveDocs == null &&
!tooDirty(matchingFieldsReader)) {
// optimized merge, raw byte copy
// its not worth fine-graining this if there are deletions.
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
// flush any pending chunks
if (numBufferedDocs > 0) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
}
// iterate over each chunk. we use the stored fields index to find chunk boundaries,
// read the docstart + doccount from the chunk header (we write a new header, since doc numbers will change),
// and just copy the bytes directly.
IndexInput rawDocs = matchingFieldsReader.getFieldsStream();
CompressingStoredFieldsIndexReader index = matchingFieldsReader.getIndexReader();
rawDocs.seek(index.getStartPointer(0));
int docID = 0;
while (docID < maxDoc) {
// read header
int base = rawDocs.readVInt();
if (base != docID) {
throw new CorruptIndexException("invalid state: base=" + base + ", docID=" + docID, rawDocs);
}
int code = rawDocs.readVInt();
// write a new index entry and new header for this chunk.
int bufferedDocs = code >>> 1;
indexWriter.writeIndex(bufferedDocs, fieldsStream.getFilePointer());
fieldsStream.writeVInt(docBase); // rebase
fieldsStream.writeVInt(code);
docID += bufferedDocs;
docBase += bufferedDocs;
docCount += bufferedDocs;
if (docID > maxDoc) {
throw new CorruptIndexException("invalid state: base=" + base + ", count=" + bufferedDocs + ", maxDoc=" + maxDoc, rawDocs);
}
// copy bytes until the next chunk boundary (or end of chunk data).
// using the stored fields index for this isn't the most efficient, but fast enough
// and is a source of redundancy for detecting bad things.
final long end;
if (docID == maxDoc) {
end = matchingFieldsReader.getMaxPointer();
} else {
end = index.getStartPointer(docID);
}
fieldsStream.copyBytes(rawDocs, end - rawDocs.getFilePointer());
}
if (rawDocs.getFilePointer() != matchingFieldsReader.getMaxPointer()) {
throw new CorruptIndexException("invalid state: pos=" + rawDocs.getFilePointer() + ", max=" + matchingFieldsReader.getMaxPointer(), rawDocs);
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numChunks += matchingFieldsReader.getNumChunks();
numDirtyChunks += matchingFieldsReader.getNumDirtyChunks();
} else {
// optimized merge, we copy serialized (but decompressed) bytes directly
// even on simple docs (1 stored field), it seems to help by about 20%
// if the format is older, its always handled by the naive merge case above
assert matchingFieldsReader.getVersion() == VERSION_CURRENT;
matchingFieldsReader.checkIntegrity();
for (int docID = 0; docID < maxDoc; docID++) {
if (liveDocs != null && liveDocs.get(docID) == false) {
continue;
@ -527,4 +620,17 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
finish(mergeState.mergeFieldInfos, docCount);
return docCount;
}
/**
* Returns true if we should recompress this reader, even though we could bulk merge compressed data
* <p>
* The last chunk written for a segment is typically incomplete, so without recompressing,
* in some worst-case situations (e.g. frequent reopen with tiny flushes), over time the
* compression ratio can degrade. This is a safety switch.
*/
boolean tooDirty(CompressingStoredFieldsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024 ||
candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
}
}

View File

@ -70,8 +70,10 @@ public abstract class CompressionMode {
@Override
public Compressor newCompressor() {
// notes:
// 3 is the highest level that doesn't have lazy match evaluation
return new DeflateCompressor(3);
// 6 is the default, higher than that is just a waste of cpu
return new DeflateCompressor(6);
}
@Override

View File

@ -71,7 +71,7 @@ import org.apache.lucene.util.packed.PackedInts;
* <a href="http://fastcompression.blogspot.fr/2011/05/lz4-explained.html">compression format</a>.</p>
* <p>Here is a more detailed description of the field data file format:</p>
* <ul>
* <li>FieldData (.fdt) --&gt; &lt;Header&gt;, PackedIntsVersion, &lt;Chunk&gt;<sup>ChunkCount</sup></li>
* <li>FieldData (.fdt) --&gt; &lt;Header&gt;, PackedIntsVersion, &lt;Chunk&gt;<sup>ChunkCount</sup>, ChunkCount, DirtyChunkCount, Footer</li>
* <li>Header --&gt; {@link CodecUtil#writeIndexHeader IndexHeader}</li>
* <li>PackedIntsVersion --&gt; {@link PackedInts#VERSION_CURRENT} as a {@link DataOutput#writeVInt VInt}</li>
* <li>ChunkCount is not known in advance and is the number of chunks necessary to store all document of the segment</li>
@ -102,6 +102,9 @@ import org.apache.lucene.util.packed.PackedInts;
* <li>FieldNum --&gt; an ID of the field</li>
* <li>Value --&gt; {@link DataOutput#writeString(String) String} | BinaryValue | Int | Float | Long | Double depending on Type</li>
* <li>BinaryValue --&gt; ValueLength &lt;Byte&gt;<sup>ValueLength</sup></li>
* <li>ChunkCount --&gt; the number of chunks in this file</li>
* <li>DirtyChunkCount --&gt; the number of prematurely flushed chunks in this file</li>
* <li>Footer --&gt; {@link CodecUtil#writeFooter CodecFooter}</li>
* </ul>
* <p>Notes</p>
* <ul>
@ -123,9 +126,10 @@ import org.apache.lucene.util.packed.PackedInts;
* <li><a name="field_index" id="field_index"></a>
* <p>A fields index file (extension <tt>.fdx</tt>).</p>
* <ul>
* <li>FieldsIndex (.fdx) --&gt; &lt;Header&gt;, &lt;ChunkIndex&gt;</li>
* <li>FieldsIndex (.fdx) --&gt; &lt;Header&gt;, &lt;ChunkIndex&gt;, Footer</li>
* <li>Header --&gt; {@link CodecUtil#writeIndexHeader IndexHeader}</li>
* <li>ChunkIndex: See {@link CompressingStoredFieldsIndexWriter}</li>
* <li>Footer --&gt; {@link CodecUtil#writeFooter CodecFooter}</li>
* </ul>
* </li>
* </ol>

View File

@ -27,9 +27,14 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.IntField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.BaseStoredFieldsFormatTestCase;
import org.apache.lucene.index.CodecReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.Directory;
@ -267,4 +272,50 @@ public class TestCompressingStoredFieldsFormat extends BaseStoredFieldsFormatTes
out.reset(buffer);
}
}
/**
* writes some tiny segments with incomplete compressed blocks,
* and ensures merge recompresses them.
*/
public void testChunkCleanup() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwConf = newIndexWriterConfig(new MockAnalyzer(random()));
iwConf.setMergePolicy(NoMergePolicy.INSTANCE);
// we have to enforce certain things like maxDocsPerChunk to cause dirty chunks to be created
// by this test.
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4*1024, 100, false, 8));
IndexWriter iw = new IndexWriter(dir, iwConf);
DirectoryReader ir = DirectoryReader.open(iw, true);
for (int i = 0; i < 5; i++) {
Document doc = new Document();
doc.add(new StoredField("text", "not very long at all"));
iw.addDocument(doc);
// force flush
DirectoryReader ir2 = DirectoryReader.openIfChanged(ir);
assertNotNull(ir2);
ir.close();
ir = ir2;
// examine dirty counts:
for (LeafReaderContext leaf : ir2.leaves()) {
CodecReader sr = (CodecReader) leaf.reader();
CompressingStoredFieldsReader reader = (CompressingStoredFieldsReader)sr.getFieldsReader();
assertEquals(1, reader.getNumChunks());
assertEquals(1, reader.getNumDirtyChunks());
}
}
iw.getConfig().setMergePolicy(newLogMergePolicy());
iw.forceMerge(1);
DirectoryReader ir2 = DirectoryReader.openIfChanged(ir);
assertNotNull(ir2);
ir.close();
ir = ir2;
CodecReader sr = getOnlySegmentReader(ir);
CompressingStoredFieldsReader reader = (CompressingStoredFieldsReader)sr.getFieldsReader();
// we could get lucky, and have zero, but typically one.
assertTrue(reader.getNumDirtyChunks() <= 1);
ir.close();
iw.close();
dir.close();
}
}