Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis. Updated Pool javadocs. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> Co-authored-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
parent
ef95c9b3ad
commit
525fcb3119
|
@ -70,7 +70,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
|
||||
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
|
||||
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
|
||||
}
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
|
||||
}
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
|
||||
|
@ -78,6 +83,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
this.destination = destination;
|
||||
this.requester = requester;
|
||||
this.pool = pool;
|
||||
pool.setMaxMultiplex(1); // Force the use of multiplexing.
|
||||
addBean(pool);
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.eclipse.jetty.client.api.Connection;
|
|||
public interface ConnectionPool extends Closeable
|
||||
{
|
||||
/**
|
||||
* Optionally pre-create up to <code>connectionCount</code>
|
||||
* Optionally pre-create up to {@code connectionCount}
|
||||
* connections so they are immediately ready for use.
|
||||
* @param connectionCount the number of connections to pre-start.
|
||||
*/
|
||||
|
@ -104,7 +104,7 @@ public interface ConnectionPool extends Closeable
|
|||
}
|
||||
|
||||
/**
|
||||
* Marks a connection pool as supporting multiplexed connections.
|
||||
* Marks a connection as supporting multiplexed requests.
|
||||
*/
|
||||
interface Multiplexable
|
||||
{
|
||||
|
@ -115,7 +115,11 @@ public interface ConnectionPool extends Closeable
|
|||
|
||||
/**
|
||||
* @param maxMultiplex the max number of requests multiplexable on a single connection
|
||||
* @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed
|
||||
*/
|
||||
void setMaxMultiplex(int maxMultiplex);
|
||||
@Deprecated
|
||||
default void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,9 +34,10 @@ public class DuplexConnectionPool extends AbstractConnectionPool
|
|||
|
||||
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
|
||||
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
|
||||
{
|
||||
super(destination, pool, requester);
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
|||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
@ManagedObject
|
||||
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
|
||||
public class MultiplexConnectionPool extends AbstractConnectionPool
|
||||
{
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
|
@ -34,9 +34,26 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
|
|||
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
|
||||
{
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
|
||||
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
|
||||
}
|
||||
|
||||
public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
|
||||
{
|
||||
@Override
|
||||
protected int getMaxMultiplex(Connection connection)
|
||||
{
|
||||
int multiplex = (connection instanceof Multiplexable)
|
||||
? ((Multiplexable)connection).getMaxMultiplex()
|
||||
: super.getMaxMultiplex(connection);
|
||||
return multiplex > 0 ? multiplex : 1;
|
||||
}
|
||||
}, requester);
|
||||
setMaxMultiplex(maxMultiplex);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, pool, requester);
|
||||
|
|
|
@ -28,15 +28,15 @@ public abstract class MultiplexHttpDestination extends HttpDestination
|
|||
public int getMaxRequestsPerConnection()
|
||||
{
|
||||
ConnectionPool connectionPool = getConnectionPool();
|
||||
if (connectionPool instanceof ConnectionPool.Multiplexable)
|
||||
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
|
||||
if (connectionPool instanceof AbstractConnectionPool)
|
||||
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
|
||||
return 1;
|
||||
}
|
||||
|
||||
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
|
||||
{
|
||||
ConnectionPool connectionPool = getConnectionPool();
|
||||
if (connectionPool instanceof ConnectionPool.Multiplexable)
|
||||
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
|
||||
if (connectionPool instanceof AbstractConnectionPool)
|
||||
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
|
|||
{
|
||||
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
|
||||
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
|
|||
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
|
||||
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
|
||||
// If there are queued requests and connections get
|
||||
// closed due to idle timeout or overuse, we want to
|
||||
// aggressively try to open new connections to replace
|
||||
|
|
|
@ -738,7 +738,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
int maxCount = getMaxLocalStreams();
|
||||
if (maxCount >= 0 && localCount >= maxCount)
|
||||
{
|
||||
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
|
||||
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
|
||||
promise.failed(failure);
|
||||
return null;
|
||||
}
|
||||
if (localStreamCount.compareAndSet(localCount, localCount + 1))
|
||||
|
@ -751,7 +754,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
stream.setIdleTimeout(getStreamIdleTimeout());
|
||||
flowControl.onStreamCreated(stream);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created local {}", stream);
|
||||
LOG.debug("Created local {} for {}", stream, this);
|
||||
return stream;
|
||||
}
|
||||
else
|
||||
|
@ -786,6 +789,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
int maxCount = getMaxRemoteStreams();
|
||||
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
|
||||
{
|
||||
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
|
||||
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
|
||||
return null;
|
||||
}
|
||||
|
@ -799,7 +805,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
stream.setIdleTimeout(getStreamIdleTimeout());
|
||||
flowControl.onStreamCreated(stream);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created remote {}", stream);
|
||||
LOG.debug("Created remote {} for {}", stream, this);
|
||||
return stream;
|
||||
}
|
||||
else
|
||||
|
@ -945,7 +951,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
private void onStreamCreated(int streamId)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Created stream #{} for {}", streamId, this);
|
||||
LOG.debug("Creating stream #{} for {}", streamId, this);
|
||||
streamsState.onStreamCreated();
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,9 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
setConnectionPoolFactory(destination ->
|
||||
{
|
||||
HttpClient httpClient = getHttpClient();
|
||||
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
|
||||
// Start with the minimum maxMultiplex; the SETTINGS frame from the
|
||||
// server preface will override this value before any request is sent.
|
||||
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -211,9 +213,6 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport
|
|||
@Override
|
||||
public void onSettings(Session session, SettingsFrame frame)
|
||||
{
|
||||
Map<Integer, Integer> settings = frame.getSettings();
|
||||
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
|
||||
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
|
||||
if (!connection.isMarked())
|
||||
onServerPreface(session);
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.TimeoutException;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpChannel;
|
||||
import org.eclipse.jetty.client.HttpConnection;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
|
@ -36,13 +37,14 @@ import org.eclipse.jetty.client.HttpRequest;
|
|||
import org.eclipse.jetty.client.SendFailure;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.HTTP2Session;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
|
||||
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnection.class);
|
||||
|
||||
|
@ -74,6 +76,12 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
|
|||
this.recycleHttpChannels = recycleHttpChannels;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
return ((HTTP2Session)session).getMaxLocalStreams();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Iterator<HttpChannel> getHttpChannels()
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.eclipse.jetty.http2.client.http;
|
|||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
|
@ -40,6 +41,7 @@ import org.eclipse.jetty.client.HttpClient;
|
|||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpResponseException;
|
||||
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
|
@ -76,6 +78,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
public class MaxConcurrentStreamsTest extends AbstractTest
|
||||
{
|
||||
|
@ -545,6 +548,109 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
|
||||
{
|
||||
long processing = 125;
|
||||
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
|
||||
{
|
||||
private Session session1;
|
||||
private Session session2;
|
||||
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||
switch (request.getURI().getPath())
|
||||
{
|
||||
case "/prime":
|
||||
{
|
||||
session1 = stream.getSession();
|
||||
// Send another request from here to force the opening of the 2nd connection.
|
||||
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
});
|
||||
break;
|
||||
}
|
||||
case "/prime2":
|
||||
{
|
||||
session2 = stream.getSession();
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
case "/update_max_streams":
|
||||
{
|
||||
Session session = stream.getSession() == session1 ? session2 : session1;
|
||||
Map<Integer, Integer> settings = new HashMap<>();
|
||||
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
|
||||
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
sleep(processing);
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
http2.setMaxConcurrentStreams(1);
|
||||
prepareServer(http2);
|
||||
server.start();
|
||||
prepareClient();
|
||||
client.setMaxConnectionsPerDestination(2);
|
||||
client.start();
|
||||
|
||||
// Prime the 2 connections.
|
||||
primeConnection();
|
||||
|
||||
String host = "localhost";
|
||||
int port = connector.getLocalPort();
|
||||
|
||||
AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
|
||||
assertEquals(2, pool.getConnectionCount());
|
||||
|
||||
// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
|
||||
ContentResponse response = client.newRequest(host, port)
|
||||
.path("/update_max_streams")
|
||||
.send();
|
||||
assertEquals(HttpStatus.OK_200, response.getStatus());
|
||||
|
||||
// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
|
||||
int count = 4;
|
||||
CountDownLatch latch = new CountDownLatch(count);
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
client.newRequest(host, port)
|
||||
.path("/" + i)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
int status = result.getResponse().getStatus();
|
||||
if (status == HttpStatus.OK_200)
|
||||
latch.countDown();
|
||||
else
|
||||
fail("unexpected status " + status);
|
||||
}
|
||||
else
|
||||
{
|
||||
fail(result.getFailure());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
private void primeConnection() throws Exception
|
||||
{
|
||||
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
|
||||
|
|
|
@ -100,9 +100,7 @@ public class MultiplexedConnectionPoolTest
|
|||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||
poolRef.set(pool);
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
|
@ -116,6 +114,7 @@ public class MultiplexedConnectionPoolTest
|
|||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
poolRef.set(connectionPool.getBean(Pool.class));
|
||||
connectionPool.setMaxDuration(maxDuration);
|
||||
return connectionPool;
|
||||
});
|
||||
|
@ -161,9 +160,7 @@ public class MultiplexedConnectionPoolTest
|
|||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||
poolRef.set(pool);
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
|
@ -177,6 +174,7 @@ public class MultiplexedConnectionPoolTest
|
|||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
poolRef.set(connectionPool.getBean(Pool.class));
|
||||
connectionPool.setMaxDuration(maxDuration);
|
||||
return connectionPool;
|
||||
});
|
||||
|
|
|
@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
@ -38,24 +40,22 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.util.thread.Locker;
|
||||
|
||||
/**
|
||||
* A fast pool of objects, with optional support for
|
||||
* multiplexing, max usage count and several optimized strategies plus
|
||||
* an optional {@link ThreadLocal} cache of the last release entry.
|
||||
* <p>
|
||||
* When the method {@link #close()} is called, all {@link Closeable}s in the pool
|
||||
* are also closed.
|
||||
* </p>
|
||||
* @param <T>
|
||||
* <p>A pool of objects, with optional support for multiplexing,
|
||||
* max usage count and several optimized strategies plus
|
||||
* an optional {@link ThreadLocal} cache of the last release entry.</p>
|
||||
* <p>When the method {@link #close()} is called, all {@link Closeable}s
|
||||
* object pooled by the pool are also closed.</p>
|
||||
*
|
||||
* @param <T> the type of the pooled objects
|
||||
*/
|
||||
@ManagedObject
|
||||
public class Pool<T> implements AutoCloseable, Dumpable
|
||||
{
|
||||
private static final Logger LOGGER = Log.getLogger(Pool.class);
|
||||
|
||||
private final List<Entry> entries = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final int maxEntries;
|
||||
private final StrategyType strategyType;
|
||||
|
||||
/*
|
||||
* The cache is used to avoid hammering on the first index of the entry list.
|
||||
* Caches can become poisoned (i.e.: containing entries that are in use) when
|
||||
|
@ -68,8 +68,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
private final ThreadLocal<Entry> cache;
|
||||
private final AtomicInteger nextIndex;
|
||||
private volatile boolean closed;
|
||||
private volatile int maxMultiplex = 1;
|
||||
private volatile int maxUsageCount = -1;
|
||||
@Deprecated
|
||||
private volatile int maxUsage = -1;
|
||||
@Deprecated
|
||||
private volatile int maxMultiplex = -1;
|
||||
|
||||
/**
|
||||
* The type of the strategy to use for the pool.
|
||||
|
@ -104,7 +106,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
* random strategy but with more predictable behaviour.
|
||||
* No entries are favoured and contention is reduced.
|
||||
*/
|
||||
ROUND_ROBIN,
|
||||
ROUND_ROBIN
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -122,6 +124,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
/**
|
||||
* Construct a Pool with the specified thread-local cache size and
|
||||
* an optional {@link ThreadLocal} cache.
|
||||
*
|
||||
* @param strategyType The strategy to used for looking up entries.
|
||||
* @param maxEntries the maximum amount of entries that the pool will accept.
|
||||
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
|
||||
|
@ -131,66 +134,141 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
this.maxEntries = maxEntries;
|
||||
this.strategyType = strategyType;
|
||||
this.cache = cache ? new ThreadLocal<>() : null;
|
||||
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
|
||||
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of reserved entries
|
||||
*/
|
||||
@ManagedAttribute("The number of reserved entries")
|
||||
public int getReservedCount()
|
||||
{
|
||||
return (int)entries.stream().filter(Entry::isReserved).count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of idle entries
|
||||
*/
|
||||
@ManagedAttribute("The number of idle entries")
|
||||
public int getIdleCount()
|
||||
{
|
||||
return (int)entries.stream().filter(Entry::isIdle).count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of in-use entries
|
||||
*/
|
||||
@ManagedAttribute("The number of in-use entries")
|
||||
public int getInUseCount()
|
||||
{
|
||||
return (int)entries.stream().filter(Entry::isInUse).count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of closed entries
|
||||
*/
|
||||
@ManagedAttribute("The number of closed entries")
|
||||
public int getClosedCount()
|
||||
{
|
||||
return (int)entries.stream().filter(Entry::isClosed).count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum number of entries
|
||||
*/
|
||||
@ManagedAttribute("The maximum number of entries")
|
||||
public int getMaxEntries()
|
||||
{
|
||||
return maxEntries;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the default maximum multiplex count of entries
|
||||
* @deprecated Multiplex functionalities will be removed
|
||||
*/
|
||||
@ManagedAttribute("The default maximum multiplex count of entries")
|
||||
@Deprecated
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
return maxMultiplex;
|
||||
return maxMultiplex == -1 ? 1 : maxMultiplex;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Retrieves the max multiplex count for the given pooled object.</p>
|
||||
*
|
||||
* @param pooled the pooled object
|
||||
* @return the max multiplex count for the given pooled object
|
||||
* @deprecated Multiplex functionalities will be removed
|
||||
*/
|
||||
@Deprecated
|
||||
protected int getMaxMultiplex(T pooled)
|
||||
{
|
||||
return getMaxMultiplex();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Sets the default maximum multiplex count for the Pool's entries.</p>
|
||||
*
|
||||
* @param maxMultiplex the default maximum multiplex count of entries
|
||||
* @deprecated Multiplex functionalities will be removed
|
||||
*/
|
||||
@Deprecated
|
||||
public final void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
if (maxMultiplex < 1)
|
||||
throw new IllegalArgumentException("Max multiplex must be >= 1");
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
try (Locker.Lock l = locker.lock())
|
||||
{
|
||||
if (closed)
|
||||
return;
|
||||
|
||||
if (entries.stream().anyMatch(MonoEntry.class::isInstance))
|
||||
throw new IllegalStateException("Pool entries do not support multiplexing");
|
||||
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum number of times the entries of the pool
|
||||
* can be acquired.
|
||||
* @return the max usage count.
|
||||
* <p>Returns the maximum number of times the entries of the pool
|
||||
* can be acquired.</p>
|
||||
*
|
||||
* @return the default maximum usage count of entries
|
||||
* @deprecated MaxUsage functionalities will be removed
|
||||
*/
|
||||
@ManagedAttribute("The default maximum usage count of entries")
|
||||
@Deprecated
|
||||
public int getMaxUsageCount()
|
||||
{
|
||||
return maxUsageCount;
|
||||
return maxUsage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Change the max usage count of the pool's entries. All existing
|
||||
* idle entries over this new max usage are removed and closed.
|
||||
* @param maxUsageCount the max usage count.
|
||||
* <p>Retrieves the max usage count for the given pooled object.</p>
|
||||
*
|
||||
* @param pooled the pooled object
|
||||
* @return the max usage count for the given pooled object
|
||||
* @deprecated MaxUsage functionalities will be removed
|
||||
*/
|
||||
@Deprecated
|
||||
protected int getMaxUsageCount(T pooled)
|
||||
{
|
||||
return getMaxUsageCount();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Sets the maximum usage count for the Pool's entries.</p>
|
||||
* <p>All existing idle entries that have a usage count larger
|
||||
* than this new value are removed from the Pool and closed.</p>
|
||||
*
|
||||
* @param maxUsageCount the default maximum usage count of entries
|
||||
* @deprecated MaxUsage functionalities will be removed
|
||||
*/
|
||||
@Deprecated
|
||||
public final void setMaxUsageCount(int maxUsageCount)
|
||||
{
|
||||
if (maxUsageCount == 0)
|
||||
throw new IllegalArgumentException("Max usage count must be != 0");
|
||||
this.maxUsageCount = maxUsageCount;
|
||||
|
||||
// Iterate the entries, remove overused ones and collect a list of the closeable removed ones.
|
||||
List<Closeable> copy;
|
||||
|
@ -199,6 +277,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (closed)
|
||||
return;
|
||||
|
||||
if (entries.stream().anyMatch(MonoEntry.class::isInstance))
|
||||
throw new IllegalStateException("Pool entries do not support max usage");
|
||||
|
||||
this.maxUsage = maxUsageCount;
|
||||
|
||||
copy = entries.stream()
|
||||
.filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable)
|
||||
.map(entry -> (Closeable)entry.pooled)
|
||||
|
@ -210,10 +293,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Create a new disabled slot into the pool.
|
||||
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
|
||||
* <p>Creates a new disabled slot into the pool.</p>
|
||||
* <p>The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
|
||||
* method called or be removed via {@link Pool.Entry#remove()} or
|
||||
* {@link Pool#remove(Pool.Entry)}.
|
||||
* {@link Pool#remove(Pool.Entry)}.</p>
|
||||
*
|
||||
* @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex,
|
||||
* or a negative number to always trigger the reservation of a new entry.
|
||||
|
@ -237,17 +320,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment)
|
||||
return null;
|
||||
|
||||
Entry entry = new Entry();
|
||||
Entry entry = newEntry();
|
||||
entries.add(entry);
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new disabled slot into the pool.
|
||||
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
|
||||
* <p>Creates a new disabled slot into the pool.</p>
|
||||
* <p>The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
|
||||
* method called or be removed via {@link Pool.Entry#remove()} or
|
||||
* {@link Pool#remove(Pool.Entry)}.
|
||||
* {@link Pool#remove(Pool.Entry)}.</p>
|
||||
*
|
||||
* @return a disabled entry that is contained in the pool,
|
||||
* or null if the pool is closed or if the pool already contains
|
||||
|
@ -264,17 +347,28 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (entries.size() >= maxEntries)
|
||||
return null;
|
||||
|
||||
Entry entry = new Entry();
|
||||
Entry entry = newEntry();
|
||||
entries.add(entry);
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
private Entry newEntry()
|
||||
{
|
||||
// Do not allow more than 2 implementations of Entry, otherwise call sites in Pool
|
||||
// referencing Entry methods will become mega-morphic and kill the performance.
|
||||
if (maxMultiplex >= 0 || maxUsage >= 0)
|
||||
return new MultiEntry();
|
||||
return new MonoEntry();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
|
||||
* @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
|
||||
* <p>Acquires the entry from the pool at the specified index.</p>
|
||||
* <p>This method bypasses the thread-local cache mechanism.</p>
|
||||
*
|
||||
* @param idx the index of the entry to acquire.
|
||||
* @return the specified entry or null if there is none at the specified index or if it is not available.
|
||||
* @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
|
||||
*/
|
||||
@Deprecated
|
||||
public Entry acquireAt(int idx)
|
||||
|
@ -296,8 +390,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Acquire an entry from the pool.
|
||||
* Only enabled entries will be returned from this method and their enable method must not be called.
|
||||
* <p>Acquires an entry from the pool.</p>
|
||||
* <p>Only enabled entries will be returned from this method
|
||||
* and their {@link Entry#enable(Object, boolean)}
|
||||
* method must not be called.</p>
|
||||
*
|
||||
* @return an entry from the pool or null if none is available.
|
||||
*/
|
||||
public Entry acquire()
|
||||
|
@ -359,8 +456,8 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Utility method to acquire an entry from the pool,
|
||||
* reserving and creating a new entry if necessary.
|
||||
* <p>Acquires an entry from the pool,
|
||||
* reserving and creating a new entry if necessary.</p>
|
||||
*
|
||||
* @param creator a function to create the pooled value for a reserved entry.
|
||||
* @return an entry from the pool or null if none is available.
|
||||
|
@ -396,15 +493,14 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* This method will return an acquired object to the pool. Objects
|
||||
* that are acquired from the pool but never released will result
|
||||
* in a memory leak.
|
||||
* <p>Releases an {@link #acquire() acquired} entry to the pool.</p>
|
||||
* <p>Entries that are acquired from the pool but never released
|
||||
* will result in a memory leak.</p>
|
||||
*
|
||||
* @param entry the value to return to the pool
|
||||
* @return true if the entry was released and could be acquired again,
|
||||
* false if the entry should be removed by calling {@link #remove(Pool.Entry)}
|
||||
* and the object contained by the entry should be disposed.
|
||||
* @throws NullPointerException if value is null
|
||||
*/
|
||||
public boolean release(Entry entry)
|
||||
{
|
||||
|
@ -418,7 +514,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Remove a value from the pool.
|
||||
* <p>Removes an entry from the pool.</p>
|
||||
*
|
||||
* @param entry the value to remove
|
||||
* @return true if the entry was removed, false otherwise
|
||||
|
@ -495,78 +591,72 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[size=%d closed=%s]",
|
||||
return String.format("%s@%x[inUse=%d,size=%d,capacity=%d,closed=%b]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
entries.size(),
|
||||
closed);
|
||||
getInUseCount(),
|
||||
size(),
|
||||
getMaxEntries(),
|
||||
isClosed());
|
||||
}
|
||||
|
||||
public class Entry
|
||||
/**
|
||||
* <p>A Pool entry that holds metadata and a pooled object.</p>
|
||||
*/
|
||||
public abstract class Entry
|
||||
{
|
||||
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
|
||||
// lo: multiplexing counter
|
||||
private final AtomicBiInteger state;
|
||||
|
||||
// The pooled item. This is not volatile as it is set once and then never changed.
|
||||
// The pooled object. This is not volatile as it is set once and then never changed.
|
||||
// Other threads accessing must check the state field above first, so a good before/after
|
||||
// relationship exists to make a memory barrier.
|
||||
private T pooled;
|
||||
|
||||
Entry()
|
||||
{
|
||||
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
|
||||
}
|
||||
|
||||
// for testing only
|
||||
void setUsageCount(int usageCount)
|
||||
{
|
||||
this.state.getAndSetHi(usageCount);
|
||||
}
|
||||
|
||||
/** Enable a reserved entry {@link Entry}.
|
||||
* An entry returned from the {@link #reserve()} method must be enabled with this method,
|
||||
* once and only once, before it is usable by the pool.
|
||||
* The entry may be enabled and not acquired, in which case it is immediately available to be
|
||||
/**
|
||||
* <p>Enables this, previously {@link #reserve() reserved}, Entry.</p>
|
||||
* <p>An entry returned from the {@link #reserve()} method must be enabled with this method,
|
||||
* once and only once, before it is usable by the pool.</p>
|
||||
* <p>The entry may be enabled and not acquired, in which case it is immediately available to be
|
||||
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
|
||||
* no other thread can acquire it, although the acquire may still fail if the pool has been closed.
|
||||
* @param pooled The pooled item for the entry
|
||||
* @param acquire If true the entry is atomically enabled and acquired.
|
||||
* @return true If the entry was enabled.
|
||||
* @throws IllegalStateException if the entry was already enabled
|
||||
* no other thread can acquire it, although the acquire may still fail if the pool has been closed.</p>
|
||||
*
|
||||
* @param pooled the pooled object for this Entry
|
||||
* @param acquire whether this Entry should be atomically enabled and acquired
|
||||
* @return whether this Entry was enabled
|
||||
* @throws IllegalStateException if this Entry was already enabled
|
||||
*/
|
||||
public boolean enable(T pooled, boolean acquire)
|
||||
{
|
||||
Objects.requireNonNull(pooled);
|
||||
|
||||
if (state.getHi() != Integer.MIN_VALUE)
|
||||
if (!isReserved())
|
||||
{
|
||||
if (state.getHi() == -1)
|
||||
if (isClosed())
|
||||
return false; // Pool has been closed
|
||||
throw new IllegalStateException("Entry already enabled: " + this);
|
||||
}
|
||||
this.pooled = pooled;
|
||||
int usage = acquire ? 1 : 0;
|
||||
if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage))
|
||||
{
|
||||
this.pooled = null;
|
||||
if (state.getHi() == -1)
|
||||
return false; // Pool has been closed
|
||||
throw new IllegalStateException("Entry already enabled: " + this);
|
||||
}
|
||||
|
||||
return true;
|
||||
if (tryEnable(acquire))
|
||||
return true;
|
||||
|
||||
this.pooled = null;
|
||||
if (isClosed())
|
||||
return false; // Pool has been closed
|
||||
throw new IllegalStateException("Entry already enabled: " + this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the pooled object
|
||||
*/
|
||||
public T getPooled()
|
||||
{
|
||||
return pooled;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release the entry.
|
||||
* This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.
|
||||
* @return true if released.
|
||||
* <p>Releases this Entry.</p>
|
||||
* <p>This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.</p>
|
||||
*
|
||||
* @return whether this Entry was released
|
||||
*/
|
||||
public boolean release()
|
||||
{
|
||||
|
@ -574,9 +664,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Remove the entry.
|
||||
* This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.
|
||||
* @return true if remove.
|
||||
* <p>Removes this Entry from the Pool.</p>
|
||||
* <p>This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.</p>
|
||||
*
|
||||
* @return whether this Entry was removed
|
||||
*/
|
||||
public boolean remove()
|
||||
{
|
||||
|
@ -584,40 +675,257 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Try to acquire the entry if possible by incrementing both the usage
|
||||
* count and the multiplex count.
|
||||
* @return true if the usage count is <= maxUsageCount and
|
||||
* the multiplex count is maxMultiplex and the entry is not closed,
|
||||
* false otherwise.
|
||||
* <p>Tries to enable, and possible also acquire, this Entry.</p>
|
||||
*
|
||||
* @param acquire whether to also acquire this Entry
|
||||
* @return whether this Entry was enabled
|
||||
*/
|
||||
abstract boolean tryEnable(boolean acquire);
|
||||
|
||||
/**
|
||||
* <p>Tries to acquire this Entry.</p>
|
||||
*
|
||||
* @return whether this Entry was acquired
|
||||
*/
|
||||
abstract boolean tryAcquire();
|
||||
|
||||
/**
|
||||
* <p>Tries to release this Entry.</p>
|
||||
*
|
||||
* @return true if this Entry was released,
|
||||
* false if {@link #tryRemove()} should be called.
|
||||
*/
|
||||
abstract boolean tryRelease();
|
||||
|
||||
/**
|
||||
* <p>Tries to remove the entry by marking it as closed.</p>
|
||||
*
|
||||
* @return whether the entry can be removed from the containing pool
|
||||
*/
|
||||
abstract boolean tryRemove();
|
||||
|
||||
/**
|
||||
* @return whether this Entry is closed
|
||||
*/
|
||||
public abstract boolean isClosed();
|
||||
|
||||
/**
|
||||
* @return whether this Entry is reserved
|
||||
*/
|
||||
public abstract boolean isReserved();
|
||||
|
||||
/**
|
||||
* @return whether this Entry is idle
|
||||
*/
|
||||
public abstract boolean isIdle();
|
||||
|
||||
/**
|
||||
* @return whether this entry is in use.
|
||||
*/
|
||||
public abstract boolean isInUse();
|
||||
|
||||
/**
|
||||
* @return whether this entry has been used beyond {@link #getMaxUsageCount()}
|
||||
* @deprecated MaxUsage functionalities will be removed
|
||||
*/
|
||||
@Deprecated
|
||||
public boolean isOverUsed()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean isIdleAndOverUsed()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// Only for testing.
|
||||
int getUsageCount()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Only for testing.
|
||||
void setUsageCount(int usageCount)
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>A Pool entry that holds metadata and a pooled object,
|
||||
* that can only be acquired concurrently at most once, and
|
||||
* can be acquired/released multiple times.</p>
|
||||
*/
|
||||
private class MonoEntry extends Entry
|
||||
{
|
||||
// MIN_VALUE => pending; -1 => closed; 0 => idle; 1 => active;
|
||||
private final AtomicInteger state = new AtomicInteger(Integer.MIN_VALUE);
|
||||
|
||||
@Override
|
||||
protected boolean tryEnable(boolean acquire)
|
||||
{
|
||||
return state.compareAndSet(Integer.MIN_VALUE, acquire ? 1 : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean tryAcquire()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
int s = state.get();
|
||||
if (s != 0)
|
||||
return false;
|
||||
if (state.compareAndSet(s, 1))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean tryRelease()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
int s = state.get();
|
||||
if (s < 0)
|
||||
return false;
|
||||
if (s == 0)
|
||||
throw new IllegalStateException("Cannot release an already released entry");
|
||||
if (state.compareAndSet(s, 0))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean tryRemove()
|
||||
{
|
||||
state.set(-1);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return state.get() < 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReserved()
|
||||
{
|
||||
return state.get() == Integer.MIN_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isIdle()
|
||||
{
|
||||
return state.get() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInUse()
|
||||
{
|
||||
return state.get() == 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
String s;
|
||||
switch (state.get())
|
||||
{
|
||||
case Integer.MIN_VALUE:
|
||||
s = "PENDING";
|
||||
break;
|
||||
case -1:
|
||||
s = "CLOSED";
|
||||
break;
|
||||
case 0:
|
||||
s = "IDLE";
|
||||
break;
|
||||
default:
|
||||
s = "ACTIVE";
|
||||
}
|
||||
return String.format("%s@%x{%s,pooled=%s}",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
s,
|
||||
getPooled());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>A Pool entry that holds metadata and a pooled object,
|
||||
* that can be acquired concurrently multiple times, and
|
||||
* can be acquired/released multiple times.</p>
|
||||
*/
|
||||
class MultiEntry extends Entry
|
||||
{
|
||||
// hi: MIN_VALUE => pending; -1 => closed; 0+ => usage counter;
|
||||
// lo: 0 => idle; positive => multiplex counter
|
||||
private final AtomicBiInteger state;
|
||||
|
||||
MultiEntry()
|
||||
{
|
||||
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
void setUsageCount(int usageCount)
|
||||
{
|
||||
this.state.getAndSetHi(usageCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean tryEnable(boolean acquire)
|
||||
{
|
||||
int usage = acquire ? 1 : 0;
|
||||
return state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Tries to acquire the entry if possible by incrementing both the usage
|
||||
* count and the multiplex count.</p>
|
||||
*
|
||||
* @return true if the usage count is less than {@link #getMaxUsageCount()} and
|
||||
* the multiplex count is less than {@link #getMaxMultiplex(Object)} and
|
||||
* the entry is not closed, false otherwise.
|
||||
*/
|
||||
@Override
|
||||
boolean tryAcquire()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
long encoded = state.get();
|
||||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
int multiplexCount = AtomicBiInteger.getLo(encoded);
|
||||
boolean closed = usageCount < 0;
|
||||
int multiplexingCount = AtomicBiInteger.getLo(encoded);
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount))
|
||||
if (closed)
|
||||
return false;
|
||||
T pooled = getPooled();
|
||||
int maxUsageCount = getMaxUsageCount(pooled);
|
||||
int maxMultiplexed = getMaxMultiplex(pooled);
|
||||
if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed)
|
||||
return false;
|
||||
if (maxUsageCount > 0 && usageCount >= maxUsageCount)
|
||||
return false;
|
||||
|
||||
// Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE.
|
||||
int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1;
|
||||
if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1))
|
||||
if (state.compareAndSet(encoded, newUsageCount, multiplexCount + 1))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to release the entry if possible by decrementing the multiplexing
|
||||
* count unless the entity is closed.
|
||||
* <p>Tries to release the entry if possible by decrementing the multiplex
|
||||
* count unless the entity is closed.</p>
|
||||
*
|
||||
* @return true if the entry was released,
|
||||
* false if {@link #tryRemove()} should be called.
|
||||
*/
|
||||
@Override
|
||||
boolean tryRelease()
|
||||
{
|
||||
int newMultiplexingCount;
|
||||
int newMultiplexCount;
|
||||
int usageCount;
|
||||
while (true)
|
||||
{
|
||||
|
@ -627,24 +935,26 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (closed)
|
||||
return false;
|
||||
|
||||
newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1;
|
||||
if (newMultiplexingCount < 0)
|
||||
newMultiplexCount = AtomicBiInteger.getLo(encoded) - 1;
|
||||
if (newMultiplexCount < 0)
|
||||
throw new IllegalStateException("Cannot release an already released entry");
|
||||
|
||||
if (state.compareAndSet(encoded, usageCount, newMultiplexingCount))
|
||||
if (state.compareAndSet(encoded, usageCount, newMultiplexCount))
|
||||
break;
|
||||
}
|
||||
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
int currentMaxUsageCount = maxUsage;
|
||||
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
|
||||
return !(overUsed && newMultiplexingCount == 0);
|
||||
return !(overUsed && newMultiplexCount == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
|
||||
* The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
|
||||
* <p>Tries to remove the entry by marking it as closed and decrementing the multiplex counter.</p>
|
||||
* <p>The multiplex counter will never go below zero and if it reaches zero, the entry is considered removed.</p>
|
||||
*
|
||||
* @return true if the entry can be removed from the containing pool, false otherwise.
|
||||
*/
|
||||
@Override
|
||||
boolean tryRemove()
|
||||
{
|
||||
while (true)
|
||||
|
@ -660,45 +970,52 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return state.getHi() < 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isReserved()
|
||||
{
|
||||
return state.getHi() == Integer.MIN_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isIdle()
|
||||
{
|
||||
long encoded = state.get();
|
||||
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isInUse()
|
||||
{
|
||||
long encoded = state.get();
|
||||
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOverUsed()
|
||||
{
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
int maxUsageCount = getMaxUsageCount();
|
||||
int usageCount = state.getHi();
|
||||
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
|
||||
return maxUsageCount > 0 && usageCount >= maxUsageCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean isIdleAndOverUsed()
|
||||
{
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
int maxUsageCount = getMaxUsageCount();
|
||||
long encoded = state.get();
|
||||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
int multiplexCount = AtomicBiInteger.getLo(encoded);
|
||||
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0;
|
||||
return maxUsageCount > 0 && usageCount >= maxUsageCount && multiplexCount == 0;
|
||||
}
|
||||
|
||||
public int getUsageCount()
|
||||
@Override
|
||||
int getUsageCount()
|
||||
{
|
||||
return Math.max(state.getHi(), 0);
|
||||
}
|
||||
|
@ -710,16 +1027,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
int multiplexCount = AtomicBiInteger.getLo(encoded);
|
||||
|
||||
String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE";
|
||||
String state = usageCount < 0
|
||||
? (usageCount == Integer.MIN_VALUE ? "PENDING" : "CLOSED")
|
||||
: (multiplexCount == 0 ? "IDLE" : "ACTIVE");
|
||||
|
||||
return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}",
|
||||
return String.format("%s@%x{%s,usage=%d,multiplex=%d,pooled=%s}",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
state,
|
||||
Math.max(usageCount, 0),
|
||||
Math.max(multiplexCount, 0),
|
||||
getMaxMultiplex(),
|
||||
pooled);
|
||||
getPooled());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ public class PoolTest
|
|||
public void testAcquireRelease(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
|
@ -130,7 +130,7 @@ public class PoolTest
|
|||
public void testRemoveBeforeRelease(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
|
@ -222,69 +222,6 @@ public class PoolTest
|
|||
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testDeprecatedReserve(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
// Reserve an entry
|
||||
Pool<CloseableHolder>.Entry e1 = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// max reservations
|
||||
assertNull(pool.reserve(1));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable the entry
|
||||
e1.enable(new CloseableHolder("aaa"), false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<CloseableHolder>.Entry e2 = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// remove the reservation
|
||||
e2.remove();
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<CloseableHolder>.Entry e3 = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable and acquire the entry
|
||||
e3.enable(new CloseableHolder("bbb"), true);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
|
||||
// can't reenable
|
||||
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
|
||||
|
||||
// Can't enable acquired entry
|
||||
Pool<CloseableHolder>.Entry e = pool.acquire();
|
||||
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReserveNegativeMaxPending(Factory factory)
|
||||
|
@ -356,22 +293,6 @@ public class PoolTest
|
|||
assertThat(pool.values().isEmpty(), is(false));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testAcquireAt(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||
pool.reserve(-1).enable(new CloseableHolder("bbb"), false);
|
||||
|
||||
assertThat(pool.acquireAt(2), nullValue());
|
||||
assertThat(pool.acquireAt(0), notNullValue());
|
||||
assertThat(pool.acquireAt(0), nullValue());
|
||||
assertThat(pool.acquireAt(1), notNullValue());
|
||||
assertThat(pool.acquireAt(1), nullValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMaxUsageCount(Factory factory)
|
||||
|
@ -608,6 +529,7 @@ public class PoolTest
|
|||
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(1);
|
||||
Pool<CloseableHolder>.Entry entry = pool.reserve();
|
||||
entry.enable(new CloseableHolder("aaa"), false);
|
||||
entry.setUsageCount(Integer.MAX_VALUE);
|
||||
|
@ -627,6 +549,7 @@ public class PoolTest
|
|||
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
|
||||
{
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
pool.setMaxUsageCount(100);
|
||||
Pool<CloseableHolder>.Entry entry1 = pool.reserve();
|
||||
entry1.enable(new CloseableHolder("aaa"), false);
|
||||
Pool<CloseableHolder>.Entry entry2 = pool.reserve();
|
||||
|
|
Loading…
Reference in New Issue