Improve the usage of Sized ByteBufferPool

This commit is contained in:
gregw 2024-07-16 18:15:22 +10:00 committed by Ludovic Orban
parent f72174790b
commit e699583c04
4 changed files with 71 additions and 49 deletions

View File

@ -136,7 +136,7 @@ public interface ByteBufferPool
{
super(Objects.requireNonNullElse(wrapped, NON_POOLING));
_direct = direct;
_size = size > 0 ? size : 4096;
_size = size >= 0 ? size : 8192;
}
public boolean isDirect()

View File

@ -1353,11 +1353,9 @@ public interface RetainableByteBuffer extends Retainable
{
private static final Logger LOG = LoggerFactory.getLogger(RetainableByteBuffer.DynamicCapacity.class);
private final ByteBufferPool _pool;
private final boolean _direct;
private final ByteBufferPool.Sized _pool;
private final long _maxSize;
private final List<RetainableByteBuffer> _buffers;
private final int _aggregationSize;
private final int _minRetainSize;
private Mutable _aggregate;
@ -1369,12 +1367,42 @@ public interface RetainableByteBuffer extends Retainable
this(null, false, -1, -1, -1);
}
/**
* @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured
* and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers.
*/
public DynamicCapacity(ByteBufferPool.Sized sizedPool)
{
this(null, sizedPool, -1, -1);
}
/**
* @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured
* and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers.
* @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit
*/
public DynamicCapacity(ByteBufferPool.Sized sizedPool, long maxSize)
{
this(null, sizedPool, maxSize, -1);
}
/**
* @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured
* and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers.
* @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit
* @param minRetainSize The minimal size of a {@link RetainableByteBuffer} before it will be retained; or 0 to always retain; or -1 for a heuristic;
*/
public DynamicCapacity(ByteBufferPool.Sized sizedPool, long maxSize, int minRetainSize)
{
this(null, sizedPool, maxSize, minRetainSize);
}
/**
* @param pool The pool from which to allocate buffers
*/
public DynamicCapacity(ByteBufferPool pool)
{
this(pool, false, -1, -1, -1);
this(null, pool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(pool), -1, -1);
}
/**
@ -1411,30 +1439,19 @@ public interface RetainableByteBuffer extends Retainable
*/
public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize)
{
this(new ArrayList<>(), pool, direct, maxSize, aggregationSize, minRetainSize);
this(null, new ByteBufferPool.Sized(pool, direct, maxSize > 0 && maxSize < 8192L ? (int)maxSize : aggregationSize), maxSize, minRetainSize);
}
private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize)
private DynamicCapacity(List<RetainableByteBuffer> buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize)
{
super();
_pool = pool == null ? ByteBufferPool.NON_POOLING : pool;
_direct = direct;
_pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool;
_maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize;
_buffers = buffers;
_buffers = buffers == null ? new ArrayList<>() : buffers;
if (aggregationSize < 0)
{
_aggregationSize = (int)Math.min(_maxSize, 8192L);
}
else
{
if (aggregationSize > _maxSize)
throw new IllegalArgumentException("aggregationSize(%d) must be <= maxCapacity(%d)".formatted(aggregationSize, _maxSize));
_aggregationSize = aggregationSize;
}
_minRetainSize = minRetainSize;
if (_aggregationSize == 0 && _maxSize >= Integer.MAX_VALUE && _minRetainSize != 0)
if (_pool.getSize() == 0 && _maxSize >= Integer.MAX_VALUE && _minRetainSize != 0)
throw new IllegalArgumentException("must always retain if cannot aggregate");
}
@ -1445,7 +1462,7 @@ public interface RetainableByteBuffer extends Retainable
public int getAggregationSize()
{
return _aggregationSize;
return _pool.getSize();
}
public int getMinRetainSize()
@ -1483,7 +1500,7 @@ public interface RetainableByteBuffer extends Retainable
throw new BufferOverflowException();
int length = (int)size;
RetainableByteBuffer combined = _pool.acquire(length, _direct);
RetainableByteBuffer combined = _pool.acquire(length, _pool.isDirect());
ByteBuffer byteBuffer = combined.getByteBuffer();
BufferUtil.flipToFill(byteBuffer);
for (RetainableByteBuffer buffer : _buffers)
@ -1562,7 +1579,7 @@ public interface RetainableByteBuffer extends Retainable
break;
}
}
return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize);
}
@Override
@ -1623,7 +1640,7 @@ public interface RetainableByteBuffer extends Retainable
skip = 0;
}
}
return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize);
}
/**
@ -1742,7 +1759,7 @@ public interface RetainableByteBuffer extends Retainable
@Override
public boolean isDirect()
{
return _direct;
return _pool.isDirect();
}
@Override
@ -1841,7 +1858,7 @@ public interface RetainableByteBuffer extends Retainable
private Mutable newSlice(List<RetainableByteBuffer> buffers)
{
return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize);
}
@Override
@ -1865,7 +1882,7 @@ public interface RetainableByteBuffer extends Retainable
for (RetainableByteBuffer rbb : _buffers)
buffers.add(rbb.copy());
return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize);
return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize);
}
/**
@ -1981,7 +1998,7 @@ public interface RetainableByteBuffer extends Retainable
else
{
// acquire a new aggregate buffer
int aggregateSize = _aggregationSize;
int aggregateSize = _pool.getSize();
// If we cannot grow, allow a single allocation only if we have not already retained.
if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE)
@ -1990,8 +2007,7 @@ public interface RetainableByteBuffer extends Retainable
aggregateSize = Math.max(length, aggregateSize);
if (aggregateSize > space)
aggregateSize = (int)space;
_aggregate = _pool.acquire(aggregateSize, _direct).asMutable(); // TODO don't allocate more than space
_aggregate = _pool.acquire(aggregateSize, _pool.isDirect()).asMutable();
checkAggregateLimit(space);
_buffers.add(_aggregate);
}
@ -2215,12 +2231,14 @@ public interface RetainableByteBuffer extends Retainable
}
// We need a new aggregate, acquire a new aggregate buffer
int aggregateSize = _aggregationSize;
int aggregateSize = _pool.getSize();
// If we cannot grow, allow a single allocation only if we have not already retained.
if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE)
aggregateSize = (int)_maxSize;
_aggregate = _pool.acquire(Math.max(needed, aggregateSize), _direct).asMutable();
_aggregate = _pool.acquire(Math.toIntExact(_maxSize));
else if (needed > aggregateSize)
_aggregate = _pool.acquire(needed);
else
_aggregate = _pool.acquire();
// If the new aggregate buffer is larger than the space available, then adjust the capacity
checkAggregateLimit(space);
@ -2347,8 +2365,8 @@ public interface RetainableByteBuffer extends Retainable
protected void addExtraStringInfo(StringBuilder builder)
{
super.addExtraStringInfo(builder);
builder.append(",aggSize=");
builder.append(_aggregationSize);
builder.append(",pool=");
builder.append(_pool);
builder.append(",minRetain=");
builder.append(_minRetainSize);
builder.append(",buffers=");

View File

@ -43,23 +43,27 @@ public class BufferedContentSink implements Content.Sink
private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class);
private final Content.Sink _delegate;
private final int _maxAggregationSize;
private final RetainableByteBuffer.DynamicCapacity _aggregator;
private final SerializedInvoker _serializer = new SerializedInvoker();
private boolean _firstWrite = true;
private boolean _lastWritten;
public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize)
{
this(delegate, new ByteBufferPool.Sized(bufferPool, direct, maxAggregationSize), maxBufferSize);
}
public BufferedContentSink(Content.Sink delegate, ByteBufferPool.Sized sizedPool, int maxBufferSize)
{
if (maxBufferSize <= 0)
throw new IllegalArgumentException("maxBufferSize must be > 0, was: " + maxBufferSize);
if (maxAggregationSize <= 0)
throw new IllegalArgumentException("maxAggregationSize must be > 0, was: " + maxAggregationSize);
if (maxBufferSize < maxAggregationSize)
throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")");
if (sizedPool.getSize() <= 0)
throw new IllegalArgumentException("pool.size must be > 0, was: " + sizedPool.getSize());
if (maxBufferSize < sizedPool.getSize())
throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= pool.size (" + sizedPool.getSize() + ")");
_delegate = delegate;
_maxAggregationSize = maxAggregationSize;
_aggregator = new RetainableByteBuffer.DynamicCapacity(bufferPool, direct, maxBufferSize);
_aggregator = new RetainableByteBuffer.DynamicCapacity(sizedPool, maxBufferSize);
}
@Override
@ -86,7 +90,7 @@ public class BufferedContentSink implements Content.Sink
}
ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER;
if (current.remaining() <= _maxAggregationSize && !last && byteBuffer != FLUSH_BUFFER)
if (current.remaining() <= _aggregator.getAggregationSize() && !last && byteBuffer != FLUSH_BUFFER)
{
// current buffer can be aggregated
aggregateAndFlush(current, callback);
@ -128,7 +132,7 @@ public class BufferedContentSink implements Content.Sink
LOG.debug("flushing aggregate {}", _aggregator);
_aggregator.writeTo(_delegate, last, callback);
}
else if (last && currentBuffer.remaining() <= Math.min(_maxAggregationSize, _aggregator.space()) && _aggregator.append(currentBuffer))
else if (last && currentBuffer.remaining() <= Math.min(_aggregator.getAggregationSize(), _aggregator.space()) && _aggregator.append(currentBuffer))
{
if (LOG.isDebugEnabled())
LOG.debug("flushing aggregated {}", _aggregator);

View File

@ -110,9 +110,6 @@ public class ServletMultiPartFormData
{
// No existing core parts, so we need to configure the parser.
ServletContextHandler contextHandler = servletContextRequest.getServletContext().getServletContextHandler();
ByteBufferPool byteBufferPool = servletContextRequest.getComponents().getByteBufferPool();
ConnectionMetaData connectionMetaData = servletContextRequest.getConnectionMetaData();
Connection connection = connectionMetaData.getConnection();
Content.Source source;
if (servletRequest instanceof ServletApiRequest servletApiRequest)
@ -122,6 +119,9 @@ public class ServletMultiPartFormData
else
{
// TODO use the size specified in ByteBufferPool.SIZED_NON_POOLING instead of specifying a 2K buffer size?
ByteBufferPool byteBufferPool = servletContextRequest.getComponents().getByteBufferPool();
ConnectionMetaData connectionMetaData = servletContextRequest.getConnectionMetaData();
Connection connection = connectionMetaData.getConnection();
int bufferSize = connection instanceof AbstractConnection c ? c.getInputBufferSize() : 2048;
source = new InputStreamContentSource(servletRequest.getInputStream(), new ByteBufferPool.Sized(byteBufferPool, false, bufferSize));
}