Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-08-22 17:35:25 +02:00
commit 823e713ee4
5 changed files with 43 additions and 44 deletions

View File

@ -68,7 +68,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount]; CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
for (int i = 0; i < connectionCount; i++) for (int i = 0; i < connectionCount; i++)
{ {
futures[i] = tryCreateReturningFuture(pool.getMaxEntries()); futures[i] = tryCreateAsync(getMaxConnectionCount());
} }
return CompletableFuture.allOf(futures); return CompletableFuture.allOf(futures);
} }
@ -138,6 +138,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@Override @Override
public Connection acquire(boolean create) public Connection acquire(boolean create)
{ {
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate(); Connection connection = activate();
if (connection == null && create) if (connection == null && create)
{ {
@ -159,22 +161,21 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
*/ */
protected void tryCreate(int maxPending) protected void tryCreate(int maxPending)
{ {
tryCreateReturningFuture(maxPending); tryCreateAsync(maxPending);
} }
private CompletableFuture<Void> tryCreateReturningFuture(int maxPending) private CompletableFuture<Void> tryCreateAsync(int maxPending)
{ {
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{ LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
}
Pool<Connection>.Entry entry = pool.reserve(maxPending); Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null) if (entry == null)
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending); LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
CompletableFuture<Void> future = new CompletableFuture<>(); CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<>() destination.newConnection(new Promise<>()
@ -183,7 +184,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
public void succeeded(Connection connection) public void succeeded(Connection connection)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection); LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable)) if (!(connection instanceof Attachable))
{ {
failed(new IllegalArgumentException("Invalid connection object: " + connection)); failed(new IllegalArgumentException("Invalid connection object: " + connection));
@ -201,7 +202,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
public void failed(Throwable x) public void failed(Throwable x)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x); LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
entry.remove(); entry.remove();
future.completeExceptionally(x); future.completeExceptionally(x);
requester.failed(x); requester.failed(x);
@ -240,7 +241,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
if (entry != null) if (entry != null)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("activated {}", entry); LOG.debug("Activated {} {}", entry, pool);
Connection connection = entry.getPooled(); Connection connection = entry.getPooled();
acquired(connection); acquired(connection);
return connection; return connection;
@ -258,8 +259,6 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment(); Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null) if (entry == null)
return false; return false;
if (LOG.isDebugEnabled())
LOG.debug("isActive {}", entry);
return !entry.isIdle(); return !entry.isIdle();
} }
@ -283,7 +282,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
return true; return true;
boolean reusable = pool.release(entry); boolean reusable = pool.release(entry);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Released ({}) {}", reusable, entry); LOG.debug("Released ({}) {} {}", reusable, entry, pool);
if (reusable) if (reusable)
return true; return true;
remove(connection); remove(connection);
@ -308,7 +307,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
attachable.setAttachment(null); attachable.setAttachment(null);
boolean removed = pool.remove(entry); boolean removed = pool.remove(entry);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Removed ({}) {}", removed, entry); LOG.debug("Removed ({}) {} {}", removed, entry, pool);
if (removed || force) if (removed || force)
{ {
released(connection); released(connection);

View File

@ -370,7 +370,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{ {
// Aggressively send other queued requests // Aggressively send other queued requests
// in case connections are multiplexed. // in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH; return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -427,7 +427,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{ {
if (connectionPool.isActive(connection)) if (connectionPool.isActive(connection))
{ {
// trigger the next request after releasing the connection // Trigger the next request after releasing the connection.
if (connectionPool.release(connection)) if (connectionPool.release(connection))
send(false); send(false);
else else

View File

@ -50,7 +50,6 @@ import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.MethodSource;
@ -65,10 +64,15 @@ public class ConnectionPoolTest
private HttpClient client; private HttpClient client;
public static Stream<ConnectionPoolFactory> pools() public static Stream<ConnectionPoolFactory> pools()
{
return Stream.concat(poolsNoRoundRobin(),
Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination))));
}
public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{ {
return Stream.of( return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
); );
} }
@ -301,11 +305,11 @@ public class ConnectionPoolTest
} }
@ParameterizedTest @ParameterizedTest
@MethodSource("pools") @MethodSource("poolsNoRoundRobin")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{ {
// Round robin connection pool does open a few more connections than expected. // Round robin connection pool does open a few more
Assumptions.assumeFalse(factory.name.equals("round-robin")); // connections than expected, exclude it from this test.
startServer(new EmptyServerHandler()); startServer(new EmptyServerHandler());

View File

@ -362,7 +362,12 @@ public class Pool<T> implements AutoCloseable, Dumpable
@Override @Override
public String toString() public String toString()
{ {
return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList; return String.format("%s@%x[size=%d closed=%s entries=%s]",
getClass().getSimpleName(),
hashCode(),
sharedList.size(),
closed,
sharedList);
} }
public class Entry public class Entry
@ -544,11 +549,13 @@ public class Pool<T> implements AutoCloseable, Dumpable
public String toString() public String toString()
{ {
long encoded = state.get(); long encoded = state.get();
return String.format("%s@%x{usage=%d,multiplex=%d,pooled=%s}", return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}",
getClass().getSimpleName(), getClass().getSimpleName(),
hashCode(), hashCode(),
AtomicBiInteger.getHi(encoded), AtomicBiInteger.getHi(encoded),
getMaxUsageCount(),
AtomicBiInteger.getLo(encoded), AtomicBiInteger.getLo(encoded),
getMaxMultiplex(),
pooled); pooled);
} }
} }

View File

@ -19,8 +19,9 @@
package org.eclipse.jetty.http.client; package org.eclipse.jetty.http.client;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -56,7 +57,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{ {
init(transport); init(transport);
AtomicBoolean record = new AtomicBoolean(); AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new ArrayList<>(); List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler() scenario.start(new EmptyServerHandler()
{ {
@Override @Override
@ -115,8 +116,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int maxConnections = 3; int maxConnections = 3;
int count = maxConnections * maxMultiplex; int count = maxConnections * maxMultiplex;
AtomicBoolean record = new AtomicBoolean(); List<Integer> remotePorts = new CopyOnWriteArrayList<>();
List<Integer> remotePorts = new ArrayList<>();
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>(); AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count); CountDownLatch serverLatch = new CountDownLatch(count);
CyclicBarrier barrier = new CyclicBarrier(count + 1); CyclicBarrier barrier = new CyclicBarrier(count + 1);
@ -127,13 +127,10 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{ {
try try
{ {
if (record.get()) remotePorts.add(request.getRemotePort());
{ requestLatch.get().countDown();
remotePorts.add(request.getRemotePort()); serverLatch.countDown();
requestLatch.get().countDown(); barrier.await();
serverLatch.countDown();
barrier.await();
}
} }
catch (Exception x) catch (Exception x)
{ {
@ -144,17 +141,9 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex)); scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));
// Prime the connections, so that they are all opened // Do not prime the connections, to see if the behavior is
// before we actually test the round robin behavior. // correct even if the connections are not pre-created.
for (int i = 0; i < maxConnections; ++i)
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count); CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger(); AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)