Improved logging and toString() implementations,
small refactorings in code and tests. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
70a679f5f4
commit
2d3f0e0c10
|
@ -78,7 +78,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
|
||||
for (int i = 0; i < connectionCount; i++)
|
||||
{
|
||||
futures[i] = tryCreateReturningFuture(pool.getMaxEntries());
|
||||
futures[i] = tryCreateAsync(getMaxConnectionCount());
|
||||
}
|
||||
return CompletableFuture.allOf(futures);
|
||||
}
|
||||
|
@ -175,6 +175,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
*/
|
||||
protected Connection acquire(boolean create)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Acquiring create={} on {}", create, this);
|
||||
Connection connection = activate();
|
||||
if (connection == null && create)
|
||||
{
|
||||
|
@ -196,20 +198,21 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
*/
|
||||
protected void tryCreate(int maxPending)
|
||||
{
|
||||
tryCreateReturningFuture(maxPending);
|
||||
tryCreateAsync(maxPending);
|
||||
}
|
||||
|
||||
private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
|
||||
private CompletableFuture<Void> tryCreateAsync(int maxPending)
|
||||
{
|
||||
int connectionCount = getConnectionCount();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
|
||||
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
|
||||
|
||||
Pool<Connection>.Entry entry = pool.reserve(maxPending);
|
||||
if (entry == null)
|
||||
return CompletableFuture.completedFuture(null);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
|
||||
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
|
||||
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
destination.newConnection(new Promise<Connection>()
|
||||
|
@ -218,7 +221,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
public void succeeded(Connection connection)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
|
||||
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
|
||||
if (!(connection instanceof Attachable))
|
||||
{
|
||||
failed(new IllegalArgumentException("Invalid connection object: " + connection));
|
||||
|
@ -236,7 +239,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
|
||||
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
|
||||
entry.remove();
|
||||
future.completeExceptionally(x);
|
||||
requester.failed(x);
|
||||
|
@ -257,7 +260,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
if (entry != null)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activated {}", entry);
|
||||
LOG.debug("Activated {} {}", entry, pool);
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
|
@ -275,8 +278,6 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isActive {}", entry);
|
||||
return !entry.isIdle();
|
||||
}
|
||||
|
||||
|
@ -300,7 +301,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
return true;
|
||||
boolean reusable = pool.release(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Released ({}) {}", reusable, entry);
|
||||
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
|
||||
if (reusable)
|
||||
return true;
|
||||
remove(connection);
|
||||
|
@ -325,7 +326,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
attachable.setAttachment(null);
|
||||
boolean removed = pool.remove(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Removed ({}) {}", removed, entry);
|
||||
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
|
||||
if (removed || force)
|
||||
{
|
||||
released(connection);
|
||||
|
|
|
@ -384,7 +384,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
// Aggressively send other queued requests
|
||||
// in case connections are multiplexed.
|
||||
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
|
||||
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -438,7 +438,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
if (connectionPool.isActive(connection))
|
||||
{
|
||||
// trigger the next request after releasing the connection
|
||||
// Trigger the next request after releasing the connection.
|
||||
if (connectionPool.release(connection))
|
||||
send(false);
|
||||
else
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.eclipse.jetty.util.SocketAddressResolver;
|
|||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assumptions;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
|
@ -63,10 +62,15 @@ public class ConnectionPoolTest
|
|||
private HttpClient client;
|
||||
|
||||
public static Stream<ConnectionPoolFactory> pools()
|
||||
{
|
||||
return Stream.concat(poolsNoRoundRobin(),
|
||||
Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination))));
|
||||
}
|
||||
|
||||
public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
|
||||
{
|
||||
return Stream.of(
|
||||
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
|
||||
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
|
||||
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
|
||||
);
|
||||
}
|
||||
|
@ -295,11 +299,11 @@ public class ConnectionPoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("pools")
|
||||
@MethodSource("poolsNoRoundRobin")
|
||||
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
|
||||
{
|
||||
// Round robin connection pool does open a few more connections than expected.
|
||||
Assumptions.assumeFalse(factory.name.equals("round-robin"));
|
||||
// Round robin connection pool does open a few more
|
||||
// connections than expected, exclude it from this test.
|
||||
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
|
|
|
@ -362,7 +362,12 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList;
|
||||
return String.format("%s@%x[size=%d closed=%s entries=%s]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
sharedList.size(),
|
||||
closed,
|
||||
sharedList);
|
||||
}
|
||||
|
||||
public class Entry
|
||||
|
@ -544,11 +549,13 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
public String toString()
|
||||
{
|
||||
long encoded = state.get();
|
||||
return String.format("%s@%x{usage=%d,multiplex=%d,pooled=%s}",
|
||||
return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
AtomicBiInteger.getHi(encoded),
|
||||
getMaxUsageCount(),
|
||||
AtomicBiInteger.getLo(encoded),
|
||||
getMaxMultiplex(),
|
||||
pooled);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,8 +19,9 @@
|
|||
package org.eclipse.jetty.http.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -56,7 +57,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
|
|||
{
|
||||
init(transport);
|
||||
AtomicBoolean record = new AtomicBoolean();
|
||||
List<Integer> remotePorts = new ArrayList<>();
|
||||
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
|
||||
scenario.start(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
|
@ -115,8 +116,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
|
|||
int maxConnections = 3;
|
||||
int count = maxConnections * maxMultiplex;
|
||||
|
||||
AtomicBoolean record = new AtomicBoolean();
|
||||
List<Integer> remotePorts = new ArrayList<>();
|
||||
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
|
||||
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
|
||||
CountDownLatch serverLatch = new CountDownLatch(count);
|
||||
CyclicBarrier barrier = new CyclicBarrier(count + 1);
|
||||
|
@ -127,13 +127,10 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
|
|||
{
|
||||
try
|
||||
{
|
||||
if (record.get())
|
||||
{
|
||||
remotePorts.add(request.getRemotePort());
|
||||
requestLatch.get().countDown();
|
||||
serverLatch.countDown();
|
||||
barrier.await();
|
||||
}
|
||||
remotePorts.add(request.getRemotePort());
|
||||
requestLatch.get().countDown();
|
||||
serverLatch.countDown();
|
||||
barrier.await();
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
|
@ -144,17 +141,9 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
|
|||
|
||||
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));
|
||||
|
||||
// Prime the connections, so that they are all opened
|
||||
// before we actually test the round robin behavior.
|
||||
for (int i = 0; i < maxConnections; ++i)
|
||||
{
|
||||
ContentResponse response = scenario.client.newRequest(scenario.newURI())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
}
|
||||
// Do not prime the connections, to see if the behavior is
|
||||
// correct even if the connections are not pre-created.
|
||||
|
||||
record.set(true);
|
||||
CountDownLatch clientLatch = new CountDownLatch(count);
|
||||
AtomicInteger requests = new AtomicInteger();
|
||||
for (int i = 0; i < count; ++i)
|
||||
|
|
Loading…
Reference in New Issue