#6322 Use RetainableByteBuffer and write a new pool for it

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2021-05-28 11:51:57 +02:00
parent c4021023ee
commit c9a5d8df58
20 changed files with 944 additions and 133 deletions

View File

@ -54,10 +54,12 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Jetty;
@ -193,12 +195,14 @@ public class HttpClient extends ContainerLifeCycle
threadPool.setName(name);
setExecutor(threadPool);
}
int maxBucketSize = executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2;
ByteBufferPool byteBufferPool = getByteBufferPool();
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool(2048,
executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2));
setByteBufferPool(new MappedByteBufferPool(2048, maxBucketSize));
if (getBean(RetainableByteBufferPool.class) == null)
addBean(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize));
Scheduler scheduler = getScheduler();
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler(name + "-scheduler", false));

View File

@ -29,9 +29,9 @@ import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
@ -43,6 +43,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private final RetainableByteBufferPool retainableByteBufferPool;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
private boolean complete;
@ -61,6 +62,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
parser.setHeaderCacheSize(httpTransport.getHeaderCacheSize());
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
}
@Override
@ -111,9 +114,8 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
boolean direct = client.isUseInputDirectByteBuffers();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), direct);
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), direct);
}
private void releaseNetworkBuffer()
@ -166,7 +168,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res
return;
}
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
// The networkBuffer may have been reacquired.

View File

@ -44,9 +44,9 @@ import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -71,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final ClientParser parser;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private final RetainableByteBufferPool retainableByteBufferPool;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
@ -81,6 +82,9 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
}
public HttpDestination getHttpDestination()
@ -135,8 +139,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}
private void releaseNetworkBuffer()
@ -161,7 +164,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
if (parse(networkBuffer.getBuffer()))
return;
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
// The networkBuffer may have been reacquired.

View File

@ -31,6 +31,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
@ -67,7 +68,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers());
@ -81,9 +84,9 @@ public class HTTP2ClientConnectionFactory implements ClientConnectionFactory
private final Promise<Session> promise;
private final Session.Listener listener;
private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
super(retainableByteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
this.promise = promise;
this.listener = listener;

View File

@ -23,10 +23,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -45,7 +45,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool;
private final RetainableByteBufferPool retainableByteBufferPool;
private final Parser parser;
private final ISession session;
private final int bufferSize;
@ -53,10 +53,10 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
protected HTTP2Connection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.retainableByteBufferPool = retainableByteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
@ -287,7 +287,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
return task;
// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
}
@ -415,16 +415,43 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
}
}
private class NetworkBuffer extends RetainableByteBuffer implements Callback
private class NetworkBuffer implements Callback
{
private final RetainableByteBuffer delegate;
private NetworkBuffer()
{
super(byteBufferPool, bufferSize, isUseInputDirectByteBuffers());
delegate = retainableByteBufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}
public ByteBuffer getBuffer()
{
return delegate.getBuffer();
}
public boolean isRetained()
{
return delegate.isRetained();
}
public boolean hasRemaining()
{
return delegate.hasRemaining();
}
public boolean release()
{
return delegate.release();
}
public void retain()
{
delegate.retain();
}
private void put(ByteBuffer source)
{
BufferUtil.append(getBuffer(), source);
BufferUtil.append(delegate.getBuffer(), source);
}
@Override
@ -441,7 +468,7 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private void completed(Throwable failure)
{
if (release() == 0)
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this, failure);

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.WindowRateControl;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
@ -279,7 +280,9 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());
HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());

View File

@ -45,8 +45,8 @@ import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
@ -87,9 +87,9 @@ public class HTTP2ServerConnection extends HTTP2Connection
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels = true;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
super(retainableByteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
}

View File

@ -0,0 +1,314 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool
{
private static final Logger LOG = LoggerFactory.getLogger(ArrayRetainableByteBufferPool.class);
private final Pool<RetainableByteBuffer>[] _direct;
private final Pool<RetainableByteBuffer>[] _indirect;
private final int _factor;
private final int _minCapacity;
private final long _maxHeapMemory;
private final long _maxDirectMemory;
private final AtomicLong _currentHeapMemory = new AtomicLong();
private final AtomicLong _currentDirectMemory = new AtomicLong();
public ArrayRetainableByteBufferPool()
{
this(0, 1024, 65536, Integer.MAX_VALUE, -1L, -1L);
}
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, -1L, -1L);
}
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
this._maxHeapMemory = maxHeapMemory;
this._maxDirectMemory = maxDirectMemory;
if (minCapacity <= 0)
minCapacity = 0;
_minCapacity = minCapacity;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % _factor) != 0 || _factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
int length = maxCapacity / _factor;
@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] directArray = new Pool[length];
@SuppressWarnings("unchecked")
Pool<RetainableByteBuffer>[] indirectArray = new Pool[length];
for (int i = 0; i < directArray.length; i++)
{
directArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
indirectArray[i] = new Pool<>(Pool.StrategyType.THREAD_ID, maxBucketSize, true);
}
_direct = directArray;
_indirect = indirectArray;
}
@Override
public RetainableByteBuffer acquire(int size, boolean direct)
{
int capacity = (bucketIndexFor(size) + 1) * _factor;
Pool<RetainableByteBuffer> bucket = bucketFor(size, direct);
if (bucket == null)
return newRetainableByteBuffer(size, direct, byteBuffer -> {});
Pool<RetainableByteBuffer>.Entry entry = bucket.acquire();
RetainableByteBuffer buffer;
if (entry == null)
{
Pool<RetainableByteBuffer>.Entry reservedEntry = bucket.reserve();
if (reservedEntry != null)
{
buffer = newRetainableByteBuffer(capacity, direct, byteBuffer ->
{
BufferUtil.clear(byteBuffer);
reservedEntry.release();
});
reservedEntry.enable(buffer, true);
if (direct)
_currentDirectMemory.addAndGet(buffer.capacity());
else
_currentHeapMemory.addAndGet(buffer.capacity());
releaseExcessMemory(direct);
}
else
{
buffer = newRetainableByteBuffer(size, direct, byteBuffer -> {});
}
}
else
{
buffer = entry.getPooled();
buffer.acquire();
}
return buffer;
}
private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direct, Consumer<ByteBuffer> releaser)
{
ByteBuffer buffer = direct ? ByteBuffer.allocateDirect(capacity) : ByteBuffer.allocate(capacity);
BufferUtil.clear(buffer);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(buffer, releaser);
retainableByteBuffer.acquire();
return retainableByteBuffer;
}
private Pool<RetainableByteBuffer> bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int idx = bucketIndexFor(capacity);
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
if (idx >= buckets.length)
return null;
return buckets[idx];
}
private int bucketIndexFor(int capacity)
{
return (capacity - 1) / _factor;
}
@ManagedAttribute("The number of pooled direct ByteBuffers")
public long getDirectByteBufferCount()
{
return getByteBufferCount(true);
}
@ManagedAttribute("The number of pooled heap ByteBuffers")
public long getHeapByteBufferCount()
{
return getByteBufferCount(false);
}
private long getByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(Pool::size).sum();
}
@ManagedAttribute("The number of pooled direct ByteBuffers that are available")
public long getAvailableDirectByteBufferCount()
{
return getAvailableByteBufferCount(true);
}
@ManagedAttribute("The number of pooled heap ByteBuffers that are available")
public long getAvailableHeapByteBufferCount()
{
return getAvailableByteBufferCount(false);
}
private long getAvailableByteBufferCount(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
return Arrays.stream(buckets).mapToLong(pool -> pool.values().stream().filter(Pool.Entry::isIdle).count()).sum();
}
@ManagedAttribute("The bytes retained by direct ByteBuffers")
public long getDirectMemory()
{
return getMemory(true);
}
@ManagedAttribute("The bytes retained by heap ByteBuffers")
public long getHeapMemory()
{
return getMemory(false);
}
private long getMemory(boolean direct)
{
if (direct)
return _currentDirectMemory.get();
else
return _currentHeapMemory.get();
}
@ManagedAttribute("The available bytes retained by direct ByteBuffers")
public long getAvailableDirectMemory()
{
return getAvailableMemory(true);
}
@ManagedAttribute("The available bytes retained by heap ByteBuffers")
public long getAvailableHeapMemory()
{
return getAvailableMemory(false);
}
private long getAvailableMemory(boolean direct)
{
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
long total = 0L;
for (int i = 0; i < buckets.length; i++)
{
Pool<RetainableByteBuffer> bucket = buckets[i];
long capacity = (i + 1L) * _factor;
total += bucket.values().stream().filter(Pool.Entry::isIdle).count() * capacity;
}
return total;
}
@ManagedOperation(value = "Clears this RetainableByteBufferPool", impact = "ACTION")
public void clear()
{
clearArray(_direct, _currentDirectMemory);
clearArray(_indirect, _currentHeapMemory);
}
private void clearArray(Pool<RetainableByteBuffer>[] poolArray, AtomicLong memoryCounter)
{
for (Pool<RetainableByteBuffer> retainableByteBufferPool : poolArray)
{
for (Pool<RetainableByteBuffer>.Entry entry : retainableByteBufferPool.values())
{
entry.remove();
memoryCounter.addAndGet(-entry.getPooled().capacity());
}
}
}
private void releaseExcessMemory(boolean direct)
{
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
if (maxMemory > 0)
{
long excess = getMemory(direct) - maxMemory;
if (excess > 0)
evict(direct, excess);
}
}
/**
* This eviction mechanism searches for the RetainableByteBuffers that were released the longest time ago.
* @param direct true to search in the direct buffers buckets, false to search in the heap buffers buckets.
* @param excess the amount of bytes to evict. At least this much will be removed from the buckets.
*/
private void evict(boolean direct, long excess)
{
if (LOG.isDebugEnabled())
LOG.debug("evicting {} bytes from {} pools", excess, (direct ? "direct" : "heap"));
long now = System.nanoTime();
long totalClearedCapacity = 0L;
Pool<RetainableByteBuffer>[] buckets = direct ? _direct : _indirect;
while (totalClearedCapacity < excess)
{
for (Pool<RetainableByteBuffer> bucket : buckets)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = findOldestEntry(now, bucket);
if (oldestEntry == null)
continue;
if (oldestEntry.remove())
{
int clearedCapacity = oldestEntry.getPooled().capacity();
if (direct)
_currentDirectMemory.addAndGet(-clearedCapacity);
else
_currentHeapMemory.addAndGet(-clearedCapacity);
totalClearedCapacity += clearedCapacity;
}
// else a concurrent thread evicted the same entry -> do not account for its capacity.
}
}
if (LOG.isDebugEnabled())
LOG.debug("eviction done, cleared {} bytes from {} pools", totalClearedCapacity, (direct ? "direct" : "heap"));
}
private Pool<RetainableByteBuffer>.Entry findOldestEntry(long now, Pool<RetainableByteBuffer> bucket)
{
Pool<RetainableByteBuffer>.Entry oldestEntry = null;
for (Pool<RetainableByteBuffer>.Entry entry : bucket.values())
{
if (oldestEntry != null)
{
long entryAge = now - entry.getPooled().getLastUpdate();
if (entryAge > now - oldestEntry.getPooled().getLastUpdate())
oldestEntry = entry;
}
else
{
oldestEntry = entry;
}
}
return oldestEntry;
}
}

View File

@ -15,32 +15,40 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Retainable;
/**
* A Retainable ByteBuffer.
* <p>Acquires a ByteBuffer from a {@link ByteBufferPool} and maintains a reference count that is
* initially 1, incremented with {@link #retain()} and decremented with {@link #release()}. The buffer
* is released to the pool when the reference count is decremented to 0.</p>
* <p>A pooled ByteBuffer which maintains a reference count that is
* incremented with {@link #retain()} and decremented with {@link #release()}. The buffer
* is released to the pool when {@link #release()} is called one more time than {@link #retain()}.</p>
* <p>A {@code RetainableByteBuffer} can either be:
* <ul>
* <li>in pool; in this case {@link #isRetained()} returns {@code false} and calling {@link #release()} throws {@link IllegalStateException}</li>
* <li>out of pool but not retained; in this case {@link #isRetained()} returns {@code false} and calling {@link #release()} returns {@code true}</li>
* <li>out of pool and retained; in this case {@link #isRetained()} returns {@code true} and calling {@link #release()} returns {@code false}</li>
* </ul>
* <p>Calling {@link #release()} on a out of pool and retained instance does not re-pool it while that re-pools it on a out of pool but not retained instance.</p>
*/
public class RetainableByteBuffer implements Retainable
{
private final ByteBufferPool pool;
private final ByteBuffer buffer;
private final AtomicInteger references;
private final AtomicInteger references = new AtomicInteger();
private final Consumer<ByteBuffer> releaser;
private final AtomicLong lastUpdate = new AtomicLong(System.nanoTime());
public RetainableByteBuffer(ByteBufferPool pool, int size)
RetainableByteBuffer(ByteBuffer buffer, Consumer<ByteBuffer> releaser)
{
this(pool, size, false);
this.releaser = releaser;
this.buffer = buffer;
}
public RetainableByteBuffer(ByteBufferPool pool, int size, boolean direct)
public int capacity()
{
this.pool = pool;
this.buffer = pool.acquire(size, direct);
this.references = new AtomicInteger(1);
return buffer.capacity();
}
public ByteBuffer getBuffer()
@ -48,32 +56,66 @@ public class RetainableByteBuffer implements Retainable
return buffer;
}
public int getReferences()
public long getLastUpdate()
{
return references.get();
return lastUpdate.getOpaque();
}
/**
* Checks if {@link #retain()} has been called at least one more time than {@link #release()}.
* @return true if this buffer is retained, false otherwise.
*/
public boolean isRetained()
{
return references.get() > 1;
}
public boolean isDirect()
{
return buffer.isDirect();
}
/**
* Increments the retained counter of this buffer. It must be done internally by
* the pool right after creation and after each un-pooling.
* The reason why this method exists on top of {@link #retain()} is to be able to
* have some safety checks that must know why the ref counter is being incremented.
*/
void acquire()
{
if (references.getAndUpdate(c -> c == 0 ? 1 : c) != 0)
throw new IllegalStateException("re-pooled while still used " + this);
}
/**
* Increments the retained counter of this buffer.
*/
@Override
public void retain()
{
while (true)
{
int r = references.get();
if (r == 0)
if (references.getAndUpdate(c -> c == 0 ? 0 : c + 1) == 0)
throw new IllegalStateException("released " + this);
if (references.compareAndSet(r, r + 1))
break;
}
}
public int release()
/**
* Decrements the retained counter of this buffer.
* @return true if the buffer was re-pooled, false otherwise.
*/
public boolean release()
{
int ref = references.decrementAndGet();
if (ref == 0)
pool.release(buffer);
else if (ref < 0)
int ref = references.updateAndGet(c ->
{
if (c == 0)
throw new IllegalStateException("already released " + this);
return ref;
return c - 1;
});
if (ref == 0)
{
lastUpdate.setOpaque(System.nanoTime());
releaser.accept(buffer);
return true;
}
return false;
}
public int remaining()
@ -99,6 +141,6 @@ public class RetainableByteBuffer implements Retainable
@Override
public String toString()
{
return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), getReferences());
return String.format("%s@%x{%s,r=%d}", getClass().getSimpleName(), hashCode(), BufferUtil.toDetailString(buffer), references.get());
}
}

View File

@ -0,0 +1,58 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.component.Container;
/**
* <p>A {@link RetainableByteBuffer} pool.</p>
* <p>Acquired buffers <b>must</b> be released by calling {@link RetainableByteBuffer#release()} otherwise the memory they hold will
* be leaked.</p>
*/
public interface RetainableByteBufferPool
{
/**
* Acquires a memory buffer from the pool.
* @param size The size of the buffer. The returned buffer will have at least this capacity.
* @param direct true if a direct memory buffer is needed, false otherwise.
* @return a memory buffer.
*/
RetainableByteBuffer acquire(int size, boolean direct);
/**
* Finds a {@link RetainableByteBufferPool} implementation in the given container, or wrap the given
* {@link ByteBufferPool} with an adapter.
* @param container the container to search for an existing memory pool.
* @param byteBufferPool the {@link ByteBufferPool} to wrap if no memory pool was found in the container.
* @return the {@link RetainableByteBufferPool} found or the wrapped one.
*/
static RetainableByteBufferPool findOrAdapt(Container container, ByteBufferPool byteBufferPool)
{
RetainableByteBufferPool retainableByteBufferPool = container == null ? null : container.getBean(RetainableByteBufferPool.class);
if (retainableByteBufferPool == null)
{
// Wrap the ByteBufferPool instance.
retainableByteBufferPool = (size, direct) ->
{
ByteBuffer byteBuffer = byteBufferPool.acquire(size, direct);
RetainableByteBuffer retainableByteBuffer = new RetainableByteBuffer(byteBuffer, byteBufferPool::release);
retainableByteBuffer.acquire();
return retainableByteBuffer;
};
}
return retainableByteBufferPool;
}
}

View File

@ -35,6 +35,8 @@ import org.eclipse.jetty.io.AbstractEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -107,10 +109,11 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
private final ByteBufferPool _bufferPool;
private final RetainableByteBufferPool _retainableByteBufferPool;
private final SSLEngine _sslEngine;
private final DecryptedEndPoint _decryptedEndPoint;
private ByteBuffer _decryptedInput;
private ByteBuffer _encryptedInput;
private RetainableByteBuffer _encryptedInput;
private ByteBuffer _encryptedOutput;
private final boolean _encryptedDirectBuffers;
private final boolean _decryptedDirectBuffers;
@ -187,11 +190,18 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
public SslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,
boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption)
{
this(RetainableByteBufferPool.findOrAdapt(null, byteBufferPool), byteBufferPool, executor, endPoint, sslEngine, useDirectBuffersForEncryption, useDirectBuffersForDecryption);
}
public SslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine sslEngine,
boolean useDirectBuffersForEncryption, boolean useDirectBuffersForDecryption)
{
// This connection does not execute calls to onFillable(), so they will be called by the selector thread.
// onFillable() does not block and will only wakeup another thread to do the actual reading and handling.
super(endPoint, executor);
this._bufferPool = byteBufferPool;
this._retainableByteBufferPool = retainableByteBufferPool;
this._sslEngine = sslEngine;
this._decryptedEndPoint = newDecryptedEndPoint();
this._encryptedDirectBuffers = useDirectBuffersForEncryption;
@ -326,7 +336,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
private void acquireEncryptedInput()
{
if (_encryptedInput == null)
_encryptedInput = _bufferPool.acquire(getPacketBufferSize(), _encryptedDirectBuffers);
_encryptedInput = _retainableByteBufferPool.acquire(getPacketBufferSize(), _encryptedDirectBuffers);
}
private void acquireEncryptedOutput()
@ -339,7 +349,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
public void onUpgradeTo(ByteBuffer buffer)
{
acquireEncryptedInput();
BufferUtil.append(_encryptedInput, buffer);
BufferUtil.append(_encryptedInput.getBuffer(), buffer);
}
@Override
@ -409,7 +419,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
@Override
public String toConnectionString()
{
ByteBuffer b = _encryptedInput;
ByteBuffer b = _encryptedInput == null ? null : _encryptedInput.getBuffer();
int ei = b == null ? -1 : b.remaining();
b = _encryptedOutput;
int eo = b == null ? -1 : b.remaining();
@ -431,7 +441,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
{
if (_encryptedInput != null && !_encryptedInput.hasRemaining())
{
_bufferPool.release(_encryptedInput);
_encryptedInput.release();
_encryptedInput = null;
}
}
@ -672,14 +682,14 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
}
// Let's try reading some encrypted data... even if we have some already.
int netFilled = networkFill(_encryptedInput);
int netFilled = networkFill(_encryptedInput.getBuffer());
if (netFilled > 0)
_bytesIn.addAndGet(netFilled);
if (LOG.isDebugEnabled())
LOG.debug("net filled={}", netFilled);
// Workaround for Java 11 behavior.
if (netFilled < 0 && isHandshakeInitial() && BufferUtil.isEmpty(_encryptedInput))
if (netFilled < 0 && isHandshakeInitial() && (_encryptedInput == null || _encryptedInput.isEmpty()))
closeInbound();
if (netFilled > 0 && !isHandshakeComplete() && isOutboundDone())
@ -698,7 +708,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
try
{
_underflown = false;
unwrapResult = SslConnection.this.unwrap(_sslEngine, _encryptedInput, appIn);
unwrapResult = SslConnection.this.unwrap(_sslEngine, _encryptedInput.getBuffer(), appIn);
}
finally
{
@ -708,7 +718,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
LOG.debug("unwrap net_filled={} {} encryptedBuffer={} unwrapBuffer={} appBuffer={}",
netFilled,
StringUtil.replace(unwrapResult.toString(), '\n', ' '),
BufferUtil.toSummaryString(_encryptedInput),
_encryptedInput,
BufferUtil.toDetailString(appIn),
BufferUtil.toDetailString(buffer));
@ -729,13 +739,13 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
case BUFFER_UNDERFLOW:
// Continue if we can compact?
if (BufferUtil.compact(_encryptedInput))
if (BufferUtil.compact(_encryptedInput.getBuffer()))
continue;
// Are we out of space?
if (BufferUtil.space(_encryptedInput) == 0)
if (BufferUtil.space(_encryptedInput.getBuffer()) == 0)
{
BufferUtil.clear(_encryptedInput);
BufferUtil.clear(_encryptedInput.getBuffer());
throw new SSLHandshakeException("Encrypted buffer max length exceeded");
}
@ -847,7 +857,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
_flushState,
_fillState,
_underflown,
BufferUtil.toDetailString(_encryptedInput),
_encryptedInput,
BufferUtil.toDetailString(_decryptedInput),
SslConnection.this);
@ -855,7 +865,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
return;
// Fillable if we have decrypted input OR enough encrypted input.
fillable = BufferUtil.hasContent(_decryptedInput) || (BufferUtil.hasContent(_encryptedInput) && !_underflown);
fillable = BufferUtil.hasContent(_decryptedInput) || (_encryptedInput != null && _encryptedInput.hasRemaining() && !_underflown);
HandshakeStatus status = _sslEngine.getHandshakeStatus();
switch (status)

View File

@ -0,0 +1,316 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ArrayRetainableByteBufferPoolTest
{
@Test
public void testMaxMemoryEviction()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, Integer.MAX_VALUE, 40, 40);
List<RetainableByteBuffer> buffers = new ArrayList<>();
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(10, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
buffers.add(pool.acquire(20, true));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectMemory(), greaterThan(0L));
buffers.forEach(RetainableByteBuffer::release);
assertThat(pool.getAvailableDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectByteBufferCount(), greaterThan(0L));
assertThat(pool.getDirectByteBufferCount(), lessThan((long)buffers.size()));
assertThat(pool.getDirectMemory(), lessThanOrEqualTo(40L));
assertThat(pool.getDirectMemory(), greaterThan(0L));
}
@Test
public void testBelowMinCapacityDoesNotPool()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
RetainableByteBuffer buf1 = pool.acquire(1, true);
assertThat(buf1.capacity(), is(1));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
buf1.release();
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
}
@Test
public void testOverMaxCapacityDoesNotPool()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
RetainableByteBuffer buf1 = pool.acquire(21, true);
assertThat(buf1.capacity(), is(21));
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
buf1.release();
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
}
@Test
public void testRetain()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(buf1.isRetained(), is(false));
buf1.retain();
buf1.retain();
assertThat(buf1.isRetained(), is(true));
assertThat(buf1.release(), is(false));
assertThat(buf1.isRetained(), is(true));
assertThat(buf1.release(), is(false));
assertThat(buf1.isRetained(), is(false));
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(buf1.release(), is(true));
assertThat(buf1.isRetained(), is(false));
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
}
@Test
public void testTooManyReleases()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
buf1.release();
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThrows(IllegalStateException.class, buf1::release);
assertThat(pool.getDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectMemory(), is(10L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
assertThat(pool.getDirectByteBufferCount(), is(1L));
}
@Test
public void testMaxBucketSize()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, 2);
RetainableByteBuffer buf1 = pool.acquire(1, true); // pooled
assertThat(buf1.capacity(), is(10));
RetainableByteBuffer buf2 = pool.acquire(1, true); // pooled
assertThat(buf2.capacity(), is(10));
RetainableByteBuffer buf3 = pool.acquire(1, true); // not pooled, bucket is full
assertThat(buf3.capacity(), is(1));
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(20L));
RetainableByteBuffer buf4 = pool.acquire(11, true); // pooled
assertThat(buf4.capacity(), is(20));
RetainableByteBuffer buf5 = pool.acquire(11, true); // pooled
assertThat(buf5.capacity(), is(20));
RetainableByteBuffer buf6 = pool.acquire(11, true); // not pooled, bucket is full
assertThat(buf6.capacity(), is(11));
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertThat(pool.getDirectMemory(), is(60L));
}
@Test
public void testBufferReleaseRepools()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(0, 10, 20, 1);
List<RetainableByteBuffer> all = new ArrayList<>();
all.add(pool.acquire(1, true)); // pooled
all.add(pool.acquire(1, true)); // not pooled, bucket is full
all.add(pool.acquire(11, true)); // pooled
all.add(pool.acquire(11, true)); // not pooled, bucket is full
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
all.forEach(RetainableByteBuffer::release);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(2L));
assertThat(pool.getAvailableDirectMemory(), is(30L));
}
@Test
public void testFactorAndCapacity()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool(10, 10, 20, Integer.MAX_VALUE);
pool.acquire(1, true); // not pooled, < minCapacity
pool.acquire(10, true); // pooled
pool.acquire(20, true); // pooled
pool.acquire(30, true); // not pooled, > maxCapacity
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(30L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
}
@Test
public void testClearUnlinksLeakedBuffers()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool();
pool.acquire(10, true);
pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(2L));
assertThat(pool.getDirectMemory(), is(2048L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
pool.clear();
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(pool.getAvailableDirectMemory(), is(0L));
}
@Test
public void testRetainAfterRePooledThrows()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool();
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(buf1.release(), is(true));
assertThrows(IllegalStateException.class, buf1::retain);
assertThrows(IllegalStateException.class, buf1::release);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
// check that the buffer is still available
RetainableByteBuffer buf2 = pool.acquire(10, true);
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(0L));
assertThat(buf2 == buf1, is(true)); // make sure it's not a new instance
assertThat(buf1.release(), is(true));
assertThat(pool.getDirectByteBufferCount(), is(1L));
assertThat(pool.getAvailableDirectByteBufferCount(), is(1L));
}
@Test
public void testAcquireRelease()
{
ArrayRetainableByteBufferPool pool = new ArrayRetainableByteBufferPool();
for (int i = 0; i < 3; i++)
{
RetainableByteBuffer buf1 = pool.acquire(10, true);
assertThat(buf1, is(notNullValue()));
assertThat(buf1.capacity(), is(1024));
RetainableByteBuffer buf2 = pool.acquire(10, true);
assertThat(buf2, is(notNullValue()));
assertThat(buf2.capacity(), is(1024));
buf1.release();
buf2.release();
RetainableByteBuffer buf3 = pool.acquire(16384 + 1, true);
assertThat(buf3, is(notNullValue()));
assertThat(buf3.capacity(), is(16384 + 1024));
buf3.release();
RetainableByteBuffer buf4 = pool.acquire(32768, true);
assertThat(buf4, is(notNullValue()));
assertThat(buf4.capacity(), is(32768));
buf4.release();
RetainableByteBuffer buf5 = pool.acquire(32768, false);
assertThat(buf5, is(notNullValue()));
assertThat(buf5.capacity(), is(32768));
buf5.release();
}
assertThat(pool.getDirectByteBufferCount(), is(4L));
assertThat(pool.getHeapByteBufferCount(), is(1L));
assertThat(pool.getDirectMemory(), is(1024 + 1024 + 16384 + 1024 + 32768L));
assertThat(pool.getHeapMemory(), is(32768L));
pool.clear();
assertThat(pool.getDirectByteBufferCount(), is(0L));
assertThat(pool.getHeapByteBufferCount(), is(0L));
assertThat(pool.getDirectMemory(), is(0L));
assertThat(pool.getHeapMemory(), is(0L));
}
}

View File

@ -32,8 +32,10 @@ import java.util.concurrent.locks.Condition;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.StringUtil;
@ -188,6 +190,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
addBean(_byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = _server.getBean(RetainableByteBufferPool.class);
addBean(retainableByteBufferPool == null ? new ArrayRetainableByteBufferPool() : retainableByteBufferPool, retainableByteBufferPool == null);
addEventListener(new Container.Listener()
{

View File

@ -17,7 +17,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.BadMessageException;
@ -35,6 +34,8 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -56,12 +57,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private final HttpConfiguration _config;
private final Connector _connector;
private final ByteBufferPool _bufferPool;
private final RetainableByteBufferPool _retainableByteBufferPool;
private final HttpInput _input;
private final HttpGenerator _generator;
private final HttpChannelOverHttp _channel;
private final HttpParser _parser;
private final AtomicInteger _contentBufferReferences = new AtomicInteger();
private volatile ByteBuffer _requestBuffer = null;
private volatile RetainableByteBuffer _retainableByteBuffer;
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations;
@ -96,6 +97,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, _bufferPool);
_generator = newHttpGenerator();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
@ -198,10 +200,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
@Override
public ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(_requestBuffer))
if (!isRequestBufferEmpty())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining());
unconsumed.put(_requestBuffer);
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
@ -225,36 +227,34 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
ByteBuffer buffer = _requestBuffer;
_requestBuffer = null;
_bufferPool.release(buffer);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
}
}
public ByteBuffer getRequestBuffer()
private ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
{
boolean useDirectByteBuffers = isUseInputDirectByteBuffers();
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), useDirectByteBuffers);
}
return _requestBuffer;
if (_retainableByteBuffer == null)
_retainableByteBuffer = _retainableByteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
return _retainableByteBuffer.getBuffer();
}
public boolean isRequestBufferEmpty()
{
return BufferUtil.isEmpty(_requestBuffer);
return _retainableByteBuffer == null || _retainableByteBuffer.isEmpty();
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer));
LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), _retainableByteBuffer);
HttpConnection last = setCurrentConnection(this);
try
@ -299,18 +299,27 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
}
}
catch (Throwable x)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("{} caught exception {}", this, _channel.getState(), x);
BufferUtil.clear(_requestBuffer);
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.clear();
releaseRequestBuffer();
}
}
finally
{
getEndPoint().close(x);
}
}
finally
{
setCurrentConnection(last);
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer));
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), _retainableByteBuffer);
}
}
@ -338,22 +347,22 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)
if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained())
throw new IllegalStateException("fill with unconsumed content on " + this);
if (BufferUtil.isEmpty(_requestBuffer))
if (isRequestBufferEmpty())
{
// Get a buffer
// We are not in a race here for the request buffer as we have not yet received a request,
// so there are not an possible legal threads calling #parseContent or #completed.
_requestBuffer = getRequestBuffer();
ByteBuffer requestBuffer = getRequestBuffer();
// fill
try
{
int filled = getEndPoint().fill(_requestBuffer);
int filled = getEndPoint().fill(requestBuffer);
if (filled == 0) // Do a retry on fill 0 (optimization for SSL connections)
filled = getEndPoint().fill(_requestBuffer);
filled = getEndPoint().fill(requestBuffer);
if (filled > 0)
bytesIn.add(filled);
@ -361,7 +370,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, BufferUtil.toDetailString(_requestBuffer));
LOG.debug("{} filled {} {}", this, filled, _retainableByteBuffer);
return filled;
}
@ -379,15 +388,15 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
private boolean parseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parse {}", this, BufferUtil.toDetailString(_requestBuffer));
LOG.debug("{} parse {}", this, _retainableByteBuffer);
boolean handle = _parser.parseNext(_requestBuffer == null ? BufferUtil.EMPTY_BUFFER : _requestBuffer);
boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getBuffer());
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser);
// recycle buffer ?
if (_contentBufferReferences.get() == 0)
if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained())
releaseRequestBuffer();
return handle;
@ -406,15 +415,17 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
_channel.recycle();
_parser.reset();
_generator.reset();
if (_contentBufferReferences.get() == 0)
if (_retainableByteBuffer != null)
{
if (!_retainableByteBuffer.isRetained())
{
releaseRequestBuffer();
}
else
{
LOG.warn("{} lingering content references?!?!", this);
_requestBuffer = null; // Not returned to pool!
_contentBufferReferences.set(0);
_retainableByteBuffer = null; // Not returned to pool!
}
}
return true;
}
@ -472,7 +483,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
if (_parser.isStart())
{
// if the buffer is empty
if (BufferUtil.isEmpty(_requestBuffer))
if (isRequestBufferEmpty())
{
// look for more data
fillInterested();
@ -629,21 +640,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
public Content(ByteBuffer content)
{
super(content);
_contentBufferReferences.incrementAndGet();
_retainableByteBuffer.retain();
}
@Override
public void succeeded()
{
int counter = _contentBufferReferences.decrementAndGet();
if (counter == 0)
releaseRequestBuffer();
// TODO: this should do something (warn? fail?) if _contentBufferReferences goes below 0
if (counter < 0)
{
LOG.warn("Content reference counting went below zero: {}", counter);
_contentBufferReferences.incrementAndGet();
}
_retainableByteBuffer.release();
}
@Override

View File

@ -21,8 +21,10 @@ import javax.net.ssl.SSLSession;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslHandshakeListener;
import org.eclipse.jetty.util.annotation.Name;
@ -165,7 +167,9 @@ public class SslConnectionFactory extends AbstractConnectionFactory implements C
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption());
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption());
}
@Override

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.QuotedStringTokenizer;
@ -439,7 +440,8 @@ public abstract class CoreClientUpgradeRequest extends HttpRequest implements Re
HttpClient httpClient = wsClient.getHttpClient();
ByteBufferPool bufferPool = wsClient.getWebSocketComponents().getBufferPool();
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, coreSession);
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(wsClient.getWebSocketComponents(), bufferPool);
WebSocketConnection wsConnection = new WebSocketConnection(endPoint, httpClient.getExecutor(), httpClient.getScheduler(), bufferPool, retainableByteBufferPool, coreSession);
wsClient.getEventListeners().forEach(wsConnection::addEventListener);
coreSession.setWebSocketConnection(wsConnection);
Exception listenerError = notifyUpgradeListeners((listener) -> listener.onHandshakeResponse(this, response));

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
@ -53,6 +54,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
private final AutoLock lock = new AutoLock();
private final ByteBufferPool bufferPool;
private final RetainableByteBufferPool retainableByteBufferPool;
private final Generator generator;
private final Parser parser;
private final WebSocketCoreSession coreSession;
@ -78,9 +80,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
Executor executor,
Scheduler scheduler,
ByteBufferPool bufferPool,
RetainableByteBufferPool retainableByteBufferPool,
WebSocketCoreSession coreSession)
{
this(endp, executor, scheduler, bufferPool, coreSession, null);
this(endp, executor, scheduler, bufferPool, retainableByteBufferPool, coreSession, null);
}
/**
@ -93,6 +96,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
* @param executor A thread executor to use for WS callbacks.
* @param scheduler A scheduler to use for timeouts
* @param bufferPool A pool of buffers to use.
* @param retainableByteBufferPool A pool of retainable buffers to use.
* @param coreSession The WC core session to which frames are delivered.
* @param randomMask A Random used to mask frames. If null then SecureRandom will be created if needed.
*/
@ -100,6 +104,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
Executor executor,
Scheduler scheduler,
ByteBufferPool bufferPool,
RetainableByteBufferPool retainableByteBufferPool,
WebSocketCoreSession coreSession,
Random randomMask)
{
@ -109,8 +114,10 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
Objects.requireNonNull(coreSession, "Session");
Objects.requireNonNull(executor, "Executor");
Objects.requireNonNull(bufferPool, "ByteBufferPool");
Objects.requireNonNull(retainableByteBufferPool, "RetainableByteBufferPool");
this.bufferPool = bufferPool;
this.retainableByteBufferPool = retainableByteBufferPool;
this.coreSession = coreSession;
this.generator = new Generator();
this.parser = new Parser(bufferPool, coreSession);
@ -310,7 +317,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
private RetainableByteBuffer newNetworkBuffer(int capacity)
{
return new RetainableByteBuffer(bufferPool, capacity, isUseInputDirectByteBuffers());
return retainableByteBufferPool.acquire(capacity, isUseInputDirectByteBuffers());
}
private void releaseNetworkBuffer()
@ -464,7 +471,7 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
}
// If more references that 1(us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
int filled = getEndPoint().fill(networkBuffer.getBuffer()); // TODO check if compact is possible.

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpTransport;
@ -216,9 +217,9 @@ public abstract class AbstractHandshaker implements Handshaker
protected abstract WebSocketConnection createWebSocketConnection(Request baseRequest, WebSocketCoreSession coreSession);
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, WebSocketCoreSession coreSession)
protected WebSocketConnection newWebSocketConnection(EndPoint endPoint, Executor executor, Scheduler scheduler, ByteBufferPool byteBufferPool, RetainableByteBufferPool retainableByteBufferPool, WebSocketCoreSession coreSession)
{
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, coreSession);
return new WebSocketConnection(endPoint, executor, scheduler, byteBufferPool, retainableByteBufferPool, coreSession);
}
protected abstract void prepareResponse(Response response, WebSocketNegotiation negotiation);

View File

@ -23,6 +23,8 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
@ -94,7 +96,9 @@ public final class RFC6455Handshaker extends AbstractHandshaker
{
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
return newWebSocketConnection(httpChannel.getEndPoint(), connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}
@Override

View File

@ -19,7 +19,9 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.Request;
@ -78,7 +80,9 @@ public class RFC8441Handshaker extends AbstractHandshaker
HttpChannel httpChannel = baseRequest.getHttpChannel();
Connector connector = httpChannel.getConnector();
EndPoint endPoint = httpChannel.getTunnellingEndPoint();
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), connector.getByteBufferPool(), coreSession);
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, byteBufferPool);
return newWebSocketConnection(endPoint, connector.getExecutor(), connector.getScheduler(), byteBufferPool, retainableByteBufferPool, coreSession);
}
@Override