diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 84f19e47f9a..2e89a292a4d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -54,10 +54,12 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.io.ArrayRetainableByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.util.Fields; import org.eclipse.jetty.util.Jetty; @@ -193,12 +195,14 @@ public class HttpClient extends ContainerLifeCycle threadPool.setName(name); setExecutor(threadPool); } + int maxBucketSize = executor instanceof ThreadPool.SizedThreadPool + ? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2 + : ProcessorUtils.availableProcessors() * 2; ByteBufferPool byteBufferPool = getByteBufferPool(); if (byteBufferPool == null) - setByteBufferPool(new MappedByteBufferPool(2048, - executor instanceof ThreadPool.SizedThreadPool - ? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2 - : ProcessorUtils.availableProcessors() * 2)); + setByteBufferPool(new MappedByteBufferPool(2048, maxBucketSize)); + if (getBean(RetainableByteBufferPool.class) == null) + addBean(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize)); Scheduler scheduler = getScheduler(); if (scheduler == null) setScheduler(new ScheduledExecutorScheduler(name + "-scheduler", false)); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java index 04b4843775d..339cf86db85 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTP.java @@ -29,9 +29,9 @@ import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; @@ -43,6 +43,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res private final LongAdder inMessages = new LongAdder(); private final HttpParser parser; + private final RetainableByteBufferPool retainableByteBufferPool; private RetainableByteBuffer networkBuffer; private boolean shutdown; private boolean complete; @@ -61,6 +62,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res parser.setHeaderCacheSize(httpTransport.getHeaderCacheSize()); parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive()); } + + this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool()); } @Override @@ -111,9 +114,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res private RetainableByteBuffer newNetworkBuffer() { HttpClient client = getHttpDestination().getHttpClient(); - ByteBufferPool bufferPool = client.getByteBufferPool(); boolean direct = client.isUseInputDirectByteBuffers(); - return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), direct); + return retainableByteBufferPool.acquire(client.getResponseBufferSize(), direct); } private void releaseNetworkBuffer() @@ -166,7 +168,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res return; } - if (networkBuffer.getReferences() > 1) + if (networkBuffer.isRetained()) reacquireNetworkBuffer(); // The networkBuffer may have been reacquired. diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index a78f14ddeb4..cf49a458e29 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -44,9 +44,9 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -71,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne private final ClientParser parser; private RetainableByteBuffer networkBuffer; private Object attachment; + private final RetainableByteBufferPool retainableByteBufferPool; public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise promise) { @@ -81,6 +82,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne this.delegate = new Delegate(destination); this.parser = new ClientParser(new ResponseListener()); requests.addLast(0); + + HttpClient client = destination.getHttpClient(); + this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool()); } public HttpDestination getHttpDestination() @@ -135,8 +139,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne private RetainableByteBuffer newNetworkBuffer() { HttpClient client = destination.getHttpClient(); - ByteBufferPool bufferPool = client.getByteBufferPool(); - return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), client.isUseInputDirectByteBuffers()); + return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers()); } private void releaseNetworkBuffer() @@ -161,7 +164,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne if (parse(networkBuffer.getBuffer())) return; - if (networkBuffer.getReferences() > 1) + if (networkBuffer.isRetained()) reacquireNetworkBuffer(); // The networkBuffer may have been reacquired. diff --git a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 228267b308f..2a6be2b23be 100644 --- a/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-http2/http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.LifeCycle; @@ -67,7 +68,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory parser.setMaxFrameLength(client.getMaxFrameLength()); parser.setMaxSettingsKeys(client.getMaxSettingsKeys()); - HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint, + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool); + + HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint, parser, session, client.getInputBufferSize(), promise, listener); connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers()); connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers()); @@ -81,9 +84,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory private final Promise promise; private final Session.Listener listener; - private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) + private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise promise, Session.Listener listener) { - super(byteBufferPool, executor, endpoint, parser, session, bufferSize); + super(retainableByteBufferPool, executor, endpoint, parser, session, bufferSize); this.client = client; this.promise = promise; this.listener = listener; diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 02dabdb702a..2632cfab6c0 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -23,10 +23,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.parser.Parser; import org.eclipse.jetty.io.AbstractConnection; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -45,7 +45,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. private final Queue tasks = new ArrayDeque<>(); private final HTTP2Producer producer = new HTTP2Producer(); private final AtomicLong bytesIn = new AtomicLong(); - private final ByteBufferPool byteBufferPool; + private final RetainableByteBufferPool retainableByteBufferPool; private final Parser parser; private final ISession session; private final int bufferSize; @@ -53,10 +53,10 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. private boolean useInputDirectByteBuffers; private boolean useOutputDirectByteBuffers; - public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) + protected HTTP2Connection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize) { super(endPoint, executor); - this.byteBufferPool = byteBufferPool; + this.retainableByteBufferPool = retainableByteBufferPool; this.parser = parser; this.session = session; this.bufferSize = bufferSize; @@ -287,7 +287,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. return task; // If more references than 1 (ie not just us), don't refill into buffer and risk compaction. - if (networkBuffer.getReferences() > 1) + if (networkBuffer.isRetained()) reacquireNetworkBuffer(); } @@ -415,16 +415,43 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. } } - private class NetworkBuffer extends RetainableByteBuffer implements Callback + private class NetworkBuffer implements Callback { + private final RetainableByteBuffer delegate; + private NetworkBuffer() { - super(byteBufferPool, bufferSize, isUseInputDirectByteBuffers()); + delegate = retainableByteBufferPool.acquire(bufferSize, isUseInputDirectByteBuffers()); + } + + public ByteBuffer getBuffer() + { + return delegate.getBuffer(); + } + + public boolean isRetained() + { + return delegate.isRetained(); + } + + public boolean hasRemaining() + { + return delegate.hasRemaining(); + } + + public boolean release() + { + return delegate.release(); + } + + public void retain() + { + delegate.retain(); } private void put(ByteBuffer source) { - BufferUtil.append(getBuffer(), source); + BufferUtil.append(delegate.getBuffer(), source); } @Override @@ -441,7 +468,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher. private void completed(Throwable failure) { - if (release() == 0) + if (delegate.release()) { if (LOG.isDebugEnabled()) LOG.debug("Released retained {}", this, failure); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index 76b91c92776..ff386f198fa 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.http2.parser.ServerParser; import org.eclipse.jetty.http2.parser.WindowRateControl; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.AbstractConnectionFactory; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; @@ -279,7 +280,9 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne parser.setMaxFrameLength(getMaxFrameLength()); parser.setMaxSettingsKeys(getMaxSettingsKeys()); - HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(), + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool()); + + HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(), endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener); connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers()); connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers()); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index f3037941374..09bd604979a 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -45,8 +45,8 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.http2.parser.ServerParser; import org.eclipse.jetty.http2.parser.SettingsBodyParser; -import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.BufferUtil; @@ -87,9 +87,9 @@ public class HTTP2ServerConnection extends HTTP2Connection private final HttpConfiguration httpConfig; private boolean recycleHttpChannels = true; - public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) + public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener) { - super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize); + super(retainableByteBufferPool, executor, endPoint, parser, session, inputBufferSize); this.listener = listener; this.httpConfig = httpConfig; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java new file mode 100644 index 00000000000..ff358e8f0f8 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java @@ -0,0 +1,314 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Pool; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@ManagedObject +public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool +{ + private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class); + + private final Pool[] _direct; + private final Pool[] _indirect; + private final int _factor; + private final int _minCapacity; + private final long _maxHeapMemory; + private final long _maxDirectMemory; + private final AtomicLong _currentHeapMemory = new AtomicLong(); + private final AtomicLong _currentDirectMemory = new AtomicLong(); + + public ArrayRetainableByteBufferPool() + { + this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L); + } + + public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize) + { + this(minCapacity, factor, maxCapacity, maxBucketSize, -1L, -1L); + } + + public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) + { + _factor = factor <= 0 ? 1024 : factor; + this._maxHeapMemory = maxHeapMemory; + this._maxDirectMemory = maxDirectMemory; + if (minCapacity <= 0) + minCapacity = 0; + _minCapacity = minCapacity; + if (maxCapacity <= 0) + maxCapacity = 64 * 1024; + if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity) + throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); + + int length = maxCapacity / _factor; + + @SuppressWarnings("unchecked") + Pool[] directArray = new Pool[length]; + @SuppressWarnings("unchecked") + Pool[] indirectArray = new Pool[length]; + for (int i = 0; i < directArray.length; i++) + { + directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); + indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true); + } + _direct = directArray; + _indirect = indirectArray; + } + + @Override + public RetainableByteBuffer acquire(int size, boolean direct) + { + int capacity = (bucketIndexFor(size) + 1) * _factor; + Pool bucket = bucketFor(size, direct); + if (bucket == null) + return newRetainableByteBuffer(size, direct, byteBuffer -> {}); + Pool.Entry entry = bucket.acquire(); + + RetainableByteBuffer buffer; + if (entry == null) + { + Pool.Entry reservedEntry = bucket.reserve(); + if (reservedEntry != null) + { + buffer = newRetainableByteBuffer(capacity, direct, byteBuffer -> + { + BufferUtil.clear(byteBuffer); + reservedEntry.release(); + }); + reservedEntry.enable(buffer, true); + if (direct) + _currentDirectMemory.addAndGet(buffer.capacity()); + else + _currentHeapMemory.addAndGet(buffer.capacity()); + releaseExcessMemory(direct); + } + else + { + buffer = newRetainableByteBuffer(size, direct, byteBuffer -> {}); + } + } + else + { + buffer = entry.getPooled(); + buffer.acquire(); + } + return buffer; + } + + private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer releaser) + { + ByteBuffer buffer = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity); + BufferUtil.clear(buffer); + RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(buffer, releaser); + retainableByteBuffer.acquire(); + return retainableByteBuffer; + } + + private Pool bucketFor(int capacity, boolean direct) + { + if (capacity < _minCapacity) + return null; + int idx = bucketIndexFor(capacity); + Pool[] buckets = direct ? _direct : _indirect; + if (idx >= buckets.length) + return null; + return buckets[idx]; + } + + private int bucketIndexFor(int capacity) + { + return (capacity - 1) / _factor; + } + + @ManagedAttribute("The number of pooled direct ByteBuffers") + public long getDirectByteBufferCount() + { + return getByteBufferCount(true); + } + + @ManagedAttribute("The number of pooled heap ByteBuffers") + public long getHeapByteBufferCount() + { + return getByteBufferCount(false); + } + + private long getByteBufferCount(boolean direct) + { + Pool[] buckets = direct ? _direct : _indirect; + return Arrays.stream(buckets).mapToLong(Pool::size).sum(); + } + + @ManagedAttribute("The number of pooled direct ByteBuffers that are available") + public long getAvailableDirectByteBufferCount() + { + return getAvailableByteBufferCount(true); + } + + @ManagedAttribute("The number of pooled heap ByteBuffers that are available") + public long getAvailableHeapByteBufferCount() + { + return getAvailableByteBufferCount(false); + } + + private long getAvailableByteBufferCount(boolean direct) + { + Pool[] buckets = direct ? _direct : _indirect; + return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum(); + } + + @ManagedAttribute("The bytes retained by direct ByteBuffers") + public long getDirectMemory() + { + return getMemory(true); + } + + @ManagedAttribute("The bytes retained by heap ByteBuffers") + public long getHeapMemory() + { + return getMemory(false); + } + + private long getMemory(boolean direct) + { + if (direct) + return _currentDirectMemory.get(); + else + return _currentHeapMemory.get(); + } + + @ManagedAttribute("The available bytes retained by direct ByteBuffers") + public long getAvailableDirectMemory() + { + return getAvailableMemory(true); + } + + @ManagedAttribute("The available bytes retained by heap ByteBuffers") + public long getAvailableHeapMemory() + { + return getAvailableMemory(false); + } + + private long getAvailableMemory(boolean direct) + { + Pool[] buckets = direct ? _direct : _indirect; + long total = 0L; + for (int i = 0; i < buckets.length; i++) + { + Pool bucket = buckets[i]; + long capacity = (i + 1L) * _factor; + total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity; + } + return total; + } + + @ManagedOperation(value = "Clears this RetainableByteBufferPool", impact = "ACTION") + public void clear() + { + clearArray(_direct, _currentDirectMemory); + clearArray(_indirect, _currentHeapMemory); + } + + private void clearArray(Pool[] poolArray, AtomicLong memoryCounter) + { + for (Pool retainableByteBufferPool : poolArray) + { + for (Pool.Entry entry : retainableByteBufferPool.values()) + { + entry.remove(); + memoryCounter.addAndGet(-entry.getPooled().capacity()); + } + } + } + + private void releaseExcessMemory(boolean direct) + { + long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; + if (maxMemory > 0) + { + long excess = getMemory(direct) - maxMemory; + if (excess > 0) + evict(direct, excess); + } + } + + /** + * This eviction mechanism searches for the RetainableByteBuffers that were released the longest time ago. + * @param direct true to search in the direct buffers buckets, false to search in the heap buffers buckets. + * @param excess the amount of bytes to evict. At least this much will be removed from the buckets. + */ + private void evict(boolean direct, long excess) + { + if (LOG.isDebugEnabled()) + LOG.debug("evicting {} bytes from {} pools", excess, (direct ? "direct" : "heap")); + long now = System.nanoTime(); + long totalClearedCapacity = 0L; + + Pool[] buckets = direct ? _direct : _indirect; + + while (totalClearedCapacity < excess) + { + for (Pool bucket : buckets) + { + Pool.Entry oldestEntry = findOldestEntry(now, bucket); + if (oldestEntry == null) + continue; + + if (oldestEntry.remove()) + { + int clearedCapacity = oldestEntry.getPooled().capacity(); + if (direct) + _currentDirectMemory.addAndGet(-clearedCapacity); + else + _currentHeapMemory.addAndGet(-clearedCapacity); + totalClearedCapacity += clearedCapacity; + } + // else a concurrent thread evicted the same entry -> do not account for its capacity. + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap")); + } + + private Pool.Entry findOldestEntry(long now, Pool bucket) + { + Pool.Entry oldestEntry = null; + for (Pool.Entry entry : bucket.values()) + { + if (oldestEntry != null) + { + long entryAge = now - entry.getPooled().getLastUpdate(); + if (entryAge > now - oldestEntry.getPooled().getLastUpdate()) + oldestEntry = entry; + } + else + { + oldestEntry = entry; + } + } + return oldestEntry; + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java index 1ab6d1ffb2e..b760d00b877 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBuffer.java @@ -15,32 +15,40 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Retainable; /** - * A Retainable ByteBuffer. - *

Acquires a ByteBuffer from a {@link ByteBufferPool} and maintains a reference count that is - * initially 1, incremented with {@link #retain()} and decremented with {@link #release()}. The buffer - * is released to the pool when the reference count is decremented to 0.

+ *

A pooled ByteBuffer which maintains a reference count that is + * incremented with {@link #retain()} and decremented with {@link #release()}. The buffer + * is released to the pool when {@link #release()} is called one more time than {@link #retain()}.

+ *

A {@code RetainableByteBuffer} can either be: + *

    + *
  • in pool; in this case {@link #isRetained()} returns {@code false} and calling {@link #release()} throws {@link IllegalStateException}
  • + *
  • out of pool but not retained; in this case {@link #isRetained()} returns {@code false} and calling {@link #release()} returns {@code true}
  • + *
  • out of pool and retained; in this case {@link #isRetained()} returns {@code true} and calling {@link #release()} returns {@code false}
  • + *
+ *

Calling {@link #release()} on a out of pool and retained instance does not re-pool it while that re-pools it on a out of pool but not retained instance.

*/ public class RetainableByteBuffer implements Retainable { - private final ByteBufferPool pool; private final ByteBuffer buffer; - private final AtomicInteger references; + private final AtomicInteger references = new AtomicInteger(); + private final Consumer releaser; + private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime()); - public RetainableByteBuffer(ByteBufferPool pool, int size) + RetainableByteBuffer(ByteBuffer buffer, Consumer releaser) { - this(pool, size, false); + this.releaser = releaser; + this.buffer = buffer; } - public RetainableByteBuffer(ByteBufferPool pool, int size, boolean direct) + public int capacity() { - this.pool = pool; - this.buffer = pool.acquire(size, direct); - this.references = new AtomicInteger(1); + return buffer.capacity(); } public ByteBuffer getBuffer() @@ -48,32 +56,66 @@ public class RetainableByteBuffer implements Retainable return buffer; } - public int getReferences() + public long getLastUpdate() { - return references.get(); + return lastUpdate.getOpaque(); } + /** + * Checks if {@link #retain()} has been called at least one more time than {@link #release()}. + * @return true if this buffer is retained, false otherwise. + */ + public boolean isRetained() + { + return references.get() > 1; + } + + public boolean isDirect() + { + return buffer.isDirect(); + } + + /** + * Increments the retained counter of this buffer. It must be done internally by + * the pool right after creation and after each un-pooling. + * The reason why this method exists on top of {@link #retain()} is to be able to + * have some safety checks that must know why the ref counter is being incremented. + */ + void acquire() + { + if (references.getAndUpdate(c -> c == 0 ? 1 : c) != 0) + throw new IllegalStateException("re-pooled while still used " + this); + } + + /** + * Increments the retained counter of this buffer. + */ @Override public void retain() { - while (true) - { - int r = references.get(); - if (r == 0) - throw new IllegalStateException("released " + this); - if (references.compareAndSet(r, r + 1)) - break; - } + if (references.getAndUpdate(c -> c == 0 ? 0 : c + 1) == 0) + throw new IllegalStateException("released " + this); } - public int release() + /** + * Decrements the retained counter of this buffer. + * @return true if the buffer was re-pooled, false otherwise. + */ + public boolean release() { - int ref = references.decrementAndGet(); + int ref = references.updateAndGet(c -> + { + if (c == 0) + throw new IllegalStateException("already released " + this); + return c - 1; + }); if (ref == 0) - pool.release(buffer); - else if (ref < 0) - throw new IllegalStateException("already released " + this); - return ref; + { + lastUpdate.setOpaque(System.nanoTime()); + releaser.accept(buffer); + return true; + } + return false; } public int remaining() @@ -99,6 +141,6 @@ public class RetainableByteBuffer implements Retainable @Override public String toString() { - return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), getReferences()); + return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), references.get()); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBufferPool.java new file mode 100644 index 00000000000..a4bd9d3754c --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/RetainableByteBufferPool.java @@ -0,0 +1,58 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.nio.ByteBuffer; + +import org.eclipse.jetty.util.component.Container; + +/** + *

A {@link RetainableByteBuffer} pool.

+ *

Acquired buffers must be released by calling {@link RetainableByteBuffer#release()} otherwise the memory they hold will + * be leaked.

+ */ +public interface RetainableByteBufferPool +{ + /** + * Acquires a memory buffer from the pool. + * @param size The size of the buffer. The returned buffer will have at least this capacity. + * @param direct true if a direct memory buffer is needed, false otherwise. + * @return a memory buffer. + */ + RetainableByteBuffer acquire(int size, boolean direct); + + /** + * Finds a {@link RetainableByteBufferPool} implementation in the given container, or wrap the given + * {@link ByteBufferPool} with an adapter. + * @param container the container to search for an existing memory pool. + * @param byteBufferPool the {@link ByteBufferPool} to wrap if no memory pool was found in the container. + * @return the {@link RetainableByteBufferPool} found or the wrapped one. + */ + static RetainableByteBufferPool findOrAdapt(Container container, ByteBufferPool byteBufferPool) + { + RetainableByteBufferPool retainableByteBufferPool = container == null ? null : container.getBean(RetainableByteBufferPool.class); + if (retainableByteBufferPool == null) + { + // Wrap the ByteBufferPool instance. + retainableByteBufferPool = (size, direct) -> + { + ByteBuffer byteBuffer = byteBufferPool.acquire(size, direct); + RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, byteBufferPool::release); + retainableByteBuffer.acquire(); + return retainableByteBuffer; + }; + } + return retainableByteBufferPool; + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index ea896ee7f1e..728d8eb0ad9 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -35,6 +35,8 @@ import org.eclipse.jetty.io.AbstractEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -107,10 +109,11 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr private final AtomicLong _bytesIn = new AtomicLong(); private final AtomicLong _bytesOut = new AtomicLong(); private final ByteBufferPool _bufferPool; + private final RetainableByteBufferPool _retainableByteBufferPool; private final SSLEngine _sslEngine; private final DecryptedEndPoint _decryptedEndPoint; private ByteBuffer _decryptedInput; - private ByteBuffer _encryptedInput; + private RetainableByteBuffer _encryptedInput; private ByteBuffer _encryptedOutput; private final boolean _encryptedDirectBuffers; private final boolean _decryptedDirectBuffers; @@ -187,11 +190,18 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine, boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption) + { + this(RetainableByteBufferPool.findOrAdapt(null, byteBufferPool), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption); + } + + public SslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine, + boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption) { // This connection does not execute calls to onFillable(), so they will be called by the selector thread. // onFillable() does not block and will only wakeup another thread to do the actual reading and handling. super(endPoint, executor); this._bufferPool = byteBufferPool; + this._retainableByteBufferPool = retainableByteBufferPool; this._sslEngine = sslEngine; this._decryptedEndPoint = newDecryptedEndPoint(); this._encryptedDirectBuffers = useDirectBuffersForEncryption; @@ -326,7 +336,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr private void acquireEncryptedInput() { if (_encryptedInput == null) - _encryptedInput = _bufferPool.acquire(getPacketBufferSize(), _encryptedDirectBuffers); + _encryptedInput = _retainableByteBufferPool.acquire(getPacketBufferSize(), _encryptedDirectBuffers); } private void acquireEncryptedOutput() @@ -339,7 +349,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr public void onUpgradeTo(ByteBuffer buffer) { acquireEncryptedInput(); - BufferUtil.append(_encryptedInput, buffer); + BufferUtil.append(_encryptedInput.getBuffer(), buffer); } @Override @@ -409,7 +419,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr @Override public String toConnectionString() { - ByteBuffer b = _encryptedInput; + ByteBuffer b = _encryptedInput == null ? null : _encryptedInput.getBuffer(); int ei = b == null ? -1 : b.remaining(); b = _encryptedOutput; int eo = b == null ? -1 : b.remaining(); @@ -431,7 +441,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr { if (_encryptedInput != null && !_encryptedInput.hasRemaining()) { - _bufferPool.release(_encryptedInput); + _encryptedInput.release(); _encryptedInput = null; } } @@ -672,14 +682,14 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr } // Let's try reading some encrypted data... even if we have some already. - int netFilled = networkFill(_encryptedInput); + int netFilled = networkFill(_encryptedInput.getBuffer()); if (netFilled > 0) _bytesIn.addAndGet(netFilled); if (LOG.isDebugEnabled()) LOG.debug("net filled={}", netFilled); // Workaround for Java 11 behavior. - if (netFilled < 0 && isHandshakeInitial() && BufferUtil.isEmpty(_encryptedInput)) + if (netFilled < 0 && isHandshakeInitial() && (_encryptedInput == null || _encryptedInput.isEmpty())) closeInbound(); if (netFilled > 0 && !isHandshakeComplete() && isOutboundDone()) @@ -698,7 +708,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr try { _underflown = false; - unwrapResult = SslConnection.this.unwrap(_sslEngine, _encryptedInput, appIn); + unwrapResult = SslConnection.this.unwrap(_sslEngine, _encryptedInput.getBuffer(), appIn); } finally { @@ -708,7 +718,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr LOG.debug("unwrap net_filled={} {} encryptedBuffer={} unwrapBuffer={} appBuffer={}", netFilled, StringUtil.replace(unwrapResult.toString(), '\n', ' '), - BufferUtil.toSummaryString(_encryptedInput), + _encryptedInput, BufferUtil.toDetailString(appIn), BufferUtil.toDetailString(buffer)); @@ -729,13 +739,13 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr case BUFFER_UNDERFLOW: // Continue if we can compact? - if (BufferUtil.compact(_encryptedInput)) + if (BufferUtil.compact(_encryptedInput.getBuffer())) continue; // Are we out of space? - if (BufferUtil.space(_encryptedInput) == 0) + if (BufferUtil.space(_encryptedInput.getBuffer()) == 0) { - BufferUtil.clear(_encryptedInput); + BufferUtil.clear(_encryptedInput.getBuffer()); throw new SSLHandshakeException("Encrypted buffer max length exceeded"); } @@ -847,7 +857,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr _flushState, _fillState, _underflown, - BufferUtil.toDetailString(_encryptedInput), + _encryptedInput, BufferUtil.toDetailString(_decryptedInput), SslConnection.this); @@ -855,7 +865,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr return; // Fillable if we have decrypted input OR enough encrypted input. - fillable = BufferUtil.hasContent(_decryptedInput) || (BufferUtil.hasContent(_encryptedInput) && !_underflown); + fillable = BufferUtil.hasContent(_decryptedInput) || (_encryptedInput != null && _encryptedInput.hasRemaining() && !_underflown); HandshakeStatus status = _sslEngine.getHandshakeStatus(); switch (status) diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java new file mode 100644 index 00000000000..9dba1740f57 --- /dev/null +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java @@ -0,0 +1,316 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.io; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class ArrayRetainableByteBufferPoolTest +{ + @Test + public void testMaxMemoryEviction() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, Integer.MAX_VALUE, 40, 40); + + List buffers = new ArrayList<>(); + + buffers.add(pool.acquire(10, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(10, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(20, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(20, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(10, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(20, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(10, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + buffers.add(pool.acquire(20, true)); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectByteBufferCount(), greaterThan(0L)); + assertThat(pool.getDirectMemory(), greaterThan(0L)); + + buffers.forEach(RetainableByteBuffer::release); + + assertThat(pool.getAvailableDirectByteBufferCount(), greaterThan(0L)); + assertThat(pool.getAvailableDirectByteBufferCount(), lessThan((long)buffers.size())); + assertThat(pool.getDirectByteBufferCount(), greaterThan(0L)); + assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size())); + assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L)); + assertThat(pool.getDirectMemory(), greaterThan(0L)); + } + + @Test + public void testBelowMinCapacityDoesNotPool() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE); + + RetainableByteBuffer buf1 = pool.acquire(1, true); + assertThat(buf1.capacity(), is(1)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + + buf1.release(); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + } + + @Test + public void testOverMaxCapacityDoesNotPool() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE); + + RetainableByteBuffer buf1 = pool.acquire(21, true); + assertThat(buf1.capacity(), is(21)); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + + buf1.release(); + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + } + + @Test + public void testRetain() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE); + + RetainableByteBuffer buf1 = pool.acquire(10, true); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + + assertThat(buf1.isRetained(), is(false)); + buf1.retain(); + buf1.retain(); + assertThat(buf1.isRetained(), is(true)); + assertThat(buf1.release(), is(false)); + assertThat(buf1.isRetained(), is(true)); + assertThat(buf1.release(), is(false)); + assertThat(buf1.isRetained(), is(false)); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + + assertThat(buf1.release(), is(true)); + assertThat(buf1.isRetained(), is(false)); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + } + + @Test + public void testTooManyReleases() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE); + + RetainableByteBuffer buf1 = pool.acquire(10, true); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + + buf1.release(); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + + assertThrows(IllegalStateException.class, buf1::release); + + assertThat(pool.getDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectMemory(), is(10L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + } + + @Test + public void testMaxBucketSize() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, 2); + + RetainableByteBuffer buf1 = pool.acquire(1, true); // pooled + assertThat(buf1.capacity(), is(10)); + RetainableByteBuffer buf2 = pool.acquire(1, true); // pooled + assertThat(buf2.capacity(), is(10)); + RetainableByteBuffer buf3 = pool.acquire(1, true); // not pooled, bucket is full + assertThat(buf3.capacity(), is(1)); + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(20L)); + + RetainableByteBuffer buf4 = pool.acquire(11, true); // pooled + assertThat(buf4.capacity(), is(20)); + RetainableByteBuffer buf5 = pool.acquire(11, true); // pooled + assertThat(buf5.capacity(), is(20)); + RetainableByteBuffer buf6 = pool.acquire(11, true); // not pooled, bucket is full + assertThat(buf6.capacity(), is(11)); + + assertThat(pool.getDirectByteBufferCount(), is(4L)); + assertThat(pool.getDirectMemory(), is(60L)); + } + + @Test + public void testBufferReleaseRepools() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, 1); + + List all = new ArrayList<>(); + + all.add(pool.acquire(1, true)); // pooled + all.add(pool.acquire(1, true)); // not pooled, bucket is full + all.add(pool.acquire(11, true)); // pooled + all.add(pool.acquire(11, true)); // not pooled, bucket is full + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + + all.forEach(RetainableByteBuffer::release); + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(2L)); + assertThat(pool.getAvailableDirectMemory(), is(30L)); + } + + @Test + public void testFactorAndCapacity() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE); + + pool.acquire(1, true); // not pooled, < minCapacity + pool.acquire(10, true); // pooled + pool.acquire(20, true); // pooled + pool.acquire(30, true); // not pooled, > maxCapacity + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(30L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + } + + @Test + public void testClearUnlinksLeakedBuffers() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(); + + pool.acquire(10, true); + pool.acquire(10, true); + + assertThat(pool.getDirectByteBufferCount(), is(2L)); + assertThat(pool.getDirectMemory(), is(2048L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + + pool.clear(); + + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(pool.getAvailableDirectMemory(), is(0L)); + } + + @Test + public void testRetainAfterRePooledThrows() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(); + RetainableByteBuffer buf1 = pool.acquire(10, true); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(buf1.release(), is(true)); + assertThrows(IllegalStateException.class, buf1::retain); + assertThrows(IllegalStateException.class, buf1::release); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + + // check that the buffer is still available + RetainableByteBuffer buf2 = pool.acquire(10, true); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(0L)); + assertThat(buf2 == buf1, is(true)); // make sure it's not a new instance + assertThat(buf1.release(), is(true)); + assertThat(pool.getDirectByteBufferCount(), is(1L)); + assertThat(pool.getAvailableDirectByteBufferCount(), is(1L)); + } + + @Test + public void testAcquireRelease() + { + ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(); + + for (int i = 0; i < 3; i++) + { + RetainableByteBuffer buf1 = pool.acquire(10, true); + assertThat(buf1, is(notNullValue())); + assertThat(buf1.capacity(), is(1024)); + RetainableByteBuffer buf2 = pool.acquire(10, true); + assertThat(buf2, is(notNullValue())); + assertThat(buf2.capacity(), is(1024)); + buf1.release(); + buf2.release(); + + RetainableByteBuffer buf3 = pool.acquire(16384 + 1, true); + assertThat(buf3, is(notNullValue())); + assertThat(buf3.capacity(), is(16384 + 1024)); + buf3.release(); + + RetainableByteBuffer buf4 = pool.acquire(32768, true); + assertThat(buf4, is(notNullValue())); + assertThat(buf4.capacity(), is(32768)); + buf4.release(); + + RetainableByteBuffer buf5 = pool.acquire(32768, false); + assertThat(buf5, is(notNullValue())); + assertThat(buf5.capacity(), is(32768)); + buf5.release(); + } + + assertThat(pool.getDirectByteBufferCount(), is(4L)); + assertThat(pool.getHeapByteBufferCount(), is(1L)); + assertThat(pool.getDirectMemory(), is(1024 + 1024 + 16384 + 1024 + 32768L)); + assertThat(pool.getHeapMemory(), is(32768L)); + + pool.clear(); + + assertThat(pool.getDirectByteBufferCount(), is(0L)); + assertThat(pool.getHeapByteBufferCount(), is(0L)); + assertThat(pool.getDirectMemory(), is(0L)); + assertThat(pool.getHeapMemory(), is(0L)); + } +} diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 7903f7f65ab..0a020a44c01 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -32,8 +32,10 @@ import java.util.concurrent.locks.Condition; import java.util.stream.Collectors; import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.ArrayRetainableByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.util.ProcessorUtils; import org.eclipse.jetty.util.StringUtil; @@ -188,6 +190,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co pool = _server.getBean(ByteBufferPool.class); _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool(); addBean(_byteBufferPool); + RetainableByteBufferPool retainableByteBufferPool = _server.getBean(RetainableByteBufferPool.class); + addBean(retainableByteBufferPool == null ? new ArrayRetainableByteBufferPool() : retainableByteBufferPool, retainableByteBufferPool == null); addEventListener(new Container.Listener() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 728214a7572..b0fb50dc661 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -17,7 +17,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.WritePendingException; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import org.eclipse.jetty.http.BadMessageException; @@ -35,6 +34,8 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -56,12 +57,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private final HttpConfiguration _config; private final Connector _connector; private final ByteBufferPool _bufferPool; + private final RetainableByteBufferPool _retainableByteBufferPool; private final HttpInput _input; private final HttpGenerator _generator; private final HttpChannelOverHttp _channel; private final HttpParser _parser; - private final AtomicInteger _contentBufferReferences = new AtomicInteger(); - private volatile ByteBuffer _requestBuffer = null; + private volatile RetainableByteBuffer _retainableByteBuffer; private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback(); private final SendCallback _sendCallback = new SendCallback(); private final boolean _recordHttpComplianceViolations; @@ -96,6 +97,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _config = config; _connector = connector; _bufferPool = _connector.getByteBufferPool(); + _retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, _bufferPool); _generator = newHttpGenerator(); _channel = newHttpChannel(); _input = _channel.getRequest().getHttpInput(); @@ -198,10 +200,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http @Override public ByteBuffer onUpgradeFrom() { - if (BufferUtil.hasContent(_requestBuffer)) + if (!isRequestBufferEmpty()) { - ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining()); - unconsumed.put(_requestBuffer); + ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); + unconsumed.put(_retainableByteBuffer.getBuffer()); unconsumed.flip(); releaseRequestBuffer(); return unconsumed; @@ -225,36 +227,34 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http void releaseRequestBuffer() { - if (_requestBuffer != null && !_requestBuffer.hasRemaining()) + if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining()) { if (LOG.isDebugEnabled()) LOG.debug("releaseRequestBuffer {}", this); - ByteBuffer buffer = _requestBuffer; - _requestBuffer = null; - _bufferPool.release(buffer); + if (_retainableByteBuffer.release()) + _retainableByteBuffer = null; + else + throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer); } } - public ByteBuffer getRequestBuffer() + private ByteBuffer getRequestBuffer() { - if (_requestBuffer == null) - { - boolean useDirectByteBuffers = isUseInputDirectByteBuffers(); - _requestBuffer = _bufferPool.acquire(getInputBufferSize(), useDirectByteBuffers); - } - return _requestBuffer; + if (_retainableByteBuffer == null) + _retainableByteBuffer = _retainableByteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + return _retainableByteBuffer.getBuffer(); } public boolean isRequestBufferEmpty() { - return BufferUtil.isEmpty(_requestBuffer); + return _retainableByteBuffer == null || _retainableByteBuffer.isEmpty(); } @Override public void onFillable() { if (LOG.isDebugEnabled()) - LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer)); + LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), _retainableByteBuffer); HttpConnection last = setCurrentConnection(this); try @@ -300,17 +300,26 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } catch (Throwable x) { - if (LOG.isDebugEnabled()) - LOG.debug("{} caught exception {}", this, _channel.getState(), x); - BufferUtil.clear(_requestBuffer); - releaseRequestBuffer(); - getEndPoint().close(x); + try + { + if (LOG.isDebugEnabled()) + LOG.debug("{} caught exception {}", this, _channel.getState(), x); + if (_retainableByteBuffer != null) + { + _retainableByteBuffer.clear(); + releaseRequestBuffer(); + } + } + finally + { + getEndPoint().close(x); + } } finally { setCurrentConnection(last); if (LOG.isDebugEnabled()) - LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer)); + LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), _retainableByteBuffer); } } @@ -338,22 +347,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private int fillRequestBuffer() { - if (_contentBufferReferences.get() > 0) + if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained()) throw new IllegalStateException("fill with unconsumed content on " + this); - if (BufferUtil.isEmpty(_requestBuffer)) + if (isRequestBufferEmpty()) { // Get a buffer // We are not in a race here for the request buffer as we have not yet received a request, // so there are not an possible legal threads calling #parseContent or #completed. - _requestBuffer = getRequestBuffer(); + ByteBuffer requestBuffer = getRequestBuffer(); // fill try { - int filled = getEndPoint().fill(_requestBuffer); + int filled = getEndPoint().fill(requestBuffer); if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections) - filled = getEndPoint().fill(_requestBuffer); + filled = getEndPoint().fill(requestBuffer); if (filled > 0) bytesIn.add(filled); @@ -361,7 +370,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _parser.atEOF(); if (LOG.isDebugEnabled()) - LOG.debug("{} filled {} {}", this, filled, BufferUtil.toDetailString(_requestBuffer)); + LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer); return filled; } @@ -379,15 +388,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http private boolean parseRequestBuffer() { if (LOG.isDebugEnabled()) - LOG.debug("{} parse {}", this, BufferUtil.toDetailString(_requestBuffer)); + LOG.debug("{} parse {}", this, _retainableByteBuffer); - boolean handle = _parser.parseNext(_requestBuffer == null ? BufferUtil.EMPTY_BUFFER : _requestBuffer); + boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getBuffer()); if (LOG.isDebugEnabled()) LOG.debug("{} parsed {} {}", this, handle, _parser); // recycle buffer ? - if (_contentBufferReferences.get() == 0) + if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained()) releaseRequestBuffer(); return handle; @@ -406,15 +415,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http _channel.recycle(); _parser.reset(); _generator.reset(); - if (_contentBufferReferences.get() == 0) + if (_retainableByteBuffer != null) { - releaseRequestBuffer(); - } - else - { - LOG.warn("{} lingering content references?!?!", this); - _requestBuffer = null; // Not returned to pool! - _contentBufferReferences.set(0); + if (!_retainableByteBuffer.isRetained()) + { + releaseRequestBuffer(); + } + else + { + LOG.warn("{} lingering content references?!?!", this); + _retainableByteBuffer = null; // Not returned to pool! + } } return true; } @@ -472,7 +483,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http if (_parser.isStart()) { // if the buffer is empty - if (BufferUtil.isEmpty(_requestBuffer)) + if (isRequestBufferEmpty()) { // look for more data fillInterested(); @@ -629,21 +640,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http public Content(ByteBuffer content) { super(content); - _contentBufferReferences.incrementAndGet(); + _retainableByteBuffer.retain(); } @Override public void succeeded() { - int counter = _contentBufferReferences.decrementAndGet(); - if (counter == 0) - releaseRequestBuffer(); - // TODO: this should do something (warn? fail?) if _contentBufferReferences goes below 0 - if (counter < 0) - { - LOG.warn("Content reference counting went below zero: {}", counter); - _contentBufferReferences.incrementAndGet(); - } + _retainableByteBuffer.release(); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/SslConnectionFactory.java b/jetty-server/src/main/java/org/eclipse/jetty/server/SslConnectionFactory.java index f73ce1f087a..97dd019ffaf 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/SslConnectionFactory.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/SslConnectionFactory.java @@ -21,8 +21,10 @@ import javax.net.ssl.SSLSession; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.AbstractConnection; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.io.ssl.SslHandshakeListener; import org.eclipse.jetty.util.annotation.Name; @@ -165,7 +167,9 @@ public class SslConnectionFactory extends AbstractConnectionFactory implements C protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine) { - return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()); + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool); + return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()); } @Override diff --git a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/CoreClientUpgradeRequest.java b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/CoreClientUpgradeRequest.java index cc45b3f001a..9182681390e 100644 --- a/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/CoreClientUpgradeRequest.java +++ b/jetty-websocket/websocket-core-client/src/main/java/org/eclipse/jetty/websocket/core/client/CoreClientUpgradeRequest.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.MultiException; import org.eclipse.jetty.util.QuotedStringTokenizer; @@ -439,7 +440,8 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re HttpClient httpClient = wsClient.getHttpClient(); ByteBufferPool bufferPool = wsClient.getWebSocketComponents().getBufferPool(); - WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, coreSession); + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(wsClient.getWebSocketComponents(), bufferPool); + WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, retainableByteBufferPool, coreSession); wsClient.getEventListeners().forEach(wsConnection::addEventListener); coreSession.setWebSocketConnection(wsConnection); Exception listenerError = notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response)); diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index dc796cc102a..3c14c78075f 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.Dumpable; @@ -53,6 +54,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio private final AutoLock lock = new AutoLock(); private final ByteBufferPool bufferPool; + private final RetainableByteBufferPool retainableByteBufferPool; private final Generator generator; private final Parser parser; private final WebSocketCoreSession coreSession; @@ -78,9 +80,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, + RetainableByteBufferPool retainableByteBufferPool, WebSocketCoreSession coreSession) { - this(endp, executor, scheduler, bufferPool, coreSession, null); + this(endp, executor, scheduler, bufferPool, retainableByteBufferPool, coreSession, null); } /** @@ -93,6 +96,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio * @param executor A thread executor to use for WS callbacks. * @param scheduler A scheduler to use for timeouts * @param bufferPool A pool of buffers to use. + * @param retainableByteBufferPool A pool of retainable buffers to use. * @param coreSession The WC core session to which frames are delivered. * @param randomMask A Random used to mask frames. If null then SecureRandom will be created if needed. */ @@ -100,6 +104,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, + RetainableByteBufferPool retainableByteBufferPool, WebSocketCoreSession coreSession, Random randomMask) { @@ -109,8 +114,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio Objects.requireNonNull(coreSession, "Session"); Objects.requireNonNull(executor, "Executor"); Objects.requireNonNull(bufferPool, "ByteBufferPool"); + Objects.requireNonNull(retainableByteBufferPool, "RetainableByteBufferPool"); this.bufferPool = bufferPool; + this.retainableByteBufferPool = retainableByteBufferPool; this.coreSession = coreSession; this.generator = new Generator(); this.parser = new Parser(bufferPool, coreSession); @@ -310,7 +317,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio private RetainableByteBuffer newNetworkBuffer(int capacity) { - return new RetainableByteBuffer(bufferPool, capacity, isUseInputDirectByteBuffers()); + return retainableByteBufferPool.acquire(capacity, isUseInputDirectByteBuffers()); } private void releaseNetworkBuffer() @@ -464,7 +471,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio } // If more references that 1(us), don't refill into buffer and risk compaction. - if (networkBuffer.getReferences() > 1) + if (networkBuffer.isRetained()) reacquireNetworkBuffer(); int filled = getEndPoint().fill(networkBuffer.getBuffer()); // TODO check if compact is possible. diff --git a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/AbstractHandshaker.java b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/AbstractHandshaker.java index 18174074ef1..f898dcfb890 100644 --- a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/AbstractHandshaker.java +++ b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/AbstractHandshaker.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.PreEncodedHttpField; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpTransport; @@ -216,9 +217,9 @@ public abstract class AbstractHandshaker implements Handshaker protected abstract WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession); - protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession) + protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, RetainableByteBufferPool retainableByteBufferPool, WebSocketCoreSession coreSession) { - return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession); + return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, retainableByteBufferPool, coreSession); } protected abstract void prepareResponse(Response response, WebSocketNegotiation negotiation); diff --git a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java index f9d53267996..bb40d1207a9 100644 --- a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java +++ b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC6455Handshaker.java @@ -22,6 +22,8 @@ import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.PreEncodedHttpField; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; @@ -93,7 +95,9 @@ public final class RFC6455Handshaker extends AbstractHandshaker { HttpChannel httpChannel = baseRequest.getHttpChannel(); Connector connector = httpChannel.getConnector(); - return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession); + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool); + return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession); } @Override diff --git a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java index a842154302e..8e2ec4a13f6 100644 --- a/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java +++ b/jetty-websocket/websocket-core-server/src/main/java/org/eclipse/jetty/websocket/core/server/internal/RFC8441Handshaker.java @@ -18,7 +18,9 @@ import jakarta.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.Request; @@ -77,7 +79,9 @@ public class RFC8441Handshaker extends AbstractHandshaker HttpChannel httpChannel = baseRequest.getHttpChannel(); Connector connector = httpChannel.getConnector(); EndPoint endPoint = httpChannel.getTunnellingEndPoint(); - return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession); + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool); + return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession); } @Override