From e699583c046d6c61480d9fc7d7251820c5ae50d6 Mon Sep 17 00:00:00 2001 From: gregw Date: Tue, 16 Jul 2024 18:15:22 +1000 Subject: [PATCH] Improve the usage of Sized ByteBufferPool --- .../org/eclipse/jetty/io/ByteBufferPool.java | 2 +- .../jetty/io/RetainableByteBuffer.java | 90 +++++++++++-------- .../jetty/io/content/BufferedContentSink.java | 22 +++-- .../servlet/ServletMultiPartFormData.java | 6 +- 4 files changed, 71 insertions(+), 49 deletions(-) diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 4f2c4dfcb1a..bfb14d4d800 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -136,7 +136,7 @@ public interface ByteBufferPool { super(Objects.requireNonNullElse(wrapped, NON_POOLING)); _direct = direct; - _size = size > 0 ? size : 4096; + _size = size >= 0 ? size : 8192; } public boolean isDirect() diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index f570d2dd3f2..4a25de06bbd 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -1353,11 +1353,9 @@ public interface RetainableByteBuffer extends Retainable { private static final Logger LOG = LoggerFactory.getLogger(RetainableByteBuffer.DynamicCapacity.class); - private final ByteBufferPool _pool; - private final boolean _direct; + private final ByteBufferPool.Sized _pool; private final long _maxSize; private final List _buffers; - private final int _aggregationSize; private final int _minRetainSize; private Mutable _aggregate; @@ -1369,12 +1367,42 @@ public interface RetainableByteBuffer extends Retainable this(null, false, -1, -1, -1); } + /** + * @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured + * and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers. + */ + public DynamicCapacity(ByteBufferPool.Sized sizedPool) + { + this(null, sizedPool, -1, -1); + } + + /** + * @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured + * and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers. + * @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit + */ + public DynamicCapacity(ByteBufferPool.Sized sizedPool, long maxSize) + { + this(null, sizedPool, maxSize, -1); + } + + /** + * @param sizedPool The pool from which to allocate buffers, with {@link ByteBufferPool.Sized#isDirect()} configured + * and {@link ByteBufferPool.Sized#getSize()} used for the size of aggregation buffers. + * @param maxSize The maximum length of the accumulated buffers or -1 for 2GB limit + * @param minRetainSize The minimal size of a {@link RetainableByteBuffer} before it will be retained; or 0 to always retain; or -1 for a heuristic; + */ + public DynamicCapacity(ByteBufferPool.Sized sizedPool, long maxSize, int minRetainSize) + { + this(null, sizedPool, maxSize, minRetainSize); + } + /** * @param pool The pool from which to allocate buffers */ public DynamicCapacity(ByteBufferPool pool) { - this(pool, false, -1, -1, -1); + this(null, pool instanceof ByteBufferPool.Sized sized ? sized : new ByteBufferPool.Sized(pool), -1, -1); } /** @@ -1411,30 +1439,19 @@ public interface RetainableByteBuffer extends Retainable */ public DynamicCapacity(ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize) { - this(new ArrayList<>(), pool, direct, maxSize, aggregationSize, minRetainSize); + this(null, new ByteBufferPool.Sized(pool, direct, maxSize > 0 && maxSize < 8192L ? (int)maxSize : aggregationSize), maxSize, minRetainSize); } - private DynamicCapacity(List buffers, ByteBufferPool pool, boolean direct, long maxSize, int aggregationSize, int minRetainSize) + private DynamicCapacity(List buffers, ByteBufferPool.Sized pool, long maxSize, int minRetainSize) { super(); - _pool = pool == null ? ByteBufferPool.NON_POOLING : pool; - _direct = direct; + _pool = pool == null ? ByteBufferPool.SIZED_NON_POOLING : pool; _maxSize = maxSize < 0 ? Long.MAX_VALUE : maxSize; - _buffers = buffers; + _buffers = buffers == null ? new ArrayList<>() : buffers; - if (aggregationSize < 0) - { - _aggregationSize = (int)Math.min(_maxSize, 8192L); - } - else - { - if (aggregationSize > _maxSize) - throw new IllegalArgumentException("aggregationSize(%d) must be <= maxCapacity(%d)".formatted(aggregationSize, _maxSize)); - _aggregationSize = aggregationSize; - } _minRetainSize = minRetainSize; - if (_aggregationSize == 0 && _maxSize >= Integer.MAX_VALUE && _minRetainSize != 0) + if (_pool.getSize() == 0 && _maxSize >= Integer.MAX_VALUE && _minRetainSize != 0) throw new IllegalArgumentException("must always retain if cannot aggregate"); } @@ -1445,7 +1462,7 @@ public interface RetainableByteBuffer extends Retainable public int getAggregationSize() { - return _aggregationSize; + return _pool.getSize(); } public int getMinRetainSize() @@ -1483,7 +1500,7 @@ public interface RetainableByteBuffer extends Retainable throw new BufferOverflowException(); int length = (int)size; - RetainableByteBuffer combined = _pool.acquire(length, _direct); + RetainableByteBuffer combined = _pool.acquire(length, _pool.isDirect()); ByteBuffer byteBuffer = combined.getByteBuffer(); BufferUtil.flipToFill(byteBuffer); for (RetainableByteBuffer buffer : _buffers) @@ -1562,7 +1579,7 @@ public interface RetainableByteBuffer extends Retainable break; } } - return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize); + return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize); } @Override @@ -1623,7 +1640,7 @@ public interface RetainableByteBuffer extends Retainable skip = 0; } } - return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize); + return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize); } /** @@ -1742,7 +1759,7 @@ public interface RetainableByteBuffer extends Retainable @Override public boolean isDirect() { - return _direct; + return _pool.isDirect(); } @Override @@ -1841,7 +1858,7 @@ public interface RetainableByteBuffer extends Retainable private Mutable newSlice(List buffers) { - return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize); + return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize); } @Override @@ -1865,7 +1882,7 @@ public interface RetainableByteBuffer extends Retainable for (RetainableByteBuffer rbb : _buffers) buffers.add(rbb.copy()); - return new DynamicCapacity(buffers, _pool, _direct, _maxSize, _aggregationSize, _minRetainSize); + return new DynamicCapacity(buffers, _pool, _maxSize, _minRetainSize); } /** @@ -1981,7 +1998,7 @@ public interface RetainableByteBuffer extends Retainable else { // acquire a new aggregate buffer - int aggregateSize = _aggregationSize; + int aggregateSize = _pool.getSize(); // If we cannot grow, allow a single allocation only if we have not already retained. if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE) @@ -1990,8 +2007,7 @@ public interface RetainableByteBuffer extends Retainable aggregateSize = Math.max(length, aggregateSize); if (aggregateSize > space) aggregateSize = (int)space; - - _aggregate = _pool.acquire(aggregateSize, _direct).asMutable(); // TODO don't allocate more than space + _aggregate = _pool.acquire(aggregateSize, _pool.isDirect()).asMutable(); checkAggregateLimit(space); _buffers.add(_aggregate); } @@ -2215,12 +2231,14 @@ public interface RetainableByteBuffer extends Retainable } // We need a new aggregate, acquire a new aggregate buffer - int aggregateSize = _aggregationSize; - + int aggregateSize = _pool.getSize(); // If we cannot grow, allow a single allocation only if we have not already retained. if (aggregateSize == 0 && _buffers.isEmpty() && _maxSize < Integer.MAX_VALUE) - aggregateSize = (int)_maxSize; - _aggregate = _pool.acquire(Math.max(needed, aggregateSize), _direct).asMutable(); + _aggregate = _pool.acquire(Math.toIntExact(_maxSize)); + else if (needed > aggregateSize) + _aggregate = _pool.acquire(needed); + else + _aggregate = _pool.acquire(); // If the new aggregate buffer is larger than the space available, then adjust the capacity checkAggregateLimit(space); @@ -2347,8 +2365,8 @@ public interface RetainableByteBuffer extends Retainable protected void addExtraStringInfo(StringBuilder builder) { super.addExtraStringInfo(builder); - builder.append(",aggSize="); - builder.append(_aggregationSize); + builder.append(",pool="); + builder.append(_pool); builder.append(",minRetain="); builder.append(_minRetainSize); builder.append(",buffers="); diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java index 11fe54811be..aa77f5fdd79 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/content/BufferedContentSink.java @@ -43,23 +43,27 @@ public class BufferedContentSink implements Content.Sink private static final Logger LOG = LoggerFactory.getLogger(BufferedContentSink.class); private final Content.Sink _delegate; - private final int _maxAggregationSize; private final RetainableByteBuffer.DynamicCapacity _aggregator; private final SerializedInvoker _serializer = new SerializedInvoker(); private boolean _firstWrite = true; private boolean _lastWritten; public BufferedContentSink(Content.Sink delegate, ByteBufferPool bufferPool, boolean direct, int maxAggregationSize, int maxBufferSize) + { + this(delegate, new ByteBufferPool.Sized(bufferPool, direct, maxAggregationSize), maxBufferSize); + } + + public BufferedContentSink(Content.Sink delegate, ByteBufferPool.Sized sizedPool, int maxBufferSize) { if (maxBufferSize <= 0) throw new IllegalArgumentException("maxBufferSize must be > 0, was: " + maxBufferSize); - if (maxAggregationSize <= 0) - throw new IllegalArgumentException("maxAggregationSize must be > 0, was: " + maxAggregationSize); - if (maxBufferSize < maxAggregationSize) - throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= maxAggregationSize (" + maxAggregationSize + ")"); + if (sizedPool.getSize() <= 0) + throw new IllegalArgumentException("pool.size must be > 0, was: " + sizedPool.getSize()); + if (maxBufferSize < sizedPool.getSize()) + throw new IllegalArgumentException("maxBufferSize (" + maxBufferSize + ") must be >= pool.size (" + sizedPool.getSize() + ")"); + _delegate = delegate; - _maxAggregationSize = maxAggregationSize; - _aggregator = new RetainableByteBuffer.DynamicCapacity(bufferPool, direct, maxBufferSize); + _aggregator = new RetainableByteBuffer.DynamicCapacity(sizedPool, maxBufferSize); } @Override @@ -86,7 +90,7 @@ public class BufferedContentSink implements Content.Sink } ByteBuffer current = byteBuffer != null ? byteBuffer : BufferUtil.EMPTY_BUFFER; - if (current.remaining() <= _maxAggregationSize && !last && byteBuffer != FLUSH_BUFFER) + if (current.remaining() <= _aggregator.getAggregationSize() && !last && byteBuffer != FLUSH_BUFFER) { // current buffer can be aggregated aggregateAndFlush(current, callback); @@ -128,7 +132,7 @@ public class BufferedContentSink implements Content.Sink LOG.debug("flushing aggregate {}", _aggregator); _aggregator.writeTo(_delegate, last, callback); } - else if (last && currentBuffer.remaining() <= Math.min(_maxAggregationSize, _aggregator.space()) && _aggregator.append(currentBuffer)) + else if (last && currentBuffer.remaining() <= Math.min(_aggregator.getAggregationSize(), _aggregator.space()) && _aggregator.append(currentBuffer)) { if (LOG.isDebugEnabled()) LOG.debug("flushing aggregated {}", _aggregator); diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletMultiPartFormData.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletMultiPartFormData.java index 1aa62fb558f..8b032ba6573 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletMultiPartFormData.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletMultiPartFormData.java @@ -110,9 +110,6 @@ public class ServletMultiPartFormData { // No existing core parts, so we need to configure the parser. ServletContextHandler contextHandler = servletContextRequest.getServletContext().getServletContextHandler(); - ByteBufferPool byteBufferPool = servletContextRequest.getComponents().getByteBufferPool(); - ConnectionMetaData connectionMetaData = servletContextRequest.getConnectionMetaData(); - Connection connection = connectionMetaData.getConnection(); Content.Source source; if (servletRequest instanceof ServletApiRequest servletApiRequest) @@ -122,6 +119,9 @@ public class ServletMultiPartFormData else { // TODO use the size specified in ByteBufferPool.SIZED_NON_POOLING instead of specifying a 2K buffer size? + ByteBufferPool byteBufferPool = servletContextRequest.getComponents().getByteBufferPool(); + ConnectionMetaData connectionMetaData = servletContextRequest.getConnectionMetaData(); + Connection connection = connectionMetaData.getConnection(); int bufferSize = connection instanceof AbstractConnection c ? c.getInputBufferSize() : 2048; source = new InputStreamContentSource(servletRequest.getInputStream(), new ByteBufferPool.Sized(byteBufferPool, false, bufferSize)); }