Improve #6322 extensible ArrayRetainableByteBufferPool (#6538)

Add a exponential bucket size impl to test ArrayRetainableByteBufferPool extensibility

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2021-07-26 10:43:50 +10:00 committed by GitHub
parent 41481669af
commit d781ec3546
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 187 additions and 46 deletions

View File

@ -13,36 +13,41 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ManagedObject @ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{ {
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class); private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);
private final Pool<RetainableByteBuffer>[] _direct; private final Bucket[] _direct;
private final Pool<RetainableByteBuffer>[] _indirect; private final Bucket[] _indirect;
private final int _factor;
private final int _minCapacity; private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory; private final long _maxHeapMemory;
private final long _maxDirectMemory; private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong(); private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong(); private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function<Integer, Integer> _bucketIndexFor;
public ArrayRetainableByteBufferPool() public ArrayRetainableByteBufferPool()
{ {
this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L); this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L);
} }
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize) public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
@ -52,48 +57,72 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{ {
_factor = factor <= 0 ? 1024 : factor; this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
this._maxHeapMemory = maxHeapMemory; }
this._maxDirectMemory = maxDirectMemory;
protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity)
{
if (minCapacity <= 0) if (minCapacity <= 0)
minCapacity = 0; minCapacity = 0;
_minCapacity = minCapacity;
if (maxCapacity <= 0) if (maxCapacity <= 0)
maxCapacity = 64 * 1024; maxCapacity = 64 * 1024;
if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity)
int f = factor <= 0 ? 1024 : factor;
if ((maxCapacity % f) != 0 || f >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
int length = maxCapacity / _factor; if (bucketIndexFor == null)
bucketIndexFor = c -> (c - 1) / f;
if (bucketCapacity == null)
bucketCapacity = i -> (i + 1) * f;
@SuppressWarnings("unchecked") int length = bucketIndexFor.apply(maxCapacity) + 1;
Pool<RetainableByteBuffer>[] directArray = new Pool[length]; Bucket[] directArray = new Bucket[length];
@SuppressWarnings("unchecked") Bucket[] indirectArray = new Bucket[length];
Pool<RetainableByteBuffer>[] indirectArray = new Pool[length];
for (int i = 0; i < directArray.length; i++) for (int i = 0; i < directArray.length; i++)
{ {
directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); int capacity = Math.min(bucketCapacity.apply(i), maxCapacity);
indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
} }
_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
_direct = directArray; _direct = directArray;
_indirect = indirectArray; _indirect = indirectArray;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_bucketIndexFor = bucketIndexFor;
}
@ManagedAttribute("The minimum pooled buffer capacity")
public int getMinCapacity()
{
return _minCapacity;
}
@ManagedAttribute("The maximum pooled buffer capacity")
public int getMaxCapacity()
{
return _maxCapacity;
} }
@Override @Override
public RetainableByteBuffer acquire(int size, boolean direct) public RetainableByteBuffer acquire(int size, boolean direct)
{ {
int capacity = (bucketIndexFor(size) + 1) * _factor; Bucket bucket = bucketFor(size, direct);
Pool<RetainableByteBuffer> bucket = bucketFor(size, direct);
if (bucket == null) if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {}); return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Pool<RetainableByteBuffer>.Entry entry = bucket.acquire(); Bucket.Entry entry = bucket.acquire();
RetainableByteBuffer buffer; RetainableByteBuffer buffer;
if (entry == null) if (entry == null)
{ {
Pool<RetainableByteBuffer>.Entry reservedEntry = bucket.reserve(); Bucket.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null) if (reservedEntry != null)
{ {
buffer = newRetainableByteBuffer(capacity, direct, byteBuffer -> buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer ->
{ {
BufferUtil.clear(byteBuffer); BufferUtil.clear(byteBuffer);
reservedEntry.release(); reservedEntry.release();
@ -127,22 +156,17 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
return retainableByteBuffer; return retainableByteBuffer;
} }
private Pool<RetainableByteBuffer> bucketFor(int capacity, boolean direct) private Bucket bucketFor(int capacity, boolean direct)
{ {
if (capacity < _minCapacity) if (capacity < _minCapacity)
return null; return null;
int idx = bucketIndexFor(capacity); int idx = _bucketIndexFor.apply(capacity);
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect; Bucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length) if (idx >= buckets.length)
return null; return null;
return buckets[idx]; return buckets[idx];
} }
private int bucketIndexFor(int capacity)
{
return (capacity - 1) / _factor;
}
@ManagedAttribute("The number of pooled direct ByteBuffers") @ManagedAttribute("The number of pooled direct ByteBuffers")
public long getDirectByteBufferCount() public long getDirectByteBufferCount()
{ {
@ -157,8 +181,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
private long getByteBufferCount(boolean direct) private long getByteBufferCount(boolean direct)
{ {
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect; Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Pool::size).sum(); return Arrays.stream(buckets).mapToLong(Bucket::size).sum();
} }
@ManagedAttribute("The number of pooled direct ByteBuffers that are available") @ManagedAttribute("The number of pooled direct ByteBuffers that are available")
@ -175,8 +199,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
private long getAvailableByteBufferCount(boolean direct) private long getAvailableByteBufferCount(boolean direct)
{ {
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect; Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum(); return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum();
} }
@ManagedAttribute("The bytes retained by direct ByteBuffers") @ManagedAttribute("The bytes retained by direct ByteBuffers")
@ -213,12 +237,11 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
private long getAvailableMemory(boolean direct) private long getAvailableMemory(boolean direct)
{ {
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect; Bucket[] buckets = direct ? _direct : _indirect;
long total = 0L; long total = 0L;
for (int i = 0; i < buckets.length; i++) for (Bucket bucket : buckets)
{ {
Pool<RetainableByteBuffer> bucket = buckets[i]; int capacity = bucket._capacity;
long capacity = (i + 1L) * _factor;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity; total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
} }
return total; return total;
@ -231,11 +254,11 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
clearArray(_indirect, _currentHeapMemory); clearArray(_indirect, _currentHeapMemory);
} }
private void clearArray(Pool<RetainableByteBuffer>[] poolArray, AtomicLong memoryCounter) private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter)
{ {
for (Pool<RetainableByteBuffer> retainableByteBufferPool : poolArray) for (Bucket pool : poolArray)
{ {
for (Pool<RetainableByteBuffer>.Entry entry : retainableByteBufferPool.values()) for (Bucket.Entry entry : pool.values())
{ {
entry.remove(); entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity()); memoryCounter.addAndGet(-entry.getPooled().capacity());
@ -266,13 +289,13 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
long now = System.nanoTime(); long now = System.nanoTime();
long totalClearedCapacity = 0L; long totalClearedCapacity = 0L;
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect; Bucket[] buckets = direct ? _direct : _indirect;
while (totalClearedCapacity < excess) while (totalClearedCapacity < excess)
{ {
for (Pool<RetainableByteBuffer> bucket : buckets) for (Bucket bucket : buckets)
{ {
Pool<RetainableByteBuffer>.Entry oldestEntry = findOldestEntry(now, bucket); Bucket.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null) if (oldestEntry == null)
continue; continue;
@ -293,10 +316,32 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap")); LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap"));
} }
@Override
public String toString()
{
return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}",
super.toString(),
_minCapacity, _maxCapacity,
_direct.length,
_currentHeapMemory.get(), _maxHeapMemory,
_currentDirectMemory.get(), _maxDirectMemory);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(
out,
indent,
this,
DumpableCollection.fromArray("direct", _direct),
DumpableCollection.fromArray("indirect", _indirect));
}
private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket) private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{ {
Pool<RetainableByteBuffer>.Entry oldestEntry = null; Bucket.Entry oldestEntry = null;
for (Pool<RetainableByteBuffer>.Entry entry : bucket.values()) for (Bucket.Entry entry : bucket.values())
{ {
if (oldestEntry != null) if (oldestEntry != null)
{ {
@ -311,4 +356,34 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
} }
return oldestEntry; return oldestEntry;
} }
private static class Bucket extends Pool<RetainableByteBuffer>
{
private final int _capacity;
Bucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
_capacity = capacity;
}
@Override
public String toString()
{
int entries = 0;
int inUse = 0;
for (Entry entry : values())
{
entries++;
if (entry.isInUse())
inUse++;
}
return String.format("%s{capacity=%d,inuse=%d(%d%%)}",
super.toString(),
_capacity,
inUse,
entries > 0 ? (inUse * 100) / entries : 0);
}
}
} }

View File

@ -13,12 +13,15 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -313,4 +316,67 @@ public class ArrayRetainableByteBufferPoolTest
assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getHeapMemory(), is(0L)); assertThat(pool.getHeapMemory(), is(0L));
} }
@Test
public void testExponentialPool() throws IOException
{
ArrayRetainableByteBufferPool pool = new ExponentialPool();
assertThat(pool.acquire(1, false).capacity(), is(1));
assertThat(pool.acquire(2, false).capacity(), is(2));
RetainableByteBuffer b3 = pool.acquire(3, false);
assertThat(b3.capacity(), is(4));
RetainableByteBuffer b4 = pool.acquire(4, false);
assertThat(b4.capacity(), is(4));
int capacity = 4;
while (true)
{
RetainableByteBuffer b = pool.acquire(capacity - 1, false);
assertThat(b.capacity(), Matchers.is(capacity));
b = pool.acquire(capacity, false);
assertThat(b.capacity(), Matchers.is(capacity));
if (capacity >= pool.getMaxCapacity())
break;
b = pool.acquire(capacity + 1, false);
assertThat(b.capacity(), Matchers.is(capacity * 2));
capacity = capacity * 2;
}
b3.release();
b4.getBuffer().limit(b4.getBuffer().capacity() - 2);
assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)"));
}
/**
* A variant of the {@link ArrayRetainableByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
*/
public static class ExponentialPool extends ArrayRetainableByteBufferPool
{
public ExponentialPool()
{
this(0, -1, Integer.MAX_VALUE);
}
public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}
public ExponentialPool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i);
}
}
} }