mirror of https://github.com/apache/lucene.git
LUCENE-9827: avoid wasteful recompression for small segments (#28)
Require that the segment has enough dirty documents to create a clean chunk before recompressing during merge, there must be at least maxChunkSize. This prevents wasteful recompression with small flushes (e.g. every document): we ensure recompression achieves some "permanent" progress. Expose maxDocsPerChunk as a parameter for Term vectors too, matching the stored fields format. This allows for easy testing. Increment numDirtyDocs for partially optimized merges: If segment N needs recompression, we have to flush any buffered docs before bulk-copying segment N+1. Don't just increment numDirtyChunks, also make sure numDirtyDocs is incremented, too. This doesn't have a performance impact, and is unrelated to tooDirty() improvements, but it is easier to reason about things with correct statistics in the index. Further tuning of how dirtiness is measured: for simplification just use percentage of dirty chunks. Co-authored-by: Adrien Grand <jpountz@gmail.com>
This commit is contained in:
parent
d991fefb49
commit
be94a667f2
|
@ -154,6 +154,6 @@ public final class Lucene90TermVectorsFormat extends Lucene90CompressingTermVect
|
|||
|
||||
/** Sole constructor. */
|
||||
public Lucene90TermVectorsFormat() {
|
||||
super("Lucene90TermVectorsData", "", CompressionMode.FAST, 1 << 12, 10);
|
||||
super("Lucene90TermVectorsData", "", CompressionMode.FAST, 1 << 12, 128, 10);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -89,8 +89,9 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
|
|||
private final int numDocs;
|
||||
private final boolean merging;
|
||||
private final BlockState state;
|
||||
private final long numChunks; // number of written blocks
|
||||
private final long numDirtyChunks; // number of incomplete compressed blocks written
|
||||
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
|
||||
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
|
||||
private boolean closed;
|
||||
|
||||
// used by clone
|
||||
|
@ -106,6 +107,7 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
|
|||
this.compressionMode = reader.compressionMode;
|
||||
this.decompressor = reader.decompressor.clone();
|
||||
this.numDocs = reader.numDocs;
|
||||
this.numChunks = reader.numChunks;
|
||||
this.numDirtyChunks = reader.numDirtyChunks;
|
||||
this.numDirtyDocs = reader.numDirtyDocs;
|
||||
this.merging = merging;
|
||||
|
@ -177,6 +179,7 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
|
|||
this.maxPointer = maxPointer;
|
||||
this.indexReader = indexReader;
|
||||
|
||||
numChunks = metaIn.readVLong();
|
||||
numDirtyChunks = metaIn.readVLong();
|
||||
numDirtyDocs = metaIn.readVLong();
|
||||
|
||||
|
@ -718,6 +721,15 @@ public final class Lucene90CompressingStoredFieldsReader extends StoredFieldsRea
|
|||
return numDirtyChunks;
|
||||
}
|
||||
|
||||
long getNumChunks() {
|
||||
if (version != VERSION_CURRENT) {
|
||||
throw new IllegalStateException(
|
||||
"getNumChunks should only ever get called when the reader is on the current version");
|
||||
}
|
||||
assert numChunks >= 0;
|
||||
return numChunks;
|
||||
}
|
||||
|
||||
int getNumDocs() {
|
||||
return numDocs;
|
||||
}
|
||||
|
|
|
@ -94,6 +94,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
private int docBase; // doc ID at the beginning of the chunk
|
||||
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID
|
||||
|
||||
private long numChunks;
|
||||
private long numDirtyChunks; // number of incomplete compressed blocks written
|
||||
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
|
||||
|
||||
|
@ -249,6 +250,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
}
|
||||
|
||||
private void flush() throws IOException {
|
||||
numChunks++;
|
||||
indexWriter.writeIndex(numBufferedDocs, fieldsStream.getFilePointer());
|
||||
|
||||
// transform end offsets into lengths
|
||||
|
@ -489,10 +491,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
public void finish(FieldInfos fis, int numDocs) throws IOException {
|
||||
if (numBufferedDocs > 0) {
|
||||
numDirtyChunks++; // incomplete: we had to force this flush
|
||||
final long expectedChunkDocs =
|
||||
Math.min(
|
||||
maxDocsPerChunk, (long) ((double) chunkSize / bufferedDocs.size() * numBufferedDocs));
|
||||
numDirtyDocs += expectedChunkDocs - numBufferedDocs;
|
||||
numDirtyDocs += numBufferedDocs;
|
||||
flush();
|
||||
} else {
|
||||
assert bufferedDocs.size() == 0;
|
||||
|
@ -502,6 +501,7 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
"Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
|
||||
}
|
||||
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
|
||||
metaStream.writeVLong(numChunks);
|
||||
metaStream.writeVLong(numDirtyChunks);
|
||||
metaStream.writeVLong(numDirtyDocs);
|
||||
CodecUtil.writeFooter(metaStream);
|
||||
|
@ -615,8 +615,9 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
|
||||
// flush any pending chunks
|
||||
if (numBufferedDocs > 0) {
|
||||
flush();
|
||||
numDirtyChunks++; // incomplete: we had to force this flush
|
||||
numDirtyDocs += numBufferedDocs;
|
||||
flush();
|
||||
}
|
||||
|
||||
// iterate over each chunk. we use the stored fields index to find chunk boundaries,
|
||||
|
@ -709,10 +710,10 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
|
|||
* ratio can degrade. This is a safety switch.
|
||||
*/
|
||||
boolean tooDirty(Lucene90CompressingStoredFieldsReader candidate) {
|
||||
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
|
||||
return candidate.getNumDirtyChunks() > 1024
|
||||
|| (candidate.getNumDirtyChunks() > 1
|
||||
&& candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs());
|
||||
// A segment is considered dirty only if it has enough dirty docs to make a full block
|
||||
// AND more than 1% blocks are dirty.
|
||||
return candidate.getNumDirtyDocs() > maxDocsPerChunk
|
||||
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
|
||||
}
|
||||
|
||||
private static class CompressingStoredFieldsMergeSub extends DocIDMerger.Sub {
|
||||
|
|
|
@ -41,6 +41,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
private final CompressionMode compressionMode;
|
||||
private final int chunkSize;
|
||||
private final int blockSize;
|
||||
private final int maxDocsPerChunk;
|
||||
|
||||
/**
|
||||
* Create a new {@link Lucene90CompressingTermVectorsFormat}.
|
||||
|
@ -63,6 +64,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
* @param segmentSuffix a suffix to append to files created by this format
|
||||
* @param compressionMode the {@link CompressionMode} to use
|
||||
* @param chunkSize the minimum number of bytes of a single chunk of stored documents
|
||||
* @param maxDocsPerChunk the maximum number of documents in a single chunk
|
||||
* @param blockSize the number of chunks to store in an index block.
|
||||
* @see CompressionMode
|
||||
*/
|
||||
|
@ -71,6 +73,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
String segmentSuffix,
|
||||
CompressionMode compressionMode,
|
||||
int chunkSize,
|
||||
int maxDocsPerChunk,
|
||||
int blockSize) {
|
||||
this.formatName = formatName;
|
||||
this.segmentSuffix = segmentSuffix;
|
||||
|
@ -79,6 +82,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
throw new IllegalArgumentException("chunkSize must be >= 1");
|
||||
}
|
||||
this.chunkSize = chunkSize;
|
||||
this.maxDocsPerChunk = maxDocsPerChunk;
|
||||
if (blockSize < 1) {
|
||||
throw new IllegalArgumentException("blockSize must be >= 1");
|
||||
}
|
||||
|
@ -104,6 +108,7 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
formatName,
|
||||
compressionMode,
|
||||
chunkSize,
|
||||
maxDocsPerChunk,
|
||||
blockSize);
|
||||
}
|
||||
|
||||
|
@ -114,6 +119,8 @@ public class Lucene90CompressingTermVectorsFormat extends TermVectorsFormat {
|
|||
+ compressionMode
|
||||
+ ", chunkSize="
|
||||
+ chunkSize
|
||||
+ ", maxDocsPerChunk="
|
||||
+ maxDocsPerChunk
|
||||
+ ", blockSize="
|
||||
+ blockSize
|
||||
+ ")";
|
||||
|
|
|
@ -83,8 +83,9 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
|
|||
private final int numDocs;
|
||||
private boolean closed;
|
||||
private final BlockPackedReaderIterator reader;
|
||||
private final long numChunks; // number of written blocks
|
||||
private final long numDirtyChunks; // number of incomplete compressed blocks written
|
||||
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
|
||||
private final long numDirtyDocs; // cumulative number of docs in incomplete chunks
|
||||
private final long maxPointer; // end of the data section
|
||||
|
||||
// used by clone
|
||||
|
@ -100,6 +101,7 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
|
|||
this.reader =
|
||||
new BlockPackedReaderIterator(vectorsStream, packedIntsVersion, PACKED_BLOCK_SIZE, 0);
|
||||
this.version = reader.version;
|
||||
this.numChunks = reader.numChunks;
|
||||
this.numDirtyChunks = reader.numDirtyChunks;
|
||||
this.numDirtyDocs = reader.numDirtyDocs;
|
||||
this.maxPointer = reader.maxPointer;
|
||||
|
@ -167,6 +169,7 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
|
|||
this.indexReader = fieldsIndexReader;
|
||||
this.maxPointer = fieldsIndexReader.getMaxPointer();
|
||||
|
||||
numChunks = metaIn.readVLong();
|
||||
numDirtyChunks = metaIn.readVLong();
|
||||
numDirtyDocs = metaIn.readVLong();
|
||||
|
||||
|
@ -238,6 +241,15 @@ public final class Lucene90CompressingTermVectorsReader extends TermVectorsReade
|
|||
return numDirtyChunks;
|
||||
}
|
||||
|
||||
long getNumChunks() {
|
||||
if (version != VERSION_CURRENT) {
|
||||
throw new IllegalStateException(
|
||||
"getNumChunks should only ever get called when the reader is on the current version");
|
||||
}
|
||||
assert numChunks >= 0;
|
||||
return numChunks;
|
||||
}
|
||||
|
||||
int getNumDocs() {
|
||||
return numDocs;
|
||||
}
|
||||
|
|
|
@ -60,9 +60,6 @@ import org.apache.lucene.util.packed.PackedInts;
|
|||
*/
|
||||
public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWriter {
|
||||
|
||||
// hard limit on the maximum number of documents per chunk
|
||||
static final int MAX_DOCUMENTS_PER_CHUNK = 128;
|
||||
|
||||
static final String VECTORS_EXTENSION = "tvd";
|
||||
static final String VECTORS_INDEX_EXTENSION = "tvx";
|
||||
static final String VECTORS_META_EXTENSION = "tvm";
|
||||
|
@ -87,8 +84,9 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
private final Compressor compressor;
|
||||
private final int chunkSize;
|
||||
|
||||
private long numChunks; // number of chunks
|
||||
private long numDirtyChunks; // number of incomplete compressed blocks written
|
||||
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
|
||||
private long numDirtyDocs; // cumulative number of docs in incomplete chunks
|
||||
|
||||
/** a pending doc */
|
||||
private class DocData {
|
||||
|
@ -224,6 +222,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
private final ByteBuffersDataOutput termSuffixes; // buffered term suffixes
|
||||
private final ByteBuffersDataOutput payloadBytes; // buffered term payloads
|
||||
private final BlockPackedWriter writer;
|
||||
private final int maxDocsPerChunk; // hard limit on number of docs per chunk
|
||||
|
||||
/** Sole constructor. */
|
||||
Lucene90CompressingTermVectorsWriter(
|
||||
|
@ -234,6 +233,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
String formatName,
|
||||
CompressionMode compressionMode,
|
||||
int chunkSize,
|
||||
int maxDocsPerChunk,
|
||||
int blockShift)
|
||||
throws IOException {
|
||||
assert directory != null;
|
||||
|
@ -241,6 +241,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
this.compressionMode = compressionMode;
|
||||
this.compressor = compressionMode.newCompressor();
|
||||
this.chunkSize = chunkSize;
|
||||
this.maxDocsPerChunk = maxDocsPerChunk;
|
||||
|
||||
numDocs = 0;
|
||||
pendingDocs = new ArrayDeque<>();
|
||||
|
@ -373,10 +374,11 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
}
|
||||
|
||||
private boolean triggerFlush() {
|
||||
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= MAX_DOCUMENTS_PER_CHUNK;
|
||||
return termSuffixes.size() >= chunkSize || pendingDocs.size() >= maxDocsPerChunk;
|
||||
}
|
||||
|
||||
private void flush() throws IOException {
|
||||
numChunks++;
|
||||
final int chunkDocs = pendingDocs.size();
|
||||
assert chunkDocs > 0 : chunkDocs;
|
||||
|
||||
|
@ -712,11 +714,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
public void finish(FieldInfos fis, int numDocs) throws IOException {
|
||||
if (!pendingDocs.isEmpty()) {
|
||||
numDirtyChunks++; // incomplete: we had to force this flush
|
||||
final long expectedChunkDocs =
|
||||
Math.min(
|
||||
MAX_DOCUMENTS_PER_CHUNK,
|
||||
(long) ((double) chunkSize / termSuffixes.size() * pendingDocs.size()));
|
||||
numDirtyDocs += expectedChunkDocs - pendingDocs.size();
|
||||
numDirtyDocs += pendingDocs.size();
|
||||
flush();
|
||||
}
|
||||
if (numDocs != this.numDocs) {
|
||||
|
@ -724,6 +722,7 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
"Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
|
||||
}
|
||||
indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
|
||||
metaStream.writeVLong(numChunks);
|
||||
metaStream.writeVLong(numDirtyChunks);
|
||||
metaStream.writeVLong(numDirtyDocs);
|
||||
CodecUtil.writeFooter(metaStream);
|
||||
|
@ -845,8 +844,9 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
|
||||
// flush any pending chunks
|
||||
if (!pendingDocs.isEmpty()) {
|
||||
flush();
|
||||
numDirtyChunks++; // incomplete: we had to force this flush
|
||||
numDirtyDocs += pendingDocs.size();
|
||||
flush();
|
||||
}
|
||||
|
||||
// iterate over each chunk. we use the vectors index to find chunk boundaries,
|
||||
|
@ -937,10 +937,10 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
|
|||
* ratio can degrade. This is a safety switch.
|
||||
*/
|
||||
boolean tooDirty(Lucene90CompressingTermVectorsReader candidate) {
|
||||
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
|
||||
return candidate.getNumDirtyChunks() > 1024
|
||||
|| (candidate.getNumDirtyChunks() > 1
|
||||
&& candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs());
|
||||
// A segment is considered dirty only if it has enough dirty docs to make a full block
|
||||
// AND more than 1% blocks are dirty.
|
||||
return candidate.getNumDirtyDocs() > maxDocsPerChunk
|
||||
&& candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -38,7 +38,7 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer {
|
|||
|
||||
private static final TermVectorsFormat TEMP_TERM_VECTORS_FORMAT =
|
||||
new Lucene90CompressingTermVectorsFormat(
|
||||
"TempTermVectors", "", SortingStoredFieldsConsumer.NO_COMPRESSION, 8 * 1024, 10);
|
||||
"TempTermVectors", "", SortingStoredFieldsConsumer.NO_COMPRESSION, 8 * 1024, 128, 10);
|
||||
TrackingTmpOutputDirectoryWrapper tmpDirectory;
|
||||
|
||||
SortingTermVectorsConsumer(
|
||||
|
|
|
@ -120,7 +120,7 @@ public abstract class CompressingCodec extends FilterCodec {
|
|||
name, segmentSuffix, compressionMode, chunkSize, maxDocsPerChunk, blockShift);
|
||||
this.termVectorsFormat =
|
||||
new Lucene90CompressingTermVectorsFormat(
|
||||
name, segmentSuffix, compressionMode, chunkSize, blockShift);
|
||||
name, segmentSuffix, compressionMode, chunkSize, maxDocsPerChunk, blockShift);
|
||||
}
|
||||
|
||||
/** Creates a compressing codec with an empty segment suffix */
|
||||
|
|
|
@ -283,7 +283,7 @@ public class TestCompressingStoredFieldsFormat extends BaseStoredFieldsFormatTes
|
|||
|
||||
// we have to enforce certain things like maxDocsPerChunk to cause dirty chunks to be created
|
||||
// by this test.
|
||||
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 100, false, 8));
|
||||
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 4, false, 8));
|
||||
IndexWriter iw = new IndexWriter(dir, iwConf);
|
||||
DirectoryReader ir = DirectoryReader.open(iw);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
|
|
|
@ -84,7 +84,7 @@ public class TestCompressingTermVectorsFormat extends BaseTermVectorsFormatTestC
|
|||
|
||||
// we have to enforce certain things like maxDocsPerChunk to cause dirty chunks to be created
|
||||
// by this test.
|
||||
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 100, false, 8));
|
||||
iwConf.setCodec(CompressingCodec.randomInstance(random(), 4 * 1024, 4, false, 8));
|
||||
IndexWriter iw = new IndexWriter(dir, iwConf);
|
||||
DirectoryReader ir = DirectoryReader.open(iw);
|
||||
for (int i = 0; i < 5; i++) {
|
||||
|
|
Loading…
Reference in New Issue