LUCENE-3108: Enable memory tracking for ByteBlockPool allocations in DocValues writer impls.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/branches/docvalues@1124825 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Simon Willnauer 2011-05-19 14:22:54 +00:00
parent 43e40e8844
commit 6dc4879fec
15 changed files with 112 additions and 90 deletions

View File

@ -303,7 +303,7 @@ public abstract class AbstractField implements Fieldable {
}
public boolean hasDocValues() {
return docValues != null;
return docValues != null && docValues.type() != null;
}
public ValueType docValuesType() {

View File

@ -32,6 +32,7 @@ import org.apache.lucene.search.SimilarityProvider;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BitVector;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.RamUsageEstimator;
public class DocumentsWriterPerThread {
@ -169,6 +170,7 @@ public class DocumentsWriterPerThread {
DocumentsWriterDeleteQueue deleteQueue;
DeleteSlice deleteSlice;
private final NumberFormat nf = NumberFormat.getInstance();
final Allocator byteBlockAllocator;
public DocumentsWriterPerThread(Directory directory, DocumentsWriter parent,
@ -181,9 +183,9 @@ public class DocumentsWriterPerThread {
this.docState = new DocState(this);
this.docState.similarityProvider = parent.indexWriter.getConfig()
.getSimilarityProvider();
consumer = indexingChain.getChain(this);
bytesUsed = new AtomicLong(0);
byteBlockAllocator = new DirectTrackingAllocator(bytesUsed);
consumer = indexingChain.getChain(this);
pendingDeletes = new BufferedDeletes(false);
initialize();
}
@ -462,32 +464,6 @@ public class DocumentsWriterPerThread {
bytesUsed.addAndGet(-(length *(INT_BLOCK_SIZE*RamUsageEstimator.NUM_BYTES_INT)));
}
final Allocator byteBlockAllocator = new DirectTrackingAllocator();
private class DirectTrackingAllocator extends Allocator {
public DirectTrackingAllocator() {
this(BYTE_BLOCK_SIZE);
}
public DirectTrackingAllocator(int blockSize) {
super(blockSize);
}
public byte[] getByteBlock() {
bytesUsed.addAndGet(blockSize);
return new byte[blockSize];
}
@Override
public void recycleByteBlocks(byte[][] blocks, int start, int end) {
bytesUsed.addAndGet(-((end-start)* blockSize));
for (int i = start; i < end; i++) {
blocks[i] = null;
}
}
};
PerDocWriteState newPerDocWriteState(int codecId) {
assert segment != null;
return new PerDocWriteState(infoStream, directory, segment, fieldInfos, bytesUsed, codecId);

View File

@ -151,9 +151,19 @@ public class MultiPerDocValues extends PerDocValues {
}
public void close() throws IOException {
PerDocValues[] perDocValues = this.subs;
final PerDocValues[] perDocValues = this.subs;
IOException ex = null;
for (PerDocValues values : perDocValues) {
try {
values.close();
} catch (IOException e) {
if (ex == null) {
ex = e;
}
}
}
if (ex != null) {
throw ex;
}
}

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.lucene.index.codecs.Codec;
@ -243,8 +244,7 @@ final class PerFieldCodecWrapper extends Codec {
}
private final class PerDocProducers extends PerDocValues {
private final Set<String> fields = new TreeSet<String>();
private final Map<String, PerDocValues> codecs = new HashMap<String, PerDocValues>();
private final TreeMap<String, PerDocValues> codecs = new TreeMap<String, PerDocValues>();
public PerDocProducers(Directory dir, FieldInfos fieldInfos, SegmentInfo si,
int readBufferSize, int indexDivisor) throws IOException {
@ -253,7 +253,6 @@ final class PerFieldCodecWrapper extends Codec {
try {
for (FieldInfo fi : fieldInfos) {
if (fi.hasDocValues()) {
fields.add(fi.name);
assert fi.getCodecId() != FieldInfo.UNASSIGNED_CODEC_ID;
Codec codec = segmentCodecs.codecs[fi.getCodecId()];
if (!producers.containsKey(codec)) {
@ -280,9 +279,10 @@ final class PerFieldCodecWrapper extends Codec {
}
}
}
@Override
public Collection<String> fields() {
return fields;
return codecs.keySet();
}
@Override
public DocValues docValues(String field) throws IOException {
@ -302,11 +302,11 @@ final class PerFieldCodecWrapper extends Codec {
if (next != null) {
next.close();
}
} catch (IOException ioe) {
} catch (Exception ioe) {
// keep first IOException we hit but keep
// closing the rest
if (err == null) {
err = ioe;
err = new IOException(ioe);
}
}
}

View File

@ -575,13 +575,15 @@ final class SegmentMerger {
mergeState.multiDeletedDocs = new MultiBits(perDocBits, perDocBitsStarts);
final PerDocConsumer docsConsumer = codec
.docsConsumer(new PerDocWriteState(segmentWriteState));
MultiPerDocValues multiPerDocValues = null;
try {
docsConsumer.merge(
mergeState,
new MultiPerDocValues(perDocProducers
multiPerDocValues = new MultiPerDocValues(perDocProducers
.toArray(PerDocValues.EMPTY_ARRAY), perDocSlices
.toArray(ReaderUtil.Slice.EMPTY_ARRAY)));
.toArray(ReaderUtil.Slice.EMPTY_ARRAY));
docsConsumer.merge(mergeState, multiPerDocValues);
} finally {
if (multiPerDocValues != null)
multiPerDocValues.close();
docsConsumer.close();
}
}

View File

@ -131,7 +131,7 @@ public class DefaultDocValuesProducer extends PerDocValues {
}
public void close() throws IOException {
Collection<DocValues> values = docValues.values();
final Collection<DocValues> values = docValues.values();
IOException ex = null;
for (DocValues docValues : values) {
try {

View File

@ -341,43 +341,29 @@ public final class Bytes {
// TODO: open up this API?!
static abstract class BytesWriterBase extends Writer {
private final Directory dir;
private final String id;
protected IndexOutput idxOut;
protected IndexOutput datOut;
protected BytesRef bytesRef;
private final String codecName;
private final int version;
protected final ByteBlockPool pool;
protected BytesWriterBase(Directory dir, String id, String codecName,
int version, boolean initIndex, boolean initData, ByteBlockPool pool,
int version, boolean initIndex, ByteBlockPool pool,
AtomicLong bytesUsed) throws IOException {
super(bytesUsed);
this.dir = dir;
this.id = id;
this.codecName = codecName;
this.version = version;
this.pool = pool;
if (initData) {
initDataOut();
}
if (initIndex) {
initIndexOut();
}
}
private void initDataOut() throws IOException {
datOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
DATA_EXTENSION));
CodecUtil.writeHeader(datOut, codecName, version);
}
private void initIndexOut() throws IOException {
if (initIndex) {
idxOut = dir.createOutput(IndexFileNames.segmentFileName(id, "",
INDEX_EXTENSION));
CodecUtil.writeHeader(idxOut, codecName, version);
} else {
idxOut = null;
}
}
/**
@ -390,7 +376,6 @@ public final class Bytes {
@Override
public void finish(int docCount) throws IOException {
try {
if (datOut != null)
datOut.close();
} finally {
try {
@ -483,9 +468,7 @@ public final class Bytes {
super.close();
} finally {
try {
if (datIn != null) {
datIn.close();
}
} finally {
if (idxIn != null) {
idxIn.close();

View File

@ -33,7 +33,7 @@ import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.BytesRefHash.TrackingDirectBytesStartArray;
import org.apache.lucene.util.packed.PackedInts;
@ -54,16 +54,15 @@ class FixedDerefBytesImpl {
private final BytesRefHash hash = new BytesRefHash(pool,
BytesRefHash.DEFAULT_CAPACITY, new TrackingDirectBytesStartArray(
BytesRefHash.DEFAULT_CAPACITY, bytesUsed));
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
this(dir, id, new DirectAllocator(ByteBlockPool.BYTE_BLOCK_SIZE),
this(dir, id, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
bytesUsed);
}
public Writer(Directory dir, String id, Allocator allocator,
AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, true,
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
docToID = new int[1];
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT); // TODO BytesRefHash
@ -249,9 +248,12 @@ class FixedDerefBytesImpl {
}
public void close() throws IOException {
try {
datIn.close();
} finally {
idx.close();
}
}
protected void fill(long address, BytesRef ref) throws IOException {
datIn.seek(fp + ((address - 1) * size));

View File

@ -36,7 +36,7 @@ import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.BytesRefHash.TrackingDirectBytesStartArray;
import org.apache.lucene.util.packed.PackedInts;
@ -63,13 +63,13 @@ class FixedSortedBytesImpl {
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
AtomicLong bytesUsed) throws IOException {
this(dir, id, comp, new DirectAllocator(ByteBlockPool.BYTE_BLOCK_SIZE),
this(dir, id, comp, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
bytesUsed);
}
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
Allocator allocator, AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, true,
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
docToEntry = new int[1];
// docToEntry[0] = -1;

View File

@ -46,7 +46,7 @@ class FixedStraightBytesImpl {
private byte[] oneRecord;
protected Writer(Directory dir, String id) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, false, true, null, null);
super(dir, id, CODEC_NAME, VERSION_CURRENT, false, null, null);
}
// TODO - impl bulk copy here!
@ -87,7 +87,13 @@ class FixedStraightBytesImpl {
}
fill(state.docBase);
// TODO should we add a transfer to API to each reader?
datOut.copyBytes(reader.cloneData(), size * maxDocs);
final IndexInput cloneData = reader.cloneData();
try {
datOut.copyBytes(cloneData, size * maxDocs);
} finally {
cloneData.close();
}
lastDocID += maxDocs - 1;
} else
super.merge(state);
@ -116,7 +122,7 @@ class FixedStraightBytesImpl {
}
public long ramBytesUsed() {
return 0;
return oneRecord == null ? 0 : oneRecord.length;
}
}

View File

@ -36,7 +36,7 @@ import org.apache.lucene.util.CodecUtil;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.BytesRefHash.TrackingDirectBytesStartArray;
import org.apache.lucene.util.packed.PackedInts;
@ -117,13 +117,13 @@ class VarDerefBytesImpl {
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
this(dir, id, new DirectAllocator(ByteBlockPool.BYTE_BLOCK_SIZE),
this(dir, id, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
bytesUsed);
}
public Writer(Directory dir, String id, Allocator allocator,
AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, true,
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
docToAddress = new int[1];
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);

View File

@ -35,7 +35,7 @@ import org.apache.lucene.util.BytesRefHash;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.ByteBlockPool.Allocator;
import org.apache.lucene.util.ByteBlockPool.DirectAllocator;
import org.apache.lucene.util.ByteBlockPool.DirectTrackingAllocator;
import org.apache.lucene.util.BytesRefHash.TrackingDirectBytesStartArray;
import org.apache.lucene.util.packed.PackedInts;
@ -62,13 +62,13 @@ class VarSortedBytesImpl {
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
AtomicLong bytesUsed) throws IOException {
this(dir, id, comp, new DirectAllocator(ByteBlockPool.BYTE_BLOCK_SIZE),
this(dir, id, comp, new DirectTrackingAllocator(ByteBlockPool.BYTE_BLOCK_SIZE, bytesUsed),
bytesUsed);
}
public Writer(Directory dir, String id, Comparator<BytesRef> comp,
Allocator allocator, AtomicLong bytesUsed) throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, true,
super(dir, id, CODEC_NAME, VERSION_CURRENT, true,
new ByteBlockPool(allocator), bytesUsed);
this.comp = comp;
docToEntry = new int[1];

View File

@ -51,7 +51,7 @@ class VarStraightBytesImpl {
public Writer(Directory dir, String id, AtomicLong bytesUsed)
throws IOException {
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, true, null, bytesUsed);
super(dir, id, CODEC_NAME, VERSION_CURRENT, true, null, bytesUsed);
docToAddress = new long[1];
bytesUsed.addAndGet(RamUsageEstimator.NUM_BYTES_INT);
}

View File

@ -18,6 +18,8 @@ package org.apache.lucene.util;
*/
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.lucene.util.RamUsageEstimator.NUM_BYTES_OBJECT_REF;
/**
@ -79,6 +81,33 @@ public final class ByteBlockPool {
}
public static class DirectTrackingAllocator extends Allocator {
private final AtomicLong bytesUsed;
public DirectTrackingAllocator(AtomicLong bytesUsed) {
this(BYTE_BLOCK_SIZE, bytesUsed);
}
public DirectTrackingAllocator(int blockSize, AtomicLong bytesUsed) {
super(blockSize);
this.bytesUsed = bytesUsed;
}
public byte[] getByteBlock() {
bytesUsed.addAndGet(blockSize);
return new byte[blockSize];
}
@Override
public void recycleByteBlocks(byte[][] blocks, int start, int end) {
bytesUsed.addAndGet(-((end-start)* blockSize));
for (int i = start; i < end; i++) {
blocks[i] = null;
}
}
};
public byte[][] buffers = new byte[10][];
int bufferUpto = -1; // Which buffer we are upto
@ -93,6 +122,20 @@ public final class ByteBlockPool {
this.allocator = allocator;
}
public void dropBuffersAndReset() {
if (bufferUpto != -1) {
// Recycle all but the first buffer
allocator.recycleByteBlocks(buffers, 0, 1+bufferUpto);
// Re-use the first buffer
bufferUpto = -1;
byteUpto = BYTE_BLOCK_SIZE;
byteOffset = -BYTE_BLOCK_SIZE;
buffers = new byte[10][];
buffer = null;
}
}
public void reset() {
if (bufferUpto != -1) {
// We allocated at least one buffer

View File

@ -228,7 +228,7 @@ public final class BytesRefHash {
lastCount = count;
count = 0;
if (resetPool) {
pool.reset();
pool.dropBuffersAndReset();
}
bytesStart = bytesStartArray.clear();
if (lastCount != -1 && shrink(lastCount)) {