Fixes #11371 - Review ArrayByteBufferPool eviction. (#11400)

* Fixes #11371 - Review ArrayByteBufferPool eviction.

* Eviction is now performed on release(), rather than acquire().
* Memory accounting is done on release(), rather than acquire().
This is because we were always exceeding the memory usage on acquire(), by returning a non-pooled buffer.
We only need to account for what is idle in the pool, and that is done more efficiently on release(), and it is leak-resistant (i.e. if the buffer is not returned, the memory is already non accounted for, keeping the pool consistent).
* Released entries now give precedence to Concurrent.Entry, rather than Queued.Entry, so the queued pool is always kept at minimum size.
* Changed eviction algorithm to be simpler: one pass through the buckets excluding the current, trying to remove idle buffers until enough memory is recovered.
If successful, the buffer being released is pooled, otherwise it is also discarded.
* Added detailed statistics to ArrayByteBufferPool.RetainedBuckets.
* Added statisticsEnabled property in Jetty module bytebufferpool.mod.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2024-02-20 11:02:11 +01:00 committed by GitHub
parent 624ee584bd
commit d02406c164
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 628 additions and 241 deletions

View File

@ -23,7 +23,9 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
@ -32,7 +34,6 @@ import org.eclipse.jetty.io.internal.CompoundPool;
import org.eclipse.jetty.io.internal.QueuedPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.NanoTime;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -54,7 +55,6 @@ import org.slf4j.LoggerFactory;
@ManagedObject
public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayByteBufferPool.class);
static final int DEFAULT_FACTOR = 4096;
static final int DEFAULT_MAX_CAPACITY_BY_FACTOR = 16;
@ -64,9 +64,10 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private final int _maxCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong();
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
private final IntUnaryOperator _bucketIndexFor;
private boolean _statisticsEnabled;
/**
* Creates a new ArrayByteBufferPool with a default configuration.
@ -175,6 +176,17 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
return maxMemory;
}
@ManagedAttribute("Whether statistics are enabled")
public boolean isStatisticsEnabled()
{
return _statisticsEnabled;
}
public void setStatisticsEnabled(boolean enabled)
{
_statisticsEnabled = enabled;
}
@ManagedAttribute("The minimum pooled buffer capacity")
public int getMinCapacity()
{
@ -191,40 +203,131 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
public RetainableByteBuffer acquire(int size, boolean direct)
{
RetainedBucket bucket = bucketFor(size, direct);
// No bucket, return non-pooled.
if (bucket == null)
return newRetainableByteBuffer(size, direct, this::removed);
return newRetainableByteBuffer(size, direct, null);
bucket.recordAcquire();
// Try to acquire a pooled entry.
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().acquire();
RetainableByteBuffer buffer;
if (entry == null)
if (entry != null)
{
Pool.Entry<RetainableByteBuffer> reservedEntry = bucket.getPool().reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(bucket._capacity, direct, retainedBuffer ->
{
BufferUtil.reset(retainedBuffer.getByteBuffer());
if (!reservedEntry.release())
reservedEntry.remove();
});
// A reserved entry may become old and be evicted before it is enabled.
if (reservedEntry.enable(buffer, true))
{
if (direct)
_currentDirectMemory.addAndGet(buffer.capacity());
else
_currentHeapMemory.addAndGet(buffer.capacity());
releaseExcessMemory(direct);
return buffer;
}
}
return newRetainableByteBuffer(size, direct, this::removed);
bucket.recordPooled();
subtractMemory(bucket.getCapacity(), direct);
RetainableByteBuffer buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
}
buffer = entry.getPooled();
((Buffer)buffer).acquire();
return buffer;
return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer));
}
private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer)
{
bucket.recordRelease();
boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();
// Discard the buffer if maxMemory is exceeded.
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}
Pool.Entry<RetainableByteBuffer> entry = bucket.getPool().reserve();
// Cannot reserve, discard the buffer.
if (entry == null)
{
subtractMemory(capacity, direct);
bucket.recordNonPooled();
return;
}
ByteBuffer byteBuffer = buffer.getByteBuffer();
BufferUtil.reset(byteBuffer);
Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry));
if (entry.enable(pooledBuffer, false))
return;
// Discard the buffer if the entry cannot be enabled.
subtractMemory(capacity, direct);
bucket.recordNonPooled();
entry.remove();
}
private void release(RetainedBucket bucket, Pool.Entry<RetainableByteBuffer> entry)
{
bucket.recordRelease();
RetainableByteBuffer buffer = entry.getPooled();
BufferUtil.reset(buffer.getByteBuffer());
boolean direct = buffer.isDirect();
int capacity = bucket.getCapacity();
long excessMemory = addMemoryAndGetExcess(bucket, direct);
if (excessMemory > 0)
{
bucket.recordEvict();
// If we cannot free enough space for the entry, remove it.
if (!evict(excessMemory, bucket, direct))
{
subtractMemory(capacity, direct);
bucket.recordRemove();
entry.remove();
return;
}
}
// We have enough space for this entry, pool it.
if (entry.release())
return;
// Not enough space, discard this buffer.
subtractMemory(capacity, direct);
bucket.recordRemove();
entry.remove();
}
private long addMemoryAndGetExcess(RetainedBucket bucket, boolean direct)
{
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
if (maxMemory < 0)
return -1;
AtomicLong memory = direct ? _directMemory : _heapMemory;
int capacity = bucket.getCapacity();
long newMemory = memory.addAndGet(capacity);
// Account the excess at most for the bucket capacity.
return Math.min(capacity, newMemory - maxMemory);
}
private boolean evict(long excessMemory, RetainedBucket target, boolean direct)
{
RetainedBucket[] buckets = direct ? _direct : _indirect;
int length = buckets.length;
int index = ThreadLocalRandom.current().nextInt(length);
for (int c = 0; c < length; ++c)
{
RetainedBucket bucket = buckets[index++];
if (index == length)
index = 0;
// Do not evict from the bucket the buffer is released into.
if (bucket == target)
continue;
int evicted = bucket.evict();
subtractMemory(evicted, direct);
excessMemory -= evicted;
if (excessMemory <= 0)
return true;
}
return false;
}
protected ByteBuffer allocate(int capacity)
@ -237,10 +340,6 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
return ByteBuffer.allocateDirect(capacity);
}
protected void removed(RetainableByteBuffer retainedBuffer)
{
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
{
ByteBuffer buffer = BufferUtil.allocate(capacity, direct);
@ -257,7 +356,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private RetainedBucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
if (capacity < getMinCapacity())
return null;
int idx = _bucketIndexFor.applyAsInt(capacity);
RetainedBucket[] buckets = direct ? _direct : _indirect;
@ -316,10 +415,14 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
private long getMemory(boolean direct)
{
if (direct)
return _currentDirectMemory.get();
else
return _currentHeapMemory.get();
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.longValue();
}
private void subtractMemory(int amount, boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
memory.addAndGet(-amount);
}
@ManagedAttribute("The available bytes retained by direct ByteBuffers")
@ -340,7 +443,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
long total = 0L;
for (RetainedBucket bucket : buckets)
{
long capacity = bucket._capacity;
long capacity = bucket.getCapacity();
total += bucket.getPool().getIdleCount() * capacity;
}
return total;
@ -349,105 +452,20 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
clearArray(_direct, _currentDirectMemory);
clearArray(_indirect, _currentHeapMemory);
clearBuckets(_direct);
_directMemory.set(0);
clearBuckets(_indirect);
_heapMemory.set(0);
}
private void clearArray(RetainedBucket[] poolArray, AtomicLong memoryCounter)
private void clearBuckets(RetainedBucket[] buckets)
{
for (RetainedBucket bucket : poolArray)
for (RetainedBucket bucket : buckets)
{
bucket.getPool().stream().forEach(entry ->
{
if (entry.remove())
{
RetainableByteBuffer pooled = entry.getPooled();
// Calling getPooled can return null if the entry was not yet enabled.
if (pooled != null)
{
memoryCounter.addAndGet(-pooled.capacity());
removed(pooled);
}
}
});
bucket.clear();
}
}
private void releaseExcessMemory(boolean direct)
{
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
if (maxMemory > 0)
{
long excess = getMemory(direct) - maxMemory;
if (excess > 0)
evict(direct, excess);
}
}
/**
* This eviction mechanism searches for the RetainableByteBuffers that were released the longest time ago.
*
* @param direct true to search in the direct buffers buckets, false to search in the heap buffers buckets.
* @param excess the amount of bytes to evict. At least this much will be removed from the buckets.
*/
private void evict(boolean direct, long excess)
{
if (LOG.isDebugEnabled())
LOG.debug("evicting {} bytes from {} pools", excess, (direct ? "direct" : "heap"));
long now = NanoTime.now();
long totalClearedCapacity = 0L;
RetainedBucket[] buckets = direct ? _direct : _indirect;
while (totalClearedCapacity < excess)
{
// Run through all the buckets to avoid removing
// the buffers only from the first bucket(s).
for (RetainedBucket bucket : buckets)
{
Pool.Entry<RetainableByteBuffer> oldestEntry = findOldestEntry(now, bucket.getPool());
if (oldestEntry == null)
continue;
// Get the pooled buffer now in case we can evict below.
// The buffer may be null if the entry has been reserved but
// not yet enabled, or the entry has been removed by a concurrent
// thread, that may also have nulled-out the pooled buffer.
RetainableByteBuffer buffer = oldestEntry.getPooled();
if (buffer == null)
continue;
// A concurrent thread evicted the same entry.
// A successful Entry.remove() call may null-out the pooled buffer.
if (!oldestEntry.remove())
continue;
// We can evict, clear the buffer capacity.
int clearedCapacity = buffer.capacity();
if (direct)
_currentDirectMemory.addAndGet(-clearedCapacity);
else
_currentHeapMemory.addAndGet(-clearedCapacity);
totalClearedCapacity += clearedCapacity;
removed(buffer);
}
}
if (LOG.isDebugEnabled())
LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap"));
}
@Override
public String toString()
{
return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}",
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
_currentHeapMemory.get(), _maxHeapMemory,
_currentDirectMemory.get(), _maxDirectMemory);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
@ -459,35 +477,25 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
DumpableCollection.fromArray("indirect", _indirect));
}
private Pool.Entry<RetainableByteBuffer> findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
@Override
public String toString()
{
// This method may be in the hot path, do not use Java streams.
Pool.Entry<RetainableByteBuffer> oldestEntry = null;
RetainableByteBuffer oldestBuffer = null;
long oldestAge = 0;
// TODO: improve Pool APIs to avoid stream().toList().
for (Pool.Entry<RetainableByteBuffer> entry : bucket.stream().toList())
{
Buffer buffer = (Buffer)entry.getPooled();
// A null buffer means the entry is reserved
// but not acquired yet, try the next.
if (buffer != null)
{
long age = NanoTime.elapsed(buffer.getLastNanoTime(), now);
if (oldestBuffer == null || age > oldestAge)
{
oldestEntry = entry;
oldestBuffer = buffer;
oldestAge = age;
}
}
}
return oldestEntry;
return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}",
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
getMemory(false), _maxHeapMemory,
getMemory(true), _maxDirectMemory);
}
private static class RetainedBucket
private class RetainedBucket
{
private final LongAdder _acquires = new LongAdder();
private final LongAdder _pooled = new LongAdder();
private final LongAdder _nonPooled = new LongAdder();
private final LongAdder _evicts = new LongAdder();
private final LongAdder _removes = new LongAdder();
private final LongAdder _releases = new LongAdder();
private final Pool<RetainableByteBuffer> _pool;
private final int _capacity;
@ -496,18 +504,87 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE)
_pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1);
else
_pool = new CompoundPool<>(
_pool = new BucketCompoundPool(
new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, e -> 1),
new QueuedPool<>(poolSize - ConcurrentPool.OPTIMAL_MAX_SIZE)
);
_capacity = capacity;
}
public void recordAcquire()
{
if (isStatisticsEnabled())
_acquires.increment();
}
public void recordEvict()
{
if (isStatisticsEnabled())
_evicts.increment();
}
public void recordNonPooled()
{
if (isStatisticsEnabled())
_nonPooled.increment();
}
public void recordPooled()
{
if (isStatisticsEnabled())
_pooled.increment();
}
public void recordRelease()
{
if (isStatisticsEnabled())
_releases.increment();
}
public void recordRemove()
{
if (isStatisticsEnabled())
_removes.increment();
}
private int getCapacity()
{
return _capacity;
}
private Pool<RetainableByteBuffer> getPool()
{
return _pool;
}
private int evict()
{
Pool.Entry<RetainableByteBuffer> entry;
if (_pool instanceof BucketCompoundPool compound)
entry = compound.evict();
else
entry = _pool.acquire();
if (entry == null)
return 0;
recordRemove();
entry.remove();
return getCapacity();
}
public void clear()
{
_acquires.reset();
_pooled.reset();
_nonPooled.reset();
_evicts.reset();
_removes.reset();
_releases.reset();
getPool().stream().forEach(Pool.Entry::remove);
}
@Override
public String toString()
{
@ -520,18 +597,44 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
inUse++;
}
return String.format("%s{capacity=%d,inuse=%d(%d%%)}",
long pooled = _pooled.longValue();
long acquires = _acquires.longValue();
float hitRatio = acquires == 0 ? Float.NaN : pooled * 100F / acquires;
return String.format("%s{capacity=%d,in-use=%d/%d,pooled/acquires=%d/%d(%.3f%%),non-pooled/evicts/removes/releases=%d/%d/%d/%d}",
super.toString(),
_capacity,
getCapacity(),
inUse,
entries > 0 ? (inUse * 100) / entries : 0);
entries,
pooled,
acquires,
hitRatio,
_nonPooled.longValue(),
_evicts.longValue(),
_removes.longValue(),
_releases.longValue()
);
}
private static class BucketCompoundPool extends CompoundPool<RetainableByteBuffer>
{
private BucketCompoundPool(ConcurrentPool<RetainableByteBuffer> concurrentBucket, QueuedPool<RetainableByteBuffer> queuedBucket)
{
super(concurrentBucket, queuedBucket);
}
private Pool.Entry<RetainableByteBuffer> evict()
{
Entry<RetainableByteBuffer> entry = getSecondaryPool().acquire();
if (entry == null)
entry = getPrimaryPool().acquire();
return entry;
}
}
}
private static class Buffer extends AbstractRetainableByteBuffer
{
private final Consumer<RetainableByteBuffer> releaser;
private final AtomicLong lastNanoTime = new AtomicLong(NanoTime.now());
private Buffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
@ -545,16 +648,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable
boolean released = super.release();
if (released)
{
lastNanoTime.setOpaque(NanoTime.now());
releaser.accept(this);
if (releaser != null)
releaser.accept(this);
}
return released;
}
public long getLastNanoTime()
{
return lastNanoTime.getOpaque();
}
}
/**

View File

@ -35,6 +35,16 @@ public class CompoundPool<P> implements Pool<P>
this.secondaryPool = secondaryPool;
}
public Pool<P> getPrimaryPool()
{
return primaryPool;
}
public Pool<P> getSecondaryPool()
{
return secondaryPool;
}
@Override
public Entry<P> reserve()
{
@ -81,4 +91,28 @@ public class CompoundPool<P> implements Pool<P>
{
return Stream.concat(primaryPool.stream(), secondaryPool.stream());
}
@Override
public int getReservedCount()
{
return primaryPool.getReservedCount() + secondaryPool.getReservedCount();
}
@Override
public int getIdleCount()
{
return primaryPool.getIdleCount() + secondaryPool.getIdleCount();
}
@Override
public int getInUseCount()
{
return primaryPool.getInUseCount() + secondaryPool.getInUseCount();
}
@Override
public int getTerminatedCount()
{
return primaryPool.getTerminatedCount() + secondaryPool.getTerminatedCount();
}
}

View File

@ -171,6 +171,30 @@ public class QueuedPool<P> implements Pool<P>
return queue.stream();
}
@Override
public int getReservedCount()
{
return 0;
}
@Override
public int getIdleCount()
{
return size();
}
@Override
public int getInUseCount()
{
return 0;
}
@Override
public int getTerminatedCount()
{
return 0;
}
private static class QueuedEntry<P> implements Entry<P>
{
private final QueuedPool<P> pool;

View File

@ -13,16 +13,20 @@
package org.eclipse.jetty.io;
import java.lang.ref.Reference;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.eclipse.jetty.io.internal.CompoundPool;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.Pool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
@ -30,6 +34,7 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ArrayByteBufferPoolTest
{
@ -41,25 +46,25 @@ public class ArrayByteBufferPoolTest
List<RetainableByteBuffer> buffers = new ArrayList<>();
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectMemory(), greaterThan(0L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
buffers.forEach(RetainableByteBuffer::release);
@ -67,8 +72,8 @@ public class ArrayByteBufferPoolTest
assertThat(pool.getAvailableDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), greaterThan(0L));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
}
@Test
@ -108,10 +113,10 @@ public class ArrayByteBufferPoolTest
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(buf1.isRetained(), is(false));
buf1.retain();
@ -122,10 +127,10 @@ public class ArrayByteBufferPoolTest
assertThat(buf1.release(), is(false));
assertThat(buf1.isRetained(), is(false));
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(buf1.release(), is(true));
assertThat(buf1.isRetained(), is(false));
@ -143,10 +148,10 @@ public class ArrayByteBufferPoolTest
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
buf1.release();
@ -168,49 +173,55 @@ public class ArrayByteBufferPoolTest
{
ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, 20, 2);
RetainableByteBuffer buf1 = pool.acquire(1, true); // pooled
RetainableByteBuffer buf1 = pool.acquire(1, true);
assertThat(buf1.capacity(), is(10));
RetainableByteBuffer buf2 = pool.acquire(1, true); // pooled
RetainableByteBuffer buf2 = pool.acquire(1, true);
assertThat(buf2.capacity(), is(10));
RetainableByteBuffer buf3 = pool.acquire(1, true); // not pooled, bucket is full
assertThat(buf3.capacity(), is(1));
RetainableByteBuffer buf3 = pool.acquire(1, true);
assertThat(buf3.capacity(), is(10));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertTrue(buf1.release()); // pooled
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertTrue(buf2.release()); // pooled
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertTrue(buf3.release()); // not pooled, bucket is full.
assertThat(pool.getDirectByteBufferCount(), is(2L));
RetainableByteBuffer buf4 = pool.acquire(11, true);
assertThat(buf4.capacity(), is(20));
RetainableByteBuffer buf5 = pool.acquire(11, true);
assertThat(buf5.capacity(), is(20));
RetainableByteBuffer buf6 = pool.acquire(11, true);
assertThat(buf6.capacity(), is(20));
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(20L));
RetainableByteBuffer buf4 = pool.acquire(11, true); // pooled
assertThat(buf4.capacity(), is(20));
RetainableByteBuffer buf5 = pool.acquire(11, true); // pooled
assertThat(buf5.capacity(), is(20));
RetainableByteBuffer buf6 = pool.acquire(11, true); // not pooled, bucket is full
assertThat(buf6.capacity(), is(11));
// Need to keep the references around past the asserts above.
Reference.reachabilityFence(buf1);
Reference.reachabilityFence(buf2);
Reference.reachabilityFence(buf3);
Reference.reachabilityFence(buf4);
Reference.reachabilityFence(buf5);
Reference.reachabilityFence(buf6);
assertTrue(buf4.release()); // pooled
assertThat(pool.getDirectByteBufferCount(), is(3L));
assertTrue(buf5.release()); // pooled
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertTrue(buf6.release()); // not pooled, bucket is full.
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertThat(pool.getDirectMemory(), is(60L));
}
@Test
public void testBufferReleaseRepools()
public void testBufferReleaseRePools()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, 20, 1);
List<RetainableByteBuffer> all = new ArrayList<>();
all.add(pool.acquire(1, true)); // pooled
all.add(pool.acquire(1, true)); // not pooled, bucket is full
all.add(pool.acquire(11, true)); // pooled
all.add(pool.acquire(11, true)); // not pooled, bucket is full
all.add(pool.acquire(1, true));
all.add(pool.acquire(1, true));
all.add(pool.acquire(11, true));
all.add(pool.acquire(11, true));
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
@ -227,21 +238,25 @@ public class ArrayByteBufferPoolTest
{
ArrayByteBufferPool pool = new ArrayByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
RetainableByteBuffer b1 = pool.acquire(1, true); // not pooled, < minCapacity
RetainableByteBuffer b2 = pool.acquire(10, true); // pooled
RetainableByteBuffer b3 = pool.acquire(20, true); // pooled
RetainableByteBuffer b4 = pool.acquire(30, true); // not pooled, > maxCapacity
RetainableByteBuffer buf1 = pool.acquire(1, true);
RetainableByteBuffer buf2 = pool.acquire(10, true);
RetainableByteBuffer buf3 = pool.acquire(20, true);
RetainableByteBuffer buf4 = pool.acquire(30, true);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
// Need to keep the references around past the asserts above.
Reference.reachabilityFence(b1);
Reference.reachabilityFence(b2);
Reference.reachabilityFence(b3);
Reference.reachabilityFence(b4);
assertTrue(buf1.release()); // not pooled, < minCapacity
assertTrue(buf2.release()); // pooled
assertTrue(buf3.release()); // pooled
assertTrue(buf4.release()); // not pooled, > maxCapacity
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(2L));
assertThat(pool.getAvailableDirectMemory(), is(30L));
}
@Test
@ -249,17 +264,19 @@ public class ArrayByteBufferPoolTest
{
ArrayByteBufferPool pool = new ArrayByteBufferPool();
RetainableByteBuffer b1 = pool.acquire(10, true);
RetainableByteBuffer b2 = pool.acquire(10, true);
RetainableByteBuffer buffer1 = pool.acquire(10, true);
RetainableByteBuffer buffer2 = pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(2L * ArrayByteBufferPool.DEFAULT_FACTOR));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
// Need to keep the references around past the asserts above.
Reference.reachabilityFence(b1);
Reference.reachabilityFence(b2);
buffer2.release();
buffer1.release();
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(2L * ArrayByteBufferPool.DEFAULT_FACTOR));
pool.clear();
@ -273,23 +290,33 @@ public class ArrayByteBufferPoolTest
public void testRetainAfterRePooledThrows()
{
ArrayByteBufferPool pool = new ArrayByteBufferPool();
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(buf1.release(), is(true));
assertThrows(IllegalStateException.class, buf1::retain);
assertThrows(IllegalStateException.class, buf1::release);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
// check that the buffer is still available
// Check that the buffer is still available.
RetainableByteBuffer buf2 = pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(buf2 == buf1, is(true)); // make sure it's not a new instance
// The ByteBuffer is re-wrapped by a different RetainableByteBuffer upon the first release.
assertThat(buf2, not(sameInstance(buf1)));
assertThat(buf2.getByteBuffer(), sameInstance(buf1.getByteBuffer()));
assertThat(buf2.release(), is(true));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
RetainableByteBuffer buf3 = pool.acquire(10, true);
assertThat(buf3, sameInstance(buf2));
assertThat(buf3.release(), is(true));
}
@Test
@ -345,7 +372,8 @@ public class ArrayByteBufferPoolTest
RetainableByteBuffer retain5 = pool.acquire(5, false);
retain5.release();
RetainableByteBuffer retain6 = pool.acquire(6, false);
assertThat(retain6, sameInstance(retain5));
assertThat(retain6, not(sameInstance(retain5)));
assertThat(retain6.getByteBuffer(), sameInstance(retain5.getByteBuffer()));
retain6.release();
RetainableByteBuffer retain9 = pool.acquire(9, false);
assertThat(retain9, not(sameInstance(retain5)));
@ -386,4 +414,34 @@ public class ArrayByteBufferPoolTest
assertThat(buffer.release(), is(true));
assertThat(buffer.getByteBuffer().order(), Matchers.is(ByteOrder.BIG_ENDIAN));
}
@Test
public void testReleaseExcessMemory()
{
int maxCapacity = 20;
int maxBucketSize = ConcurrentPool.OPTIMAL_MAX_SIZE * 2;
int maxMemory = maxCapacity * maxBucketSize / 2;
ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, maxCapacity, maxBucketSize, maxMemory, maxMemory);
// It is always possible to acquire beyond maxMemory, because
// the buffers are in use and not really retained in the pool.
List<RetainableByteBuffer> buffers = new ArrayList<>();
for (int i = 0; i < maxBucketSize; ++i)
{
buffers.add(pool.acquire(maxCapacity, true));
}
// The last entries acquired are from the queued pool.
// Release in reverse order to release first the queued
// entries, but then the concurrent entries should be
// pooled, and the queued entries removed.
Collections.reverse(buffers);
buffers.forEach(RetainableByteBuffer::release);
Pool<RetainableByteBuffer> bucketPool = pool.poolFor(maxCapacity, true);
assertThat(bucketPool, instanceOf(CompoundPool.class));
CompoundPool<RetainableByteBuffer> compoundPool = (CompoundPool<RetainableByteBuffer>)bucketPool;
assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE));
assertThat(compoundPool.getSecondaryPool().size(), is(0));
}
}

View File

@ -8,5 +8,6 @@
<Arg type="int"><Property name="jetty.byteBufferPool.maxBucketSize" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
<Set name="statisticsEnabled" property="jetty.byteBufferPool.statisticsEnabled" />
</New>
</Configure>

View File

@ -31,3 +31,6 @@ etc/jetty-bytebufferpool.xml
## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=0
## Whether statistics are enabled.
#jetty.byteBufferPool.statisticsEnabled=false

View File

@ -22,6 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import java.util.stream.Stream;
@ -148,6 +149,11 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
leaked.increment();
if (LOG.isDebugEnabled())
LOG.debug("Leaked " + holder);
leaked();
}
protected void leaked()
{
}
@Override
@ -194,8 +200,8 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
Holder<P> holder = entries.get(i);
if (holder.getEntry() == null)
{
leaked(holder);
entries.remove(i--);
leaked(holder);
}
}
}
@ -222,8 +228,8 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
ConcurrentEntry<P> entry = (ConcurrentEntry<P>)holder.getEntry();
if (entry == null)
{
leaked(holder);
entries.remove(index);
leaked(holder);
continue;
}
@ -348,6 +354,42 @@ public class ConcurrentPool<P> implements Pool<P>, Dumpable
return entries.stream().map(Holder::getEntry).filter(Objects::nonNull);
}
@Override
public int getReservedCount()
{
return getCount(Entry::isReserved);
}
@Override
public int getIdleCount()
{
return getCount(Entry::isIdle);
}
@Override
public int getInUseCount()
{
return getCount(Entry::isInUse);
}
@Override
public int getTerminatedCount()
{
return getCount(Entry::isTerminated);
}
private int getCount(Predicate<Entry<P>> predicate)
{
int count = 0;
for (Holder<P> holder : entries)
{
Entry<P> entry = holder.getEntry();
if (entry != null && predicate.test(entry))
count++;
}
return count;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{

View File

@ -0,0 +1,127 @@
//
// ========================================================================
// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io.jmh;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.profile.AsyncProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;
@State(Scope.Benchmark)
public class ArrayByteBufferPoolBenchmark
{
public static void main(String[] args) throws RunnerException
{
String asyncProfilerPath = "/home/simon/programs/async-profiler/lib/libasyncProfiler.so";
Options opt = new OptionsBuilder()
.include(ArrayByteBufferPoolBenchmark.class.getSimpleName())
.warmupIterations(10)
.warmupTime(TimeValue.milliseconds(500))
.measurementIterations(10)
.measurementTime(TimeValue.milliseconds(500))
.addProfiler(AsyncProfiler.class, "dir=/tmp;output=flamegraph;event=cpu;interval=500000;libPath=" + asyncProfilerPath)
.forks(1)
.threads(10)
.build();
new Runner(opt).run();
}
@Param("0")
int minCapacity;
@Param("65536")
int maxCapacity;
@Param("4096")
int factor;
@Param("-1")
int maxBucketSize;
@Param({"0", "1048576"})
long maxMemory;
@Param({"true"})
boolean statisticsEnabled;
ArrayByteBufferPool pool;
@Setup
public void prepare()
{
pool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity, maxBucketSize, maxMemory, maxMemory);
pool.setStatisticsEnabled(statisticsEnabled);
}
@TearDown
public void dispose()
{
System.out.println(pool.dump());
}
@Benchmark
@BenchmarkMode({Mode.Throughput})
public void inputFixedCapacityOutputRandomCapacity()
{
// Simulate a read from the network.
RetainableByteBuffer input = pool.acquire(61440, true);
// Simulate a write of random size from the application.
int capacity = ThreadLocalRandom.current().nextInt(minCapacity, maxCapacity);
RetainableByteBuffer output = pool.acquire(capacity, true);
output.release();
input.release();
}
int iterations;
@Setup(Level.Iteration)
public void migrate()
{
++iterations;
if (iterations == 15)
System.out.println(pool.dump());
}
@Benchmark
@BenchmarkMode({Mode.Throughput})
public void inputFixedCapacityOutputRandomCapacityMigrating()
{
// Simulate a read from the network.
RetainableByteBuffer input = pool.acquire(8192, true);
// Simulate a write of random size from the application.
// Simulate a change in buffer sizes after half of the iterations.
int capacity;
if (iterations <= 15)
capacity = ThreadLocalRandom.current().nextInt(minCapacity, maxCapacity / 2);
else
capacity = ThreadLocalRandom.current().nextInt(maxCapacity / 2, maxCapacity);
RetainableByteBuffer output = pool.acquire(capacity, true);
output.release();
input.release();
}
}