diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index bfb498ad124..8da443ebd9d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -20,8 +20,6 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; -import org.eclipse.jetty.util.BufferUtil; - public class ArrayByteBufferPool implements ByteBufferPool { private final int _min; @@ -63,8 +61,8 @@ public class ArrayByteBufferPool implements ByteBufferPool for (int i=0;i<_direct.length;i++) { size+=_inc; - _direct[i]=new ByteBufferPool.Bucket(size,_maxQueue); - _indirect[i]=new ByteBufferPool.Bucket(size,_maxQueue); + _direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue); + _indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue); } } @@ -73,7 +71,7 @@ public class ArrayByteBufferPool implements ByteBufferPool { ByteBufferPool.Bucket bucket = bucketFor(size,direct); if (bucket==null) - return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size); + return newByteBuffer(size,direct); return bucket.acquire(direct); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 4944dac91c5..4bb2afce13d 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -55,6 +55,11 @@ public interface ByteBufferPool * @see #acquire(int, boolean) */ public void release(ByteBuffer buffer); + + default ByteBuffer newByteBuffer(int capacity, boolean direct) + { + return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity); + } public static class Lease { @@ -119,14 +124,17 @@ public interface ByteBufferPool } } + class Bucket { + private final ByteBufferPool _pool; private final int _capacity; private final AtomicInteger _space; private final Queue _queue= new ConcurrentArrayQueue<>(); - public Bucket(int bufferSize,int maxSize) + public Bucket(ByteBufferPool pool, int bufferSize,int maxSize) { + _pool=pool; _capacity=bufferSize; _space=maxSize>0?new AtomicInteger(maxSize):null; } @@ -146,7 +154,7 @@ public interface ByteBufferPool { ByteBuffer buffer = _queue.poll(); if (buffer == null) - return direct ? BufferUtil.allocateDirect(_capacity) : BufferUtil.allocate(_capacity); + return _pool.newByteBuffer(_capacity,direct); if (_space!=null) _space.incrementAndGet(); return buffer; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java index 335d6ccba7b..11af00c7273 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.eclipse.jetty.util.BufferUtil; @@ -31,6 +32,7 @@ public class MappedByteBufferPool implements ByteBufferPool private final ConcurrentMap heapBuffers = new ConcurrentHashMap<>(); private final int _factor; private final int _maxQueue; + private final Function _newBucket; public MappedByteBufferPool() { @@ -39,13 +41,19 @@ public class MappedByteBufferPool implements ByteBufferPool public MappedByteBufferPool(int factor) { - this(factor,-1); + this(factor,-1,null); } public MappedByteBufferPool(int factor,int maxQueue) + { + this(factor,maxQueue,null); + } + + public MappedByteBufferPool(int factor,int maxQueue,Function newBucket) { _factor = factor<=0?1024:factor; - _maxQueue=maxQueue; + _maxQueue = maxQueue; + _newBucket = newBucket!=null?newBucket:i->new Bucket(this,i*_factor,_maxQueue); } @Override @@ -60,12 +68,6 @@ public class MappedByteBufferPool implements ByteBufferPool return bucket.acquire(direct); } - protected ByteBuffer newByteBuffer(int capacity, boolean direct) - { - return direct ? BufferUtil.allocateDirect(capacity) - : BufferUtil.allocate(capacity); - } - @Override public void release(ByteBuffer buffer) { @@ -78,13 +80,15 @@ public class MappedByteBufferPool implements ByteBufferPool int b = bucketFor(buffer.capacity()); ConcurrentMap buckets = bucketsFor(buffer.isDirect()); - Bucket bucket = buckets.computeIfAbsent(b,bi->new Bucket(b*_factor,_maxQueue)); + Bucket bucket = buckets.computeIfAbsent(b,_newBucket); bucket.release(buffer); } public void clear() { + directBuffers.values().forEach(Bucket::clear); directBuffers.clear(); + heapBuffers.values().forEach(Bucket::clear); heapBuffers.clear(); } @@ -107,7 +111,7 @@ public class MappedByteBufferPool implements ByteBufferPool private final AtomicInteger tag = new AtomicInteger(); @Override - protected ByteBuffer newByteBuffer(int capacity, boolean direct) + public ByteBuffer newByteBuffer(int capacity, boolean direct) { ByteBuffer buffer = super.newByteBuffer(capacity + 4, direct); buffer.limit(buffer.capacity());