Compute RAM usage ByteBuffersDataOutput on the fly. (#1919)

This helps remove the assumption that all blocks have the same size.
This commit is contained in:
Adrien Grand 2020-09-28 15:08:08 +02:00 committed by GitHub
parent 32041c8d9b
commit c3f97fbdc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 7 deletions

View File

@ -117,6 +117,9 @@ public final class ByteBuffersDataOutput extends DataOutput implements Accountab
*/ */
private final ArrayDeque<ByteBuffer> blocks = new ArrayDeque<>(); private final ArrayDeque<ByteBuffer> blocks = new ArrayDeque<>();
/** Cumulative RAM usage across all blocks. */
private long ramBytesUsed;
/** /**
* The current-or-next write block. * The current-or-next write block.
*/ */
@ -400,13 +403,8 @@ public final class ByteBuffersDataOutput extends DataOutput implements Accountab
public long ramBytesUsed() { public long ramBytesUsed() {
// Return a rough estimation for allocated blocks. Note that we do not make // Return a rough estimation for allocated blocks. Note that we do not make
// any special distinction for direct memory buffers. // any special distinction for direct memory buffers.
ByteBuffer first = blocks.peek(); assert ramBytesUsed == blocks.stream().mapToLong(ByteBuffer::capacity).sum() + blocks.size() * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
if (first == null) { return ramBytesUsed;
return 0L;
} else {
// All blocks have the same capacity.
return (first.capacity() + RamUsageEstimator.NUM_BYTES_OBJECT_REF) * blocks.size();
}
} }
/** /**
@ -422,6 +420,7 @@ public final class ByteBuffersDataOutput extends DataOutput implements Accountab
blocks.forEach(blockReuse); blocks.forEach(blockReuse);
} }
blocks.clear(); blocks.clear();
ramBytesUsed = 0;
currentBlock = EMPTY; currentBlock = EMPTY;
} }
@ -455,6 +454,7 @@ public final class ByteBuffersDataOutput extends DataOutput implements Accountab
currentBlock = blockAllocate.apply(requiredBlockSize); currentBlock = blockAllocate.apply(requiredBlockSize);
assert currentBlock.capacity() == requiredBlockSize; assert currentBlock.capacity() == requiredBlockSize;
blocks.add(currentBlock); blocks.add(currentBlock);
ramBytesUsed += RamUsageEstimator.NUM_BYTES_OBJECT_REF + currentBlock.capacity();
} }
private void rewriteToBlockSize(int targetBlockBits) { private void rewriteToBlockSize(int targetBlockBits) {
@ -476,6 +476,7 @@ public final class ByteBuffersDataOutput extends DataOutput implements Accountab
assert blocks.isEmpty(); assert blocks.isEmpty();
this.blockBits = targetBlockBits; this.blockBits = targetBlockBits;
blocks.addAll(cloned.blocks); blocks.addAll(cloned.blocks);
ramBytesUsed = cloned.ramBytesUsed;
} }
private static int computeBlockSizeBitsFor(long bytes) { private static int computeBlockSizeBitsFor(long bytes) {

View File

@ -20,10 +20,12 @@ import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.RamUsageEstimator;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -159,4 +161,44 @@ public final class TestByteBuffersDataOutput extends BaseDataOutputTestCase<Byte
assertTrue(bb.hasArray()); // heap-based by default, so array should be there. assertTrue(bb.hasArray()); // heap-based by default, so array should be there.
} }
} }
@Test
public void testRamBytesUsed() {
ByteBuffersDataOutput out = new ByteBuffersDataOutput();
// Empty output requires no RAM
assertEquals(0, out.ramBytesUsed());
// Non-empty buffer requires RAM
out.writeInt(4);
assertEquals(out.ramBytesUsed(), computeRamBytesUsed(out));
// Make sure this keeps working with multiple backing buffers
while (out.toBufferList().size() < 2) {
out.writeLong(42);
}
assertEquals(out.ramBytesUsed(), computeRamBytesUsed(out));
// Make sure this keeps working when increasing the block size
int currentBlockCapacity = out.toBufferList().get(0).capacity();
do {
out.writeLong(42);
} while (out.toBufferList().get(0).capacity() == currentBlockCapacity);
assertEquals(out.ramBytesUsed(), computeRamBytesUsed(out));
// Back to zero after a clear
out.reset();
assertEquals(0, out.ramBytesUsed());
// And back to non-empty
out.writeInt(4);
assertEquals(out.ramBytesUsed(), computeRamBytesUsed(out));
}
private static long computeRamBytesUsed(ByteBuffersDataOutput out) {
if (out.size() == 0) {
return 0;
}
List<ByteBuffer> buffers = out.toBufferList();
return buffers.stream().mapToLong(ByteBuffer::capacity).sum() + buffers.size() * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
}
} }