LUCENE-9529: Track dirtiness of stored fields via a number of docs, not chunks. (#1882)

The problem of tracking dirtiness via numbers of chunks is that larger
chunks make stored fields readers more likely to be considered dirty, so
I'm trying to work around it by tracking numbers of docs instead.
This commit is contained in:
Adrien Grand 2020-09-17 18:59:08 +02:00 committed by GitHub
parent e0a64908d8
commit 33f7280078
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 60 additions and 37 deletions

View File

@ -90,8 +90,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
private final int numDocs;
private final boolean merging;
private final BlockState state;
private final long numChunks; // number of compressed blocks written
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private boolean closed;
// used by clone
@ -106,8 +106,8 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
this.compressionMode = reader.compressionMode;
this.decompressor = reader.decompressor.clone();
this.numDocs = reader.numDocs;
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.merging = merging;
this.state = new BlockState();
this.closed = false;
@ -187,15 +187,13 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
this.indexReader = indexReader;
if (version >= VERSION_META) {
numChunks = metaIn.readVLong();
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();
} else {
fieldsStream.seek(maxPointer);
numChunks = fieldsStream.readVLong();
numDirtyChunks = fieldsStream.readVLong();
}
if (numDirtyChunks > numChunks) {
throw new CorruptIndexException("invalid chunk counts: dirty=" + numDirtyChunks + ", total=" + numChunks, fieldsStream);
// Old versions of this format did not record numDirtyDocs. Since bulk
// merges are disabled on version increments anyway, we make no effort
// to get valid values of numDirtyChunks and numDirtyDocs.
numDirtyChunks = numDirtyDocs = -1;
}
if (metaIn != null) {
@ -690,14 +688,26 @@ public final class CompressingStoredFieldsReader extends StoredFieldsReader {
return chunkSize;
}
long getNumChunks() {
return numChunks;
long getNumDirtyDocs() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException("getNumDirtyDocs should only ever get called when the reader is on the current version");
}
assert numDirtyDocs >= 0;
return numDirtyDocs;
}
long getNumDirtyChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException("getNumDirtyChunks should only ever get called when the reader is on the current version");
}
assert numDirtyChunks >= 0;
return numDirtyChunks;
}
int getNumDocs() {
return numDocs;
}
int getPackedIntsVersion() {
return packedIntsVersion;
}

View File

@ -95,8 +95,8 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
private int docBase; // doc ID at the beginning of the chunk
private int numBufferedDocs; // docBase + numBufferedDocs == current doc ID
private long numChunks; // number of compressed blocks written
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
/** Sole constructor. */
CompressingStoredFieldsWriter(Directory directory, SegmentInfo si, String segmentSuffix, IOContext context,
@ -252,7 +252,6 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
docBase += numBufferedDocs;
numBufferedDocs = 0;
bufferedDocs.reset();
numChunks++;
}
@Override
@ -468,8 +467,10 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (numBufferedDocs > 0) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs = Math.min(maxDocsPerChunk, (long) ((double) chunkSize / bufferedDocs.size() * numBufferedDocs));
numDirtyDocs += expectedChunkDocs - numBufferedDocs;
flush();
} else {
assert bufferedDocs.size() == 0;
}
@ -477,8 +478,8 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
throw new RuntimeException("Wrote " + docBase + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, fieldsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
CodecUtil.writeFooter(fieldsStream);
assert bufferedDocs.size() == 0;
@ -632,8 +633,8 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numChunks += matchingFieldsReader.getNumChunks();
numDirtyChunks += matchingFieldsReader.getNumDirtyChunks();
numDirtyDocs += matchingFieldsReader.getNumDirtyDocs();
} else {
// optimized merge, we copy serialized (but decompressed) bytes directly
// even on simple docs (1 stored field), it seems to help by about 20%
@ -669,7 +670,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
boolean tooDirty(CompressingStoredFieldsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024 ||
candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs();
}
private static class CompressingStoredFieldsMergeSub extends DocIDMerger.Sub {

View File

@ -85,8 +85,8 @@ public final class CompressingTermVectorsReader extends TermVectorsReader implem
private final int numDocs;
private boolean closed;
private final BlockPackedReaderIterator reader;
private final long numChunks; // number of compressed blocks written
private final long numDirtyChunks; // number of incomplete compressed blocks written
private final long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
private final long maxPointer; // end of the data section
// used by clone
@ -101,8 +101,8 @@ public final class CompressingTermVectorsReader extends TermVectorsReader implem
this.numDocs = reader.numDocs;
this.reader = new BlockPackedReaderIterator(vectorsStream, packedIntsVersion, PACKED_BLOCK_SIZE, 0);
this.version = reader.version;
this.numChunks = reader.numChunks;
this.numDirtyChunks = reader.numDirtyChunks;
this.numDirtyDocs = reader.numDirtyDocs;
this.maxPointer = reader.maxPointer;
this.closed = false;
}
@ -178,15 +178,13 @@ public final class CompressingTermVectorsReader extends TermVectorsReader implem
this.maxPointer = maxPointer;
if (version >= VERSION_META) {
numChunks = metaIn.readVLong();
numDirtyChunks = metaIn.readVLong();
numDirtyDocs = metaIn.readVLong();
} else {
vectorsStream.seek(maxPointer);
numChunks = vectorsStream.readVLong();
numDirtyChunks = vectorsStream.readVLong();
}
if (numDirtyChunks > numChunks) {
throw new CorruptIndexException("invalid chunk counts: dirty=" + numDirtyChunks + ", total=" + numChunks, vectorsStream);
// Old versions of this format did not record numDirtyDocs. Since bulk
// merges are disabled on version increments anyway, we make no effort
// to get valid values of numDirtyChunks and numDirtyDocs.
numDirtyChunks = numDirtyDocs = -1;
}
decompressor = compressionMode.newDecompressor();
@ -240,14 +238,26 @@ public final class CompressingTermVectorsReader extends TermVectorsReader implem
return maxPointer;
}
long getNumChunks() {
return numChunks;
long getNumDirtyDocs() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException("getNumDirtyDocs should only ever get called when the reader is on the current version");
}
assert numDirtyDocs >= 0;
return numDirtyDocs;
}
long getNumDirtyChunks() {
if (version != VERSION_CURRENT) {
throw new IllegalStateException("getNumDirtyChunks should only ever get called when the reader is on the current version");
}
assert numDirtyChunks >= 0;
return numDirtyChunks;
}
int getNumDocs() {
return numDocs;
}
/**
* @throws AlreadyClosedException if this TermVectorsReader is closed
*/

View File

@ -87,9 +87,9 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
private final CompressionMode compressionMode;
private final Compressor compressor;
private final int chunkSize;
private long numChunks; // number of compressed blocks written
private long numDirtyChunks; // number of incomplete compressed blocks written
private long numDirtyDocs; // cumulative number of missing docs in incomplete chunks
/** a pending doc */
private class DocData {
@ -376,7 +376,6 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
curDoc = null;
curField = null;
termSuffixes.reset();
numChunks++;
}
private int flushNumFields(int chunkDocs) throws IOException {
@ -650,15 +649,17 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
@Override
public void finish(FieldInfos fis, int numDocs) throws IOException {
if (!pendingDocs.isEmpty()) {
flush();
numDirtyChunks++; // incomplete: we had to force this flush
final long expectedChunkDocs = Math.min(MAX_DOCUMENTS_PER_CHUNK, (long) ((double) chunkSize / termSuffixes.size() * pendingDocs.size()));
numDirtyDocs += expectedChunkDocs - pendingDocs.size();
flush();
}
if (numDocs != this.numDocs) {
throw new RuntimeException("Wrote " + this.numDocs + " docs, finish called with numDocs=" + numDocs);
}
indexWriter.finish(numDocs, vectorsStream.getFilePointer(), metaStream);
metaStream.writeVLong(numChunks);
metaStream.writeVLong(numDirtyChunks);
metaStream.writeVLong(numDirtyDocs);
CodecUtil.writeFooter(metaStream);
CodecUtil.writeFooter(vectorsStream);
}
@ -822,8 +823,8 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
}
// since we bulk merged all chunks, we inherit any dirty ones from this segment.
numChunks += matchingVectorsReader.getNumChunks();
numDirtyChunks += matchingVectorsReader.getNumDirtyChunks();
numDirtyDocs += matchingVectorsReader.getNumDirtyDocs();
} else {
// naive merge...
if (vectorsReader != null) {
@ -858,7 +859,7 @@ public final class CompressingTermVectorsWriter extends TermVectorsWriter {
boolean tooDirty(CompressingTermVectorsReader candidate) {
// more than 1% dirty, or more than hard limit of 1024 dirty chunks
return candidate.getNumDirtyChunks() > 1024 ||
candidate.getNumDirtyChunks() * 100 > candidate.getNumChunks();
candidate.getNumDirtyDocs() * 100 > candidate.getNumDocs();
}
@Override

View File

@ -297,7 +297,8 @@ public class TestCompressingStoredFieldsFormat extends BaseStoredFieldsFormatTes
for (LeafReaderContext leaf : ir2.leaves()) {
CodecReader sr = (CodecReader) leaf.reader();
CompressingStoredFieldsReader reader = (CompressingStoredFieldsReader)sr.getFieldsReader();
assertEquals(1, reader.getNumChunks());
assertTrue(reader.getNumDirtyDocs() > 0);
assertTrue(reader.getNumDirtyDocs() < 100); // can't be gte the number of docs per chunk
assertEquals(1, reader.getNumDirtyChunks());
}
}

View File

@ -102,7 +102,7 @@ public class TestCompressingTermVectorsFormat extends BaseTermVectorsFormatTestC
for (LeafReaderContext leaf : ir2.leaves()) {
CodecReader sr = (CodecReader) leaf.reader();
CompressingTermVectorsReader reader = (CompressingTermVectorsReader)sr.getTermVectorsReader();
assertEquals(1, reader.getNumChunks());
assertTrue(reader.getNumDirtyDocs() > 0);
assertEquals(1, reader.getNumDirtyChunks());
}
}