Issue #890 Review MappedByteBuffer
Moved newByteBuffer to a default method on ByteBufferPool preallocate newBuffer function clear buckets
This commit is contained in:
parent
e0c83757aa
commit
13696b54f2
|
@ -20,8 +20,6 @@ package org.eclipse.jetty.io;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
|
||||||
|
|
||||||
public class ArrayByteBufferPool implements ByteBufferPool
|
public class ArrayByteBufferPool implements ByteBufferPool
|
||||||
{
|
{
|
||||||
private final int _min;
|
private final int _min;
|
||||||
|
@ -63,8 +61,8 @@ public class ArrayByteBufferPool implements ByteBufferPool
|
||||||
for (int i=0;i<_direct.length;i++)
|
for (int i=0;i<_direct.length;i++)
|
||||||
{
|
{
|
||||||
size+=_inc;
|
size+=_inc;
|
||||||
_direct[i]=new ByteBufferPool.Bucket(size,_maxQueue);
|
_direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
|
||||||
_indirect[i]=new ByteBufferPool.Bucket(size,_maxQueue);
|
_indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +71,7 @@ public class ArrayByteBufferPool implements ByteBufferPool
|
||||||
{
|
{
|
||||||
ByteBufferPool.Bucket bucket = bucketFor(size,direct);
|
ByteBufferPool.Bucket bucket = bucketFor(size,direct);
|
||||||
if (bucket==null)
|
if (bucket==null)
|
||||||
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
|
return newByteBuffer(size,direct);
|
||||||
|
|
||||||
return bucket.acquire(direct);
|
return bucket.acquire(direct);
|
||||||
|
|
||||||
|
|
|
@ -56,6 +56,11 @@ public interface ByteBufferPool
|
||||||
*/
|
*/
|
||||||
public void release(ByteBuffer buffer);
|
public void release(ByteBuffer buffer);
|
||||||
|
|
||||||
|
default ByteBuffer newByteBuffer(int capacity, boolean direct)
|
||||||
|
{
|
||||||
|
return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
|
||||||
|
}
|
||||||
|
|
||||||
public static class Lease
|
public static class Lease
|
||||||
{
|
{
|
||||||
private final ByteBufferPool byteBufferPool;
|
private final ByteBufferPool byteBufferPool;
|
||||||
|
@ -119,14 +124,17 @@ public interface ByteBufferPool
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class Bucket
|
class Bucket
|
||||||
{
|
{
|
||||||
|
private final ByteBufferPool _pool;
|
||||||
private final int _capacity;
|
private final int _capacity;
|
||||||
private final AtomicInteger _space;
|
private final AtomicInteger _space;
|
||||||
private final Queue<ByteBuffer> _queue= new ConcurrentArrayQueue<>();
|
private final Queue<ByteBuffer> _queue= new ConcurrentArrayQueue<>();
|
||||||
|
|
||||||
public Bucket(int bufferSize,int maxSize)
|
public Bucket(ByteBufferPool pool, int bufferSize,int maxSize)
|
||||||
{
|
{
|
||||||
|
_pool=pool;
|
||||||
_capacity=bufferSize;
|
_capacity=bufferSize;
|
||||||
_space=maxSize>0?new AtomicInteger(maxSize):null;
|
_space=maxSize>0?new AtomicInteger(maxSize):null;
|
||||||
}
|
}
|
||||||
|
@ -146,7 +154,7 @@ public interface ByteBufferPool
|
||||||
{
|
{
|
||||||
ByteBuffer buffer = _queue.poll();
|
ByteBuffer buffer = _queue.poll();
|
||||||
if (buffer == null)
|
if (buffer == null)
|
||||||
return direct ? BufferUtil.allocateDirect(_capacity) : BufferUtil.allocate(_capacity);
|
return _pool.newByteBuffer(_capacity,direct);
|
||||||
if (_space!=null)
|
if (_space!=null)
|
||||||
_space.incrementAndGet();
|
_space.incrementAndGet();
|
||||||
return buffer;
|
return buffer;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.function.Function;
|
||||||
|
|
||||||
import org.eclipse.jetty.util.BufferUtil;
|
import org.eclipse.jetty.util.BufferUtil;
|
||||||
|
|
||||||
|
@ -31,6 +32,7 @@ public class MappedByteBufferPool implements ByteBufferPool
|
||||||
private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>();
|
private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>();
|
||||||
private final int _factor;
|
private final int _factor;
|
||||||
private final int _maxQueue;
|
private final int _maxQueue;
|
||||||
|
private final Function<Integer, Bucket> _newBucket;
|
||||||
|
|
||||||
public MappedByteBufferPool()
|
public MappedByteBufferPool()
|
||||||
{
|
{
|
||||||
|
@ -39,13 +41,19 @@ public class MappedByteBufferPool implements ByteBufferPool
|
||||||
|
|
||||||
public MappedByteBufferPool(int factor)
|
public MappedByteBufferPool(int factor)
|
||||||
{
|
{
|
||||||
this(factor,-1);
|
this(factor,-1,null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public MappedByteBufferPool(int factor,int maxQueue)
|
public MappedByteBufferPool(int factor,int maxQueue)
|
||||||
|
{
|
||||||
|
this(factor,maxQueue,null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MappedByteBufferPool(int factor,int maxQueue,Function<Integer, Bucket> newBucket)
|
||||||
{
|
{
|
||||||
_factor = factor<=0?1024:factor;
|
_factor = factor<=0?1024:factor;
|
||||||
_maxQueue=maxQueue;
|
_maxQueue = maxQueue;
|
||||||
|
_newBucket = newBucket!=null?newBucket:i->new Bucket(this,i*_factor,_maxQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -60,12 +68,6 @@ public class MappedByteBufferPool implements ByteBufferPool
|
||||||
return bucket.acquire(direct);
|
return bucket.acquire(direct);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ByteBuffer newByteBuffer(int capacity, boolean direct)
|
|
||||||
{
|
|
||||||
return direct ? BufferUtil.allocateDirect(capacity)
|
|
||||||
: BufferUtil.allocate(capacity);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void release(ByteBuffer buffer)
|
public void release(ByteBuffer buffer)
|
||||||
{
|
{
|
||||||
|
@ -78,13 +80,15 @@ public class MappedByteBufferPool implements ByteBufferPool
|
||||||
int b = bucketFor(buffer.capacity());
|
int b = bucketFor(buffer.capacity());
|
||||||
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
|
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
|
||||||
|
|
||||||
Bucket bucket = buckets.computeIfAbsent(b,bi->new Bucket(b*_factor,_maxQueue));
|
Bucket bucket = buckets.computeIfAbsent(b,_newBucket);
|
||||||
bucket.release(buffer);
|
bucket.release(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear()
|
public void clear()
|
||||||
{
|
{
|
||||||
|
directBuffers.values().forEach(Bucket::clear);
|
||||||
directBuffers.clear();
|
directBuffers.clear();
|
||||||
|
heapBuffers.values().forEach(Bucket::clear);
|
||||||
heapBuffers.clear();
|
heapBuffers.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +111,7 @@ public class MappedByteBufferPool implements ByteBufferPool
|
||||||
private final AtomicInteger tag = new AtomicInteger();
|
private final AtomicInteger tag = new AtomicInteger();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ByteBuffer newByteBuffer(int capacity, boolean direct)
|
public ByteBuffer newByteBuffer(int capacity, boolean direct)
|
||||||
{
|
{
|
||||||
ByteBuffer buffer = super.newByteBuffer(capacity + 4, direct);
|
ByteBuffer buffer = super.newByteBuffer(capacity + 4, direct);
|
||||||
buffer.limit(buffer.capacity());
|
buffer.limit(buffer.capacity());
|
||||||
|
|
Loading…
Reference in New Issue