From 67ed6e18118cccdb9e0f1b56370daa608f5dce03 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Thu, 4 Mar 2010 16:46:18 +0000 Subject: [PATCH] LUCENE-2283: use shared byte[] pool to buffer pending stored fields & term vectors during indexing; fixes excessive memory usage for mixed tiny & big docs with many threads git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@919060 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 6 ++ .../apache/lucene/index/DocumentsWriter.java | 95 ++++++++++++++++--- .../lucene/index/StoredFieldsWriter.java | 9 +- .../lucene/index/TermVectorsTermsWriter.java | 9 +- src/java/org/apache/lucene/store/RAMFile.java | 27 +++--- .../apache/lucene/store/RAMOutputStream.java | 15 ++- .../apache/lucene/store/TestHugeRamFile.java | 2 +- 7 files changed, 119 insertions(+), 44 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a93eb415903..a99aa2c5bea 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -101,6 +101,12 @@ Bug fixes * LUCENE-2273: FieldCacheImpl.getCacheEntries() used WeakHashMap incorrectly and lead to ConcurrentModificationException. (Uwe Schindler, Robert Muir) + +* LUCENE-2283: Use shared memory pool for term vector and stored + fields buffers. This memory will be reclaimed if needed according to + the configured RAM Buffer Size for the IndexWriter. This also fixes + potentially excessive memory usage when many threads are indexing a + mix of small and large documents. (Tim Smith via Mike McCandless) New features diff --git a/src/java/org/apache/lucene/index/DocumentsWriter.java b/src/java/org/apache/lucene/index/DocumentsWriter.java index c80e7194a9a..3c1a6ca28a8 100644 --- a/src/java/org/apache/lucene/index/DocumentsWriter.java +++ b/src/java/org/apache/lucene/index/DocumentsWriter.java @@ -37,6 +37,7 @@ import org.apache.lucene.search.Similarity; import org.apache.lucene.search.Weight; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.RAMFile; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.Constants; import org.apache.lucene.util.ThreadInterruptedException; @@ -172,6 +173,46 @@ final class DocumentsWriter { this.next = next; } } + + /** + * Create and return a new DocWriterBuffer. + */ + PerDocBuffer newPerDocBuffer() { + return new PerDocBuffer(); + } + + /** + * RAMFile buffer for DocWriters. + */ + class PerDocBuffer extends RAMFile { + + /** + * Allocate bytes used from shared pool. + */ + protected byte[] newBuffer(int size) { + assert size == PER_DOC_BLOCK_SIZE; + return perDocAllocator.getByteBlock(false); + } + + /** + * Recycle the bytes used. + */ + synchronized void recycle() { + if (buffers.size() > 0) { + setLength(0); + + // Recycle the blocks + final int blockCount = buffers.size(); + + final byte[][] blocks = buffers.toArray( new byte[blockCount][] ); + perDocAllocator.recycleByteBlocks(blocks, 0, blockCount); + buffers.clear(); + sizeInBytes = 0; + + assert numBuffers() == 0; + } + } + } /** * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method @@ -1200,6 +1241,11 @@ final class DocumentsWriter { final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; private class ByteBlockAllocator extends ByteBlockPool.Allocator { + final int blockSize; + + ByteBlockAllocator(int blockSize) { + this.blockSize = blockSize; + } ArrayList freeByteBlocks = new ArrayList(); @@ -1216,12 +1262,12 @@ final class DocumentsWriter { // things that don't track allocations (term // vectors) and things that do (freq/prox // postings). - numBytesAlloc += BYTE_BLOCK_SIZE; - b = new byte[BYTE_BLOCK_SIZE]; + numBytesAlloc += blockSize; + b = new byte[blockSize]; } else b = freeByteBlocks.remove(size-1); if (trackAllocations) - numBytesUsed += BYTE_BLOCK_SIZE; + numBytesUsed += blockSize; assert numBytesUsed <= numBytesAlloc; return b; } @@ -1282,7 +1328,12 @@ final class DocumentsWriter { freeIntBlocks.add(blocks[i]); } - ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(); + ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE); + + final static int PER_DOC_BLOCK_SIZE = 1024; + + final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE); + /* Initial chunk size of the shared char[] blocks used to store term text */ @@ -1322,10 +1373,12 @@ final class DocumentsWriter { return nf.format(v/1024./1024.); } - /* We have three pools of RAM: Postings, byte blocks - * (holds freq/prox posting data) and char blocks (holds - * characters in the term). Different docs require - * varying amount of storage from these three classes. + /* We have four pools of RAM: Postings, byte blocks + * (holds freq/prox posting data), char blocks (holds + * characters in the term) and per-doc buffers (stored fields/term vectors). + * Different docs require varying amount of storage from + * these four classes. + * * For example, docs with many unique single-occurrence * short terms will use up the Postings RAM and hardly any * of the other two. Whereas docs with very large terms @@ -1349,6 +1402,7 @@ final class DocumentsWriter { " deletesMB=" + toMB(deletesRAMUsed) + " vs trigger=" + toMB(freeTrigger) + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + + " perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) + " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE)); final long startBytesAlloc = numBytesAlloc + deletesRAMUsed; @@ -1364,7 +1418,11 @@ final class DocumentsWriter { while(numBytesAlloc+deletesRAMUsed > freeLevel) { synchronized(this) { - if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) { + if (0 == perDocAllocator.freeByteBlocks.size() + && 0 == byteBlockAllocator.freeByteBlocks.size() + && 0 == freeCharBlocks.size() + && 0 == freeIntBlocks.size() + && !any) { // Nothing else to free -- must flush now. bufferIsFull = numBytesUsed+deletesRAMUsed > flushTrigger; if (infoStream != null) { @@ -1377,23 +1435,34 @@ final class DocumentsWriter { break; } - if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { + if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) { byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); numBytesAlloc -= BYTE_BLOCK_SIZE; } - if ((1 == iter % 4) && freeCharBlocks.size() > 0) { + if ((1 == iter % 5) && freeCharBlocks.size() > 0) { freeCharBlocks.remove(freeCharBlocks.size()-1); numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; } - if ((2 == iter % 4) && freeIntBlocks.size() > 0) { + if ((2 == iter % 5) && freeIntBlocks.size() > 0) { freeIntBlocks.remove(freeIntBlocks.size()-1); numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; } + + if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) { + // Remove upwards of 32 blocks (each block is 1K) + for (int i = 0; i < 32; ++i) { + perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1); + numBytesAlloc -= PER_DOC_BLOCK_SIZE; + if (perDocAllocator.freeByteBlocks.size() == 0) { + break; + } + } + } } - if ((3 == iter % 4) && any) + if ((4 == iter % 5) && any) // Ask consumer to free any recycled state any = consumer.freeRAM(); diff --git a/src/java/org/apache/lucene/index/StoredFieldsWriter.java b/src/java/org/apache/lucene/index/StoredFieldsWriter.java index 5b15d437cca..a599917aed4 100644 --- a/src/java/org/apache/lucene/index/StoredFieldsWriter.java +++ b/src/java/org/apache/lucene/index/StoredFieldsWriter.java @@ -166,14 +166,13 @@ final class StoredFieldsWriter { } class PerDoc extends DocumentsWriter.DocWriter { - - // TODO: use something more memory efficient; for small - // docs the 1024 buffer size of RAMOutputStream wastes alot - RAMOutputStream fdt = new RAMOutputStream(); + final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer(); + RAMOutputStream fdt = new RAMOutputStream(buffer); int numStoredFields; void reset() { fdt.reset(); + buffer.recycle(); numStoredFields = 0; } @@ -185,7 +184,7 @@ final class StoredFieldsWriter { @Override public long sizeInBytes() { - return fdt.sizeInBytes(); + return buffer.getSizeInBytes(); } @Override diff --git a/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java b/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java index 24c122a4249..fa05d3eef73 100644 --- a/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java +++ b/src/java/org/apache/lucene/index/TermVectorsTermsWriter.java @@ -248,9 +248,9 @@ final class TermVectorsTermsWriter extends TermsHashConsumer { class PerDoc extends DocumentsWriter.DocWriter { - // TODO: use something more memory efficient; for small - // docs the 1024 buffer size of RAMOutputStream wastes alot - RAMOutputStream perDocTvf = new RAMOutputStream(); + final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer(); + RAMOutputStream perDocTvf = new RAMOutputStream(buffer); + int numVectorFields; int[] fieldNumbers = new int[1]; @@ -258,6 +258,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer { void reset() { perDocTvf.reset(); + buffer.recycle(); numVectorFields = 0; } @@ -281,7 +282,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer { @Override public long sizeInBytes() { - return perDocTvf.sizeInBytes(); + return buffer.getSizeInBytes(); } @Override diff --git a/src/java/org/apache/lucene/store/RAMFile.java b/src/java/org/apache/lucene/store/RAMFile.java index 7c5ab6bb3d3..36306725cc1 100644 --- a/src/java/org/apache/lucene/store/RAMFile.java +++ b/src/java/org/apache/lucene/store/RAMFile.java @@ -20,44 +20,45 @@ package org.apache.lucene.store; import java.util.ArrayList; import java.io.Serializable; -class RAMFile implements Serializable { +/** @lucene.internal */ +public class RAMFile implements Serializable { private static final long serialVersionUID = 1l; - private ArrayList buffers = new ArrayList(); + protected ArrayList buffers = new ArrayList(); long length; RAMDirectory directory; - long sizeInBytes; + protected long sizeInBytes; // This is publicly modifiable via Directory.touchFile(), so direct access not supported private long lastModified = System.currentTimeMillis(); // File used as buffer, in no RAMDirectory - RAMFile() {} + protected RAMFile() {} RAMFile(RAMDirectory directory) { this.directory = directory; } // For non-stream access from thread that might be concurrent with writing - synchronized long getLength() { + public synchronized long getLength() { return length; } - synchronized void setLength(long length) { + protected synchronized void setLength(long length) { this.length = length; } // For non-stream access from thread that might be concurrent with writing - synchronized long getLastModified() { + public synchronized long getLastModified() { return lastModified; } - synchronized void setLastModified(long lastModified) { + protected synchronized void setLastModified(long lastModified) { this.lastModified = lastModified; } - final byte[] addBuffer(int size) { + protected final byte[] addBuffer(int size) { byte[] buffer = newBuffer(size); synchronized(this) { buffers.add(buffer); @@ -70,11 +71,11 @@ class RAMFile implements Serializable { return buffer; } - final synchronized byte[] getBuffer(int index) { + protected final synchronized byte[] getBuffer(int index) { return buffers.get(index); } - final synchronized int numBuffers() { + protected final synchronized int numBuffers() { return buffers.size(); } @@ -84,11 +85,11 @@ class RAMFile implements Serializable { * @param size size of allocated buffer. * @return allocated buffer. */ - byte[] newBuffer(int size) { + protected byte[] newBuffer(int size) { return new byte[size]; } - synchronized long getSizeInBytes() { + public synchronized long getSizeInBytes() { return sizeInBytes; } diff --git a/src/java/org/apache/lucene/store/RAMOutputStream.java b/src/java/org/apache/lucene/store/RAMOutputStream.java index 801a4028e00..5efdbc5a74a 100644 --- a/src/java/org/apache/lucene/store/RAMOutputStream.java +++ b/src/java/org/apache/lucene/store/RAMOutputStream.java @@ -40,7 +40,7 @@ public class RAMOutputStream extends IndexOutput { this(new RAMFile()); } - RAMOutputStream(RAMFile f) { + public RAMOutputStream(RAMFile f) { file = f; // make sure that we switch to the @@ -66,14 +66,13 @@ public class RAMOutputStream extends IndexOutput { } } - /** Resets this to an empty buffer. */ + /** Resets this to an empty file. */ public void reset() { - try { - seek(0); - } catch (IOException e) { // should never happen - throw new RuntimeException(e.toString()); - } - + currentBuffer = null; + currentBufferIndex = -1; + bufferPosition = 0; + bufferStart = 0; + bufferLength = 0; file.setLength(0); } diff --git a/src/test/org/apache/lucene/store/TestHugeRamFile.java b/src/test/org/apache/lucene/store/TestHugeRamFile.java index e3b9a31f10b..3530fdd6518 100755 --- a/src/test/org/apache/lucene/store/TestHugeRamFile.java +++ b/src/test/org/apache/lucene/store/TestHugeRamFile.java @@ -33,7 +33,7 @@ public class TestHugeRamFile extends LuceneTestCase { private long capacity = 0; private HashMap singleBuffers = new HashMap(); @Override - byte[] newBuffer(int size) { + protected byte[] newBuffer(int size) { capacity += size; if (capacity <= MAX_VALUE) { // below maxint we reuse buffers