Combined ByteBufferPool (#8171)

All `ByteBufferPool` can now be accessed as `RetainableByteBufferPools`.

Users now need to configure only a single buffer pool and there is just the additional retained parameter that needs consideration.
Default buffer pool has been changed to logarithmic, but we may wish to review that before next release.
Default factor size has been increased to 4096.
This commit is contained in:
Greg Wilkins 2022-07-04 10:38:30 +10:00 committed by GitHub
parent 27a89b284a
commit 2b817f06c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 619 additions and 313 deletions

View File

@ -64,7 +64,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
this.retainableByteBufferPool = httpClient.getByteBufferPool().asRetainableByteBufferPool();
}
@Override

View File

@ -22,9 +22,9 @@ import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@ -708,6 +708,50 @@ public class HttpClientTLSTest
assertEquals(0, clientBytes.get());
}
protected class TestRetained extends ArrayRetainableByteBufferPool
{
private final ByteBufferPool _pool;
public TestRetained(ByteBufferPool pool, int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
_pool = pool;
}
@Override
protected ByteBuffer allocate(int capacity)
{
return _pool.acquire(capacity, false);
}
@Override
protected ByteBuffer allocateDirect(int capacity)
{
return _pool.acquire(capacity, true);
}
@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
_pool.release(retainedBuffer.getBuffer());
}
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
}
private class TestByteBufferPool extends ArrayByteBufferPool
{
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new TestRetained(this, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
}
@Test
public void testEncryptedInputBufferRepooling() throws Exception
{
@ -715,15 +759,10 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
{
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
};
server.addBean(retainableByteBufferPool);
ArrayByteBufferPool byteBufferPool = new TestByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
@ -765,9 +804,12 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
Pool<RetainableByteBuffer> bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
Pool<RetainableByteBuffer> bucket = ((TestRetained)retainableByteBufferPool).poolFor(16 * 1024 + 1, connector.getConnectionFactory(HttpConnectionFactory.class).isUseInputDirectByteBuffers());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());
long count = ssl.isDirectBuffersForDecryption() ? byteBufferPool.getDirectByteBufferCount() : byteBufferPool.getHeapByteBufferCount();
assertEquals(1, count);
}
@Test
@ -777,7 +819,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -834,6 +876,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ -845,7 +888,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -916,6 +959,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ -927,7 +971,7 @@ public class HttpClientTLSTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
List<ByteBuffer> leakedBuffers = new CopyOnWriteArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
@ -998,6 +1042,7 @@ public class HttpClientTLSTest
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
byteBufferPool.asRetainableByteBufferPool().clear();
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}

View File

@ -79,7 +79,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = client.getByteBufferPool().asRetainableByteBufferPool();
}
public HttpDestination getHttpDestination()

View File

@ -120,7 +120,7 @@ public class ServerGenerator extends Generator
private ByteBuffer generateEndRequest(int request, boolean aborted)
{
request &= 0xFF_FF;
ByteBuffer endRequestBuffer = acquire(8);
ByteBuffer endRequestBuffer = acquire(16);
BufferUtil.clearToFill(endRequestBuffer);
endRequestBuffer.putInt(0x01_03_00_00 + request);
endRequestBuffer.putInt(0x00_08_00_00);

View File

@ -50,7 +50,7 @@ public class ServerFCGIConnection extends AbstractConnection
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.networkByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;

View File

@ -68,7 +68,7 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);

View File

@ -280,7 +280,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
RetainableByteBufferPool retainableByteBufferPool = connector.getByteBufferPool().asRetainableByteBufferPool();
HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);

View File

@ -57,7 +57,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.buffers = byteBufferPool.asRetainableByteBufferPool();
this.parser = parser;
parser.init(MessageListener::new);
}

View File

@ -14,10 +14,15 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -29,27 +34,64 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
@ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool
{
public static final int DEFAULT_FACTOR = 4096;
public static final int DEFAULT_MAX_CAPACITY_BY_FACTOR = 16;
private final int _factor;
private final int _maxQueueLength;
private final int _maxCapacity;
private final int _maxBucketSize;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
private final RetainableByteBufferPool _retainableByteBufferPool;
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
protected AbstractByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_factor = factor <= 0 ? DEFAULT_FACTOR : factor;
_maxCapacity = maxCapacity > 0 ? maxCapacity : DEFAULT_MAX_CAPACITY_BY_FACTOR * _factor;
_maxBucketSize = maxBucketSize;
_maxHeapMemory = memorySize(maxHeapMemory);
_maxDirectMemory = memorySize(maxDirectMemory);
_retainableByteBufferPool = (retainedHeapMemory == -2 && retainedDirectMemory == -2)
? RetainableByteBufferPool.from(this)
: newRetainableByteBufferPool(factor, maxCapacity, maxBucketSize, retainedSize(retainedHeapMemory), retainedSize(retainedDirectMemory));
}
static long retainedSize(long size)
{
if (size == -2)
return 0;
return memorySize(size);
}
static long memorySize(long size)
{
if (size < 0)
return -1;
if (size == 0)
return Runtime.getRuntime().maxMemory() / 4;
return size;
}
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return RetainableByteBufferPool.from(this);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
return _retainableByteBufferPool;
}
protected int getCapacityFactor()
@ -57,9 +99,14 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return _factor;
}
protected int getMaxQueueLength()
protected int getMaxCapacity()
{
return _maxQueueLength;
return _maxCapacity;
}
protected int getMaxBucketSize()
{
return _maxBucketSize;
}
@Deprecated
@ -123,6 +170,97 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return memory.get();
}
protected static class Bucket
{
private final Queue<ByteBuffer> _queue = new ConcurrentLinkedQueue<>();
private final int _capacity;
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;
@Deprecated
public Bucket(int capacity, int maxSize)
{
this(capacity, maxSize, i -> {});
}
public Bucket(int capacity, int maxSize, IntConsumer memoryFunction)
{
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}
public ByteBuffer acquire()
{
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}
return buffer;
}
public void release(ByteBuffer buffer)
{
resetUpdateTime();
BufferUtil.reset(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}
void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}
public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (_size != null)
--size;
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
long getLastUpdate()
{
return _lastUpdate.getOpaque();
}
@Override
public String toString()
{
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}
IntConsumer updateMemory(boolean direct)
{
return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet;

View File

@ -31,9 +31,9 @@ import org.slf4j.LoggerFactory;
/**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p>
* <p>Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers
* each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity
* 2048, and so on.</p>
* <p>Given a capacity {@code factor} of 4096, the first array element holds a bucket of ByteBuffers
* each of capacity 4096, the second array element holds a bucket of ByteBuffers each of capacity
* 8192, and so on.</p>
* <p>The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@ -44,8 +44,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
private final int _maxCapacity;
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private boolean _detailedDump = false;
/**
@ -90,19 +90,35 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer queue length in a {@link Bucket}
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new ArrayByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxBucketSize the maximum ByteBuffer queue length in a {@link Bucket}
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
maxCapacity = getMaxCapacity();
factor = getCapacityFactor();
if (minCapacity <= 0)
minCapacity = 0;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_maxCapacity = maxCapacity;
@ -110,8 +126,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
// Initialize all buckets in constructor and never modify the array again.
int length = bucketFor(maxCapacity) + 1;
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
_direct = new Bucket[length];
_indirect = new Bucket[length];
for (int i = 0; i < length; i++)
{
_direct[i] = newBucket(i, true);
@ -119,11 +135,17 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
}
}
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new Retained(factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : capacityFor(bucketFor(size));
ByteBufferPool.Bucket bucket = bucketFor(size, direct);
Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newByteBuffer(capacity, direct);
ByteBuffer buffer = bucket.acquire();
@ -152,7 +174,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
return;
boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct);
Bucket bucket = bucketFor(capacity, direct);
if (bucket != null)
{
bucket.release(buffer);
@ -162,7 +184,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(capacityFor(key), getMaxQueueLength(), updateMemory(direct));
return new Bucket(capacityFor(key), getMaxBucketSize(), updateMemory(direct));
}
@Override
@ -210,7 +232,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
return bucket * getCapacityFactor();
}
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
protected Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
@ -242,7 +264,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
}
// Package local for testing
ByteBufferPool.Bucket[] bucketsFor(boolean direct)
Bucket[] bucketsFor(boolean direct)
{
return direct ? _direct : _indirect;
}
@ -276,6 +298,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
dump.add("Indirect Buckets size=" + indirect.size());
dump.add("Direct Buckets size=" + direct.size());
}
dump.add(asRetainableByteBufferPool());
Dumpable.dumpObjects(out, indent, this, dump);
}
@ -286,7 +309,33 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
this.getClass().getSimpleName(), hashCode(),
_minCapacity,
_maxCapacity,
getMaxQueueLength(),
getMaxBucketSize(),
getCapacityFactor());
}
protected class Retained extends ArrayRetainableByteBufferPool
{
public Retained(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
super(0, factor, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
protected ByteBuffer allocate(int capacity)
{
return ArrayByteBufferPool.this.acquire(capacity, false);
}
@Override
protected ByteBuffer allocateDirect(int capacity)
{
return ArrayByteBufferPool.this.acquire(capacity, true);
}
@Override
protected void removed(RetainableByteBuffer retainedBuffer)
{
ArrayByteBufferPool.this.release(retainedBuffer.getBuffer());
}
}
}

View File

@ -39,13 +39,14 @@ import org.slf4j.LoggerFactory;
* <p>The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@SuppressWarnings("resource")
@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final RetainedBucket[] _direct;
private final RetainedBucket[] _indirect;
private final int _minCapacity;
private final int _maxCapacity;
private final long _maxHeapMemory;
@ -109,34 +110,34 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
{
if (minCapacity <= 0)
minCapacity = 0;
factor = factor <= 0 ? AbstractByteBufferPool.DEFAULT_FACTOR : factor;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
int f = factor <= 0 ? 1024 : factor;
if ((maxCapacity % f) != 0 || f >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
maxCapacity = AbstractByteBufferPool.DEFAULT_MAX_CAPACITY_BY_FACTOR * factor;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException(String.format("The capacity factor(%d) must be a divisor of maxCapacity(%d)", factor, maxCapacity));
int f = factor;
if (bucketIndexFor == null)
bucketIndexFor = c -> (c - 1) / f;
if (bucketCapacity == null)
bucketCapacity = i -> (i + 1) * f;
int length = bucketIndexFor.apply(maxCapacity) + 1;
Bucket[] directArray = new Bucket[length];
Bucket[] indirectArray = new Bucket[length];
RetainedBucket[] directArray = new RetainedBucket[length];
RetainedBucket[] indirectArray = new RetainedBucket[length];
for (int i = 0; i < directArray.length; i++)
{
int capacity = Math.min(bucketCapacity.apply(i), maxCapacity);
directArray[i] = new Bucket(capacity, maxBucketSize);
indirectArray[i] = new Bucket(capacity, maxBucketSize);
directArray[i] = new RetainedBucket(capacity, maxBucketSize);
indirectArray[i] = new RetainedBucket(capacity, maxBucketSize);
}
_minCapacity = minCapacity;
_maxCapacity = maxCapacity;
_direct = directArray;
_indirect = indirectArray;
_maxHeapMemory = (maxHeapMemory != 0L) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0L) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_maxHeapMemory = AbstractByteBufferPool.retainedSize(maxHeapMemory);
_maxDirectMemory = AbstractByteBufferPool.retainedSize(maxDirectMemory);
_bucketIndexFor = bucketIndexFor;
}
@ -155,20 +156,20 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
Bucket bucket = bucketFor(size, direct);
RetainedBucket bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Bucket.Entry entry = bucket.acquire();
return newRetainableByteBuffer(size, direct, this::removed);
RetainedBucket.Entry entry = bucket.acquire();
RetainableByteBuffer buffer;
if (entry == null)
{
Bucket.Entry reservedEntry = bucket.reserve();
RetainedBucket.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(bucket._capacity, direct, byteBuffer ->
buffer = newRetainableByteBuffer(bucket._capacity, direct, retainedBuffer ->
{
BufferUtil.reset(byteBuffer);
BufferUtil.reset(retainedBuffer.getBuffer());
reservedEntry.release();
});
reservedEntry.enable(buffer, true);
@ -180,7 +181,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
}
else
{
buffer = newRetainableByteBuffer(size, direct, byteBuffer -> {});
buffer = newRetainableByteBuffer(size, direct, this::removed);
}
}
else
@ -191,9 +192,23 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return buffer;
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<ByteBuffer> releaser)
protected ByteBuffer allocate(int capacity)
{
ByteBuffer buffer = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
return ByteBuffer.allocate(capacity);
}
protected ByteBuffer allocateDirect(int capacity)
{
return ByteBuffer.allocateDirect(capacity);
}
protected void removed(RetainableByteBuffer retainedBuffer)
{
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<RetainableByteBuffer> releaser)
{
ByteBuffer buffer = direct ? allocateDirect(capacity) : allocate(capacity);
BufferUtil.clear(buffer);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(buffer, releaser);
retainableByteBuffer.acquire();
@ -205,12 +220,12 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return bucketFor(capacity, direct);
}
private Bucket bucketFor(int capacity, boolean direct)
private RetainedBucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = _bucketIndexFor.apply(capacity);
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
@ -230,8 +245,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getByteBufferCount(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Bucket::size).sum();
RetainedBucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(RetainedBucket::size).sum();
}
@ManagedAttribute("The number of pooled direct ByteBuffers that are available")
@ -248,7 +263,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getAvailableByteBufferCount(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(bucket -> bucket.values().stream().filter(Pool.Entry::isIdle).count()).sum();
}
@ -286,9 +301,9 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private long getAvailableMemory(boolean direct)
{
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (Bucket bucket : buckets)
for (RetainedBucket bucket : buckets)
{
int capacity = bucket._capacity;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
@ -303,14 +318,17 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
clearArray(_indirect, _currentHeapMemory);
}
private void clearArray(Bucket[] poolArray, AtomicLong memoryCounter)
private void clearArray(RetainedBucket[] poolArray, AtomicLong memoryCounter)
{
for (Bucket pool : poolArray)
for (RetainedBucket pool : poolArray)
{
for (Bucket.Entry entry : pool.values())
for (RetainedBucket.Entry entry : pool.values())
{
entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity());
if (entry.remove())
{
memoryCounter.addAndGet(-entry.getPooled().capacity());
removed(entry.getPooled());
}
}
}
}
@ -338,13 +356,13 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
long now = System.nanoTime();
long totalClearedCapacity = 0L;
Bucket[] buckets = direct ? _direct : _indirect;
RetainedBucket[] buckets = direct ? _direct : _indirect;
while (totalClearedCapacity < excess)
{
for (Bucket bucket : buckets)
for (RetainedBucket bucket : buckets)
{
Bucket.Entry oldestEntry = findOldestEntry(now, bucket);
RetainedBucket.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null)
continue;
@ -356,6 +374,7 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
else
_currentHeapMemory.addAndGet(-clearedCapacity);
totalClearedCapacity += clearedCapacity;
removed(oldestEntry.getPooled());
}
// else a concurrent thread evicted the same entry -> do not account for its capacity.
}
@ -389,8 +408,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{
Bucket.Entry oldestEntry = null;
for (Bucket.Entry entry : bucket.values())
RetainedBucket.Entry oldestEntry = null;
for (RetainedBucket.Entry entry : bucket.values())
{
if (oldestEntry != null)
{
@ -406,11 +425,11 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return oldestEntry;
}
private static class Bucket extends Pool<RetainableByteBuffer>
private static class RetainedBucket extends Pool<RetainableByteBuffer>
{
private final int _capacity;
Bucket(int capacity, int size)
RetainedBucket(int capacity, int size)
{
super(Pool.StrategyType.THREAD_ID, size, true);
_capacity = capacity;

View File

@ -16,12 +16,6 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.BufferUtil;
@ -43,7 +37,7 @@ public interface ByteBufferPool
* @return the requested buffer
* @see #release(ByteBuffer)
*/
public ByteBuffer acquire(int size, boolean direct);
ByteBuffer acquire(int size, boolean direct);
/**
* <p>Returns a {@link ByteBuffer}, usually obtained with {@link #acquire(int, boolean)}
@ -52,7 +46,7 @@ public interface ByteBufferPool
* @param buffer the buffer to return
* @see #acquire(int, boolean)
*/
public void release(ByteBuffer buffer);
void release(ByteBuffer buffer);
/**
* <p>Removes a {@link ByteBuffer} that was previously obtained with {@link #acquire(int, boolean)}.</p>
@ -78,7 +72,14 @@ public interface ByteBufferPool
return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
public static class Lease
/**
* Get this pool as a {@link RetainableByteBufferPool}, which supports reference counting of the
* buffers and possibly a more efficient lookup mechanism based on the {@link org.eclipse.jetty.util.Pool} class.
* @return This pool as a RetainableByteBufferPool. The same instance is always returned by multiple calls to this method.
*/
RetainableByteBufferPool asRetainableByteBufferPool();
class Lease
{
private final ByteBufferPool byteBufferPool;
private final List<ByteBuffer> buffers;
@ -147,95 +148,4 @@ public interface ByteBufferPool
byteBufferPool.release(buffer);
}
}
public static class Bucket
{
private final Queue<ByteBuffer> _queue = new ConcurrentLinkedQueue<>();
private final int _capacity;
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;
@Deprecated
public Bucket(int capacity, int maxSize)
{
this(capacity, maxSize, i -> {});
}
public Bucket(int capacity, int maxSize, IntConsumer memoryFunction)
{
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}
public ByteBuffer acquire()
{
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}
return buffer;
}
public void release(ByteBuffer buffer)
{
resetUpdateTime();
BufferUtil.reset(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}
void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}
public void clear()
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (_size != null)
--size;
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
long getLastUpdate()
{
return _lastUpdate.getOpaque();
}
@Override
public String toString()
{
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}
}

View File

@ -58,6 +58,13 @@ public class LeakTrackingByteBufferPool extends ContainerLifeCycle implements By
addBean(delegate);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
// the retainable pool is just a client of the normal pool, so no special handling required.
return delegate.asRetainableByteBufferPool();
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{

View File

@ -63,7 +63,29 @@ public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, 1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(minCapacity, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param retainedHeapMemory the max heap memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -1 for unlimited retained memory or 0 to use default heuristic
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(minCapacity, -1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}
@Override
protected RetainableByteBufferPool newRetainableByteBufferPool(int factor, int maxCapacity, int maxBucketSize, long retainedHeapMemory, long retainedDirectMemory)
{
return new LogarithmicRetainablePool(0, maxCapacity, maxBucketSize, retainedHeapMemory, retainedDirectMemory);
}
@Override
@ -104,4 +126,34 @@ public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool
bucket.resetUpdateTime();
}
}
/**
* A variant of the {@link ArrayRetainableByteBufferPool} that
* uses buckets of buffers that increase in size by a power of
* 2 (eg 1k, 2k, 4k, 8k, etc.).
*/
public static class LogarithmicRetainablePool extends ArrayRetainableByteBufferPool
{
public LogarithmicRetainablePool()
{
this(0, -1, Integer.MAX_VALUE);
}
public LogarithmicRetainablePool(int minCapacity, int maxCapacity, int maxBucketSize)
{
this(minCapacity, maxCapacity, maxBucketSize, -1L, -1L);
}
public LogarithmicRetainablePool(int minCapacity, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity,
-1,
maxCapacity,
maxBucketSize,
maxHeapMemory,
maxDirectMemory,
c -> 32 - Integer.numberOfLeadingZeros(c - 1),
i -> 1 << i);
}
}
}

View File

@ -69,43 +69,73 @@ public class MappedByteBufferPool extends AbstractByteBufferPool implements Dump
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer bucket size
*/
public MappedByteBufferPool(int factor, int maxQueueLength)
public MappedByteBufferPool(int factor, int maxBucketSize)
{
this(factor, maxQueueLength, null);
this(factor, maxBucketSize, null);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param newBucket the function that creates a Bucket
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket)
private MappedByteBufferPool(int factor, int maxBucketSize, Function<Integer, Bucket> newBucket)
{
this(factor, maxQueueLength, newBucket, 0, 0);
this(factor, maxBucketSize, newBucket, 0, 0, 0, 0);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
public MappedByteBufferPool(int factor, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
this(factor, maxBucketSize, null, maxHeapMemory, maxDirectMemory, maxHeapMemory, maxDirectMemory);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
public MappedByteBufferPool(int factor, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
this(factor, maxBucketSize, null, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxBucketSize the maximum ByteBuffer bucket size
* @param newBucket the function that creates a Bucket
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param retainedHeapMemory the max heap memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
* @param retainedDirectMemory the max direct memory in bytes, -2 for no retained memory, -1 for unlimited retained memory or 0 to use default heuristic
*/
private MappedByteBufferPool(int factor, int maxBucketSize, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory, long retainedHeapMemory, long retainedDirectMemory)
{
super(factor, 0, maxBucketSize, maxHeapMemory, maxDirectMemory, retainedHeapMemory, retainedDirectMemory);
_newBucket = newBucket;
}
private Bucket newBucket(int key, boolean direct)
{
return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(capacityFor(key), getMaxQueueLength(), updateMemory(direct));
return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(capacityFor(key), getMaxBucketSize(), updateMemory(direct));
}
@Override
@ -269,7 +299,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool implements Dump
{
return String.format("%s@%x{maxQueueLength=%s, factor=%s}",
this.getClass().getSimpleName(), hashCode(),
getMaxQueueLength(),
getMaxBucketSize(),
getCapacityFactor());
}
}

View File

@ -19,6 +19,8 @@ import org.eclipse.jetty.util.BufferUtil;
public class NullByteBufferPool implements ByteBufferPool
{
private final RetainableByteBufferPool _retainableByteBufferPool = RetainableByteBufferPool.from(this);
@Override
public ByteBuffer acquire(int size, boolean direct)
{
@ -33,4 +35,10 @@ public class NullByteBufferPool implements ByteBufferPool
{
BufferUtil.clear(buffer);
}
@Override
public RetainableByteBufferPool asRetainableByteBufferPool()
{
return _retainableByteBufferPool;
}
}

View File

@ -37,10 +37,10 @@ public class RetainableByteBuffer implements Retainable
{
private final ByteBuffer buffer;
private final AtomicInteger references = new AtomicInteger();
private final Consumer<ByteBuffer> releaser;
private final Consumer<RetainableByteBuffer> releaser;
private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime());
RetainableByteBuffer(ByteBuffer buffer, Consumer<ByteBuffer> releaser)
RetainableByteBuffer(ByteBuffer buffer, Consumer<RetainableByteBuffer> releaser)
{
this.releaser = releaser;
this.buffer = buffer;
@ -112,7 +112,7 @@ public class RetainableByteBuffer implements Retainable
if (ref == 0)
{
lastUpdate.setOpaque(System.nanoTime());
releaser.accept(buffer);
releaser.accept(this);
return true;
}
return false;

View File

@ -15,8 +15,6 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.component.Container;
/**
* <p>A {@link RetainableByteBuffer} pool.</p>
* <p>Acquired buffers <b>must</b> be released by calling {@link RetainableByteBuffer#release()} otherwise the memory they hold will
@ -32,27 +30,36 @@ public interface RetainableByteBufferPool
*/
RetainableByteBuffer acquire(int size, boolean direct);
/**
* Finds a {@link RetainableByteBufferPool} implementation in the given container, or wrap the given
* {@link ByteBufferPool} with an adapter.
* @param container the container to search for an existing memory pool.
* @param byteBufferPool the {@link ByteBufferPool} to wrap if no memory pool was found in the container.
* @return the {@link RetainableByteBufferPool} found or the wrapped one.
*/
static RetainableByteBufferPool findOrAdapt(Container container, ByteBufferPool byteBufferPool)
void clear();
static RetainableByteBufferPool from(ByteBufferPool byteBufferPool)
{
RetainableByteBufferPool retainableByteBufferPool = container == null ? null : container.getBean(RetainableByteBufferPool.class);
if (retainableByteBufferPool == null)
return new RetainableByteBufferPool()
{
// Wrap the ByteBufferPool instance.
retainableByteBufferPool = (size, direct) ->
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
ByteBuffer byteBuffer = byteBufferPool.acquire(size, direct);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, byteBufferPool::release);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, this::release);
retainableByteBuffer.acquire();
return retainableByteBuffer;
};
}
return retainableByteBufferPool;
}
private void release(RetainableByteBuffer retainedBuffer)
{
byteBufferPool.release(retainedBuffer.getBuffer());
}
@Override
public void clear()
{
}
@Override
public String toString()
{
return String.format("NonRetainableByteBufferPool@%x{%s}", hashCode(), byteBufferPool.toString());
}
};
}
}

View File

@ -173,7 +173,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,
boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption)
{
this(RetainableByteBufferPool.findOrAdapt(null, byteBufferPool), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption);
this(byteBufferPool.asRetainableByteBufferPool(), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption);
}
public SslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,

View File

@ -18,7 +18,7 @@ import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.io.AbstractByteBufferPool.Bucket;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
@ -37,7 +37,7 @@ public class ArrayByteBufferPoolTest
public void testMinimumRelease()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 1; size <= 9; size++)
{
@ -45,7 +45,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertEquals(size, buffer.capacity());
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -53,7 +53,7 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer);
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -68,7 +68,7 @@ public class ArrayByteBufferPoolTest
int factor = 1;
int maxCapacity = 1024;
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = maxCapacity - 1; size <= maxCapacity + 1; size++)
{
@ -77,7 +77,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -87,7 +87,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
if (size <= maxCapacity)
@ -101,7 +101,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireRelease()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 390; size <= 510; size++)
{
@ -110,7 +110,7 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
for (Bucket bucket : buckets)
{
if (bucket != null)
assertTrue(bucket.isEmpty());
@ -120,7 +120,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
assertEquals(1, pooled);
}
@ -130,7 +130,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireReleaseAcquire()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 390; size <= 510; size++)
{
@ -144,7 +144,7 @@ public class ArrayByteBufferPoolTest
int pooled = Arrays.stream(buckets)
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.mapToInt(AbstractByteBufferPool.Bucket::size)
.sum();
assertEquals(1, pooled);

View File

@ -14,6 +14,7 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
@ -26,7 +27,9 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
@ -238,7 +241,7 @@ public class ArrayRetainableByteBufferPoolTest
pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(2048L));
assertThat(pool.getDirectMemory(), is(2L * AbstractByteBufferPool.DEFAULT_FACTOR));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
@ -282,16 +285,16 @@ public class ArrayRetainableByteBufferPoolTest
{
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(buf1, is(notNullValue()));
assertThat(buf1.capacity(), is(1024));
assertThat(buf1.capacity(), is(AbstractByteBufferPool.DEFAULT_FACTOR));
RetainableByteBuffer buf2 = pool.acquire(10, true);
assertThat(buf2, is(notNullValue()));
assertThat(buf2.capacity(), is(1024));
assertThat(buf2.capacity(), is(AbstractByteBufferPool.DEFAULT_FACTOR));
buf1.release();
buf2.release();
RetainableByteBuffer buf3 = pool.acquire(16384 + 1, true);
assertThat(buf3, is(notNullValue()));
assertThat(buf3.capacity(), is(16384 + 1024));
assertThat(buf3.capacity(), is(16384 + AbstractByteBufferPool.DEFAULT_FACTOR));
buf3.release();
RetainableByteBuffer buf4 = pool.acquire(32768, true);
@ -307,7 +310,7 @@ public class ArrayRetainableByteBufferPoolTest
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertThat(pool.getHeapByteBufferCount(), is(1L));
assertThat(pool.getDirectMemory(), is(1024 + 1024 + 16384 + 1024 + 32768L));
assertThat(pool.getDirectMemory(), is(AbstractByteBufferPool.DEFAULT_FACTOR * 3L + 16384 + 32768L));
assertThat(pool.getHeapMemory(), is(32768L));
pool.clear();
@ -391,4 +394,29 @@ public class ArrayRetainableByteBufferPoolTest
assertThat(buffer.release(), is(true));
assertThat(buffer.getBuffer().order(), Matchers.is(ByteOrder.BIG_ENDIAN));
}
@Test
public void testLogarithmic()
{
LogarithmicArrayByteBufferPool pool = new LogarithmicArrayByteBufferPool();
ByteBuffer buffer5 = pool.acquire(5, false);
pool.release(buffer5);
ByteBuffer buffer6 = pool.acquire(6, false);
assertThat(buffer6, sameInstance(buffer5));
pool.release(buffer6);
ByteBuffer buffer9 = pool.acquire(9, false);
assertThat(buffer9, not(sameInstance(buffer5)));
pool.release(buffer9);
RetainableByteBufferPool retainablePool = pool.asRetainableByteBufferPool();
RetainableByteBuffer retain5 = retainablePool.acquire(5, false);
retain5.release();
RetainableByteBuffer retain6 = retainablePool.acquire(6, false);
assertThat(retain6, sameInstance(retain5));
retain6.release();
RetainableByteBuffer retain9 = retainablePool.acquire(9, false);
assertThat(retain9, not(sameInstance(retain5)));
retain9.release();
}
}

View File

@ -16,7 +16,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.io.AbstractByteBufferPool.Bucket;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.junit.jupiter.api.Test;
@ -138,7 +138,7 @@ public class MappedByteBufferPoolTest
{
int factor = 1024;
int maxMemory = 11 * factor;
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, -1, maxMemory);
ConcurrentMap<Integer, Bucket> buckets = bufferPool.bucketsFor(true);
// Create the buckets - the oldest is the larger.

View File

@ -4,8 +4,10 @@
<New id="byteBufferPool" class="org.eclipse.jetty.io.LogarithmicArrayByteBufferPool">
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxBucketSize" deprecated="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -3,10 +3,12 @@
<Configure>
<New id="byteBufferPool" class="org.eclipse.jetty.io.ArrayByteBufferPool">
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="1024"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="4096"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxBucketSize" deprecated="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.retainedDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -20,11 +20,17 @@ etc/jetty-bytebufferpool-logarithmic.xml
## Maximum capacity to pool ByteBuffers
#jetty.byteBufferPool.maxCapacity=65536
## Maximum queue length for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum size for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxBucketSize=-1
## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited)
## Maximum heap memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited)
## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=0
## Maximum heap memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedHeapMemory=0
## Maximum direct memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedDirectMemory=0

View File

@ -1,5 +1,6 @@
[description]
Configures the ByteBufferPool used by ServerConnectors.
Use module "bytebufferpool-logarithmic" for a pool may hold less granulated sized buffers.
[tags]
bytebufferpool
@ -19,13 +20,19 @@ etc/jetty-bytebufferpool.xml
## Bucket capacity factor.
## ByteBuffers are allocated out of buckets that have
## a capacity that is multiple of this factor.
#jetty.byteBufferPool.factor=1024
#jetty.byteBufferPool.factor=4096
## Maximum queue length for each bucket (-1 for unbounded).
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum size for each bucket (-1 for unbounded).
#jetty.byteBufferPool.maxBucketSize=-1
## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited).
## Maximum heap memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited).
## Maximum direct memory held idle by the pool (0 for heuristic, -1 for unlimited).
#jetty.byteBufferPool.maxDirectMemory=0
## Maximum heap memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedHeapMemory=0
## Maximum direct memory retained whilst in use by the pool (0 for heuristic, -1 for unlimited, -2 for no retained).
#jetty.byteBufferPool.retainedDirectMemory=0

View File

@ -32,10 +32,9 @@ import java.util.concurrent.locks.Condition;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.StringUtil;
@ -187,12 +186,26 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
scheduler = _server.getBean(Scheduler.class);
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x", hashCode()), false);
addBean(_scheduler);
if (pool == null)
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
addBean(_byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = _server.getBean(RetainableByteBufferPool.class);
addBean(retainableByteBufferPool == null ? new ArrayRetainableByteBufferPool() : retainableByteBufferPool, retainableByteBufferPool == null);
synchronized (server)
{
if (pool == null)
{
// Look for (and cache) a common pool on the server
pool = server.getBean(ByteBufferPool.class);
if (pool == null)
{
pool = new LogarithmicArrayByteBufferPool();
server.addBean(pool, true);
}
addBean(pool, false);
}
else
{
addBean(pool, true);
}
}
_byteBufferPool = pool;
addEventListener(new Container.Listener()
{

View File

@ -97,7 +97,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, _bufferPool);
_retainableByteBufferPool = _bufferPool.asRetainableByteBufferPool();
_generator = newHttpGenerator();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();

View File

@ -168,7 +168,7 @@ public class SslConnectionFactory extends AbstractConnectionFactory implements C
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption());
}

View File

@ -210,46 +210,39 @@ public class AsyncCompletionTest extends HttpServerTestFixture
os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1));
os.flush();
// wait for OWP to execute (proves we do not block in write APIs)
boolean completeCalled = handler.waitForOWPExit();
while (true)
{
// wait for threads to return to base level (proves we are really async)
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
Boolean owpExit = handler.pollForOWPExit();
if (owpExit == null)
{
// handle any callback written so far
while (delay != null)
{
delay.proceed();
delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
}
continue;
}
// OWP has exited, but we have a delay, so let's wait for thread to return to the pool to ensure we are async.
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT);
while (_threadPool.getBusyThreads() != base)
while (delay != null && _threadPool.getBusyThreads() > base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(POLL);
}
if (completeCalled)
break;
// We are now asynchronously waiting!
assertThat(__transportComplete.get(), is(false));
// If we are not complete, we must be waiting for one or more writes to complete
while (true)
// handle any callback written so far
while (delay != null)
{
PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
if (delay != null)
{
delay.proceed();
continue;
}
// No delay callback found, have we finished OWP again?
Boolean c = handler.pollForOWPExit();
if (c == null)
// No we haven't, so look for another delay callback
continue;
// We have a OWP result, so let's handle it.
completeCalled = c;
break;
delay.proceed();
delay = __queue.poll(POLL, TimeUnit.MILLISECONDS);
}
if (owpExit)
break;
}
// Wait for full completion

View File

@ -31,7 +31,7 @@ import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.server.HttpOutput.Interceptor;
import org.eclipse.jetty.server.LocalConnector.LocalEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
@ -69,19 +69,7 @@ public class HttpOutputTest
{
_server = new Server();
_server.addBean(new ByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
}
@Override
public void release(ByteBuffer buffer)
{
}
});
_server.addBean(new NullByteBufferPool());
HttpConnectionFactory http = new HttpConnectionFactory();
http.getHttpConfiguration().setRequestHeaderSize(1024);

View File

@ -19,6 +19,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.LocalConnector;
@ -49,6 +50,7 @@ public class BufferedResponseHandlerTest
public static void setUp() throws Exception
{
_server = new Server();
_server.addBean(new NullByteBufferPool()); // Avoid giving larger buffers than requested
_config = new HttpConfiguration();
_config.setOutputBufferSize(1024);
_config.setOutputAggregationSize(256);

View File

@ -339,7 +339,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
return null;
// If we have no space
if (entries.size() >= maxEntries)
if (maxEntries > 0 && entries.size() >= maxEntries)
return null;
Entry entry = newEntry();

View File

@ -440,7 +440,7 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
HttpClient httpClient = wsClient.getHttpClient();
ByteBufferPool bufferPool = wsClient.getWebSocketComponents().getBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(wsClient.getWebSocketComponents(), bufferPool);
RetainableByteBufferPool retainableByteBufferPool = bufferPool.asRetainableByteBufferPool();
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, retainableByteBufferPool, coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);

View File

@ -97,7 +97,7 @@ public final class RFC6455Handshaker extends AbstractHandshaker
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}

View File

@ -81,7 +81,7 @@ public class RFC8441Handshaker extends AbstractHandshaker
Connector connector = httpChannel.getConnector();
EndPoint endPoint = httpChannel.getTunnellingEndPoint();
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = byteBufferPool.asRetainableByteBufferPool();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}