Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-06-01 16:34:33 +02:00
commit cf998fb321
15 changed files with 432 additions and 139 deletions

View File

@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
@ -37,18 +36,17 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionPool.class);
private final AtomicBoolean closed = new AtomicBoolean();
/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.maxConnections = maxConnections;
@ -86,17 +84,28 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
}
@Override
public Connection acquire()
public Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
if (create)
tryCreate(destination.getQueuedRequestCount());
connection = activate();
}
return connection;
}
/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
protected void tryCreate(int maxPending)
{
while (true)

View File

@ -45,12 +45,16 @@ public interface ConnectionPool extends Closeable
boolean isClosed();
/**
* <p>Returns an idle connection, if available, or schedules the opening
* of a new connection and returns {@code null}.</p>
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
*
* @return an available connection, or null
* @param create whether to schedule the opening of a connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
*/
Connection acquire();
Connection acquire(boolean create);
/**
* <p>Accepts the given connection to be managed by this ConnectionPool.</p>
@ -61,7 +65,7 @@ public interface ConnectionPool extends Closeable
boolean accept(Connection connection);
/**
* <p>Returns the given connection, previously obtained via {@link #acquire()},
* <p>Returns the given connection, previously obtained via {@link #acquire(boolean)},
* back to this ConnectionPool.</p>
*
* @param connection the connection to release

View File

@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -50,7 +49,7 @@ public class DuplexConnectionPool extends AbstractConnectionPool implements Swee
private final Deque<Connection> idleConnections;
private final Set<Connection> activeConnections;
public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester)
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
this.idleConnections = new ArrayDeque<>(maxConnections);

View File

@ -105,6 +105,8 @@ public abstract class HttpConnection implements IConnection
}
else
{
// Association may fail, for example if the application
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
@ -119,6 +121,8 @@ public abstract class HttpConnection implements IConnection
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}

View File

@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
{
protected static final Logger LOG = LoggerFactory.getLogger(HttpDestination.class);
private static final Logger LOG = LoggerFactory.getLogger(HttpDestination.class);
private final HttpClient client;
private final Origin origin;
@ -234,7 +234,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
@Override
public void succeeded()
{
send();
send(false);
}
@Override
@ -291,32 +291,42 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
}
public void send()
{
if (getHttpExchanges().isEmpty())
return;
process();
}
protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
{
return queue.offer(exchange);
}
private void process()
public void send()
{
send(true);
}
private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process(create);
}
private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection = connectionPool.acquire(create);
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}
public boolean process(Connection connection)
private ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
@ -332,7 +342,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
@ -343,31 +353,37 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}
else
SendFailure failure = send((IConnection)connection, exchange);
if (failure == null)
{
SendFailure result = send((IConnection)connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}
@ -392,11 +408,6 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
return exchanges.remove(exchange);
}
public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}
@Override
public void close()
{
@ -407,24 +418,6 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
timeout.destroy();
}
public void close(Connection connection)
{
boolean removed = remove(connection);
if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
}
}
public void release(Connection connection)
{
if (LOG.isDebugEnabled())
@ -435,7 +428,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
@ -453,6 +446,24 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
}
public boolean remove(Connection connection)
{
boolean removed = connectionPool.remove(connection);
if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else if (removed)
{
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
}
return removed;
}
/**
* Aborts all the {@link HttpExchange}s queued in this destination.
*
@ -580,4 +591,9 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
}
}
private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.slf4j.Logger;
@ -38,7 +37,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
}
};
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
public LeakTrackingConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
start();

View File

@ -55,18 +55,30 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
}
@Override
public Connection acquire()
public Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = ceilDiv(queuedRequests, maxMultiplex);
tryCreate(maxPending);
connection = activate();
}
return connection;
}
/**
* @param a the dividend
* @param b the divisor
* @return the ceiling of the algebraic quotient
*/
private static int ceilDiv(int a, int b)
{
return (a + b - 1) / b;
}
@Override
public int getMaxMultiplex()
{

View File

@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
@ -35,12 +34,12 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool implements
private int maxMultiplex;
private int index;
public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester)
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this(destination, maxConnections, requester, 1);
}
public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester, int maxMultiplex)
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester);
entries = new ArrayList<>(maxConnections);
@ -69,6 +68,21 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool implements
}
}
/**
* <p>Returns an idle connection, if available, following a round robin algorithm;
* otherwise it always tries to create a new connection, up until the max connection count.</p>
*
* @param create this parameter is ignored and assumed to be always {@code true}
* @return an idle connection or {@code null} if no idle connections are available
*/
@Override
public Connection acquire(boolean create)
{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}
@Override
protected void onCreated(Connection connection)
{

View File

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.DumpableCollection;
@ -65,7 +64,7 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
private final long timeout;
private final Map<Connection, Holder> quarantine;
public ValidatingConnectionPool(Destination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout)
public ValidatingConnectionPool(HttpDestination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout)
{
super(destination, maxConnections, requester);
this.scheduler = scheduler;

View File

@ -206,7 +206,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);
channel.destroy();
getEndPoint().shutdownOutput();

View File

@ -119,9 +119,10 @@ public abstract class BufferingResponseListener extends Listener.Adapter
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -30,6 +31,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BytesRequestContent;
@ -38,44 +40,62 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled // Disabled by @gregw on issue #2540 - commit 621b946b10884e7308eacca241dcf8b5d6f6cff2
public class ConnectionPoolTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;
public static Stream<ConnectionPool.Factory> pools()
public static Stream<ConnectionPoolFactory> pools()
{
return Stream.of(destination -> new DuplexConnectionPool(destination, 8, destination),
destination -> new RoundRobinConnectionPool(destination, 8, destination));
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}
private void start(final ConnectionPool.Factory factory, Handler handler) throws Exception
private void start(ConnectionPool.Factory factory, Handler handler) throws Exception
{
startServer(handler);
startClient(factory);
}
private void startClient(ConnectionPool.Factory factory) throws Exception
{
ClientConnector connector = new ClientConnector();
connector.setSelectors(1);
HttpClientTransport transport = new HttpClientTransportOverHTTP(connector);
transport.setConnectionPoolFactory(factory);
client = new HttpClient(transport);
client.start();
}
private void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory);
server.start();
client = new HttpClient(transport);
client.start();
}
@AfterEach
@ -99,11 +119,11 @@ public class ConnectionPoolTest
}
}
@ParameterizedTest(name = "[{index}] {0}")
@ParameterizedTest
@MethodSource("pools")
public void test(ConnectionPool.Factory factory) throws Exception
public void test(ConnectionPoolFactory factory) throws Exception
{
start(factory, new EmptyServerHandler()
start(factory.factory, new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
@ -221,4 +241,140 @@ public class ConnectionPoolTest
failures.add(x);
}
}
@ParameterizedTest
@MethodSource("pools")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
startServer(new EmptyServerHandler());
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
HttpClientTransport transport = new HttpClientTransportOverHTTP(clientConnector);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
long delay = 1000;
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(delay);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});
client.start();
CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.path("/one")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
Thread.sleep(delay / 2);
client.newRequest("localhost", connector.getLocalPort())
.path("/two")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, connectionPool.getConnectionCount());
}
@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more connections than expected.
Assumptions.assumeFalse(factory.name.equals("round-robin"));
startServer(new EmptyServerHandler());
int count = 500;
ClientConnector clientConnector = new ClientConnector();
clientConnector.setSelectors(1);
QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
clientThreads.setName("client");
clientConnector.setExecutor(clientThreads);
HttpClientTransport transport = new HttpClientTransportOverHTTP(clientConnector);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport);
client.setExecutor(clientThreads);
client.setMaxConnectionsPerDestination(2 * count);
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(100);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});
client.start();
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
}
assertTrue(latch.await(count, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
}
private static class ConnectionPoolFactory
{
private final String name;
private final ConnectionPool.Factory factory;
private ConnectionPoolFactory(String name, ConnectionPool.Factory factory)
{
this.name = name;
this.factory = factory;
}
@Override
public String toString()
{
return name;
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client.http;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@ -37,6 +38,7 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -54,7 +56,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testFirstAcquireWithEmptyQueue(Scenario scenario) throws Exception
public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@ -62,7 +64,29 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
Connection connection = connectionPool.acquire(true);
assertNull(connection);
// There are no queued requests, so no connection should be created.
connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
assertNull(connection);
}
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAcquireWithOneExchangeQueued(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection = connectionPool.acquire(false);
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
@ -78,19 +102,22 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
connection1 = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection1);
Connection connection2 = connectionPool.acquire();
Connection connection2 = connectionPool.acquire(true);
assertSame(connection1, connection2);
}
}
@ -102,14 +129,14 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
final CountDownLatch idleLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
CountDownLatch idleLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void onCreated(Connection connection)
@ -127,30 +154,37 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
}
};
}
};
})
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
// There are no available existing connections, so acquire()
// returns null because we delayed idleCreated() above
assertNull(connection1);
// There are no available existing connections, so acquire()
// returns null because we delayed idleCreated() above.
Connection connection1 = connectionPool.acquire(true);
assertNull(connection1);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire();
assertNull(connection2);
// Trigger creation of a second connection.
connectionPool.tryCreate(1);
latch.countDown();
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire(true);
assertNull(connection2);
// There must be 2 idle connections.
Connection connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
latch.countDown();
// There must be 2 idle connections.
Connection connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
}
}
@ParameterizedTest
@ -159,23 +193,30 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
{
connection1 = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection1);
// Acquire the connection to make it active.
assertSame(connection1, connectionPool.acquire(), "From idle");
assertSame(connection1, connectionPool.acquire(true), "From idle");
}
destination.process(connection1);
// There are no exchanges so process() is a no-op.
Method process = HttpDestination.class.getDeclaredMethod("process", Connection.class);
process.setAccessible(true);
process.invoke(destination, connection1);
destination.release(connection1);
Connection connection2 = connectionPool.acquire();
Connection connection2 = connectionPool.acquire(true);
assertSame(connection1, connection2, "After release");
}
}
@ -184,15 +225,20 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
@ArgumentsSource(ScenarioProvider.class)
public void testIdleConnectionIdleTimeout(Scenario scenario) throws Exception
{
startServer(scenario, new EmptyServerHandler());
start(scenario, new EmptyServerHandler());
long idleTimeout = 1000;
startClient(scenario, httpClient -> httpClient.setIdleTimeout(idleTimeout));
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
{
connection1 = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
@ -261,7 +307,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
Request request = client.newRequest(host, port)
.scheme(scenario.getScheme())
.scheme(scenario.getScheme())
.headers(headers -> headers.put(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE));
Destination destinationBefore = client.resolveDestination(request);
ContentResponse response = request.send();
@ -297,7 +343,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
server.stop();
Request request = client.newRequest(host, port).scheme(scenario.getScheme());
assertThrows(Exception.class, () -> request.send());
assertThrows(Exception.class, request::send);
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(1);
while (!client.getDestinations().isEmpty() && System.nanoTime() < deadline)
@ -329,4 +375,38 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
}
return null;
}
private static class TestDestination extends DuplexHttpDestination
{
public TestDestination(HttpClient client, Origin origin)
{
super(client, origin);
}
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
public static class TestConnectionPool extends DuplexConnectionPool
{
public TestConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
}
@Override
public void tryCreate(int maxPending)
{
super.tryCreate(maxPending);
}
@Override
public Connection acquire(boolean create)
{
return super.acquire(create);
}
}
}
}

View File

@ -246,7 +246,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);

View File

@ -196,7 +196,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);