LUCENE-2283: use shared byte[] pool to buffer pending stored fields & term vectors during indexing; fixes excessive memory usage for mixed tiny & big docs with many threads

git-svn-id: https://svn.apache.org/repos/asf/lucene/java/trunk@919060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael McCandless 2010-03-04 16:46:18 +00:00
parent 1793470004
commit 67ed6e1811
7 changed files with 119 additions and 44 deletions

View File

@ -101,6 +101,12 @@ Bug fixes
* LUCENE-2273: FieldCacheImpl.getCacheEntries() used WeakHashMap * LUCENE-2273: FieldCacheImpl.getCacheEntries() used WeakHashMap
incorrectly and lead to ConcurrentModificationException. incorrectly and lead to ConcurrentModificationException.
(Uwe Schindler, Robert Muir) (Uwe Schindler, Robert Muir)
* LUCENE-2283: Use shared memory pool for term vector and stored
fields buffers. This memory will be reclaimed if needed according to
the configured RAM Buffer Size for the IndexWriter. This also fixes
potentially excessive memory usage when many threads are indexing a
mix of small and large documents. (Tim Smith via Mike McCandless)
New features New features

View File

@ -37,6 +37,7 @@ import org.apache.lucene.search.Similarity;
import org.apache.lucene.search.Weight; import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMFile;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.ThreadInterruptedException; import org.apache.lucene.util.ThreadInterruptedException;
@ -172,6 +173,46 @@ final class DocumentsWriter {
this.next = next; this.next = next;
} }
} }
/**
* Create and return a new DocWriterBuffer.
*/
PerDocBuffer newPerDocBuffer() {
return new PerDocBuffer();
}
/**
* RAMFile buffer for DocWriters.
*/
class PerDocBuffer extends RAMFile {
/**
* Allocate bytes used from shared pool.
*/
protected byte[] newBuffer(int size) {
assert size == PER_DOC_BLOCK_SIZE;
return perDocAllocator.getByteBlock(false);
}
/**
* Recycle the bytes used.
*/
synchronized void recycle() {
if (buffers.size() > 0) {
setLength(0);
// Recycle the blocks
final int blockCount = buffers.size();
final byte[][] blocks = buffers.toArray( new byte[blockCount][] );
perDocAllocator.recycleByteBlocks(blocks, 0, blockCount);
buffers.clear();
sizeInBytes = 0;
assert numBuffers() == 0;
}
}
}
/** /**
* The IndexingChain must define the {@link #getChain(DocumentsWriter)} method * The IndexingChain must define the {@link #getChain(DocumentsWriter)} method
@ -1200,6 +1241,11 @@ final class DocumentsWriter {
final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK; final static int BYTE_BLOCK_NOT_MASK = ~BYTE_BLOCK_MASK;
private class ByteBlockAllocator extends ByteBlockPool.Allocator { private class ByteBlockAllocator extends ByteBlockPool.Allocator {
final int blockSize;
ByteBlockAllocator(int blockSize) {
this.blockSize = blockSize;
}
ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>(); ArrayList<byte[]> freeByteBlocks = new ArrayList<byte[]>();
@ -1216,12 +1262,12 @@ final class DocumentsWriter {
// things that don't track allocations (term // things that don't track allocations (term
// vectors) and things that do (freq/prox // vectors) and things that do (freq/prox
// postings). // postings).
numBytesAlloc += BYTE_BLOCK_SIZE; numBytesAlloc += blockSize;
b = new byte[BYTE_BLOCK_SIZE]; b = new byte[blockSize];
} else } else
b = freeByteBlocks.remove(size-1); b = freeByteBlocks.remove(size-1);
if (trackAllocations) if (trackAllocations)
numBytesUsed += BYTE_BLOCK_SIZE; numBytesUsed += blockSize;
assert numBytesUsed <= numBytesAlloc; assert numBytesUsed <= numBytesAlloc;
return b; return b;
} }
@ -1282,7 +1328,12 @@ final class DocumentsWriter {
freeIntBlocks.add(blocks[i]); freeIntBlocks.add(blocks[i]);
} }
ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(); ByteBlockAllocator byteBlockAllocator = new ByteBlockAllocator(BYTE_BLOCK_SIZE);
final static int PER_DOC_BLOCK_SIZE = 1024;
final ByteBlockAllocator perDocAllocator = new ByteBlockAllocator(PER_DOC_BLOCK_SIZE);
/* Initial chunk size of the shared char[] blocks used to /* Initial chunk size of the shared char[] blocks used to
store term text */ store term text */
@ -1322,10 +1373,12 @@ final class DocumentsWriter {
return nf.format(v/1024./1024.); return nf.format(v/1024./1024.);
} }
/* We have three pools of RAM: Postings, byte blocks /* We have four pools of RAM: Postings, byte blocks
* (holds freq/prox posting data) and char blocks (holds * (holds freq/prox posting data), char blocks (holds
* characters in the term). Different docs require * characters in the term) and per-doc buffers (stored fields/term vectors).
* varying amount of storage from these three classes. * Different docs require varying amount of storage from
* these four classes.
*
* For example, docs with many unique single-occurrence * For example, docs with many unique single-occurrence
* short terms will use up the Postings RAM and hardly any * short terms will use up the Postings RAM and hardly any
* of the other two. Whereas docs with very large terms * of the other two. Whereas docs with very large terms
@ -1349,6 +1402,7 @@ final class DocumentsWriter {
" deletesMB=" + toMB(deletesRAMUsed) + " deletesMB=" + toMB(deletesRAMUsed) +
" vs trigger=" + toMB(freeTrigger) + " vs trigger=" + toMB(freeTrigger) +
" byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) + " byteBlockFree=" + toMB(byteBlockAllocator.freeByteBlocks.size()*BYTE_BLOCK_SIZE) +
" perDocFree=" + toMB(perDocAllocator.freeByteBlocks.size()*PER_DOC_BLOCK_SIZE) +
" charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE)); " charBlockFree=" + toMB(freeCharBlocks.size()*CHAR_BLOCK_SIZE*CHAR_NUM_BYTE));
final long startBytesAlloc = numBytesAlloc + deletesRAMUsed; final long startBytesAlloc = numBytesAlloc + deletesRAMUsed;
@ -1364,7 +1418,11 @@ final class DocumentsWriter {
while(numBytesAlloc+deletesRAMUsed > freeLevel) { while(numBytesAlloc+deletesRAMUsed > freeLevel) {
synchronized(this) { synchronized(this) {
if (0 == byteBlockAllocator.freeByteBlocks.size() && 0 == freeCharBlocks.size() && 0 == freeIntBlocks.size() && !any) { if (0 == perDocAllocator.freeByteBlocks.size()
&& 0 == byteBlockAllocator.freeByteBlocks.size()
&& 0 == freeCharBlocks.size()
&& 0 == freeIntBlocks.size()
&& !any) {
// Nothing else to free -- must flush now. // Nothing else to free -- must flush now.
bufferIsFull = numBytesUsed+deletesRAMUsed > flushTrigger; bufferIsFull = numBytesUsed+deletesRAMUsed > flushTrigger;
if (infoStream != null) { if (infoStream != null) {
@ -1377,23 +1435,34 @@ final class DocumentsWriter {
break; break;
} }
if ((0 == iter % 4) && byteBlockAllocator.freeByteBlocks.size() > 0) { if ((0 == iter % 5) && byteBlockAllocator.freeByteBlocks.size() > 0) {
byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1); byteBlockAllocator.freeByteBlocks.remove(byteBlockAllocator.freeByteBlocks.size()-1);
numBytesAlloc -= BYTE_BLOCK_SIZE; numBytesAlloc -= BYTE_BLOCK_SIZE;
} }
if ((1 == iter % 4) && freeCharBlocks.size() > 0) { if ((1 == iter % 5) && freeCharBlocks.size() > 0) {
freeCharBlocks.remove(freeCharBlocks.size()-1); freeCharBlocks.remove(freeCharBlocks.size()-1);
numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE; numBytesAlloc -= CHAR_BLOCK_SIZE * CHAR_NUM_BYTE;
} }
if ((2 == iter % 4) && freeIntBlocks.size() > 0) { if ((2 == iter % 5) && freeIntBlocks.size() > 0) {
freeIntBlocks.remove(freeIntBlocks.size()-1); freeIntBlocks.remove(freeIntBlocks.size()-1);
numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE; numBytesAlloc -= INT_BLOCK_SIZE * INT_NUM_BYTE;
} }
if ((3 == iter % 5) && perDocAllocator.freeByteBlocks.size() > 0) {
// Remove upwards of 32 blocks (each block is 1K)
for (int i = 0; i < 32; ++i) {
perDocAllocator.freeByteBlocks.remove(perDocAllocator.freeByteBlocks.size() - 1);
numBytesAlloc -= PER_DOC_BLOCK_SIZE;
if (perDocAllocator.freeByteBlocks.size() == 0) {
break;
}
}
}
} }
if ((3 == iter % 4) && any) if ((4 == iter % 5) && any)
// Ask consumer to free any recycled state // Ask consumer to free any recycled state
any = consumer.freeRAM(); any = consumer.freeRAM();

View File

@ -166,14 +166,13 @@ final class StoredFieldsWriter {
} }
class PerDoc extends DocumentsWriter.DocWriter { class PerDoc extends DocumentsWriter.DocWriter {
final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
// TODO: use something more memory efficient; for small RAMOutputStream fdt = new RAMOutputStream(buffer);
// docs the 1024 buffer size of RAMOutputStream wastes alot
RAMOutputStream fdt = new RAMOutputStream();
int numStoredFields; int numStoredFields;
void reset() { void reset() {
fdt.reset(); fdt.reset();
buffer.recycle();
numStoredFields = 0; numStoredFields = 0;
} }
@ -185,7 +184,7 @@ final class StoredFieldsWriter {
@Override @Override
public long sizeInBytes() { public long sizeInBytes() {
return fdt.sizeInBytes(); return buffer.getSizeInBytes();
} }
@Override @Override

View File

@ -248,9 +248,9 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
class PerDoc extends DocumentsWriter.DocWriter { class PerDoc extends DocumentsWriter.DocWriter {
// TODO: use something more memory efficient; for small final DocumentsWriter.PerDocBuffer buffer = docWriter.newPerDocBuffer();
// docs the 1024 buffer size of RAMOutputStream wastes alot RAMOutputStream perDocTvf = new RAMOutputStream(buffer);
RAMOutputStream perDocTvf = new RAMOutputStream();
int numVectorFields; int numVectorFields;
int[] fieldNumbers = new int[1]; int[] fieldNumbers = new int[1];
@ -258,6 +258,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
void reset() { void reset() {
perDocTvf.reset(); perDocTvf.reset();
buffer.recycle();
numVectorFields = 0; numVectorFields = 0;
} }
@ -281,7 +282,7 @@ final class TermVectorsTermsWriter extends TermsHashConsumer {
@Override @Override
public long sizeInBytes() { public long sizeInBytes() {
return perDocTvf.sizeInBytes(); return buffer.getSizeInBytes();
} }
@Override @Override

View File

@ -20,44 +20,45 @@ package org.apache.lucene.store;
import java.util.ArrayList; import java.util.ArrayList;
import java.io.Serializable; import java.io.Serializable;
class RAMFile implements Serializable { /** @lucene.internal */
public class RAMFile implements Serializable {
private static final long serialVersionUID = 1l; private static final long serialVersionUID = 1l;
private ArrayList<byte[]> buffers = new ArrayList<byte[]>(); protected ArrayList<byte[]> buffers = new ArrayList<byte[]>();
long length; long length;
RAMDirectory directory; RAMDirectory directory;
long sizeInBytes; protected long sizeInBytes;
// This is publicly modifiable via Directory.touchFile(), so direct access not supported // This is publicly modifiable via Directory.touchFile(), so direct access not supported
private long lastModified = System.currentTimeMillis(); private long lastModified = System.currentTimeMillis();
// File used as buffer, in no RAMDirectory // File used as buffer, in no RAMDirectory
RAMFile() {} protected RAMFile() {}
RAMFile(RAMDirectory directory) { RAMFile(RAMDirectory directory) {
this.directory = directory; this.directory = directory;
} }
// For non-stream access from thread that might be concurrent with writing // For non-stream access from thread that might be concurrent with writing
synchronized long getLength() { public synchronized long getLength() {
return length; return length;
} }
synchronized void setLength(long length) { protected synchronized void setLength(long length) {
this.length = length; this.length = length;
} }
// For non-stream access from thread that might be concurrent with writing // For non-stream access from thread that might be concurrent with writing
synchronized long getLastModified() { public synchronized long getLastModified() {
return lastModified; return lastModified;
} }
synchronized void setLastModified(long lastModified) { protected synchronized void setLastModified(long lastModified) {
this.lastModified = lastModified; this.lastModified = lastModified;
} }
final byte[] addBuffer(int size) { protected final byte[] addBuffer(int size) {
byte[] buffer = newBuffer(size); byte[] buffer = newBuffer(size);
synchronized(this) { synchronized(this) {
buffers.add(buffer); buffers.add(buffer);
@ -70,11 +71,11 @@ class RAMFile implements Serializable {
return buffer; return buffer;
} }
final synchronized byte[] getBuffer(int index) { protected final synchronized byte[] getBuffer(int index) {
return buffers.get(index); return buffers.get(index);
} }
final synchronized int numBuffers() { protected final synchronized int numBuffers() {
return buffers.size(); return buffers.size();
} }
@ -84,11 +85,11 @@ class RAMFile implements Serializable {
* @param size size of allocated buffer. * @param size size of allocated buffer.
* @return allocated buffer. * @return allocated buffer.
*/ */
byte[] newBuffer(int size) { protected byte[] newBuffer(int size) {
return new byte[size]; return new byte[size];
} }
synchronized long getSizeInBytes() { public synchronized long getSizeInBytes() {
return sizeInBytes; return sizeInBytes;
} }

View File

@ -40,7 +40,7 @@ public class RAMOutputStream extends IndexOutput {
this(new RAMFile()); this(new RAMFile());
} }
RAMOutputStream(RAMFile f) { public RAMOutputStream(RAMFile f) {
file = f; file = f;
// make sure that we switch to the // make sure that we switch to the
@ -66,14 +66,13 @@ public class RAMOutputStream extends IndexOutput {
} }
} }
/** Resets this to an empty buffer. */ /** Resets this to an empty file. */
public void reset() { public void reset() {
try { currentBuffer = null;
seek(0); currentBufferIndex = -1;
} catch (IOException e) { // should never happen bufferPosition = 0;
throw new RuntimeException(e.toString()); bufferStart = 0;
} bufferLength = 0;
file.setLength(0); file.setLength(0);
} }

View File

@ -33,7 +33,7 @@ public class TestHugeRamFile extends LuceneTestCase {
private long capacity = 0; private long capacity = 0;
private HashMap<Integer,byte[]> singleBuffers = new HashMap<Integer,byte[]>(); private HashMap<Integer,byte[]> singleBuffers = new HashMap<Integer,byte[]>();
@Override @Override
byte[] newBuffer(int size) { protected byte[] newBuffer(int size) {
capacity += size; capacity += size;
if (capacity <= MAX_VALUE) { if (capacity <= MAX_VALUE) {
// below maxint we reuse buffers // below maxint we reuse buffers