Fixes #5855 - HttpClient may not send queued requests. (#5856)

Changed the AbstractConnectionPool.acquire() logic to call tryCreate() even
when create=false.

This is necessary when e.g. a sender thread T2 with create=true steals a
connection whose creation was triggered by another sender thread T1.
In the old code, T2 did not trigger the creation of a connection, possibly
leaving a request queued.
In the new code, T2 would call tryCreate(), possibly triggering
the creation of a connection.

This change re-introduces the fact that when sending e.g. 20 requests
concurrently, 20+ connections may be created.

However, it is better to err on creating more than creating less and leaving
requests queued.

Further refactoring moved field pending from Pool to AbstractConnectionPool.
As a consequence, AbstractConnectionPool.tryCreate() now performs a 
demand/supply calculation to decide whether to create a new connection.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Simone Bordet 2021-01-07 16:05:24 +01:00 committed by GitHub
parent b45c32616c
commit 403d5ec318
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 366 additions and 173 deletions

View File

@ -20,9 +20,12 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
@ -46,6 +49,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
private final AtomicInteger pending = new AtomicInteger();
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
@ -82,12 +86,23 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
@Override
public CompletableFuture<Void> preCreateConnections(int connectionCount)
{
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connections {}/{}", connectionCount, getMaxConnectionCount());
List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateAsync(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
break;
pending.incrementAndGet();
Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connection {}/{} at {}", futures.size(), getMaxConnectionCount(), entry);
destination.newConnection(future);
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}
protected int getMaxMultiplex()
@ -148,7 +163,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingConnectionCount()
{
return pool.getReservedCount();
return pending.get();
}
@Override
@ -190,88 +205,82 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* then attempts to open a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
* otherwise it attempts to open a new connection, if the number of queued requests is
* greater than the number of pending connections;
* if no connection is available even after the attempts to open, return {@code null}.</p>
* <p>The {@code create} parameter is just a hint: the connection may be created even if
* {@code false}, or may not be created even if {@code true}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
* @see #tryCreate(boolean)
*/
protected Connection acquire(boolean create)
{
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && (create || isMaximizeConnections()))
if (connection == null)
{
tryCreate(destination.getQueuedRequestCount());
tryCreate(create);
connection = activate();
}
return connection;
}
/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
* <p>Tries to create a new connection.</p>
* <p>Whether a new connection is created is determined by the {@code create} parameter
* and a count of demand and supply, where the demand is derived from the number of
* queued requests, and the supply is the number of pending connections time the
* {@link #getMaxMultiplex()} factor: if the demand is less than the supply, the
* connection will not be created.</p>
* <p>Since the number of queued requests used to derive the demand may be a stale
* value, it is possible that few more connections than strictly necessary may be
* created, but enough to satisfy the demand.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
* @param create a hint to request to create a connection
*/
protected void tryCreate(int maxPending)
{
tryCreateAsync(maxPending);
}
private CompletableFuture<Void> tryCreateAsync(int maxPending)
protected void tryCreate(boolean create)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());
Pool<Connection>.Entry entry = pool.reserve(maxPending);
// If we have already pending sufficient multiplexed connections, then do not create another.
int multiplexed = getMaxMultiplex();
while (true)
{
int pending = this.pending.get();
int supply = pending * multiplexed;
int demand = destination.getQueuedRequestCount() + (create ? 1 : 0);
boolean tryCreate = isMaximizeConnections() || supply < demand;
if (LOG.isDebugEnabled())
LOG.debug("Try creating({}) connection, pending/demand/supply: {}/{}/{}, result={}", create, pending, demand, supply, tryCreate);
if (!tryCreate)
return;
if (this.pending.compareAndSet(pending, pending + 1))
break;
}
// Create the connection.
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
return CompletableFuture.completedFuture(null);
{
pending.decrementAndGet();
return;
}
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable))
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
return;
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
future.complete(null);
proceed();
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
entry.remove();
future.completeExceptionally(x);
requester.failed(x);
}
});
return future;
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
}
protected void proceed()
@ -444,13 +453,58 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount());
getIdleConnectionCount(),
destination.getQueuedRequestCount());
}
private class FutureConnection extends Promise.Completable<Connection>
{
private final Pool<Connection>.Entry reserved;
public FutureConnection(Pool<Connection>.Entry reserved)
{
this.reserved = reserved;
}
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
// reduce pending on failure and if not multiplexing also reduce demand
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
// reduce pending on failure and if not multiplexing also reduce demand
pending.decrementAndGet();
reserved.remove();
completeExceptionally(x);
requester.failed(x);
}
}
}

View File

@ -109,7 +109,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
addBean(connectionPool, true);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
@ -311,9 +311,8 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process(create);
if (!getHttpExchanges().isEmpty())
process(create);
}
private void process(boolean create)
@ -321,7 +320,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
// It is also necessary when thread T1 cannot acquire a connection
// (for example, it has been stolen by thread T2 and the pool has
// enough pending reservations). T1 returns without doing anything
// and therefore it is T2 that must send both queued requests.
while (true)
{
Connection connection;
@ -331,14 +333,15 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
connection = connectionPool.acquire();
if (connection == null)
break;
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
boolean proceed = process(connection);
if (proceed)
create = false;
else
break;
create = result == ProcessResult.RESTART;
}
}
private ProcessResult process(Connection connection)
private boolean process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
@ -354,7 +357,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
LOG.debug("{} is stopping", client);
connection.close();
}
return ProcessResult.FINISH;
return false;
}
else
{
@ -372,9 +375,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}
SendFailure failure = send(connection, exchange);
@ -382,7 +383,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}
if (LOG.isDebugEnabled())
@ -392,10 +393,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
return false;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}
}
@ -474,7 +475,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
send(true);
}
return removed;
}
@ -541,8 +542,8 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
exchanges.size(),
connectionPool);
getQueuedRequestCount(),
getConnectionPool());
}
/**
@ -610,9 +611,4 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
}
}
}
private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
public class ConnectionPoolHelper
{
@ -28,8 +27,8 @@ public class ConnectionPoolHelper
return connectionPool.acquire(create);
}
public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
public static void tryCreate(AbstractConnectionPool connectionPool)
{
connectionPool.tryCreate(pending);
connectionPool.tryCreate(true);
}
}

View File

@ -23,10 +23,12 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -244,9 +246,12 @@ public class ConnectionPoolTest
}
@ParameterizedTest
@MethodSource("pools")
@MethodSource("poolsNoRoundRobin")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.
startServer(new EmptyServerHandler());
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
@ -300,11 +305,10 @@ public class ConnectionPoolTest
}
@ParameterizedTest
@MethodSource("poolsNoRoundRobin")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
@MethodSource("pools")
public void testConcurrentRequestsWithSlowAddressResolver(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.
// ConnectionPools may open a few more connections than expected.
startServer(new EmptyServerHandler());
@ -351,9 +355,81 @@ public class ConnectionPoolTest
assertTrue(latch.await(count, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
}
@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count);
}
@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, count);
}
private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(count);
QueuedThreadPool serverThreads = new QueuedThreadPool(2 * count);
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
barrier.await();
}
catch (Exception x)
{
throw new ServletException(x);
}
}
});
server.start();
QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.setMaxConnectionsPerDestination(maxConnections);
client.start();
// Send N requests to the server, all waiting on the server.
// This should open N connections, and the test verifies that
// all N are sent (i.e. the client does not keep any queued).
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
int id = i;
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.path("/" + id)
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
}
assertTrue(latch.await(5, TimeUnit.SECONDS), "server requests " + barrier.getNumberWaiting() + "<" + count + " - client: " + client.dump());
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
}
@ParameterizedTest

View File

@ -651,7 +651,7 @@ public class HttpClientTLSTest
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool, -1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
while (true)
{
@ -747,7 +747,7 @@ public class HttpClientTLSTest
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool, -1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
while (true)
{

View File

@ -64,10 +64,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
assertNull(connection);
// There are no queued requests, so no connection should be created.
connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
assertNull(connection);
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle.
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
}
assertNotNull(connection);
}
}
@ -83,7 +85,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
Connection connection = ConnectionPoolHelper.acquire(connectionPool, false);
if (connection == null)
@ -104,7 +106,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
@ -156,7 +158,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
@ -167,7 +169,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
assertNull(connection1);
// Trigger creation of a second connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire();
@ -195,7 +197,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
@ -232,7 +234,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);
Connection connection1 = connectionPool.acquire();
if (connection1 == null)

View File

@ -54,7 +54,6 @@ public class Pool<T> implements AutoCloseable, Dumpable
private final List<Entry> entries = new CopyOnWriteArrayList<>();
private final int maxEntries;
private final AtomicInteger pending = new AtomicInteger();
private final StrategyType strategyType;
/*
@ -137,7 +136,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
public int getReservedCount()
{
return pending.get();
return (int)entries.stream().filter(Entry::isReserved).count();
}
public int getIdleCount()
@ -216,7 +215,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries, or the allotment has already been reserved
* @deprecated Use {@link #reserve()} instead
*/
@Deprecated
public Entry reserve(int allotment)
{
try (Locker.Lock l = locker.lock())
@ -228,12 +229,35 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (space <= 0)
return null;
// The pending count is an AtomicInteger that is only ever incremented here with
// the lock held. Thus the pending count can be reduced immediately after the
// test below, but never incremented. Thus the allotment limit can be enforced.
if (allotment >= 0 && (pending.get() * getMaxMultiplex()) >= allotment)
if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment)
return null;
Entry entry = new Entry();
entries.add(entry);
return entry;
}
}
/**
* Create a new disabled slot into the pool.
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
* {@link Pool#remove(Pool.Entry)}.
*
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries
*/
public Entry reserve()
{
try (Locker.Lock l = locker.lock())
{
if (closed)
return null;
// If we have no space
if (entries.size() >= maxEntries)
return null;
pending.incrementAndGet();
Entry entry = new Entry();
entries.add(entry);
@ -342,7 +366,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (entry != null)
return entry;
entry = reserve(-1);
entry = reserve();
if (entry == null)
return null;
@ -457,12 +481,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
@Override
public String toString()
{
return String.format("%s@%x[size=%d closed=%s pending=%d]",
return String.format("%s@%x[size=%d closed=%s]",
getClass().getSimpleName(),
hashCode(),
entries.size(),
closed,
pending.get());
closed);
}
public class Entry
@ -488,7 +511,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/** Enable a reserved entry {@link Entry}.
* An entry returned from the {@link #reserve(int)} method must be enabled with this method,
* An entry returned from the {@link #reserve()} method must be enabled with this method,
* once and only once, before it is usable by the pool.
* The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
@ -517,7 +540,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
pending.decrementAndGet();
return true;
}
@ -618,11 +641,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
if (removed)
{
if (usageCount == Integer.MIN_VALUE)
pending.decrementAndGet();
return newMultiplexCount == 0;
}
}
}
@ -631,6 +650,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
return state.getHi() < 0;
}
public boolean isReserved()
{
return state.getHi() == Integer.MIN_VALUE;
}
public boolean isIdle()
{
long encoded = state.get();

View File

@ -50,7 +50,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class PoolTest
{
interface Factory
{
Pool<String> getPool(int maxSize);
@ -71,7 +70,7 @@ public class PoolTest
public void testAcquireRelease(Factory factory)
{
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
@ -115,7 +114,7 @@ public class PoolTest
public void testRemoveBeforeRelease(Factory factory)
{
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
@ -128,7 +127,7 @@ public class PoolTest
public void testCloseBeforeRelease(Factory factory)
{
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.size(), is(1));
@ -143,15 +142,72 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(), notNullValue());
assertThat(pool.size(), is(1));
assertThat(pool.reserve(-1), nullValue());
assertThat(pool.reserve(), nullValue());
assertThat(pool.size(), is(1));
}
@ParameterizedTest
@MethodSource(value = "strategy")
public void testReserve(Factory factory)
{
Pool<String> pool = factory.getPool(2);
pool.setMaxMultiplex(2);
// Reserve an entry
Pool<String>.Entry e1 = pool.reserve();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));
// enable the entry
e1.enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<String>.Entry e2 = pool.reserve();
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// remove the reservation
e2.remove();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<String>.Entry e3 = pool.reserve();
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// enable and acquire the entry
e3.enable("bbb", true);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(1));
// can't reenable
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
// Can't enable acquired entry
Pool<String>.Entry e = pool.acquire();
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
}
@ParameterizedTest
@MethodSource(value = "strategy")
public void testDeprecatedReserve(Factory factory)
{
Pool<String> pool = factory.getPool(2);
@ -208,22 +264,8 @@ public class PoolTest
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
// Can't enable acquired entry
assertThat(pool.acquire(), is(e1));
assertThrows(IllegalStateException.class, () -> e1.enable("xxx", false));
}
@ParameterizedTest
@MethodSource(value = "strategy")
public void testReserveMaxPending(Factory factory)
{
Pool<String> pool = factory.getPool(2);
assertThat(pool.reserve(0), nullValue());
assertThat(pool.reserve(1), notNullValue());
assertThat(pool.reserve(1), nullValue());
assertThat(pool.reserve(2), notNullValue());
assertThat(pool.reserve(2), nullValue());
assertThat(pool.reserve(3), nullValue());
assertThat(pool.reserve(-1), nullValue());
Pool<String>.Entry e = pool.acquire();
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
}
@ParameterizedTest
@ -231,9 +273,9 @@ public class PoolTest
public void testReserveNegativeMaxPending(Factory factory)
{
Pool<String> pool = factory.getPool(2);
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), nullValue());
assertThat(pool.reserve(), notNullValue());
assertThat(pool.reserve(), notNullValue());
assertThat(pool.reserve(), nullValue());
}
@ParameterizedTest
@ -241,7 +283,7 @@ public class PoolTest
public void testClose(Factory factory)
{
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
assertThat(pool.isClosed(), is(false));
pool.close();
pool.close();
@ -249,7 +291,7 @@ public class PoolTest
assertThat(pool.isClosed(), is(true));
assertThat(pool.size(), is(0));
assertThat(pool.acquire(), nullValue());
assertThat(pool.reserve(-1), nullValue());
assertThat(pool.reserve(), nullValue());
}
@Test
@ -258,7 +300,7 @@ public class PoolTest
AtomicBoolean closed = new AtomicBoolean();
Pool<Closeable> pool = new Pool<>(FIRST, 1);
Closeable pooled = () -> closed.set(true);
pool.reserve(-1).enable(pooled, false);
pool.reserve().enable(pooled, false);
assertThat(closed.get(), is(false));
pool.close();
assertThat(closed.get(), is(true));
@ -269,7 +311,7 @@ public class PoolTest
public void testRemove(Factory factory)
{
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
@ -287,8 +329,8 @@ public class PoolTest
assertThat(pool.size(), is(0));
assertThat(pool.values().isEmpty(), is(true));
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
pool.reserve().enable("aaa", false);
pool.reserve().enable("bbb", false);
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
assertThat(pool.size(), is(2));
}
@ -299,8 +341,8 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
pool.reserve().enable("aaa", false);
pool.reserve().enable("bbb", false);
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), nullValue());
@ -329,7 +371,7 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
@ -358,8 +400,8 @@ public class PoolTest
AtomicInteger b = new AtomicInteger();
counts.put("a", a);
counts.put("b", b);
pool.reserve(-1).enable("a", false);
pool.reserve(-1).enable("b", false);
pool.reserve().enable("a", false);
pool.reserve().enable("b", false);
counts.get(pool.acquire().getPooled()).incrementAndGet();
counts.get(pool.acquire().getPooled()).incrementAndGet();
@ -386,7 +428,7 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1, notNullValue());
@ -416,7 +458,7 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -434,7 +476,7 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
@ -447,7 +489,7 @@ public class PoolTest
{
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -471,7 +513,7 @@ public class PoolTest
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
{
Pool<String> pool = factory.getPool(1);
Pool<String>.Entry e = pool.reserve(-1);
Pool<String>.Entry e = pool.reserve();
assertThat(pool.size(), is(1));
assertThat(pool.release(e), is(false));
assertThat(pool.size(), is(1));
@ -484,7 +526,7 @@ public class PoolTest
public void testRemoveNonEnabledEntry(Factory factory)
{
Pool<String> pool = factory.getPool(1);
Pool<String>.Entry e = pool.reserve(-1);
Pool<String>.Entry e = pool.reserve();
assertThat(pool.size(), is(1));
assertThat(pool.remove(e), is(true));
assertThat(pool.size(), is(0));
@ -497,7 +539,7 @@ public class PoolTest
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e0 = pool.acquire();
@ -518,7 +560,7 @@ public class PoolTest
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e0 = pool.acquire();
@ -543,7 +585,7 @@ public class PoolTest
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(10);
pool.reserve(-1).enable("aaa", false);
pool.reserve().enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getUsageCount(), is(1));
@ -559,7 +601,7 @@ public class PoolTest
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
{
Pool<String> pool = factory.getPool(1);
Pool<String>.Entry entry = pool.reserve(-1);
Pool<String>.Entry entry = pool.reserve();
entry.enable("aaa", false);
entry.setUsageCount(Integer.MAX_VALUE);
@ -577,9 +619,9 @@ public class PoolTest
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
{
Pool<String> pool = factory.getPool(2);
Pool<String>.Entry entry1 = pool.reserve(-1);
Pool<String>.Entry entry1 = pool.reserve();
entry1.enable("aaa", false);
Pool<String>.Entry entry2 = pool.reserve(-1);
Pool<String>.Entry entry2 = pool.reserve();
entry2.enable("bbb", false);
Pool<String>.Entry acquired1 = pool.acquire();