Improve ssl buffers handling (#8165)

* Fixes #8161 improve SSLConnection buffers handling

Added memory heuristic to ArrayRetainableByteBufferPool

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
Ludovic Orban 2022-06-15 15:10:50 +02:00 committed by GitHub
parent 0699bc5326
commit 66de7ba618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 459 additions and 57 deletions

View File

@ -22,12 +22,14 @@ import java.net.Socket;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
@ -36,6 +38,8 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket; import javax.net.ssl.SSLSocket;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
@ -43,12 +47,16 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslHandshakeListener; import org.eclipse.jetty.io.ssl.SslHandshakeListener;
@ -56,11 +64,13 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.toolchain.test.Net; import org.eclipse.jetty.toolchain.test.Net;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool; import org.eclipse.jetty.util.thread.ExecutorThreadPool;
@ -71,9 +81,14 @@ import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.api.condition.JRE;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -682,12 +697,7 @@ public class HttpClientTLSTest
// Trigger the creation of a new connection, but don't use it. // Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool); ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created. // Verify that the connection has been created.
while (true) await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1));
{
Thread.sleep(50);
if (connectionPool.getConnectionCount() == 1)
break;
}
// Wait for the server to idle timeout the connection. // Wait for the server to idle timeout the connection.
Thread.sleep(idleTimeout + idleTimeout / 2); Thread.sleep(idleTimeout + idleTimeout / 2);
@ -698,6 +708,299 @@ public class HttpClientTLSTest
assertEquals(0, clientBytes.get()); assertEquals(0, clientBytes.get());
} }
@Test
public void testEncryptedInputBufferRepooling() throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
{
@Override
public Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return super.poolFor(capacity, direct);
}
};
server.addBean(retainableByteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
{
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected int networkFill(ByteBuffer input) throws IOException
{
int n = super.networkFill(input);
if (n > 0)
throw new IOException("boom");
return n;
}
};
}
};
connector = new ServerConnector(server, 1, 1, ssl, http);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler());
server.start();
SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(clientTLSFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
client.setExecutor(clientThreads);
client.start();
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
Pool<RetainableByteBuffer> bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
assertEquals(1, bucket.size());
assertEquals(1, bucket.getIdleCount());
}
@Test
public void testEncryptedOutputBufferRepooling() throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer acquired = super.acquire(size, direct);
leakedBuffers.add(acquired);
return acquired;
}
@Override
public void release(ByteBuffer buffer)
{
leakedBuffers.remove(buffer);
super.release(buffer);
}
};
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
{
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected boolean networkFlush(ByteBuffer output) throws IOException
{
throw new IOException("bang");
}
};
}
};
connector = new ServerConnector(server, 1, 1, ssl, http);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler());
server.start();
SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(clientTLSFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
client.setExecutor(clientThreads);
client.start();
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(boolean close) throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer acquired = super.acquire(size, direct);
leakedBuffers.add(acquired);
return acquired;
}
@Override
public void release(ByteBuffer buffer)
{
leakedBuffers.remove(buffer);
super.release(buffer);
}
};
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
AtomicBoolean failFlush = new AtomicBoolean(false);
SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
{
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected boolean networkFlush(ByteBuffer output) throws IOException
{
if (failFlush.get())
return false;
return super.networkFlush(output);
}
};
}
};
connector = new ServerConnector(server, 1, 1, ssl, http);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
failFlush.set(true);
if (close)
jettyRequest.getHttpChannel().getEndPoint().close();
else
jettyRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
server.start();
SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(clientTLSFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
client.setExecutor(clientThreads);
client.start();
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean close) throws Exception
{
SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
List<ByteBuffer> leakedBuffers = new ArrayList<>();
ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
{
@Override
public ByteBuffer acquire(int size, boolean direct)
{
ByteBuffer acquired = super.acquire(size, direct);
leakedBuffers.add(acquired);
return acquired;
}
@Override
public void release(ByteBuffer buffer)
{
leakedBuffers.remove(buffer);
super.release(buffer);
}
};
server.addBean(byteBufferPool);
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.addCustomizer(new SecureRequestCustomizer());
HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
AtomicBoolean failFlush = new AtomicBoolean(false);
SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
{
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected boolean networkFlush(ByteBuffer output) throws IOException
{
if (failFlush.get())
throw new IOException();
return super.networkFlush(output);
}
};
}
};
connector = new ServerConnector(server, 1, 1, ssl, http);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler()
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
failFlush.set(true);
if (close)
jettyRequest.getHttpChannel().getEndPoint().close();
else
jettyRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
server.start();
SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
clientConnector.setSslContextFactory(clientTLSFactory);
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
client.setExecutor(clientThreads);
client.start();
assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
}
@Test @Test
public void testNeverUsedConnectionThenClientIdleTimeout() throws Exception public void testNeverUsedConnectionThenClientIdleTimeout() throws Exception
{ {
@ -780,12 +1083,7 @@ public class HttpClientTLSTest
// Trigger the creation of a new connection, but don't use it. // Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool); ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created. // Verify that the connection has been created.
while (true) await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1));
{
Thread.sleep(50);
if (connectionPool.getConnectionCount() == 1)
break;
}
// Wait for the client to idle timeout the connection. // Wait for the client to idle timeout the connection.
Thread.sleep(idleTimeout + idleTimeout / 2); Thread.sleep(idleTimeout + idleTimeout / 2);

View File

@ -22,6 +22,10 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.ManagedOperation;
/**
* The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@ManagedObject @ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool abstract class AbstractByteBufferPool implements ByteBufferPool
{ {
@ -37,8 +41,8 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
* *
* @param factor the capacity factor * @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length * @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/ */
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{ {

View File

@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
* <p>Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers * <p>Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers
* each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity * each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity
* 2048, and so on.</p> * 2048, and so on.</p>
* The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/ */
@ManagedObject @ManagedObject
public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable
@ -48,6 +50,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
/** /**
* Creates a new ArrayByteBufferPool with a default configuration. * Creates a new ArrayByteBufferPool with a default configuration.
* Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*/ */
public ArrayByteBufferPool() public ArrayByteBufferPool()
{ {
@ -56,6 +59,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
/** /**
* Creates a new ArrayByteBufferPool with the given configuration. * Creates a new ArrayByteBufferPool with the given configuration.
* Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
* *
* @param minCapacity the minimum ByteBuffer capacity * @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor * @param factor the capacity factor
@ -68,6 +72,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
/** /**
* Creates a new ArrayByteBufferPool with the given configuration. * Creates a new ArrayByteBufferPool with the given configuration.
* Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
* *
* @param minCapacity the minimum ByteBuffer capacity * @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor * @param factor the capacity factor
@ -86,8 +91,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
* @param factor the capacity factor * @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity * @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length * @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/ */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{ {

View File

@ -30,6 +30,15 @@ import org.eclipse.jetty.util.component.DumpableCollection;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* <p>A {@link RetainableByteBuffer} pool where RetainableByteBuffers are held in {@link Pool}s that are
* held in array elements.</p>
* <p>Given a capacity {@code factor} of 1024, the first array element holds a Pool of RetainableByteBuffers
* each of capacity 1024, the second array element holds a Pool of RetainableByteBuffers each of capacity
* 2048, and so on.</p>
* The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
* divided by 4.</p>
*/
@ManagedObject @ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{ {
@ -45,21 +54,56 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private final AtomicLong _currentDirectMemory = new AtomicLong(); private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function<Integer, Integer> _bucketIndexFor; private final Function<Integer, Integer> _bucketIndexFor;
/**
* Creates a new ArrayRetainableByteBufferPool with a default configuration.
* Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*/
public ArrayRetainableByteBufferPool() public ArrayRetainableByteBufferPool()
{ {
this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L); this(0, -1, -1, Integer.MAX_VALUE);
} }
/**
* Creates a new ArrayRetainableByteBufferPool with the given configuration.
* Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxBucketSize the maximum number of ByteBuffers for each bucket
*/
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize) public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{ {
this(minCapacity, factor, maxCapacity, maxBucketSize, -1L, -1L); this(minCapacity, factor, maxCapacity, maxBucketSize, 0L, 0L);
} }
/**
* Creates a new ArrayRetainableByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxBucketSize the maximum number of ByteBuffers for each bucket
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{ {
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null); this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
} }
/**
* Creates a new ArrayRetainableByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxBucketSize the maximum number of ByteBuffers for each bucket
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
* @param bucketIndexFor a {@link Function} that takes a capacity and returns a bucket index
* @param bucketCapacity a {@link Function} that takes a bucket index and returns a capacity
*/
protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity) Function<Integer, Integer> bucketIndexFor, Function<Integer, Integer> bucketCapacity)
{ {
@ -91,8 +135,8 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
_maxCapacity = maxCapacity; _maxCapacity = maxCapacity;
_direct = directArray; _direct = directArray;
_indirect = indirectArray; _indirect = indirectArray;
_maxHeapMemory = maxHeapMemory; _maxHeapMemory = (maxHeapMemory != 0L) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = maxDirectMemory; _maxDirectMemory = (maxDirectMemory != 0L) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_bucketIndexFor = bucketIndexFor; _bucketIndexFor = bucketIndexFor;
} }
@ -156,6 +200,11 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
return retainableByteBuffer; return retainableByteBuffer;
} }
protected Pool<RetainableByteBuffer> poolFor(int capacity, boolean direct)
{
return bucketFor(capacity, direct);
}
private Bucket bucketFor(int capacity, boolean direct) private Bucket bucketFor(int capacity, boolean direct)
{ {
if (capacity < _minCapacity) if (capacity < _minCapacity)

View File

@ -419,8 +419,10 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection); connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection);
} }
private void releaseEncryptedInputBuffer() private void releaseEmptyEncryptedInputBuffer()
{ {
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
if (_encryptedInput != null && !_encryptedInput.hasRemaining()) if (_encryptedInput != null && !_encryptedInput.hasRemaining())
{ {
_encryptedInput.release(); _encryptedInput.release();
@ -428,8 +430,10 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
} }
protected void releaseDecryptedInputBuffer() private void releaseEmptyDecryptedInputBuffer()
{ {
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
if (_decryptedInput != null && !_decryptedInput.hasRemaining()) if (_decryptedInput != null && !_decryptedInput.hasRemaining())
{ {
_bufferPool.release(_decryptedInput); _bufferPool.release(_decryptedInput);
@ -437,7 +441,31 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
} }
private void releaseEncryptedOutputBuffer() private void discardInputBuffers()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
if (_encryptedInput != null)
_encryptedInput.clear();
BufferUtil.clear(_decryptedInput);
releaseEmptyInputBuffers();
}
private void releaseEmptyInputBuffers()
{
releaseEmptyEncryptedInputBuffer();
releaseEmptyDecryptedInputBuffer();
}
private void discardEncryptedOutputBuffer()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
BufferUtil.clear(_encryptedOutput);
releaseEmptyEncryptedOutputBuffer();
}
private void releaseEmptyEncryptedOutputBuffer()
{ {
if (!_lock.isHeldByCurrentThread()) if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException(); throw new IllegalStateException();
@ -759,7 +787,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
// See also system property "jsse.SSLEngine.acceptLargeFragments". // See also system property "jsse.SSLEngine.acceptLargeFragments".
if (BufferUtil.isEmpty(_decryptedInput) && appBufferSize < getApplicationBufferSize()) if (BufferUtil.isEmpty(_decryptedInput) && appBufferSize < getApplicationBufferSize())
{ {
releaseDecryptedInputBuffer(); releaseEmptyDecryptedInputBuffer();
continue; continue;
} }
throw new IllegalStateException("Unexpected unwrap result " + unwrap); throw new IllegalStateException("Unexpected unwrap result " + unwrap);
@ -790,6 +818,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
catch (Throwable x) catch (Throwable x)
{ {
discardInputBuffers();
Throwable f = handleException(x, "fill"); Throwable f = handleException(x, "fill");
Throwable failure = handshakeFailed(f); Throwable failure = handshakeFailed(f);
if (_flushState == FlushState.WAIT_FOR_FILL) if (_flushState == FlushState.WAIT_FOR_FILL)
@ -801,8 +830,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
finally finally
{ {
releaseEncryptedInputBuffer(); releaseEmptyInputBuffers();
releaseDecryptedInputBuffer();
if (_flushState == FlushState.WAIT_FOR_FILL) if (_flushState == FlushState.WAIT_FOR_FILL)
{ {
@ -988,26 +1016,26 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
} }
// finish of any previous flushes
if (_encryptedOutput != null)
{
int remaining = _encryptedOutput.remaining();
if (remaining > 0)
{
boolean flushed = networkFlush(_encryptedOutput);
int written = remaining - _encryptedOutput.remaining();
if (written > 0)
_bytesOut.addAndGet(written);
if (!flushed)
return false;
}
}
boolean isEmpty = BufferUtil.isEmpty(appOuts);
Boolean result = null; Boolean result = null;
try try
{ {
// finish of any previous flushes
if (_encryptedOutput != null)
{
int remaining = _encryptedOutput.remaining();
if (remaining > 0)
{
boolean flushed = networkFlush(_encryptedOutput);
int written = remaining - _encryptedOutput.remaining();
if (written > 0)
_bytesOut.addAndGet(written);
if (!flushed)
return false;
}
}
boolean isEmpty = BufferUtil.isEmpty(appOuts);
if (_flushState != FlushState.IDLE) if (_flushState != FlushState.IDLE)
return result = false; return result = false;
@ -1121,7 +1149,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
// See also system property "jsse.SSLEngine.acceptLargeFragments". // See also system property "jsse.SSLEngine.acceptLargeFragments".
if (packetBufferSize < getPacketBufferSize()) if (packetBufferSize < getPacketBufferSize())
{ {
releaseEncryptedOutputBuffer(); releaseEmptyEncryptedOutputBuffer();
continue; continue;
} }
throw new IllegalStateException("Unexpected wrap result " + wrap); throw new IllegalStateException("Unexpected wrap result " + wrap);
@ -1159,12 +1187,13 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
catch (Throwable x) catch (Throwable x)
{ {
discardEncryptedOutputBuffer();
Throwable failure = handleException(x, "flush"); Throwable failure = handleException(x, "flush");
throw handshakeFailed(failure); throw handshakeFailed(failure);
} }
finally finally
{ {
releaseEncryptedOutputBuffer(); releaseEmptyEncryptedOutputBuffer();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("<flush {} {}", result, SslConnection.this); LOG.debug("<flush {} {}", result, SslConnection.this);
} }
@ -1274,11 +1303,15 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
@Override @Override
public void doShutdownOutput() public void doShutdownOutput()
{
doShutdownOutput(false);
}
private void doShutdownOutput(boolean close)
{ {
EndPoint endPoint = getEndPoint(); EndPoint endPoint = getEndPoint();
try try
{ {
boolean close;
boolean flush = false; boolean flush = false;
try (AutoLock l = _lock.lock()) try (AutoLock l = _lock.lock())
{ {
@ -1296,7 +1329,8 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
flush = !oshut; flush = !oshut;
} }
close = ishut; if (!close)
close = ishut;
} }
if (flush) if (flush)
@ -1321,25 +1355,35 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
try (AutoLock l = _lock.lock()) try (AutoLock l = _lock.lock())
{ {
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
releaseEncryptedOutputBuffer(); releaseEmptyEncryptedOutputBuffer();
} }
}, t -> endPoint.close()), write); }, t -> disconnect()), write);
} }
} }
} }
if (close) if (close)
endPoint.close(); disconnect();
else else
ensureFillInterested(); ensureFillInterested();
} }
catch (Throwable x) catch (Throwable x)
{ {
LOG.trace("IGNORED", x); if (LOG.isTraceEnabled())
endPoint.close(); LOG.trace("IGNORED", x);
disconnect();
} }
} }
private void disconnect()
{
try (AutoLock l = _lock.lock())
{
discardEncryptedOutputBuffer();
}
getEndPoint().close();
}
private void closeOutbound() private void closeOutbound()
{ {
try try
@ -1382,9 +1426,12 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
@Override @Override
public void doClose() public void doClose()
{ {
try (AutoLock l = _lock.lock())
{
discardInputBuffers();
}
// First send the TLS Close Alert, then the FIN. // First send the TLS Close Alert, then the FIN.
doShutdownOutput(); doShutdownOutput(true);
getEndPoint().close();
super.doClose(); super.doClose();
} }
@ -1537,7 +1584,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this); LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this);
releaseEncryptedOutputBuffer(); releaseEmptyEncryptedOutputBuffer();
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
interested = _fillState == FillState.INTERESTED; interested = _fillState == FillState.INTERESTED;
@ -1563,8 +1610,7 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x); LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x);
BufferUtil.clear(_encryptedOutput); discardEncryptedOutputBuffer();
releaseEncryptedOutputBuffer();
_flushState = FlushState.IDLE; _flushState = FlushState.IDLE;
failFillInterest = _fillState == FillState.WAIT_FOR_FLUSH || failFillInterest = _fillState == FillState.WAIT_FOR_FLUSH ||