LUCENE-9996: Reduce RAM usage of DWPT for a single document. (#184)

With this change, doc-value terms dictionaries use a shared `ByteBlockPool`
across all fields, and points, binary doc values and doc-value ordinals use
slightly smaller page sizes.
This commit is contained in:
Adrien Grand 2021-06-18 09:17:50 +02:00 committed by GitHub
parent 065026b74e
commit 1365156fcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 121 additions and 36 deletions

View File

@ -377,7 +377,9 @@ Improvements
Optimizations
---------------------
(No changes)
* LUCENE-9996: Improved memory efficiency of IndexWriter's RAM buffer, in
particular in the case of many fields and many indexing threads.
(Adrien Grand)
Bug Fixes
---------------------

View File

@ -38,8 +38,8 @@ class BinaryDocValuesWriter extends DocValuesWriter<BinaryDocValues> {
/** Maximum length for a binary field. */
private static final int MAX_LENGTH = ArrayUtil.MAX_ARRAY_LENGTH;
// 32 KB block sizes for PagedBytes storage:
private static final int BLOCK_BITS = 15;
// 4 kB block sizes for PagedBytes storage:
private static final int BLOCK_BITS = 12;
private final PagedBytes bytes;
private final DataOutput bytesOut;

View File

@ -64,6 +64,8 @@ final class IndexingChain implements Accountable {
// Writes postings and term vectors:
final TermsHash termsHash;
// Shared pool for doc-value terms
final ByteBlockPool docValuesBytePool;
// Writes stored fields
final StoredFieldsConsumer storedFieldsConsumer;
final TermVectorsConsumer termVectorsWriter;
@ -127,6 +129,7 @@ final class IndexingChain implements Accountable {
termsHash =
new FreqProxTermsWriter(
intBlockAllocator, byteBlockAllocator, bytesUsed, termVectorsWriter);
docValuesBytePool = new ByteBlockPool(byteBlockAllocator);
}
private void onAbortingException(Throwable th) {
@ -696,19 +699,19 @@ final class IndexingChain implements Accountable {
pf.docValuesWriter = new BinaryDocValuesWriter(fi, bytesUsed);
break;
case SORTED:
pf.docValuesWriter = new SortedDocValuesWriter(fi, bytesUsed);
pf.docValuesWriter = new SortedDocValuesWriter(fi, bytesUsed, docValuesBytePool);
break;
case SORTED_NUMERIC:
pf.docValuesWriter = new SortedNumericDocValuesWriter(fi, bytesUsed);
break;
case SORTED_SET:
pf.docValuesWriter = new SortedSetDocValuesWriter(fi, bytesUsed);
pf.docValuesWriter = new SortedSetDocValuesWriter(fi, bytesUsed, docValuesBytePool);
break;
default:
throw new AssertionError("unrecognized DocValues.Type: " + dvType);
}
if (fi.getPointDimensionCount() != 0) {
pf.pointValuesWriter = new PointValuesWriter(byteBlockAllocator, bytesUsed, fi);
pf.pointValuesWriter = new PointValuesWriter(bytesUsed, fi);
}
if (fi.getVectorDimension() != 0) {
pf.vectorValuesWriter = new VectorValuesWriter(fi, bytesUsed);

View File

@ -20,15 +20,17 @@ import java.io.IOException;
import org.apache.lucene.codecs.MutablePointValues;
import org.apache.lucene.codecs.PointsReader;
import org.apache.lucene.codecs.PointsWriter;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.ByteBlockPool;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.PagedBytes;
/** Buffers up pending byte[][] value(s) per doc, then flushes when segment flushes. */
class PointValuesWriter {
private final FieldInfo fieldInfo;
private final ByteBlockPool bytes;
private final PagedBytes bytes;
private final DataOutput bytesOut;
private final Counter iwBytesUsed;
private int[] docIDs;
private int numPoints;
@ -36,17 +38,18 @@ class PointValuesWriter {
private int lastDocID = -1;
private final int packedBytesLength;
PointValuesWriter(ByteBlockPool.Allocator allocator, Counter bytesUsed, FieldInfo fieldInfo) {
PointValuesWriter(Counter bytesUsed, FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
this.iwBytesUsed = bytesUsed;
this.bytes = new ByteBlockPool(allocator);
this.bytes = new PagedBytes(12);
bytesOut = bytes.getDataOutput();
docIDs = new int[16];
iwBytesUsed.addAndGet(16 * Integer.BYTES);
packedBytesLength = fieldInfo.getPointDimensionCount() * fieldInfo.getPointNumBytes();
}
// TODO: if exactly the same value is added to exactly the same doc, should we dedup?
public void addPackedValue(int docID, BytesRef value) {
public void addPackedValue(int docID, BytesRef value) throws IOException {
if (value == null) {
throw new IllegalArgumentException(
"field=" + fieldInfo.name + ": point value must not be null");
@ -65,7 +68,9 @@ class PointValuesWriter {
docIDs = ArrayUtil.grow(docIDs, numPoints + 1);
iwBytesUsed.addAndGet((docIDs.length - numPoints) * Integer.BYTES);
}
bytes.append(value);
final long bytesRamBytesUsedBefore = bytes.ramBytesUsed();
bytesOut.writeBytes(value.bytes, value.offset, value.length);
iwBytesUsed.addAndGet(bytes.ramBytesUsed() - bytesRamBytesUsedBefore);
docIDs[numPoints] = docID;
if (docID != lastDocID) {
numDocs++;
@ -86,6 +91,7 @@ class PointValuesWriter {
public void flush(SegmentWriteState state, Sorter.DocMap sortMap, PointsWriter writer)
throws IOException {
final PagedBytes.Reader bytesReader = bytes.freeze(false);
PointValues points =
new MutablePointValues() {
final int[] ords = new int[numPoints];
@ -164,14 +170,13 @@ class PointValuesWriter {
@Override
public void getValue(int i, BytesRef packedValue) {
final long offset = (long) packedBytesLength * ords[i];
packedValue.length = packedBytesLength;
bytes.setRawBytesRef(packedValue, offset);
bytesReader.fillSlice(packedValue, offset, packedBytesLength);
}
@Override
public byte getByteAt(int i, int k) {
final long offset = (long) packedBytesLength * ords[i] + k;
return bytes.readByte(offset);
return bytesReader.getByte(offset);
}
@Override

View File

@ -48,12 +48,12 @@ class SortedDocValuesWriter extends DocValuesWriter<SortedDocValues> {
private int[] finalSortedValues;
private int[] finalOrdMap;
public SortedDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) {
public SortedDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed, ByteBlockPool pool) {
this.fieldInfo = fieldInfo;
this.iwBytesUsed = iwBytesUsed;
hash =
new BytesRefHash(
new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(iwBytesUsed)),
pool,
BytesRefHash.DEFAULT_CAPACITY,
new DirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY, iwBytesUsed));
pending = PackedLongValues.deltaPackedBuilder(PackedInts.COMPACT);

View File

@ -55,12 +55,12 @@ class SortedSetDocValuesWriter extends DocValuesWriter<SortedSetDocValues> {
private int[] finalSortedValues;
private int[] finalOrdMap;
SortedSetDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed) {
SortedSetDocValuesWriter(FieldInfo fieldInfo, Counter iwBytesUsed, ByteBlockPool pool) {
this.fieldInfo = fieldInfo;
this.iwBytesUsed = iwBytesUsed;
hash =
new BytesRefHash(
new ByteBlockPool(new ByteBlockPool.DirectTrackingAllocator(iwBytesUsed)),
pool,
BytesRefHash.DEFAULT_CAPACITY,
new DirectBytesStartArray(BytesRefHash.DEFAULT_CAPACITY, iwBytesUsed));
pending = PackedLongValues.packedBuilder(PackedInts.COMPACT);

View File

@ -377,14 +377,6 @@ public final class ByteBlockPool implements Accountable {
}
}
/** Read a single byte at the given {@code offset}. */
public byte readByte(long offset) {
int bufferIndex = (int) (offset >> BYTE_BLOCK_SHIFT);
int pos = (int) (offset & BYTE_BLOCK_MASK);
byte[] buffer = buffers[bufferIndex];
return buffer[pos];
}
@Override
public long ramBytesUsed() {
long size = BASE_RAM_BYTES;

View File

@ -100,6 +100,17 @@ public final class PagedBytes implements Accountable {
}
}
/**
* Get the byte at the given offset.
*
* @lucene.internal
*/
public byte getByte(long o) {
final int index = (int) (o >> blockBits);
final int offset = (int) (o & blockMask);
return blocks[index][offset];
}
/**
* Reads length as 1 or 2 byte vInt prefix, starting at <i>start</i>.
*

View File

@ -29,7 +29,7 @@ public class PackedLongValues extends LongValues implements Accountable {
private static final long BASE_RAM_BYTES_USED =
RamUsageEstimator.shallowSizeOfInstance(PackedLongValues.class);
static final int DEFAULT_PAGE_SIZE = 1024;
static final int DEFAULT_PAGE_SIZE = 256;
static final int MIN_PAGE_SIZE = 64;
// More than 1M doesn't really makes sense with these appending buffers
// since their goal is to try to have small numbers of bits per value

View File

@ -17,14 +17,30 @@
package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.analysis.*;
import java.util.function.Function;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.analysis.MockTokenizer;
import org.apache.lucene.analysis.TokenFilter;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
import org.apache.lucene.analysis.tokenattributes.PositionIncrementAttribute;
import org.apache.lucene.document.BinaryDocValuesField;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.FieldType;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.document.VectorField;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.AttributeSource;
@ -32,6 +48,8 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
public class TestDocumentWriter extends LuceneTestCase {
private Directory dir;
@ -307,4 +325,63 @@ public class TestDocumentWriter extends LuceneTestCase {
fi.fieldInfo("f2").getIndexOptions());
reader.close();
}
/** Make sure that every new field doesn't increment memory usage by more than 16kB */
private void doTestRAMUsage(Function<String, IndexableField> fieldSupplier) throws IOException {
try (Directory dir = newDirectory();
IndexWriter w =
new IndexWriter(
dir,
newIndexWriterConfig()
.setMaxBufferedDocs(10)
.setRAMBufferSizeMB(IndexWriterConfig.DISABLE_AUTO_FLUSH))) {
Document doc = new Document();
final int numFields = 100;
for (int i = 0; i < numFields; ++i) {
doc.add(fieldSupplier.apply("f" + i));
}
w.addDocument(doc);
assertTrue(w.hasChangesInRam());
MatcherAssert.assertThat(w.ramBytesUsed(), Matchers.lessThan(numFields * 16384L));
}
}
public void testRAMUsageStored() throws IOException {
doTestRAMUsage(field -> new StoredField(field, new BytesRef("Lucene")));
}
public void testRAMUsageIndexed() throws IOException {
doTestRAMUsage(field -> new StringField(field, new BytesRef("Lucene"), Store.NO));
}
public void testRAMUsagePoint() throws IOException {
doTestRAMUsage(field -> new IntPoint(field, 42));
}
public void testRAMUsageNumericDocValue() throws IOException {
doTestRAMUsage(field -> new NumericDocValuesField(field, 42));
}
public void testRAMUsageSortedDocValue() throws IOException {
doTestRAMUsage(field -> new SortedDocValuesField(field, new BytesRef("Lucene")));
}
public void testRAMUsageBinaryDocValue() throws IOException {
doTestRAMUsage(field -> new BinaryDocValuesField(field, new BytesRef("Lucene")));
}
public void testRAMUsageSortedNumericDocValue() throws IOException {
doTestRAMUsage(field -> new SortedNumericDocValuesField(field, 42));
}
public void testRAMUsageSortedSetDocValue() throws IOException {
doTestRAMUsage(field -> new SortedSetDocValuesField(field, new BytesRef("Lucene")));
}
public void testRAMUsageVector() throws IOException {
doTestRAMUsage(
field ->
new VectorField(
field, new float[] {1, 2, 3, 4}, VectorValues.SimilarityFunction.EUCLIDEAN));
}
}

View File

@ -45,18 +45,12 @@ public class TestByteBlockPool extends LuceneTestCase {
for (BytesRef expected : list) {
ref.grow(expected.length);
ref.setLength(expected.length);
switch (random().nextInt(3)) {
switch (random().nextInt(2)) {
case 0:
// copy bytes
pool.readBytes(position, ref.bytes(), 0, ref.length());
break;
case 1:
// copy bytes one by one
for (int i = 0; i < ref.length(); ++i) {
ref.setByteAt(i, pool.readByte(position + i));
}
break;
case 2:
BytesRef scratch = new BytesRef();
scratch.length = ref.length();
pool.setRawBytesRef(scratch, position);

View File

@ -87,6 +87,7 @@ public class TestPagedBytes extends LuceneTestCase {
final BytesRef slice = new BytesRef();
for (int iter2 = 0; iter2 < 100; iter2++) {
final int pos = random.nextInt(numBytes - 1);
assertEquals(answer[pos], reader.getByte(pos));
final int len = random.nextInt(Math.min(blockSize + 1, numBytes - pos));
reader.fillSlice(slice, pos, len);
for (int byteUpto = 0; byteUpto < len; byteUpto++) {