Fixes #4904 - WebsocketClient creates more connections than needed. (#4911)

* Fixes #4904 - WebsocketClient creates more connections than needed.

Fixed connection pool's `acquire()` methods to correctly take into account the number of queued requests.
Now the connection creation is conditional, triggered by
explicit send() or failures.
The connection creation is not triggered _after_ a send(),
where we aggressively send more queued requests - or
in release(), where we send queued request after a previous
one was completed.
Now the connection close/removal aggressively sends more
requests triggering the connection creation.

Also fixed a collateral bug in `BufferingResponseListener` - wrong calculation of the max content length.

Restored `ConnectionPoolTest` that was disabled in #2540, cleaned it up, and let it run for hours without failures.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-06-01 15:48:44 +02:00 committed by GitHub
parent 646010e309
commit 0ae2fff361
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 394 additions and 99 deletions

View File

@ -37,20 +37,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
private final AtomicBoolean closed = new AtomicBoolean();
/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.destination = (HttpDestination)destination;
this.maxConnections = maxConnections;
this.requester = requester;
}
@ -98,16 +97,43 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@Override
public Connection acquire()
{
return acquire(true);
}
/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
protected Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
if (create)
tryCreate(destination.getQueuedRequestCount());
connection = activate();
}
return connection;
}
/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
protected void tryCreate(int maxPending)
{
while (true)

View File

@ -228,6 +228,8 @@ public abstract class HttpConnection implements Connection
}
else
{
// Association may fail, for example if the application
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
@ -242,6 +244,8 @@ public abstract class HttpConnection implements Connection
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}

View File

@ -244,7 +244,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
@Override
public void succeeded()
{
send();
send(false);
}
@Override
@ -308,25 +308,39 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
public void send()
{
if (getHttpExchanges().isEmpty())
return;
process();
send(true);
}
private void process()
private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process(create);
}
private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection;
if (connectionPool instanceof AbstractConnectionPool)
connection = ((AbstractConnectionPool)connectionPool).acquire(create);
else
connection = connectionPool.acquire();
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}
public boolean process(Connection connection)
public ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
@ -342,7 +356,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
@ -353,31 +367,37 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}
else
SendFailure failure = send(connection, exchange);
if (failure == null)
{
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}
@ -419,7 +439,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
@ -439,25 +459,30 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}
public void close(Connection connection)
{
boolean removed = remove(connection);
boolean removed = connectionPool.remove(connection);
if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else
else if (removed)
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
}
return removed;
}
/**
* @param connection the connection to remove
* @deprecated use {@link #remove(Connection)} instead
*/
@Deprecated
public void close(Connection connection)
{
remove(connection);
}
/**
@ -581,4 +606,9 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
}
}
private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}

View File

@ -64,13 +64,25 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = ceilDiv(queuedRequests, maxMultiplex);
tryCreate(maxPending);
connection = activate();
}
return connection;
}
/**
* @param a the dividend
* @param b the divisor
* @return the ceiling of the algebraic quotient
*/
private static int ceilDiv(int a, int b)
{
return (a + b - 1) / b;
}
protected void lock()
{
lock.lock();

View File

@ -69,6 +69,21 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool implements
}
}
/**
* <p>Returns an idle connection, if available, following a round robin algorithm;
* otherwise it always tries to create a new connection, up until the max connection count.</p>
*
* @param create this parameter is ignored and assumed to be always {@code true}
* @return an idle connection or {@code null} if no idle connections are available
*/
@Override
protected Connection acquire(boolean create)
{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}
@Override
protected void onCreated(Connection connection)
{

View File

@ -180,7 +180,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);
channel.destroy();
getEndPoint().shutdownOutput();

View File

@ -119,9 +119,10 @@ public abstract class BufferingResponseListener extends Listener.Adapter
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}

View File

@ -19,7 +19,7 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayList;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
@ -27,11 +27,11 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
@ -43,51 +43,55 @@ import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled // Disabled by @gregw on issue #2540 - commit 621b946b10884e7308eacca241dcf8b5d6f6cff2
public class ConnectionPoolTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;
public static Stream<Arguments> pools()
public static Stream<ConnectionPoolFactory> pools()
{
List<Object[]> pools = new ArrayList<>();
pools.add(new Object[]{
DuplexConnectionPool.class,
(ConnectionPool.Factory)
destination -> new DuplexConnectionPool(destination, 8, destination)
});
pools.add(new Object[]{
RoundRobinConnectionPool.class,
(ConnectionPool.Factory)
destination -> new RoundRobinConnectionPool(destination, 8, destination)
});
return pools.stream().map(Arguments::of);
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}
private void start(final ConnectionPool.Factory factory, Handler handler) throws Exception
private void start(ConnectionPool.Factory factory, Handler handler) throws Exception
{
startServer(handler);
startClient(factory);
}
private void startClient(ConnectionPool.Factory factory) throws Exception
{
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory);
client = new HttpClient(transport, null);
client.start();
}
private void startServer(Handler handler) throws Exception
{
server = new Server();
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory);
server.start();
client = new HttpClient(transport, null);
client.start();
}
@AfterEach
@ -111,14 +115,14 @@ public class ConnectionPoolTest
}
}
@ParameterizedTest(name = "[{index}] {0}")
@ParameterizedTest
@MethodSource("pools")
public void test(Class<? extends ConnectionPool> connectionPoolClass, ConnectionPool.Factory factory) throws Exception
public void test(ConnectionPoolFactory factory) throws Exception
{
start(factory, new EmptyServerHandler()
start(factory.factory, new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
switch (HttpMethod.fromString(request.getMethod()))
{
@ -233,4 +237,135 @@ public class ConnectionPoolTest
failures.add(x);
}
}
@ParameterizedTest
@MethodSource("pools")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
startServer(new EmptyServerHandler());
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
long delay = 1000;
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(delay);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});
client.start();
CountDownLatch latch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.path("/one")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
Thread.sleep(delay / 2);
client.newRequest("localhost", connector.getLocalPort())
.path("/two")
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
});
assertTrue(latch.await(2 * delay, TimeUnit.MILLISECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, connectionPool.getConnectionCount());
}
@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more connections than expected.
Assumptions.assumeFalse(factory.name.equals("round-robin"));
startServer(new EmptyServerHandler());
int count = 500;
QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.setMaxConnectionsPerDestination(2 * count);
client.setSocketAddressResolver(new SocketAddressResolver.Sync()
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
client.getExecutor().execute(() ->
{
try
{
Thread.sleep(100);
super.resolve(host, port, promise);
}
catch (InterruptedException x)
{
promise.failed(x);
}
});
}
});
client.start();
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
}
assertTrue(latch.await(count, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
}
private static class ConnectionPoolFactory
{
private final String name;
private final ConnectionPool.Factory factory;
private ConnectionPoolFactory(String name, ConnectionPool.Factory factory)
{
this.name = name;
this.factory = factory;
}
@Override
public String toString()
{
return name;
}
}
}

View File

@ -35,6 +35,7 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -52,7 +53,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testFirstAcquireWithEmptyQueue(Scenario scenario) throws Exception
public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@ -61,11 +62,30 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
assertNull(connection);
// There are no queued requests, so no connection should be created.
connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
assertNull(connection);
}
}
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testAcquireWithOneExchangeQueued(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection = connectionPool.acquire(false);
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
}
assertNotNull(connection);
}
}
@ -76,11 +96,14 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
@ -100,14 +123,14 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
final CountDownLatch idleLatch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
CountDownLatch idleLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void onCreated(Connection connection)
@ -125,19 +148,25 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
}
};
}
};
})
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
// There are no available existing connections, so acquire()
// returns null because we delayed idleCreated() above
// returns null because we delayed idleCreated() above.
Connection connection1 = connectionPool.acquire();
assertNull(connection1);
// Trigger creation of a second connection.
connectionPool.tryCreate(1);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire();
assertNull(connection2);
@ -158,10 +187,14 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
@ -171,6 +204,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
assertSame(connection1, connectionPool.acquire(), "From idle");
}
// There are no exchanges so process() is a no-op.
destination.process(connection1);
destination.release(connection1);
@ -188,10 +222,14 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
long idleTimeout = 1000;
client.setIdleTimeout(idleTimeout);
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
@ -298,7 +336,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
server.stop();
Request request = client.newRequest(host, port).scheme(scenario.getScheme());
assertThrows(Exception.class, () -> request.send());
assertThrows(Exception.class, request::send);
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(1);
while (!client.getDestinations().isEmpty() && System.nanoTime() < deadline)
@ -330,4 +368,38 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
}
return null;
}
private static class TestDestination extends HttpDestinationOverHTTP
{
public TestDestination(HttpClient client, Origin origin)
{
super(client, origin);
}
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
public static class TestConnectionPool extends DuplexConnectionPool
{
public TestConnectionPool(Destination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
}
@Override
public void tryCreate(int maxPending)
{
super.tryCreate(maxPending);
}
@Override
public Connection acquire(boolean create)
{
return super.acquire(create);
}
}
}
}

View File

@ -248,7 +248,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);

View File

@ -148,7 +148,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);