From d02406c1640ba47647775946f59893f9a2453116 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 20 Feb 2024 11:02:11 +0100 Subject: [PATCH] Fixes #11371 - Review ArrayByteBufferPool eviction. (#11400) * Fixes #11371 - Review ArrayByteBufferPool eviction. * Eviction is now performed on release(), rather than acquire(). * Memory accounting is done on release(), rather than acquire(). This is because we were always exceeding the memory usage on acquire(), by returning a non-pooled buffer. We only need to account for what is idle in the pool, and that is done more efficiently on release(), and it is leak-resistant (i.e. if the buffer is not returned, the memory is already non accounted for, keeping the pool consistent). * Released entries now give precedence to Concurrent.Entry, rather than Queued.Entry, so the queued pool is always kept at minimum size. * Changed eviction algorithm to be simpler: one pass through the buckets excluding the current, trying to remove idle buffers until enough memory is recovered. If successful, the buffer being released is pooled, otherwise it is also discarded. * Added detailed statistics to ArrayByteBufferPool.RetainedBuckets. * Added statisticsEnabled property in Jetty module bytebufferpool.mod. Signed-off-by: Simone Bordet --- .../eclipse/jetty/io/ArrayByteBufferPool.java | 442 +++++++++++------- .../jetty/io/internal/CompoundPool.java | 34 ++ .../eclipse/jetty/io/internal/QueuedPool.java | 24 + .../jetty/io/ArrayByteBufferPoolTest.java | 192 +++++--- .../main/config/etc/jetty-bytebufferpool.xml | 1 + .../main/config/modules/bytebufferpool.mod | 3 + .../eclipse/jetty/util/ConcurrentPool.java | 46 +- .../io/jmh/ArrayByteBufferPoolBenchmark.java | 127 +++++ 8 files changed, 628 insertions(+), 241 deletions(-) create mode 100644 tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 04baf08262d..bd4d71a0925 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -23,7 +23,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.function.Consumer; import java.util.function.IntUnaryOperator; import java.util.stream.Collectors; @@ -32,7 +34,6 @@ import org.eclipse.jetty.io.internal.CompoundPool; import org.eclipse.jetty.io.internal.QueuedPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.ConcurrentPool; -import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -54,7 +55,6 @@ import org.slf4j.LoggerFactory; @ManagedObject public class ArrayByteBufferPool implements ByteBufferPool, Dumpable { - private static final Logger LOG = LoggerFactory.getLogger(ArrayByteBufferPool.class); static final int DEFAULT_FACTOR = 4096; static final int DEFAULT_MAX_CAPACITY_BY_FACTOR = 16; @@ -64,9 +64,10 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable private final int _maxCapacity; private final long _maxHeapMemory; private final long _maxDirectMemory; - private final AtomicLong _currentHeapMemory = new AtomicLong(); - private final AtomicLong _currentDirectMemory = new AtomicLong(); + private final AtomicLong _heapMemory = new AtomicLong(); + private final AtomicLong _directMemory = new AtomicLong(); private final IntUnaryOperator _bucketIndexFor; + private boolean _statisticsEnabled; /** * Creates a new ArrayByteBufferPool with a default configuration. @@ -175,6 +176,17 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable return maxMemory; } + @ManagedAttribute("Whether statistics are enabled") + public boolean isStatisticsEnabled() + { + return _statisticsEnabled; + } + + public void setStatisticsEnabled(boolean enabled) + { + _statisticsEnabled = enabled; + } + @ManagedAttribute("The minimum pooled buffer capacity") public int getMinCapacity() { @@ -191,40 +203,131 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable public RetainableByteBuffer acquire(int size, boolean direct) { RetainedBucket bucket = bucketFor(size, direct); + + // No bucket, return non-pooled. if (bucket == null) - return newRetainableByteBuffer(size, direct, this::removed); + return newRetainableByteBuffer(size, direct, null); + + bucket.recordAcquire(); + + // Try to acquire a pooled entry. Pool.Entry entry = bucket.getPool().acquire(); - - RetainableByteBuffer buffer; - if (entry == null) + if (entry != null) { - Pool.Entry reservedEntry = bucket.getPool().reserve(); - if (reservedEntry != null) - { - buffer = newRetainableByteBuffer(bucket._capacity, direct, retainedBuffer -> - { - BufferUtil.reset(retainedBuffer.getByteBuffer()); - if (!reservedEntry.release()) - reservedEntry.remove(); - }); - - // A reserved entry may become old and be evicted before it is enabled. - if (reservedEntry.enable(buffer, true)) - { - if (direct) - _currentDirectMemory.addAndGet(buffer.capacity()); - else - _currentHeapMemory.addAndGet(buffer.capacity()); - releaseExcessMemory(direct); - return buffer; - } - } - return newRetainableByteBuffer(size, direct, this::removed); + bucket.recordPooled(); + subtractMemory(bucket.getCapacity(), direct); + RetainableByteBuffer buffer = entry.getPooled(); + ((Buffer)buffer).acquire(); + return buffer; } - buffer = entry.getPooled(); - ((Buffer)buffer).acquire(); - return buffer; + return newRetainableByteBuffer(bucket.getCapacity(), direct, buffer -> reserve(bucket, buffer)); + } + + private void reserve(RetainedBucket bucket, RetainableByteBuffer buffer) + { + bucket.recordRelease(); + + boolean direct = buffer.isDirect(); + int capacity = bucket.getCapacity(); + + // Discard the buffer if maxMemory is exceeded. + long excessMemory = addMemoryAndGetExcess(bucket, direct); + if (excessMemory > 0) + { + subtractMemory(capacity, direct); + bucket.recordNonPooled(); + return; + } + + Pool.Entry entry = bucket.getPool().reserve(); + // Cannot reserve, discard the buffer. + if (entry == null) + { + subtractMemory(capacity, direct); + bucket.recordNonPooled(); + return; + } + + ByteBuffer byteBuffer = buffer.getByteBuffer(); + BufferUtil.reset(byteBuffer); + Buffer pooledBuffer = new Buffer(byteBuffer, b -> release(bucket, entry)); + if (entry.enable(pooledBuffer, false)) + return; + + // Discard the buffer if the entry cannot be enabled. + subtractMemory(capacity, direct); + bucket.recordNonPooled(); + entry.remove(); + } + + private void release(RetainedBucket bucket, Pool.Entry entry) + { + bucket.recordRelease(); + + RetainableByteBuffer buffer = entry.getPooled(); + BufferUtil.reset(buffer.getByteBuffer()); + boolean direct = buffer.isDirect(); + int capacity = bucket.getCapacity(); + long excessMemory = addMemoryAndGetExcess(bucket, direct); + if (excessMemory > 0) + { + bucket.recordEvict(); + // If we cannot free enough space for the entry, remove it. + if (!evict(excessMemory, bucket, direct)) + { + subtractMemory(capacity, direct); + bucket.recordRemove(); + entry.remove(); + return; + } + } + + // We have enough space for this entry, pool it. + if (entry.release()) + return; + + // Not enough space, discard this buffer. + subtractMemory(capacity, direct); + bucket.recordRemove(); + entry.remove(); + } + + private long addMemoryAndGetExcess(RetainedBucket bucket, boolean direct) + { + long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; + if (maxMemory < 0) + return -1; + + AtomicLong memory = direct ? _directMemory : _heapMemory; + int capacity = bucket.getCapacity(); + long newMemory = memory.addAndGet(capacity); + // Account the excess at most for the bucket capacity. + return Math.min(capacity, newMemory - maxMemory); + } + + private boolean evict(long excessMemory, RetainedBucket target, boolean direct) + { + RetainedBucket[] buckets = direct ? _direct : _indirect; + int length = buckets.length; + int index = ThreadLocalRandom.current().nextInt(length); + for (int c = 0; c < length; ++c) + { + RetainedBucket bucket = buckets[index++]; + if (index == length) + index = 0; + // Do not evict from the bucket the buffer is released into. + if (bucket == target) + continue; + + int evicted = bucket.evict(); + subtractMemory(evicted, direct); + + excessMemory -= evicted; + if (excessMemory <= 0) + return true; + } + return false; } protected ByteBuffer allocate(int capacity) @@ -237,10 +340,6 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable return ByteBuffer.allocateDirect(capacity); } - protected void removed(RetainableByteBuffer retainedBuffer) - { - } - private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser) { ByteBuffer buffer = BufferUtil.allocate(capacity, direct); @@ -257,7 +356,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable private RetainedBucket bucketFor(int capacity, boolean direct) { - if (capacity < _minCapacity) + if (capacity < getMinCapacity()) return null; int idx = _bucketIndexFor.applyAsInt(capacity); RetainedBucket[] buckets = direct ? _direct : _indirect; @@ -316,10 +415,14 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable private long getMemory(boolean direct) { - if (direct) - return _currentDirectMemory.get(); - else - return _currentHeapMemory.get(); + AtomicLong memory = direct ? _directMemory : _heapMemory; + return memory.longValue(); + } + + private void subtractMemory(int amount, boolean direct) + { + AtomicLong memory = direct ? _directMemory : _heapMemory; + memory.addAndGet(-amount); } @ManagedAttribute("The available bytes retained by direct ByteBuffers") @@ -340,7 +443,7 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable long total = 0L; for (RetainedBucket bucket : buckets) { - long capacity = bucket._capacity; + long capacity = bucket.getCapacity(); total += bucket.getPool().getIdleCount() * capacity; } return total; @@ -349,105 +452,20 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") public void clear() { - clearArray(_direct, _currentDirectMemory); - clearArray(_indirect, _currentHeapMemory); + clearBuckets(_direct); + _directMemory.set(0); + clearBuckets(_indirect); + _heapMemory.set(0); } - private void clearArray(RetainedBucket[] poolArray, AtomicLong memoryCounter) + private void clearBuckets(RetainedBucket[] buckets) { - for (RetainedBucket bucket : poolArray) + for (RetainedBucket bucket : buckets) { - bucket.getPool().stream().forEach(entry -> - { - if (entry.remove()) - { - RetainableByteBuffer pooled = entry.getPooled(); - // Calling getPooled can return null if the entry was not yet enabled. - if (pooled != null) - { - memoryCounter.addAndGet(-pooled.capacity()); - removed(pooled); - } - } - }); + bucket.clear(); } } - private void releaseExcessMemory(boolean direct) - { - long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; - if (maxMemory > 0) - { - long excess = getMemory(direct) - maxMemory; - if (excess > 0) - evict(direct, excess); - } - } - - /** - * This eviction mechanism searches for the RetainableByteBuffers that were released the longest time ago. - * - * @param direct true to search in the direct buffers buckets, false to search in the heap buffers buckets. - * @param excess the amount of bytes to evict. At least this much will be removed from the buckets. - */ - private void evict(boolean direct, long excess) - { - if (LOG.isDebugEnabled()) - LOG.debug("evicting {} bytes from {} pools", excess, (direct ? "direct" : "heap")); - long now = NanoTime.now(); - long totalClearedCapacity = 0L; - - RetainedBucket[] buckets = direct ? _direct : _indirect; - - while (totalClearedCapacity < excess) - { - // Run through all the buckets to avoid removing - // the buffers only from the first bucket(s). - for (RetainedBucket bucket : buckets) - { - Pool.Entry oldestEntry = findOldestEntry(now, bucket.getPool()); - if (oldestEntry == null) - continue; - - // Get the pooled buffer now in case we can evict below. - // The buffer may be null if the entry has been reserved but - // not yet enabled, or the entry has been removed by a concurrent - // thread, that may also have nulled-out the pooled buffer. - RetainableByteBuffer buffer = oldestEntry.getPooled(); - if (buffer == null) - continue; - - // A concurrent thread evicted the same entry. - // A successful Entry.remove() call may null-out the pooled buffer. - if (!oldestEntry.remove()) - continue; - - // We can evict, clear the buffer capacity. - int clearedCapacity = buffer.capacity(); - if (direct) - _currentDirectMemory.addAndGet(-clearedCapacity); - else - _currentHeapMemory.addAndGet(-clearedCapacity); - totalClearedCapacity += clearedCapacity; - removed(buffer); - } - } - - if (LOG.isDebugEnabled()) - LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap")); - } - - @Override - public String toString() - { - return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}", - super.toString(), - _minCapacity, _maxCapacity, - _direct.length, - _currentHeapMemory.get(), _maxHeapMemory, - _currentDirectMemory.get(), _maxDirectMemory); - } - @Override public void dump(Appendable out, String indent) throws IOException { @@ -459,35 +477,25 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable DumpableCollection.fromArray("indirect", _indirect)); } - private Pool.Entry findOldestEntry(long now, Pool bucket) + @Override + public String toString() { - // This method may be in the hot path, do not use Java streams. - - Pool.Entry oldestEntry = null; - RetainableByteBuffer oldestBuffer = null; - long oldestAge = 0; - // TODO: improve Pool APIs to avoid stream().toList(). - for (Pool.Entry entry : bucket.stream().toList()) - { - Buffer buffer = (Buffer)entry.getPooled(); - // A null buffer means the entry is reserved - // but not acquired yet, try the next. - if (buffer != null) - { - long age = NanoTime.elapsed(buffer.getLastNanoTime(), now); - if (oldestBuffer == null || age > oldestAge) - { - oldestEntry = entry; - oldestBuffer = buffer; - oldestAge = age; - } - } - } - return oldestEntry; + return String.format("%s{min=%d,max=%d,buckets=%d,heap=%d/%d,direct=%d/%d}", + super.toString(), + _minCapacity, _maxCapacity, + _direct.length, + getMemory(false), _maxHeapMemory, + getMemory(true), _maxDirectMemory); } - private static class RetainedBucket + private class RetainedBucket { + private final LongAdder _acquires = new LongAdder(); + private final LongAdder _pooled = new LongAdder(); + private final LongAdder _nonPooled = new LongAdder(); + private final LongAdder _evicts = new LongAdder(); + private final LongAdder _removes = new LongAdder(); + private final LongAdder _releases = new LongAdder(); private final Pool _pool; private final int _capacity; @@ -496,18 +504,87 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable if (poolSize <= ConcurrentPool.OPTIMAL_MAX_SIZE) _pool = new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, poolSize, e -> 1); else - _pool = new CompoundPool<>( + _pool = new BucketCompoundPool( new ConcurrentPool<>(ConcurrentPool.StrategyType.THREAD_ID, ConcurrentPool.OPTIMAL_MAX_SIZE, e -> 1), new QueuedPool<>(poolSize - ConcurrentPool.OPTIMAL_MAX_SIZE) ); _capacity = capacity; } + public void recordAcquire() + { + if (isStatisticsEnabled()) + _acquires.increment(); + } + + public void recordEvict() + { + if (isStatisticsEnabled()) + _evicts.increment(); + } + + public void recordNonPooled() + { + if (isStatisticsEnabled()) + _nonPooled.increment(); + } + + public void recordPooled() + { + if (isStatisticsEnabled()) + _pooled.increment(); + } + + public void recordRelease() + { + if (isStatisticsEnabled()) + _releases.increment(); + } + + public void recordRemove() + { + if (isStatisticsEnabled()) + _removes.increment(); + } + + private int getCapacity() + { + return _capacity; + } + private Pool getPool() { return _pool; } + private int evict() + { + Pool.Entry entry; + if (_pool instanceof BucketCompoundPool compound) + entry = compound.evict(); + else + entry = _pool.acquire(); + + if (entry == null) + return 0; + + recordRemove(); + entry.remove(); + + return getCapacity(); + } + + public void clear() + { + _acquires.reset(); + _pooled.reset(); + _nonPooled.reset(); + _evicts.reset(); + _removes.reset(); + _releases.reset(); + getPool().stream().forEach(Pool.Entry::remove); + } + @Override public String toString() { @@ -520,18 +597,44 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable inUse++; } - return String.format("%s{capacity=%d,inuse=%d(%d%%)}", + long pooled = _pooled.longValue(); + long acquires = _acquires.longValue(); + float hitRatio = acquires == 0 ? Float.NaN : pooled * 100F / acquires; + return String.format("%s{capacity=%d,in-use=%d/%d,pooled/acquires=%d/%d(%.3f%%),non-pooled/evicts/removes/releases=%d/%d/%d/%d}", super.toString(), - _capacity, + getCapacity(), inUse, - entries > 0 ? (inUse * 100) / entries : 0); + entries, + pooled, + acquires, + hitRatio, + _nonPooled.longValue(), + _evicts.longValue(), + _removes.longValue(), + _releases.longValue() + ); + } + + private static class BucketCompoundPool extends CompoundPool + { + private BucketCompoundPool(ConcurrentPool concurrentBucket, QueuedPool queuedBucket) + { + super(concurrentBucket, queuedBucket); + } + + private Pool.Entry evict() + { + Entry entry = getSecondaryPool().acquire(); + if (entry == null) + entry = getPrimaryPool().acquire(); + return entry; + } } } private static class Buffer extends AbstractRetainableByteBuffer { private final Consumer releaser; - private final AtomicLong lastNanoTime = new AtomicLong(NanoTime.now()); private Buffer(ByteBuffer buffer, Consumer releaser) { @@ -545,16 +648,11 @@ public class ArrayByteBufferPool implements ByteBufferPool, Dumpable boolean released = super.release(); if (released) { - lastNanoTime.setOpaque(NanoTime.now()); - releaser.accept(this); + if (releaser != null) + releaser.accept(this); } return released; } - - public long getLastNanoTime() - { - return lastNanoTime.getOpaque(); - } } /** diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/CompoundPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/CompoundPool.java index 19800783547..b66ac5d2fe4 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/CompoundPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/CompoundPool.java @@ -35,6 +35,16 @@ public class CompoundPool

implements Pool

this.secondaryPool = secondaryPool; } + public Pool

getPrimaryPool() + { + return primaryPool; + } + + public Pool

getSecondaryPool() + { + return secondaryPool; + } + @Override public Entry

reserve() { @@ -81,4 +91,28 @@ public class CompoundPool

implements Pool

{ return Stream.concat(primaryPool.stream(), secondaryPool.stream()); } + + @Override + public int getReservedCount() + { + return primaryPool.getReservedCount() + secondaryPool.getReservedCount(); + } + + @Override + public int getIdleCount() + { + return primaryPool.getIdleCount() + secondaryPool.getIdleCount(); + } + + @Override + public int getInUseCount() + { + return primaryPool.getInUseCount() + secondaryPool.getInUseCount(); + } + + @Override + public int getTerminatedCount() + { + return primaryPool.getTerminatedCount() + secondaryPool.getTerminatedCount(); + } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/QueuedPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/QueuedPool.java index d6c5bc1b1c7..e0f65310bec 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/QueuedPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/QueuedPool.java @@ -171,6 +171,30 @@ public class QueuedPool

implements Pool

return queue.stream(); } + @Override + public int getReservedCount() + { + return 0; + } + + @Override + public int getIdleCount() + { + return size(); + } + + @Override + public int getInUseCount() + { + return 0; + } + + @Override + public int getTerminatedCount() + { + return 0; + } + private static class QueuedEntry

implements Entry

{ private final QueuedPool

pool; diff --git a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 153042fc5de..11f2f6da606 100644 --- a/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-core/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -13,16 +13,20 @@ package org.eclipse.jetty.io; -import java.lang.ref.Reference; import java.nio.ByteOrder; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import org.eclipse.jetty.io.internal.CompoundPool; +import org.eclipse.jetty.util.ConcurrentPool; +import org.eclipse.jetty.util.Pool; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.not; @@ -30,6 +34,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ArrayByteBufferPoolTest { @@ -41,25 +46,25 @@ public class ArrayByteBufferPoolTest List buffers = new ArrayList<>(); buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(10, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.add(pool.acquire(20, true)); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); - assertThat(pool.getDirectByteBufferCount(), greaterThan(0L)); - assertThat(pool.getDirectMemory(), greaterThan(0L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); buffers.forEach(RetainableByteBuffer::release); @@ -67,8 +72,8 @@ public class ArrayByteBufferPoolTest assertThat(pool.getAvailableDirectByteBufferCount(), lessThan((long)buffers.size())); assertThat(pool.getDirectByteBufferCount(), greaterThan(0L)); assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size())); - assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); assertThat(pool.getDirectMemory(), greaterThan(0L)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); } @Test @@ -108,10 +113,10 @@ public class ArrayByteBufferPoolTest RetainableByteBuffer buf1 = pool.acquire(10, true); - assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); - assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); assertThat(buf1.isRetained(), is(false)); buf1.retain(); @@ -122,10 +127,10 @@ public class ArrayByteBufferPoolTest assertThat(buf1.release(), is(false)); assertThat(buf1.isRetained(), is(false)); - assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); - assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); assertThat(buf1.release(), is(true)); assertThat(buf1.isRetained(), is(false)); @@ -143,10 +148,10 @@ public class ArrayByteBufferPoolTest RetainableByteBuffer buf1 = pool.acquire(10, true); - assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); - assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); buf1.release(); @@ -168,49 +173,55 @@ public class ArrayByteBufferPoolTest { ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, 20, 2); - RetainableByteBuffer buf1 = pool.acquire(1, true); // pooled + RetainableByteBuffer buf1 = pool.acquire(1, true); assertThat(buf1.capacity(), is(10)); - RetainableByteBuffer buf2 = pool.acquire(1, true); // pooled + RetainableByteBuffer buf2 = pool.acquire(1, true); assertThat(buf2.capacity(), is(10)); - RetainableByteBuffer buf3 = pool.acquire(1, true); // not pooled, bucket is full - assertThat(buf3.capacity(), is(1)); + RetainableByteBuffer buf3 = pool.acquire(1, true); + assertThat(buf3.capacity(), is(10)); + + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + + assertTrue(buf1.release()); // pooled + assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertTrue(buf2.release()); // pooled + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertTrue(buf3.release()); // not pooled, bucket is full. + assertThat(pool.getDirectByteBufferCount(), is(2L)); + + RetainableByteBuffer buf4 = pool.acquire(11, true); + assertThat(buf4.capacity(), is(20)); + RetainableByteBuffer buf5 = pool.acquire(11, true); + assertThat(buf5.capacity(), is(20)); + RetainableByteBuffer buf6 = pool.acquire(11, true); + assertThat(buf6.capacity(), is(20)); assertThat(pool.getDirectByteBufferCount(), is(2L)); assertThat(pool.getDirectMemory(), is(20L)); - RetainableByteBuffer buf4 = pool.acquire(11, true); // pooled - assertThat(buf4.capacity(), is(20)); - RetainableByteBuffer buf5 = pool.acquire(11, true); // pooled - assertThat(buf5.capacity(), is(20)); - RetainableByteBuffer buf6 = pool.acquire(11, true); // not pooled, bucket is full - assertThat(buf6.capacity(), is(11)); - - // Need to keep the references around past the asserts above. - Reference.reachabilityFence(buf1); - Reference.reachabilityFence(buf2); - Reference.reachabilityFence(buf3); - Reference.reachabilityFence(buf4); - Reference.reachabilityFence(buf5); - Reference.reachabilityFence(buf6); - + assertTrue(buf4.release()); // pooled + assertThat(pool.getDirectByteBufferCount(), is(3L)); + assertTrue(buf5.release()); // pooled + assertThat(pool.getDirectByteBufferCount(), is(4L)); + assertTrue(buf6.release()); // not pooled, bucket is full. assertThat(pool.getDirectByteBufferCount(), is(4L)); - assertThat(pool.getDirectMemory(), is(60L)); } @Test - public void testBufferReleaseRepools() + public void testBufferReleaseRePools() { ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, 20, 1); List all = new ArrayList<>(); - all.add(pool.acquire(1, true)); // pooled - all.add(pool.acquire(1, true)); // not pooled, bucket is full - all.add(pool.acquire(11, true)); // pooled - all.add(pool.acquire(11, true)); // not pooled, bucket is full + all.add(pool.acquire(1, true)); + all.add(pool.acquire(1, true)); + all.add(pool.acquire(11, true)); + all.add(pool.acquire(11, true)); - assertThat(pool.getDirectByteBufferCount(), is(2L)); - assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); @@ -227,21 +238,25 @@ public class ArrayByteBufferPoolTest { ArrayByteBufferPool pool = new ArrayByteBufferPool(10, 10, 20, Integer.MAX_VALUE); - RetainableByteBuffer b1 = pool.acquire(1, true); // not pooled, < minCapacity - RetainableByteBuffer b2 = pool.acquire(10, true); // pooled - RetainableByteBuffer b3 = pool.acquire(20, true); // pooled - RetainableByteBuffer b4 = pool.acquire(30, true); // not pooled, > maxCapacity + RetainableByteBuffer buf1 = pool.acquire(1, true); + RetainableByteBuffer buf2 = pool.acquire(10, true); + RetainableByteBuffer buf3 = pool.acquire(20, true); + RetainableByteBuffer buf4 = pool.acquire(30, true); - assertThat(pool.getDirectByteBufferCount(), is(2L)); - assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); - // Need to keep the references around past the asserts above. - Reference.reachabilityFence(b1); - Reference.reachabilityFence(b2); - Reference.reachabilityFence(b3); - Reference.reachabilityFence(b4); + assertTrue(buf1.release()); // not pooled, < minCapacity + assertTrue(buf2.release()); // pooled + assertTrue(buf3.release()); // pooled + assertTrue(buf4.release()); // not pooled, > maxCapacity + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(2L)); + assertThat(pool.getAvailableDirectMemory(), is(30L)); } @Test @@ -249,17 +264,19 @@ public class ArrayByteBufferPoolTest { ArrayByteBufferPool pool = new ArrayByteBufferPool(); - RetainableByteBuffer b1 = pool.acquire(10, true); - RetainableByteBuffer b2 = pool.acquire(10, true); + RetainableByteBuffer buffer1 = pool.acquire(10, true); + RetainableByteBuffer buffer2 = pool.acquire(10, true); - assertThat(pool.getDirectByteBufferCount(), is(2L)); - assertThat(pool.getDirectMemory(), is(2L * ArrayByteBufferPool.DEFAULT_FACTOR)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); assertThat(pool.getAvailableDirectMemory(), is(0L)); - // Need to keep the references around past the asserts above. - Reference.reachabilityFence(b1); - Reference.reachabilityFence(b2); + buffer2.release(); + buffer1.release(); + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(2L * ArrayByteBufferPool.DEFAULT_FACTOR)); pool.clear(); @@ -273,23 +290,33 @@ public class ArrayByteBufferPoolTest public void testRetainAfterRePooledThrows() { ArrayByteBufferPool pool = new ArrayByteBufferPool(); + RetainableByteBuffer buf1 = pool.acquire(10, true); - assertThat(pool.getDirectByteBufferCount(), is(1L)); + + assertThat(pool.getDirectByteBufferCount(), is(0L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(buf1.release(), is(true)); assertThrows(IllegalStateException.class, buf1::retain); assertThrows(IllegalStateException.class, buf1::release); assertThat(pool.getDirectByteBufferCount(), is(1L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); - // check that the buffer is still available + // Check that the buffer is still available. RetainableByteBuffer buf2 = pool.acquire(10, true); assertThat(pool.getDirectByteBufferCount(), is(1L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); - assertThat(buf2 == buf1, is(true)); // make sure it's not a new instance + // The ByteBuffer is re-wrapped by a different RetainableByteBuffer upon the first release. + assertThat(buf2, not(sameInstance(buf1))); + assertThat(buf2.getByteBuffer(), sameInstance(buf1.getByteBuffer())); + assertThat(buf2.release(), is(true)); assertThat(pool.getDirectByteBufferCount(), is(1L)); assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + + RetainableByteBuffer buf3 = pool.acquire(10, true); + assertThat(buf3, sameInstance(buf2)); + assertThat(buf3.release(), is(true)); } @Test @@ -345,7 +372,8 @@ public class ArrayByteBufferPoolTest RetainableByteBuffer retain5 = pool.acquire(5, false); retain5.release(); RetainableByteBuffer retain6 = pool.acquire(6, false); - assertThat(retain6, sameInstance(retain5)); + assertThat(retain6, not(sameInstance(retain5))); + assertThat(retain6.getByteBuffer(), sameInstance(retain5.getByteBuffer())); retain6.release(); RetainableByteBuffer retain9 = pool.acquire(9, false); assertThat(retain9, not(sameInstance(retain5))); @@ -386,4 +414,34 @@ public class ArrayByteBufferPoolTest assertThat(buffer.release(), is(true)); assertThat(buffer.getByteBuffer().order(), Matchers.is(ByteOrder.BIG_ENDIAN)); } + + @Test + public void testReleaseExcessMemory() + { + int maxCapacity = 20; + int maxBucketSize = ConcurrentPool.OPTIMAL_MAX_SIZE * 2; + int maxMemory = maxCapacity * maxBucketSize / 2; + ArrayByteBufferPool pool = new ArrayByteBufferPool(0, 10, maxCapacity, maxBucketSize, maxMemory, maxMemory); + + // It is always possible to acquire beyond maxMemory, because + // the buffers are in use and not really retained in the pool. + List buffers = new ArrayList<>(); + for (int i = 0; i < maxBucketSize; ++i) + { + buffers.add(pool.acquire(maxCapacity, true)); + } + + // The last entries acquired are from the queued pool. + // Release in reverse order to release first the queued + // entries, but then the concurrent entries should be + // pooled, and the queued entries removed. + Collections.reverse(buffers); + buffers.forEach(RetainableByteBuffer::release); + + Pool bucketPool = pool.poolFor(maxCapacity, true); + assertThat(bucketPool, instanceOf(CompoundPool.class)); + CompoundPool compoundPool = (CompoundPool)bucketPool; + assertThat(compoundPool.getPrimaryPool().size(), is(ConcurrentPool.OPTIMAL_MAX_SIZE)); + assertThat(compoundPool.getSecondaryPool().size(), is(0)); + } } diff --git a/jetty-core/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml b/jetty-core/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml index d95e7773182..fae80ed9a71 100644 --- a/jetty-core/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml +++ b/jetty-core/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml @@ -8,5 +8,6 @@ + diff --git a/jetty-core/jetty-server/src/main/config/modules/bytebufferpool.mod b/jetty-core/jetty-server/src/main/config/modules/bytebufferpool.mod index 1cd2c0283d7..4bdfaaecd54 100644 --- a/jetty-core/jetty-server/src/main/config/modules/bytebufferpool.mod +++ b/jetty-core/jetty-server/src/main/config/modules/bytebufferpool.mod @@ -31,3 +31,6 @@ etc/jetty-bytebufferpool.xml ## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited). #jetty.byteBufferPool.maxDirectMemory=0 + +## Whether statistics are enabled. +#jetty.byteBufferPool.statisticsEnabled=false diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java index fba6f78f7b1..22cae03b0d2 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/ConcurrentPool.java @@ -22,6 +22,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; +import java.util.function.Predicate; import java.util.function.ToIntFunction; import java.util.stream.Stream; @@ -148,6 +149,11 @@ public class ConcurrentPool

implements Pool

, Dumpable leaked.increment(); if (LOG.isDebugEnabled()) LOG.debug("Leaked " + holder); + leaked(); + } + + protected void leaked() + { } @Override @@ -194,8 +200,8 @@ public class ConcurrentPool

implements Pool

, Dumpable Holder

holder = entries.get(i); if (holder.getEntry() == null) { - leaked(holder); entries.remove(i--); + leaked(holder); } } } @@ -222,8 +228,8 @@ public class ConcurrentPool

implements Pool

, Dumpable ConcurrentEntry

entry = (ConcurrentEntry

)holder.getEntry(); if (entry == null) { - leaked(holder); entries.remove(index); + leaked(holder); continue; } @@ -348,6 +354,42 @@ public class ConcurrentPool

implements Pool

, Dumpable return entries.stream().map(Holder::getEntry).filter(Objects::nonNull); } + @Override + public int getReservedCount() + { + return getCount(Entry::isReserved); + } + + @Override + public int getIdleCount() + { + return getCount(Entry::isIdle); + } + + @Override + public int getInUseCount() + { + return getCount(Entry::isInUse); + } + + @Override + public int getTerminatedCount() + { + return getCount(Entry::isTerminated); + } + + private int getCount(Predicate> predicate) + { + int count = 0; + for (Holder

holder : entries) + { + Entry

entry = holder.getEntry(); + if (entry != null && predicate.test(entry)) + count++; + } + return count; + } + @Override public void dump(Appendable out, String indent) throws IOException { diff --git a/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java new file mode 100644 index 00000000000..a2bd546316d --- /dev/null +++ b/tests/jetty-jmh/src/main/java/org/eclipse/jetty/io/jmh/ArrayByteBufferPoolBenchmark.java @@ -0,0 +1,127 @@ +// +// ======================================================================== +// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io.jmh; + +import java.util.concurrent.ThreadLocalRandom; + +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.profile.AsyncProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; + +@State(Scope.Benchmark) +public class ArrayByteBufferPoolBenchmark +{ + public static void main(String[] args) throws RunnerException + { + String asyncProfilerPath = "/home/simon/programs/async-profiler/lib/libasyncProfiler.so"; + Options opt = new OptionsBuilder() + .include(ArrayByteBufferPoolBenchmark.class.getSimpleName()) + .warmupIterations(10) + .warmupTime(TimeValue.milliseconds(500)) + .measurementIterations(10) + .measurementTime(TimeValue.milliseconds(500)) + .addProfiler(AsyncProfiler.class, "dir=/tmp;output=flamegraph;event=cpu;interval=500000;libPath=" + asyncProfilerPath) + .forks(1) + .threads(10) + .build(); + new Runner(opt).run(); + } + + @Param("0") + int minCapacity; + @Param("65536") + int maxCapacity; + @Param("4096") + int factor; + @Param("-1") + int maxBucketSize; + @Param({"0", "1048576"}) + long maxMemory; + @Param({"true"}) + boolean statisticsEnabled; + + ArrayByteBufferPool pool; + + @Setup + public void prepare() + { + pool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity, maxBucketSize, maxMemory, maxMemory); + pool.setStatisticsEnabled(statisticsEnabled); + } + + @TearDown + public void dispose() + { + System.out.println(pool.dump()); + } + + @Benchmark + @BenchmarkMode({Mode.Throughput}) + public void inputFixedCapacityOutputRandomCapacity() + { + // Simulate a read from the network. + RetainableByteBuffer input = pool.acquire(61440, true); + + // Simulate a write of random size from the application. + int capacity = ThreadLocalRandom.current().nextInt(minCapacity, maxCapacity); + RetainableByteBuffer output = pool.acquire(capacity, true); + + output.release(); + input.release(); + } + + int iterations; + + @Setup(Level.Iteration) + public void migrate() + { + ++iterations; + if (iterations == 15) + System.out.println(pool.dump()); + } + + @Benchmark + @BenchmarkMode({Mode.Throughput}) + public void inputFixedCapacityOutputRandomCapacityMigrating() + { + // Simulate a read from the network. + RetainableByteBuffer input = pool.acquire(8192, true); + + // Simulate a write of random size from the application. + // Simulate a change in buffer sizes after half of the iterations. + int capacity; + if (iterations <= 15) + capacity = ThreadLocalRandom.current().nextInt(minCapacity, maxCapacity / 2); + else + capacity = ThreadLocalRandom.current().nextInt(maxCapacity / 2, maxCapacity); + RetainableByteBuffer output = pool.acquire(capacity, true); + + output.release(); + input.release(); + } +}