LUCENE-9511: Include StoredFieldsWriter in DWPT accounting (#1839)

StoredFieldsWriter might consume some heap space memory that
can have a significant impact on decisions made in the IW if
writers should be stalled or DWPTs should be flushed if memory
settings are small in IWC and flushes are frequent. This change adds
RAM accounting to the StoredFieldsWriter since it's part of the
DWPT lifecycle and not just present during flush.
This commit is contained in:
Simon Willnauer 2020-09-08 18:18:13 +02:00 committed by GitHub
parent a46316e156
commit 98e55f0ea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 113 additions and 58 deletions

View File

@ -206,6 +206,10 @@ Improvements
* LUCENE-9501: Improve how Asserting* test classes handle singleton doc values.
* LUCENE-9511: Include StoredFieldsWriter in DWPT accounting to ensure that it's
heap consumption is taken into account when IndexWriter stalls or should flush
DWPTs. (Simon Willnauer)
Optimizations
---------------------

View File

@ -39,8 +39,6 @@ import org.apache.lucene.util.IOUtils;
*/
public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
private int numDocsWritten = 0;
private final Directory directory;
private final String segment;
private IndexOutput out;
final static String FIELDS_EXTENSION = "fld";
@ -62,8 +60,6 @@ public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
private final BytesRefBuilder scratch = new BytesRefBuilder();
public SimpleTextStoredFieldsWriter(Directory directory, String segment, IOContext context) throws IOException {
this.directory = directory;
this.segment = segment;
boolean success = false;
try {
out = directory.createOutput(IndexFileNames.segmentFileName(segment, "", FIELDS_EXTENSION), context);
@ -181,4 +177,9 @@ public class SimpleTextStoredFieldsWriter extends StoredFieldsWriter {
private void newLine() throws IOException {
SimpleTextUtil.writeNewline(out);
}
@Override
public long ramBytesUsed() {
return Integer.BYTES; // something > 0
}
}

View File

@ -33,6 +33,7 @@ import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.IndexableFieldType;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
@ -51,7 +52,7 @@ import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
*
* @lucene.experimental
*/
public abstract class StoredFieldsWriter implements Closeable {
public abstract class StoredFieldsWriter implements Closeable, Accountable {
/** Sole constructor. (For invocation by subclass
* constructors, typically implicit.) */

View File

@ -677,7 +677,7 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
private final int maxDoc;
int docID = -1;
public CompressingStoredFieldsMergeSub(CompressingStoredFieldsReader reader, MergeState.DocMap docMap, int maxDoc) {
CompressingStoredFieldsMergeSub(CompressingStoredFieldsReader reader, MergeState.DocMap docMap, int maxDoc) {
super(docMap);
this.maxDoc = maxDoc;
this.reader = reader;
@ -693,4 +693,9 @@ public final class CompressingStoredFieldsWriter extends StoredFieldsWriter {
}
}
}
@Override
public long ramBytesUsed() {
return bufferedDocs.ramBytesUsed() + numStoredFields.length * Integer.BYTES + endOffsets.length * Integer.BYTES;
}
}

View File

@ -52,7 +52,7 @@ import org.apache.lucene.util.RamUsageEstimator;
/** Default general purpose indexing chain, which handles
* indexing all types of fields. */
final class DefaultIndexingChain extends DocConsumer {
final Counter bytesUsed;
final Counter bytesUsed = Counter.newCounter();
final DocumentsWriterPerThread docWriter;
final FieldInfos.Builder fieldInfos;
@ -74,21 +74,20 @@ final class DefaultIndexingChain extends DocConsumer {
private PerField[] fields = new PerField[1];
private final InfoStream infoStream;
public DefaultIndexingChain(DocumentsWriterPerThread docWriter) {
DefaultIndexingChain(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
this.fieldInfos = docWriter.getFieldInfosBuilder();
this.bytesUsed = docWriter.bytesUsed;
this.infoStream = docWriter.getIndexWriterConfig().getInfoStream();
final TermsHash termVectorsWriter;
if (docWriter.getSegmentInfo().getIndexSort() == null) {
storedFieldsConsumer = new StoredFieldsConsumer(docWriter);
storedFieldsConsumer = new StoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo());
termVectorsWriter = new TermVectorsConsumer(docWriter);
} else {
storedFieldsConsumer = new SortingStoredFieldsConsumer(docWriter);
storedFieldsConsumer = new SortingStoredFieldsConsumer(docWriter.codec, docWriter.directory, docWriter.getSegmentInfo());
termVectorsWriter = new SortingTermVectorsConsumer(docWriter);
}
termsHash = new FreqProxTermsWriter(docWriter, termVectorsWriter);
termsHash = new FreqProxTermsWriter(docWriter, bytesUsed, termVectorsWriter);
}
private LeafReader getDocValuesLeafReader() {
@ -302,7 +301,6 @@ final class DefaultIndexingChain extends DocConsumer {
/** Writes all buffered doc values (called from {@link #flush}). */
private void writeDocValues(SegmentWriteState state, Sorter.DocMap sortMap) throws IOException {
int maxDoc = state.segmentInfo.maxDoc();
DocValuesConsumer dvConsumer = null;
boolean success = false;
try {
@ -582,7 +580,7 @@ final class DefaultIndexingChain extends DocConsumer {
fp.fieldInfo.setPointDimensions(pointDimensionCount, pointIndexDimensionCount, dimensionNumBytes);
if (fp.pointValuesWriter == null) {
fp.pointValuesWriter = new PointValuesWriter(docWriter, fp.fieldInfo);
fp.pointValuesWriter = new PointValuesWriter(docWriter.byteBlockAllocator, bytesUsed, fp.fieldInfo);
}
fp.pointValuesWriter.addPackedValue(docID, field.binaryValue());
}
@ -774,6 +772,11 @@ final class DefaultIndexingChain extends DocConsumer {
info.setIndexOptions(indexOptions);
}
@Override
public long ramBytesUsed() {
return bytesUsed.get() + storedFieldsConsumer.ramBytesUsed();
}
/** NOTE: not static: accesses at least docState, termsHash. */
private final class PerField implements Comparable<PerField> {

View File

@ -20,8 +20,9 @@ package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Accountable;
abstract class DocConsumer {
abstract class DocConsumer implements Accountable {
abstract void processDocument(int docId, Iterable<? extends IndexableField> document) throws IOException;
abstract Sorter.DocMap flush(final SegmentWriteState state) throws IOException;
abstract void abort() throws IOException;

View File

@ -180,7 +180,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
} else {
flushPolicy.onInsert(this, perThread);
}
if (!perThread.isFlushPending() && perThread.bytesUsed() > hardMaxBytesPerDWPT) {
if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
@ -674,7 +674,7 @@ final class DocumentsWriterFlushControl implements Accountable, Closeable {
int count = 0;
for (DocumentsWriterPerThread next : perThreadPool) {
if (next.isFlushPending() == false && next.getNumDocsInRAM() > 0) {
final long nextRam = next.bytesUsed();
final long nextRam = next.ramBytesUsed();
if (infoStream.isEnabled("FP")) {
infoStream.message("FP", "thread state has " + nextRam + " bytes; docInRAM=" + next.getNumDocsInRAM());
}

View File

@ -18,8 +18,10 @@ package org.apache.lucene.index;
import java.io.IOException;
import java.text.NumberFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
@ -33,6 +35,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.TrackingDirectoryWrapper;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.ByteBlockPool.Allocator;
@ -48,7 +51,7 @@ import org.apache.lucene.util.Version;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_MASK;
import static org.apache.lucene.util.ByteBlockPool.BYTE_BLOCK_SIZE;
final class DocumentsWriterPerThread {
final class DocumentsWriterPerThread implements Accountable {
LiveIndexWriterConfig getIndexWriterConfig() {
return indexWriterConfig;
@ -132,7 +135,7 @@ final class DocumentsWriterPerThread {
final Codec codec;
final TrackingDirectoryWrapper directory;
private final DocConsumer consumer;
final Counter bytesUsed;
private final Counter bytesUsed;
// Updates for our still-in-RAM (to be flushed next) segment
private final BufferedUpdates pendingUpdates;
@ -292,7 +295,7 @@ final class DocumentsWriterPerThread {
for (int docId = from; docId < to; docId++) {
deleteDocIDs[numDeletedDocIds++] = docId;
}
bytesUsed.addAndGet((deleteDocIDs.length - size) * Integer.SIZE);
bytesUsed.addAndGet((deleteDocIDs.length - size) * Integer.BYTES);
// NOTE: we do not trigger flush here. This is
// potentially a RAM leak, if you have an app that tries
// to add docs but every single doc always hits a
@ -338,8 +341,8 @@ final class DocumentsWriterPerThread {
assert deleteSlice.isEmpty() : "all deletes must be applied in prepareFlush";
segmentInfo.setMaxDoc(numDocsInRAM);
final SegmentWriteState flushState = new SegmentWriteState(infoStream, directory, segmentInfo, fieldInfos.finish(),
pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, bytesUsed())));
final double startMBUsed = bytesUsed() / 1024. / 1024.;
pendingUpdates, new IOContext(new FlushInfo(numDocsInRAM, ramBytesUsed())));
final double startMBUsed = ramBytesUsed() / 1024. / 1024.;
// Apply delete-by-docID now (delete-byDocID only
// happens when an exception is hit processing that
@ -351,7 +354,7 @@ final class DocumentsWriterPerThread {
flushState.liveDocs.clear(deleteDocIDs[i]);
}
flushState.delCountOnFlush = numDeletedDocIds;
bytesUsed.addAndGet(-(deleteDocIDs.length * Integer.SIZE));
bytesUsed.addAndGet(-(deleteDocIDs.length * Integer.BYTES));
deleteDocIDs = null;
}
@ -545,12 +548,18 @@ final class DocumentsWriterPerThread {
return segmentInfo;
}
long bytesUsed() {
return bytesUsed.get() + pendingUpdates.ramBytesUsed();
@Override
public long ramBytesUsed() {
return bytesUsed.get() + pendingUpdates.ramBytesUsed() + consumer.ramBytesUsed();
}
@Override
public Collection<Accountable> getChildResources() {
return List.of(pendingUpdates, consumer);
}
/* Initial chunks size of the shared byte[] blocks used to
store postings data */
store postings data */
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
/* if you increase this, you must fix field cache impl for
@ -613,13 +622,13 @@ final class DocumentsWriterPerThread {
}
/**
* Commits the current {@link #bytesUsed()} and stores it's value for later reuse.
* Commits the current {@link #ramBytesUsed()} and stores it's value for later reuse.
* The last committed bytes used can be retrieved via {@link #getLastCommittedBytesUsed()}
* @return the delta between the current {@link #bytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
* @return the delta between the current {@link #ramBytesUsed()} and the current {@link #getLastCommittedBytesUsed()}
*/
long commitLastBytesUsed() {
assert isHeldByCurrentThread();
long delta = bytesUsed() - lastCommittedBytesUsed;
long delta = ramBytesUsed() - lastCommittedBytesUsed;
lastCommittedBytesUsed += delta;
return delta;
}

View File

@ -32,6 +32,7 @@ import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TimSorter;
@ -39,8 +40,8 @@ import org.apache.lucene.util.automaton.CompiledAutomaton;
final class FreqProxTermsWriter extends TermsHash {
public FreqProxTermsWriter(DocumentsWriterPerThread docWriter, TermsHash termVectors) {
super(docWriter, true, termVectors);
FreqProxTermsWriter(DocumentsWriterPerThread docWriter, Counter bytesUsed, TermsHash termVectors) {
super(docWriter, bytesUsed, termVectors);
}
private void applyDeletes(SegmentWriteState state, Fields fields) throws IOException {

View File

@ -37,10 +37,10 @@ class PointValuesWriter {
private int lastDocID = -1;
private final int packedBytesLength;
public PointValuesWriter(DocumentsWriterPerThread docWriter, FieldInfo fieldInfo) {
PointValuesWriter(ByteBlockPool.Allocator allocator, Counter bytesUsed, FieldInfo fieldInfo) {
this.fieldInfo = fieldInfo;
this.iwBytesUsed = docWriter.bytesUsed;
this.bytes = new ByteBlockPool(docWriter.byteBlockAllocator);
this.iwBytesUsed = bytesUsed;
this.bytes = new ByteBlockPool(allocator);
docIDs = new int[16];
iwBytesUsed.addAndGet(16 * Integer.BYTES);
packedBytesLength = fieldInfo.getPointDimensionCount() * fieldInfo.getPointNumBytes();

View File

@ -24,9 +24,11 @@ import java.util.Objects;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
@ -34,16 +36,15 @@ import org.apache.lucene.util.IOUtils;
final class SortingStoredFieldsConsumer extends StoredFieldsConsumer {
TrackingTmpOutputDirectoryWrapper tmpDirectory;
SortingStoredFieldsConsumer(DocumentsWriterPerThread docWriter) {
super(docWriter);
SortingStoredFieldsConsumer(Codec codec, Directory directory, SegmentInfo info) {
super(codec, directory, info);
}
@Override
protected void initStoredFieldsWriter() throws IOException {
if (writer == null) {
this.tmpDirectory = new TrackingTmpOutputDirectoryWrapper(docWriter.directory);
this.writer = docWriter.codec.storedFieldsFormat().fieldsWriter(tmpDirectory, docWriter.getSegmentInfo(),
IOContext.DEFAULT);
this.tmpDirectory = new TrackingTmpOutputDirectoryWrapper(directory);
this.writer = codec.storedFieldsFormat().fieldsWriter(tmpDirectory, info, IOContext.DEFAULT);
}
}
@ -57,10 +58,10 @@ final class SortingStoredFieldsConsumer extends StoredFieldsConsumer {
}
return;
}
StoredFieldsReader reader = docWriter.codec.storedFieldsFormat()
StoredFieldsReader reader = codec.storedFieldsFormat()
.fieldsReader(tmpDirectory, state.segmentInfo, state.fieldInfos, IOContext.DEFAULT);
StoredFieldsReader mergeReader = reader.getMergeInstance();
StoredFieldsWriter sortWriter = docWriter.codec.storedFieldsFormat()
StoredFieldsWriter sortWriter = codec.storedFieldsFormat()
.fieldsWriter(state.directory, state.segmentInfo, IOContext.DEFAULT);
try {
reader.checkIntegrity();

View File

@ -33,7 +33,7 @@ import org.apache.lucene.util.IOUtils;
final class SortingTermVectorsConsumer extends TermVectorsConsumer {
TrackingTmpOutputDirectoryWrapper tmpDirectory;
public SortingTermVectorsConsumer(DocumentsWriterPerThread docWriter) {
SortingTermVectorsConsumer(DocumentsWriterPerThread docWriter) {
super(docWriter);
}
@ -71,7 +71,7 @@ final class SortingTermVectorsConsumer extends TermVectorsConsumer {
@Override
void initTermVectorsWriter() throws IOException {
if (writer == null) {
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.bytesUsed()));
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed()));
tmpDirectory = new TrackingTmpOutputDirectoryWrapper(docWriter.directory);
writer = docWriter.codec.termVectorsFormat().vectorsWriter(tmpDirectory, docWriter.getSegmentInfo(), context);
lastDocID = 0;

View File

@ -19,25 +19,29 @@ package org.apache.lucene.index;
import java.io.IOException;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
class StoredFieldsConsumer {
final DocumentsWriterPerThread docWriter;
final Codec codec;
final Directory directory;
final SegmentInfo info;
StoredFieldsWriter writer;
int lastDoc;
private int lastDoc;
StoredFieldsConsumer(DocumentsWriterPerThread docWriter) {
this.docWriter = docWriter;
StoredFieldsConsumer(Codec codec, Directory directory, SegmentInfo info) {
this.codec = codec;
this.directory = directory;
this.info = info;
this.lastDoc = -1;
}
protected void initStoredFieldsWriter() throws IOException {
if (writer == null) {
this.writer =
docWriter.codec.storedFieldsFormat().fieldsWriter(docWriter.directory, docWriter.getSegmentInfo(),
IOContext.DEFAULT);
if (writer == null) { // TODO can we allocate this in the ctor? we call start document for every doc anyway
this.writer = codec.storedFieldsFormat().fieldsWriter(directory, info, IOContext.DEFAULT);
}
}
@ -82,4 +86,8 @@ class StoredFieldsConsumer {
writer = null;
}
}
long ramBytesUsed() {
return writer == null ? 0 : writer.ramBytesUsed();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.lucene.store.FlushInfo;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
@ -49,7 +50,7 @@ class TermVectorsConsumer extends TermsHash {
private TermVectorsConsumerPerField[] perFields = new TermVectorsConsumerPerField[1];
TermVectorsConsumer(DocumentsWriterPerThread docWriter) {
super(docWriter, false, null);
super(docWriter, Counter.newCounter(), null);
this.docWriter = docWriter;
}
@ -84,7 +85,7 @@ class TermVectorsConsumer extends TermsHash {
void initTermVectorsWriter() throws IOException {
if (writer == null) {
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.bytesUsed()));
IOContext context = new IOContext(new FlushInfo(docWriter.getNumDocsInRAM(), docWriter.ramBytesUsed()));
writer = docWriter.codec.termVectorsFormat().vectorsWriter(docWriter.directory, docWriter.getSegmentInfo(), context);
lastDocID = 0;
}

View File

@ -40,12 +40,10 @@ abstract class TermsHash {
final ByteBlockPool bytePool;
ByteBlockPool termBytePool;
final Counter bytesUsed;
final boolean trackAllocations;
TermsHash(final DocumentsWriterPerThread docWriter, boolean trackAllocations, TermsHash nextTermsHash) {
this.trackAllocations = trackAllocations;
TermsHash(final DocumentsWriterPerThread docWriter, Counter bytesUsed, TermsHash nextTermsHash) {
this.nextTermsHash = nextTermsHash;
this.bytesUsed = trackAllocations ? docWriter.bytesUsed : Counter.newCounter();
this.bytesUsed = bytesUsed;
intPool = new IntBlockPool(docWriter.intBlockAllocator);
bytePool = new ByteBlockPool(docWriter.byteBlockAllocator);

View File

@ -267,7 +267,7 @@ public class TestFlushByRamOrCountsPolicy extends LuceneTestCase {
long bytesUsed = 0;
while (allActiveWriter.hasNext()) {
DocumentsWriterPerThread next = allActiveWriter.next();
bytesUsed += next.bytesUsed();
bytesUsed += next.ramBytesUsed();
}
assertEquals(bytesUsed, flushControl.activeBytes());
}

View File

@ -161,5 +161,15 @@ public class AssertingStoredFieldsFormat extends StoredFieldsFormat {
in.close();
in.close(); // close again
}
@Override
public long ramBytesUsed() {
return in.ramBytesUsed();
}
@Override
public Collection<Accountable> getChildResources() {
return in.getChildResources();
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.lucene.codecs.cranky;
import java.io.IOException;
import java.util.Collection;
import java.util.Random;
import org.apache.lucene.codecs.StoredFieldsFormat;
@ -29,6 +30,7 @@ import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
class CrankyStoredFieldsFormat extends StoredFieldsFormat {
final StoredFieldsFormat delegate;
@ -111,5 +113,15 @@ class CrankyStoredFieldsFormat extends StoredFieldsFormat {
}
delegate.writeField(info, field);
}
@Override
public long ramBytesUsed() {
return delegate.ramBytesUsed();
}
@Override
public Collection<Accountable> getChildResources() {
return delegate.getChildResources();
}
}
}