Fixes #6603 - HTTP/2 max local stream count exceeded (#6639) (#6682)

* Fixes #6603 - HTTP/2 max local stream count exceeded (#6639)

Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis.
Updated Pool javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Signed-off-by: Greg Wilkins <gregw@webtide.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
(cherry picked from commit 525fcb3119)
This commit is contained in:
Simone Bordet 2021-09-01 10:27:40 +02:00 committed by GitHub
parent f129770f4e
commit e2690cc420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 604 additions and 211 deletions

View File

@ -55,7 +55,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}
protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}
protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
@ -63,6 +68,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
this.destination = destination;
this.requester = requester;
this.pool = pool;
pool.setMaxMultiplex(1); // Force the use of multiplexing.
addBean(pool);
}

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.client.api.Connection;
public interface ConnectionPool extends Closeable
{
/**
* Optionally pre-create up to <code>connectionCount</code>
* Optionally pre-create up to {@code connectionCount}
* connections so they are immediately ready for use.
* @param connectionCount the number of connections to pre-start.
*/
@ -106,7 +106,7 @@ public interface ConnectionPool extends Closeable
}
/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
@ -117,7 +117,11 @@ public interface ConnectionPool extends Closeable
/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed
*/
void setMaxMultiplex(int maxMultiplex);
@Deprecated
default void setMaxMultiplex(int maxMultiplex)
{
}
}
}

View File

@ -29,9 +29,10 @@ public class DuplexConnectionPool extends AbstractConnectionPool
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}
@Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);

View File

@ -20,7 +20,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
@ -29,9 +29,26 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}
public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
{
@Override
protected int getMaxMultiplex(Connection connection)
{
int multiplex = (connection instanceof Multiplexable)
? ((Multiplexable)connection).getMaxMultiplex()
: super.getMaxMultiplex(connection);
return multiplex > 0 ? multiplex : 1;
}
}, requester);
setMaxMultiplex(maxMultiplex);
}
@Deprecated
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);

View File

@ -35,15 +35,15 @@ public class MultiplexHttpDestination extends HttpDestination implements HttpDes
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
}

View File

@ -26,6 +26,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}

View File

@ -51,7 +51,7 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace

View File

@ -786,7 +786,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
failFn.accept(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
@ -799,7 +802,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
@ -834,6 +837,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
@ -847,7 +853,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
@ -1019,7 +1025,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

View File

@ -65,14 +65,6 @@ class HTTPSessionListenerPromise extends Session.Listener.Adapter implements Pro
@Override
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{
HttpDestination destination = destination();
if (destination instanceof HttpDestination.Multiplexed)
((HttpDestination.Multiplexed)destination).setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
}
// The first SETTINGS frame is the server preface reply.
if (!connection.isMarked())
onServerPreface(session);
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@ -46,7 +47,7 @@ import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
@ -78,6 +79,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
this.recycleHttpChannels = recycleHttpChannels;
}
@Override
public int getMaxMultiplex()
{
return ((HTTP2Session)session).getMaxLocalStreams();
}
@Override
protected Iterator<HttpChannel> getHttpChannels()
{

View File

@ -16,6 +16,7 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@ -71,6 +72,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class MaxConcurrentStreamsTest extends AbstractTest
{
@ -538,6 +540,111 @@ public class MaxConcurrentStreamsTest extends AbstractTest
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();
// Prime the 2 connections.
primeConnection();
String host = "localhost";
int port = connector.getLocalPort();
assertEquals(1, client.getDestinations().size());
HttpDestination destination = (HttpDestination)client.getDestinations().get(0);
AbstractConnectionPool pool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, pool.getConnectionCount());
// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}
assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}
private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.

View File

@ -95,9 +95,7 @@ public class MultiplexedConnectionPoolTest
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
@ -111,6 +109,7 @@ public class MultiplexedConnectionPoolTest
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
@ -156,9 +155,7 @@ public class MultiplexedConnectionPoolTest
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
@ -172,6 +169,7 @@ public class MultiplexedConnectionPoolTest
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});

View File

@ -347,7 +347,7 @@ public class ArrayRetainableByteBufferPoolTest
b3.release();
b4.getBuffer().limit(b4.getBuffer().capacity() - 2);
assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)"));
assertThat(pool.dump(), containsString("]{capacity=4,inuse=3(75%)"));
}
/**

View File

@ -26,6 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.AutoLock;
@ -33,24 +35,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A fast pool of objects, with optional support for
* multiplexing, max usage count and several optimized strategies plus
* an optional {@link ThreadLocal} cache of the last release entry.
* <p>
* When the method {@link #close()} is called, all {@link Closeable}s in the pool
* are also closed.
* </p>
* @param <T>
* <p>A pool of objects, with optional support for multiplexing,
* max usage count and several optimized strategies plus
* an optional {@link ThreadLocal} cache of the last release entry.</p>
* <p>When the method {@link #close()} is called, all {@link Closeable}s
* object pooled by the pool are also closed.</p>
*
* @param <T> the type of the pooled objects
*/
@ManagedObject
public class Pool<T> implements AutoCloseable, Dumpable
{
private static final Logger LOGGER = LoggerFactory.getLogger(Pool.class);
private final List<Entry> entries = new CopyOnWriteArrayList<>();
private final int maxEntries;
private final StrategyType strategyType;
/*
* The cache is used to avoid hammering on the first index of the entry list.
* Caches can become poisoned (i.e.: containing entries that are in use) when
@ -63,8 +63,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
private final ThreadLocal<Entry> cache;
private final AtomicInteger nextIndex;
private volatile boolean closed;
private volatile int maxMultiplex = 1;
private volatile int maxUsageCount = -1;
@Deprecated
private volatile int maxUsage = -1;
@Deprecated
private volatile int maxMultiplex = -1;
/**
* The type of the strategy to use for the pool.
@ -99,7 +101,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
* random strategy but with more predictable behaviour.
* No entries are favoured and contention is reduced.
*/
ROUND_ROBIN,
ROUND_ROBIN
}
/**
@ -117,6 +119,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
/**
* Construct a Pool with the specified thread-local cache size and
* an optional {@link ThreadLocal} cache.
*
* @param strategyType The strategy to used for looking up entries.
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
@ -126,66 +129,141 @@ public class Pool<T> implements AutoCloseable, Dumpable
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}
/**
* @return the number of reserved entries
*/
@ManagedAttribute("The number of reserved entries")
public int getReservedCount()
{
return (int)entries.stream().filter(Entry::isReserved).count();
}
/**
* @return the number of idle entries
*/
@ManagedAttribute("The number of idle entries")
public int getIdleCount()
{
return (int)entries.stream().filter(Entry::isIdle).count();
}
/**
* @return the number of in-use entries
*/
@ManagedAttribute("The number of in-use entries")
public int getInUseCount()
{
return (int)entries.stream().filter(Entry::isInUse).count();
}
/**
* @return the number of closed entries
*/
@ManagedAttribute("The number of closed entries")
public int getClosedCount()
{
return (int)entries.stream().filter(Entry::isClosed).count();
}
/**
* @return the maximum number of entries
*/
@ManagedAttribute("The maximum number of entries")
public int getMaxEntries()
{
return maxEntries;
}
/**
* @return the default maximum multiplex count of entries
* @deprecated Multiplex functionalities will be removed
*/
@ManagedAttribute("The default maximum multiplex count of entries")
@Deprecated
public int getMaxMultiplex()
{
return maxMultiplex;
return maxMultiplex == -1 ? 1 : maxMultiplex;
}
/**
* <p>Retrieves the max multiplex count for the given pooled object.</p>
*
* @param pooled the pooled object
* @return the max multiplex count for the given pooled object
* @deprecated Multiplex functionalities will be removed
*/
@Deprecated
protected int getMaxMultiplex(T pooled)
{
return getMaxMultiplex();
}
/**
* <p>Sets the default maximum multiplex count for the Pool's entries.</p>
*
* @param maxMultiplex the default maximum multiplex count of entries
* @deprecated Multiplex functionalities will be removed
*/
@Deprecated
public final void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
try (AutoLock l = lock.lock())
{
if (closed)
return;
if (entries.stream().anyMatch(MonoEntry.class::isInstance))
throw new IllegalStateException("Pool entries do not support multiplexing");
this.maxMultiplex = maxMultiplex;
}
}
/**
* Get the maximum number of times the entries of the pool
* can be acquired.
* @return the max usage count.
* <p>Returns the maximum number of times the entries of the pool
* can be acquired.</p>
*
* @return the default maximum usage count of entries
* @deprecated MaxUsage functionalities will be removed
*/
@ManagedAttribute("The default maximum usage count of entries")
@Deprecated
public int getMaxUsageCount()
{
return maxUsageCount;
return maxUsage;
}
/**
* Change the max usage count of the pool's entries. All existing
* idle entries over this new max usage are removed and closed.
* @param maxUsageCount the max usage count.
* <p>Retrieves the max usage count for the given pooled object.</p>
*
* @param pooled the pooled object
* @return the max usage count for the given pooled object
* @deprecated MaxUsage functionalities will be removed
*/
@Deprecated
protected int getMaxUsageCount(T pooled)
{
return getMaxUsageCount();
}
/**
* <p>Sets the maximum usage count for the Pool's entries.</p>
* <p>All existing idle entries that have a usage count larger
* than this new value are removed from the Pool and closed.</p>
*
* @param maxUsageCount the default maximum usage count of entries
* @deprecated MaxUsage functionalities will be removed
*/
@Deprecated
public final void setMaxUsageCount(int maxUsageCount)
{
if (maxUsageCount == 0)
throw new IllegalArgumentException("Max usage count must be != 0");
this.maxUsageCount = maxUsageCount;
// Iterate the entries, remove overused ones and collect a list of the closeable removed ones.
List<Closeable> copy;
@ -194,6 +272,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (closed)
return;
if (entries.stream().anyMatch(MonoEntry.class::isInstance))
throw new IllegalStateException("Pool entries do not support max usage");
this.maxUsage = maxUsageCount;
copy = entries.stream()
.filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable)
.map(entry -> (Closeable)entry.pooled)
@ -205,10 +288,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Create a new disabled slot into the pool.
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* <p>Creates a new disabled slot into the pool.</p>
* <p>The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
* {@link Pool#remove(Pool.Entry)}.
* {@link Pool#remove(Pool.Entry)}.</p>
*
* @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex,
* or a negative number to always trigger the reservation of a new entry.
@ -232,17 +315,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment)
return null;
Entry entry = new Entry();
Entry entry = newEntry();
entries.add(entry);
return entry;
}
}
/**
* Create a new disabled slot into the pool.
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* <p>Creates a new disabled slot into the pool.</p>
* <p>The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
* {@link Pool#remove(Pool.Entry)}.
* {@link Pool#remove(Pool.Entry)}.</p>
*
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
@ -259,15 +342,27 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (entries.size() >= maxEntries)
return null;
Entry entry = new Entry();
Entry entry = newEntry();
entries.add(entry);
return entry;
}
}
private Entry newEntry()
{
// Do not allow more than 2 implementations of Entry, otherwise call sites in Pool
// referencing Entry methods will become mega-morphic and kill the performance.
if (maxMultiplex >= 0 || maxUsage >= 0)
return new MultiEntry();
return new MonoEntry();
}
/**
* Acquire an entry from the pool.
* Only enabled entries will be returned from this method and their enable method must not be called.
* <p>Acquires an entry from the pool.</p>
* <p>Only enabled entries will be returned from this method
* and their {@link Entry#enable(Object, boolean)}
* method must not be called.</p>
*
* @return an entry from the pool or null if none is available.
*/
public Entry acquire()
@ -329,8 +424,8 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Utility method to acquire an entry from the pool,
* reserving and creating a new entry if necessary.
* <p>Acquires an entry from the pool,
* reserving and creating a new entry if necessary.</p>
*
* @param creator a function to create the pooled value for a reserved entry.
* @return an entry from the pool or null if none is available.
@ -366,15 +461,14 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* This method will return an acquired object to the pool. Objects
* that are acquired from the pool but never released will result
* in a memory leak.
* <p>Releases an {@link #acquire() acquired} entry to the pool.</p>
* <p>Entries that are acquired from the pool but never released
* will result in a memory leak.</p>
*
* @param entry the value to return to the pool
* @return true if the entry was released and could be acquired again,
* false if the entry should be removed by calling {@link #remove(Pool.Entry)}
* and the object contained by the entry should be disposed.
* @throws NullPointerException if value is null
*/
public boolean release(Entry entry)
{
@ -388,7 +482,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Remove a value from the pool.
* <p>Removes an entry from the pool.</p>
*
* @param entry the value to remove
* @return true if the entry was removed, false otherwise
@ -465,78 +559,72 @@ public class Pool<T> implements AutoCloseable, Dumpable
@Override
public String toString()
{
return String.format("%s@%x[size=%d closed=%s]",
return String.format("%s@%x[inUse=%d,size=%d,max=%d,closed=%b]",
getClass().getSimpleName(),
hashCode(),
entries.size(),
closed);
getInUseCount(),
size(),
getMaxEntries(),
isClosed());
}
public class Entry
/**
* <p>A Pool entry that holds metadata and a pooled object.</p>
*/
public abstract class Entry
{
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
// lo: multiplexing counter
private final AtomicBiInteger state;
// The pooled item. This is not volatile as it is set once and then never changed.
// The pooled object. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
Entry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
}
// for testing only
void setUsageCount(int usageCount)
{
this.state.getAndSetHi(usageCount);
}
/** Enable a reserved entry {@link Entry}.
* An entry returned from the {@link #reserve()} method must be enabled with this method,
* once and only once, before it is usable by the pool.
* The entry may be enabled and not acquired, in which case it is immediately available to be
/**
* <p>Enables this, previously {@link #reserve() reserved}, Entry.</p>
* <p>An entry returned from the {@link #reserve()} method must be enabled with this method,
* once and only once, before it is usable by the pool.</p>
* <p>The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
* no other thread can acquire it, although the acquire may still fail if the pool has been closed.
* @param pooled The pooled item for the entry
* @param acquire If true the entry is atomically enabled and acquired.
* @return true If the entry was enabled.
* @throws IllegalStateException if the entry was already enabled
* no other thread can acquire it, although the acquire may still fail if the pool has been closed.</p>
*
* @param pooled the pooled object for this Entry
* @param acquire whether this Entry should be atomically enabled and acquired
* @return whether this Entry was enabled
* @throws IllegalStateException if this Entry was already enabled
*/
public boolean enable(T pooled, boolean acquire)
{
Objects.requireNonNull(pooled);
if (state.getHi() != Integer.MIN_VALUE)
if (!isReserved())
{
if (state.getHi() == -1)
if (isClosed())
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
this.pooled = pooled;
int usage = acquire ? 1 : 0;
if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage))
{
this.pooled = null;
if (state.getHi() == -1)
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
return true;
if (tryEnable(acquire))
return true;
this.pooled = null;
if (isClosed())
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
/**
* @return the pooled object
*/
public T getPooled()
{
return pooled;
}
/**
* Release the entry.
* This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.
* @return true if released.
* <p>Releases this Entry.</p>
* <p>This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.</p>
*
* @return whether this Entry was released
*/
public boolean release()
{
@ -544,9 +632,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Remove the entry.
* This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.
* @return true if remove.
* <p>Removes this Entry from the Pool.</p>
* <p>This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.</p>
*
* @return whether this Entry was removed
*/
public boolean remove()
{
@ -554,40 +643,257 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Try to acquire the entry if possible by incrementing both the usage
* count and the multiplex count.
* @return true if the usage count is &lt;= maxUsageCount and
* the multiplex count is maxMultiplex and the entry is not closed,
* false otherwise.
* <p>Tries to enable, and possible also acquire, this Entry.</p>
*
* @param acquire whether to also acquire this Entry
* @return whether this Entry was enabled
*/
abstract boolean tryEnable(boolean acquire);
/**
* <p>Tries to acquire this Entry.</p>
*
* @return whether this Entry was acquired
*/
abstract boolean tryAcquire();
/**
* <p>Tries to release this Entry.</p>
*
* @return true if this Entry was released,
* false if {@link #tryRemove()} should be called.
*/
abstract boolean tryRelease();
/**
* <p>Tries to remove the entry by marking it as closed.</p>
*
* @return whether the entry can be removed from the containing pool
*/
abstract boolean tryRemove();
/**
* @return whether this Entry is closed
*/
public abstract boolean isClosed();
/**
* @return whether this Entry is reserved
*/
public abstract boolean isReserved();
/**
* @return whether this Entry is idle
*/
public abstract boolean isIdle();
/**
* @return whether this entry is in use.
*/
public abstract boolean isInUse();
/**
* @return whether this entry has been used beyond {@link #getMaxUsageCount()}
* @deprecated MaxUsage functionalities will be removed
*/
@Deprecated
public boolean isOverUsed()
{
return false;
}
boolean isIdleAndOverUsed()
{
return false;
}
// Only for testing.
int getUsageCount()
{
return 0;
}
// Only for testing.
void setUsageCount(int usageCount)
{
}
}
/**
* <p>A Pool entry that holds metadata and a pooled object,
* that can only be acquired concurrently at most once, and
* can be acquired/released multiple times.</p>
*/
private class MonoEntry extends Entry
{
// MIN_VALUE => pending; -1 => closed; 0 => idle; 1 => active;
private final AtomicInteger state = new AtomicInteger(Integer.MIN_VALUE);
@Override
protected boolean tryEnable(boolean acquire)
{
return state.compareAndSet(Integer.MIN_VALUE, acquire ? 1 : 0);
}
@Override
boolean tryAcquire()
{
while (true)
{
int s = state.get();
if (s != 0)
return false;
if (state.compareAndSet(s, 1))
return true;
}
}
@Override
boolean tryRelease()
{
while (true)
{
int s = state.get();
if (s < 0)
return false;
if (s == 0)
throw new IllegalStateException("Cannot release an already released entry");
if (state.compareAndSet(s, 0))
return true;
}
}
@Override
boolean tryRemove()
{
state.set(-1);
return true;
}
@Override
public boolean isClosed()
{
return state.get() < 0;
}
@Override
public boolean isReserved()
{
return state.get() == Integer.MIN_VALUE;
}
@Override
public boolean isIdle()
{
return state.get() == 0;
}
@Override
public boolean isInUse()
{
return state.get() == 1;
}
@Override
public String toString()
{
String s;
switch (state.get())
{
case Integer.MIN_VALUE:
s = "PENDING";
break;
case -1:
s = "CLOSED";
break;
case 0:
s = "IDLE";
break;
default:
s = "ACTIVE";
}
return String.format("%s@%x{%s,pooled=%s}",
getClass().getSimpleName(),
hashCode(),
s,
getPooled());
}
}
/**
* <p>A Pool entry that holds metadata and a pooled object,
* that can be acquired concurrently multiple times, and
* can be acquired/released multiple times.</p>
*/
class MultiEntry extends Entry
{
// hi: MIN_VALUE => pending; -1 => closed; 0+ => usage counter;
// lo: 0 => idle; positive => multiplex counter
private final AtomicBiInteger state;
MultiEntry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
}
@Override
void setUsageCount(int usageCount)
{
this.state.getAndSetHi(usageCount);
}
@Override
protected boolean tryEnable(boolean acquire)
{
int usage = acquire ? 1 : 0;
return state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage);
}
/**
* <p>Tries to acquire the entry if possible by incrementing both the usage
* count and the multiplex count.</p>
*
* @return true if the usage count is less than {@link #getMaxUsageCount()} and
* the multiplex count is less than {@link #getMaxMultiplex(Object)} and
* the entry is not closed, false otherwise.
*/
@Override
boolean tryAcquire()
{
while (true)
{
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
int multiplexCount = AtomicBiInteger.getLo(encoded);
boolean closed = usageCount < 0;
int multiplexingCount = AtomicBiInteger.getLo(encoded);
int currentMaxUsageCount = maxUsageCount;
if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount))
if (closed)
return false;
T pooled = getPooled();
int maxUsageCount = getMaxUsageCount(pooled);
int maxMultiplexed = getMaxMultiplex(pooled);
if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed)
return false;
if (maxUsageCount > 0 && usageCount >= maxUsageCount)
return false;
// Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE.
int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1;
if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1))
if (state.compareAndSet(encoded, newUsageCount, multiplexCount + 1))
return true;
}
}
/**
* Try to release the entry if possible by decrementing the multiplexing
* count unless the entity is closed.
* <p>Tries to release the entry if possible by decrementing the multiplex
* count unless the entity is closed.</p>
*
* @return true if the entry was released,
* false if {@link #tryRemove()} should be called.
*/
@Override
boolean tryRelease()
{
int newMultiplexingCount;
int newMultiplexCount;
int usageCount;
while (true)
{
@ -597,24 +903,26 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (closed)
return false;
newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1;
if (newMultiplexingCount < 0)
newMultiplexCount = AtomicBiInteger.getLo(encoded) - 1;
if (newMultiplexCount < 0)
throw new IllegalStateException("Cannot release an already released entry");
if (state.compareAndSet(encoded, usageCount, newMultiplexingCount))
if (state.compareAndSet(encoded, usageCount, newMultiplexCount))
break;
}
int currentMaxUsageCount = maxUsageCount;
int currentMaxUsageCount = maxUsage;
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
return !(overUsed && newMultiplexingCount == 0);
return !(overUsed && newMultiplexCount == 0);
}
/**
* Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
* The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
* <p>Tries to remove the entry by marking it as closed and decrementing the multiplex counter.</p>
* <p>The multiplex counter will never go below zero and if it reaches zero, the entry is considered removed.</p>
*
* @return true if the entry can be removed from the containing pool, false otherwise.
*/
@Override
boolean tryRemove()
{
while (true)
@ -630,45 +938,52 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
}
@Override
public boolean isClosed()
{
return state.getHi() < 0;
}
@Override
public boolean isReserved()
{
return state.getHi() == Integer.MIN_VALUE;
}
@Override
public boolean isIdle()
{
long encoded = state.get();
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0;
}
@Override
public boolean isInUse()
{
long encoded = state.get();
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0;
}
@Override
public boolean isOverUsed()
{
int currentMaxUsageCount = maxUsageCount;
int maxUsageCount = getMaxUsageCount();
int usageCount = state.getHi();
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
return maxUsageCount > 0 && usageCount >= maxUsageCount;
}
@Override
boolean isIdleAndOverUsed()
{
int currentMaxUsageCount = maxUsageCount;
int maxUsageCount = getMaxUsageCount();
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
int multiplexCount = AtomicBiInteger.getLo(encoded);
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0;
return maxUsageCount > 0 && usageCount >= maxUsageCount && multiplexCount == 0;
}
public int getUsageCount()
@Override
int getUsageCount()
{
return Math.max(state.getHi(), 0);
}
@ -680,16 +995,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
int usageCount = AtomicBiInteger.getHi(encoded);
int multiplexCount = AtomicBiInteger.getLo(encoded);
String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE";
String state = usageCount < 0
? (usageCount == Integer.MIN_VALUE ? "PENDING" : "CLOSED")
: (multiplexCount == 0 ? "IDLE" : "ACTIVE");
return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}",
return String.format("%s@%x{%s,usage=%d,multiplex=%d,pooled=%s}",
getClass().getSimpleName(),
hashCode(),
state,
Math.max(usageCount, 0),
Math.max(multiplexCount, 0),
getMaxMultiplex(),
pooled);
getPooled());
}
}
}

View File

@ -217,69 +217,6 @@ public class PoolTest
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
}
@ParameterizedTest
@MethodSource(value = "strategy")
public void testDeprecatedReserve(Factory factory)
{
Pool<CloseableHolder> pool = factory.getPool(2);
// Reserve an entry
Pool<CloseableHolder>.Entry e1 = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));
// max reservations
assertNull(pool.reserve(1));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));
// enable the entry
e1.enable(new CloseableHolder("aaa"), false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<CloseableHolder>.Entry e2 = pool.reserve(-1);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// remove the reservation
e2.remove();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<CloseableHolder>.Entry e3 = pool.reserve(-1);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// enable and acquire the entry
e3.enable(new CloseableHolder("bbb"), true);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(1));
// can't reenable
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
// Can't enable acquired entry
Pool<CloseableHolder>.Entry e = pool.acquire();
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
}
@ParameterizedTest
@MethodSource(value = "strategy")
public void testReserveNegativeMaxPending(Factory factory)
@ -587,6 +524,7 @@ public class PoolTest
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
{
Pool<CloseableHolder> pool = factory.getPool(1);
pool.setMaxMultiplex(1);
Pool<CloseableHolder>.Entry entry = pool.reserve();
entry.enable(new CloseableHolder("aaa"), false);
entry.setUsageCount(Integer.MAX_VALUE);
@ -606,6 +544,7 @@ public class PoolTest
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
{
Pool<CloseableHolder> pool = factory.getPool(2);
pool.setMaxUsageCount(100);
Pool<CloseableHolder>.Entry entry1 = pool.reserve();
entry1.enable(new CloseableHolder("aaa"), false);
Pool<CloseableHolder>.Entry entry2 = pool.reserve();