diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index ac2b806ffa8..6919631d05a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -18,69 +18,113 @@ package org.eclipse.jetty.client; +import java.io.IOException; +import java.util.ArrayDeque; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; -import org.eclipse.jetty.util.AtomicBiInteger; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Sweeper; + +import static java.util.stream.Collectors.toCollection; @ManagedObject -public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable +public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); - /** - * The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed. - * The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections. - */ - private final AtomicBiInteger connections = new AtomicBiInteger(); - private final AtomicBoolean closed = new AtomicBoolean(); private final HttpDestination destination; - private final int maxConnections; private final Callback requester; + private final Pool pool; /** - * @param destination the correspondent destination - * @param maxConnections the max number of connections - * @param requester the callback to notify about new connection creation/failure - * @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, Callback)} instead + * @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead */ @Deprecated protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester) { - this((HttpDestination)destination, maxConnections, requester); + this((HttpDestination)destination, maxConnections, true, requester); } - protected AbstractConnectionPool(HttpDestination destination, int maxConnections, Callback requester) + protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { this.destination = destination; - this.maxConnections = maxConnections; this.requester = requester; + @SuppressWarnings("unchecked") + Pool pool = destination.getBean(Pool.class); + if (pool == null) + { + pool = new Pool<>(maxConnections, cache ? 1 : 0); + destination.addBean(pool); + } + this.pool = pool; } - protected HttpDestination getHttpDestination() + @Override + public CompletableFuture preCreateConnections(int connectionCount) { - return destination; + CompletableFuture[] futures = new CompletableFuture[connectionCount]; + for (int i = 0; i < connectionCount; i++) + { + futures[i] = tryCreateReturningFuture(pool.getMaxEntries()); + } + return CompletableFuture.allOf(futures); + } + + protected int getMaxMultiplex() + { + return pool.getMaxMultiplex(); + } + + protected void setMaxMultiplex(int maxMultiplex) + { + pool.setMaxMultiplex(maxMultiplex); + } + + protected int getMaxUsageCount() + { + return pool.getMaxUsageCount(); + } + + protected void setMaxUsageCount(int maxUsageCount) + { + pool.setMaxUsageCount(maxUsageCount); + } + + @ManagedAttribute(value = "The number of active connections", readonly = true) + public int getActiveConnectionCount() + { + return pool.getInUseConnectionCount(); + } + + @ManagedAttribute(value = "The number of idle connections", readonly = true) + public int getIdleConnectionCount() + { + return pool.getIdleConnectionCount(); } @ManagedAttribute(value = "The max number of connections", readonly = true) public int getMaxConnectionCount() { - return maxConnections; + return pool.getMaxEntries(); } @ManagedAttribute(value = "The number of connections", readonly = true) public int getConnectionCount() { - return connections.getLo(); + return pool.size(); } /** @@ -97,19 +141,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable @ManagedAttribute(value = "The number of pending connections", readonly = true) public int getPendingConnectionCount() { - return connections.getHi(); + return pool.getPendingConnectionCount(); } @Override public boolean isEmpty() { - return connections.getLo() == 0; + return pool.size() == 0; } @Override public boolean isClosed() { - return closed.get(); + return pool.isClosed(); } @Override @@ -152,88 +196,165 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable */ protected void tryCreate(int maxPending) { - while (true) - { - long encoded = connections.get(); - int pending = AtomicBiInteger.getHi(encoded); - int total = AtomicBiInteger.getLo(encoded); - - if (LOG.isDebugEnabled()) - LOG.debug("tryCreate {}/{} connections {}/{} pending", total, maxConnections, pending, maxPending); - - if (total >= maxConnections) - return; - - if (maxPending >= 0 && pending >= maxPending) - return; - - if (connections.compareAndSet(encoded, pending + 1, total + 1)) - { - if (LOG.isDebugEnabled()) - LOG.debug("newConnection {}/{} connections {}/{} pending", total + 1, maxConnections, pending + 1, maxPending); - - destination.newConnection(new Promise() - { - @Override - public void succeeded(Connection connection) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation succeeded {}", total + 1, maxConnections, connection); - connections.add(-1, 0); - onCreated(connection); - proceed(); - } - - @Override - public void failed(Throwable x) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection " + (total + 1) + "/" + maxConnections + " creation failed", x); - connections.add(-1, -1); - requester.failed(x); - } - }); - - return; - } - } + tryCreateReturningFuture(maxPending); } - protected abstract void onCreated(Connection connection); + private CompletableFuture tryCreateReturningFuture(int maxPending) + { + CompletableFuture future = new CompletableFuture<>(); + + if (LOG.isDebugEnabled()) + LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending); + + Pool.Entry entry = pool.reserve(maxPending); + if (entry == null) + { + future.complete(null); + return future; + } + + if (LOG.isDebugEnabled()) + LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending); + + destination.newConnection(new Promise() + { + @Override + public void succeeded(Connection connection) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection); + adopt(entry, connection); + future.complete(null); + proceed(); + } + + @Override + public void failed(Throwable x) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x); + pool.remove(entry); + future.completeExceptionally(x); + requester.failed(x); + } + }); + return future; + } protected void proceed() { requester.succeeded(); } - protected abstract Connection activate(); - - protected Connection active(Connection connection) + private void adopt(Pool.Entry entry, Connection connection) { + if (!(connection instanceof Attachable)) + throw new IllegalArgumentException("Invalid connection object: " + connection); + Attachable attachable = (Attachable)connection; + attachable.setAttachment(entry); if (LOG.isDebugEnabled()) - LOG.debug("Connection active {}", connection); - acquired(connection); - return connection; + LOG.debug("onCreating {}", entry); + onCreated(connection); + entry.enable(connection); + idle(connection, false); } - protected void acquired(Connection connection) + protected Connection activate() + { + Pool.Entry entry = pool.acquire(); + if (LOG.isDebugEnabled()) + LOG.debug("activated '{}'", entry); + if (entry != null) + { + Connection connection = entry.getPooled(); + acquired(connection); + return connection; + } + return null; + } + + @Override + public boolean isActive(Connection connection) + { + if (!(connection instanceof Attachable)) + throw new IllegalArgumentException("Invalid connection object: " + connection); + Attachable attachable = (Attachable)connection; + @SuppressWarnings("unchecked") + Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); + if (entry == null) + return false; + if (LOG.isDebugEnabled()) + LOG.debug("isActive {}", entry); + return !entry.isIdle(); + } + + @Override + public boolean release(Connection connection) + { + if (!deactivate(connection)) + return false; + released(connection); + return idle(connection, isClosed()); + } + + protected boolean deactivate(Connection connection) + { + if (!(connection instanceof Attachable)) + throw new IllegalArgumentException("Invalid connection object: " + connection); + Attachable attachable = (Attachable)connection; + @SuppressWarnings("unchecked") + Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); + if (entry == null) + return true; + if (LOG.isDebugEnabled()) + LOG.debug("releasing {}", entry); + boolean reusable = pool.release(entry); + if (!reusable) + { + remove(connection); + return false; + } + return true; + } + + @Override + public boolean remove(Connection connection) + { + return remove(connection, false); + } + + protected boolean remove(Connection connection, boolean force) + { + if (!(connection instanceof Attachable)) + throw new IllegalArgumentException("Invalid connection object: " + connection); + Attachable attachable = (Attachable)connection; + @SuppressWarnings("unchecked") + Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); + if (entry == null) + return false; + attachable.setAttachment(null); + if (LOG.isDebugEnabled()) + LOG.debug("removing {}", entry); + boolean removed = pool.remove(entry); + if (removed || force) + { + released(connection); + removed(connection); + } + return removed; + } + + protected void onCreated(Connection connection) { } protected boolean idle(Connection connection, boolean close) { - if (close) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle close {}", connection); - return false; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle {}", connection); - return true; - } + return !close; + } + + protected void acquired(Connection connection) + { } protected void released(Connection connection) @@ -242,28 +363,78 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable protected void removed(Connection connection) { - int pooled = connections.addAndGetLo(-1); - if (LOG.isDebugEnabled()) - LOG.debug("Connection removed {} - pooled: {}", connection, pooled); + } + + /** + * @deprecated Relying on this method indicates a reliance on the implementation details. + * @return an unmodifiable queue working as a view of the idle connections. + */ + @Deprecated + public Queue getIdleConnections() + { + return pool.values().stream() + .filter(Pool.Entry::isIdle) + .filter(entry -> !entry.isClosed()) + .map(Pool.Entry::getPooled) + .collect(toCollection(ArrayDeque::new)); + } + + /** + * @deprecated Relying on this method indicates a reliance on the implementation details. + * @return an unmodifiable collection working as a view of the active connections. + */ + @Deprecated + public Collection getActiveConnections() + { + return pool.values().stream() + .filter(entry -> !entry.isIdle()) + .filter(entry -> !entry.isClosed()) + .map(Pool.Entry::getPooled) + .collect(Collectors.toList()); } @Override public void close() { - if (closed.compareAndSet(false, true)) - { - connections.set(0, 0); - } - } - - protected void close(Collection connections) - { - connections.forEach(Connection::close); + pool.close(); } @Override - public String dump() + public void dump(Appendable out, String indent) throws IOException { - return Dumpable.dump(this); + Dumpable.dumpObjects(out, indent, this); + } + + @Override + public boolean sweep() + { + pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry -> + { + Connection connection = entry.getPooled(); + if (((Sweeper.Sweepable)connection).sweep()) + { + boolean removed = remove(connection); + LOG.warn("Connection swept: {}{}{} from active connections{}{}", + connection, + System.lineSeparator(), + removed ? "Removed" : "Not removed", + System.lineSeparator(), + dump()); + } + }); + return false; + } + + @Override + public String toString() + { + return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]", + getClass().getSimpleName(), + hashCode(), + getPendingConnectionCount(), + getConnectionCount(), + getMaxConnectionCount(), + getActiveConnectionCount(), + getIdleConnectionCount()); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index d60ee3aaf02..5db36554f59 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.client; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; import org.eclipse.jetty.client.api.Connection; @@ -27,6 +28,16 @@ import org.eclipse.jetty.client.api.Connection; */ public interface ConnectionPool extends Closeable { + /** + * Optionally pre-create up to connectionCount + * connections so they are immediately ready for use. + * @param connectionCount the number of connections to pre-start. + */ + default CompletableFuture preCreateConnections(int connectionCount) + { + return CompletableFuture.completedFuture(null); + } + /** * @param connection the connection to test * @return whether the given connection is currently in use diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index c0cd9d6ebca..7139a96b951 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -18,304 +18,33 @@ package org.eclipse.jetty.client; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.HashSet; -import java.util.List; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - -import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.component.Dumpable; -import org.eclipse.jetty.util.component.DumpableCollection; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Sweeper; @ManagedObject -public class DuplexConnectionPool extends AbstractConnectionPool implements Sweeper.Sweepable +public class DuplexConnectionPool extends AbstractConnectionPool { - private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class); - - private final ReentrantLock lock = new ReentrantLock(); - private final Deque idleConnections; - private final Set activeConnections; - - public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester) + public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { - super((HttpDestination)destination, maxConnections, requester); - this.idleConnections = new ArrayDeque<>(maxConnections); - this.activeConnections = new HashSet<>(maxConnections); + this(destination, maxConnections, true, requester); } - protected void lock() + public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - lock.lock(); - } - - protected void unlock() - { - lock.unlock(); - } - - @ManagedAttribute(value = "The number of idle connections", readonly = true) - public int getIdleConnectionCount() - { - lock(); - try - { - return idleConnections.size(); - } - finally - { - unlock(); - } - } - - @ManagedAttribute(value = "The number of active connections", readonly = true) - public int getActiveConnectionCount() - { - lock(); - try - { - return activeConnections.size(); - } - finally - { - unlock(); - } - } - - public Queue getIdleConnections() - { - return idleConnections; - } - - public Collection getActiveConnections() - { - return activeConnections; + super(destination, maxConnections, cache, requester); } @Override - public boolean isActive(Connection connection) + @ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed") + public int getMaxUsageCount() { - lock(); - try - { - return activeConnections.contains(connection); - } - finally - { - unlock(); - } + return super.getMaxUsageCount(); } @Override - protected void onCreated(Connection connection) + public void setMaxUsageCount(int maxUsageCount) { - lock(); - try - { - // Use "cold" new connections as last. - idleConnections.offer(connection); - } - finally - { - unlock(); - } - - idle(connection, false); - } - - @Override - protected Connection activate() - { - Connection connection; - lock(); - try - { - connection = idleConnections.poll(); - if (connection == null) - return null; - activeConnections.add(connection); - } - finally - { - unlock(); - } - - return active(connection); - } - - @Override - public boolean release(Connection connection) - { - boolean closed = isClosed(); - lock(); - try - { - if (!activeConnections.remove(connection)) - return false; - - if (!closed) - { - // Make sure we use "hot" connections first. - deactivate(connection); - } - } - finally - { - unlock(); - } - - released(connection); - return idle(connection, closed); - } - - protected boolean deactivate(Connection connection) - { - return idleConnections.offerFirst(connection); - } - - @Override - public boolean remove(Connection connection) - { - return remove(connection, false); - } - - protected boolean remove(Connection connection, boolean force) - { - boolean activeRemoved; - boolean idleRemoved; - lock(); - try - { - activeRemoved = activeConnections.remove(connection); - idleRemoved = idleConnections.remove(connection); - } - finally - { - unlock(); - } - - if (activeRemoved || force) - released(connection); - boolean removed = activeRemoved || idleRemoved || force; - if (removed) - removed(connection); - return removed; - } - - @Override - public void close() - { - super.close(); - - List connections = new ArrayList<>(); - lock(); - try - { - connections.addAll(idleConnections); - idleConnections.clear(); - connections.addAll(activeConnections); - activeConnections.clear(); - } - finally - { - unlock(); - } - - close(connections); - } - - @Override - public void dump(Appendable out, String indent) throws IOException - { - DumpableCollection active; - DumpableCollection idle; - lock(); - try - { - active = new DumpableCollection("active", new ArrayList<>(activeConnections)); - idle = new DumpableCollection("idle", new ArrayList<>(idleConnections)); - } - finally - { - unlock(); - } - dump(out, indent, active, idle); - } - - protected void dump(Appendable out, String indent, Object... items) throws IOException - { - Dumpable.dumpObjects(out, indent, this, items); - } - - @Override - public boolean sweep() - { - List toSweep; - lock(); - try - { - toSweep = activeConnections.stream() - .filter(connection -> connection instanceof Sweeper.Sweepable) - .collect(Collectors.toList()); - } - finally - { - unlock(); - } - - for (Connection connection : toSweep) - { - if (((Sweeper.Sweepable)connection).sweep()) - { - boolean removed = remove(connection, true); - LOG.warn("Connection swept: {}{}{} from active connections{}{}", - connection, - System.lineSeparator(), - removed ? "Removed" : "Not removed", - System.lineSeparator(), - dump()); - } - } - - return false; - } - - @Override - public String toString() - { - int activeSize; - int idleSize; - lock(); - try - { - activeSize = activeConnections.size(); - idleSize = idleConnections.size(); - } - finally - { - unlock(); - } - - return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]", - getClass().getSimpleName(), - hashCode(), - getPendingConnectionCount(), - getConnectionCount(), - getMaxConnectionCount(), - activeSize, - idleSize); + super.setMaxUsageCount(maxUsageCount); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index d0d8857708e..6930ca19b6b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -35,15 +35,17 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.HttpCookieStore; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public abstract class HttpConnection implements Connection +public abstract class HttpConnection implements Connection, Attachable { private static final Logger LOG = Log.getLogger(HttpConnection.class); private final HttpDestination destination; + private Object attachment; private int idleTimeoutGuard; private long idleTimeoutStamp; @@ -272,6 +274,18 @@ public abstract class HttpConnection implements Connection } } + @Override + public void setAttachment(Object obj) + { + this.attachment = obj; + } + + @Override + public Object getAttachment() + { + return attachment; + } + @Override public String toString() { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 9fbf47474e5..59ff82753f8 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -438,6 +438,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest { if (connectionPool.isActive(connection)) { + // trigger the next request after releasing the connection if (connectionPool.release(connection)) send(false); else diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java index 14404a7c528..9d25a63905b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpProxy.java @@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -233,11 +234,12 @@ public class HttpProxy extends ProxyConfiguration.Proxy } } - private static class ProxyConnection implements Connection + private static class ProxyConnection implements Connection, Attachable { private final Destination destination; private final Connection connection; private final Promise promise; + private Object attachment; private ProxyConnection(Destination destination, Connection connection, Promise promise) { @@ -270,6 +272,18 @@ public class HttpProxy extends ProxyConfiguration.Proxy { return connection.isClosed(); } + + @Override + public void setAttachment(Object obj) + { + this.attachment = obj; + } + + @Override + public Object getAttachment() + { + return attachment; + } } private static class TunnelPromise implements Promise diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java index 7baad6d3174..48150a9b643 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -40,7 +40,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester) { - super(destination, maxConnections, requester); + super((HttpDestination)destination, maxConnections, requester); start(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index 43f124e85a8..e78b9450407 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -18,405 +18,47 @@ package org.eclipse.jetty.client; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - -import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.component.Dumpable; -import org.eclipse.jetty.util.component.DumpableCollection; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Sweeper; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; -public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable, Sweeper.Sweepable +@ManagedObject +public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable { - private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class); - - private final ReentrantLock lock = new ReentrantLock(); - private final Deque idleConnections; - private final Map muxedConnections; - private final Map busyConnections; - private int maxMultiplex; - public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, maxConnections, requester); - this.idleConnections = new ArrayDeque<>(maxConnections); - this.muxedConnections = new HashMap<>(maxConnections); - this.busyConnections = new HashMap<>(maxConnections); - this.maxMultiplex = maxMultiplex; - } - - @Override - protected Connection acquire(boolean create) - { - Connection connection = activate(); - if (connection == null && create) - { - int queuedRequests = getHttpDestination().getQueuedRequestCount(); - int maxMultiplex = getMaxMultiplex(); - int maxPending = ceilDiv(queuedRequests, maxMultiplex); - tryCreate(maxPending); - connection = activate(); - } - return connection; - } - - /** - * @param a the dividend - * @param b the divisor - * @return the ceiling of the algebraic quotient - */ - private static int ceilDiv(int a, int b) - { - return (a + b - 1) / b; - } - - protected void lock() - { - lock.lock(); - } - - protected void unlock() - { - lock.unlock(); + this(destination, maxConnections, true, requester, maxMultiplex); + } + + public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, maxConnections, cache, requester); + setMaxMultiplex(maxMultiplex); } @Override + @ManagedAttribute(value = "The multiplexing factor of connections") public int getMaxMultiplex() { - lock(); - try - { - return maxMultiplex; - } - finally - { - unlock(); - } + return super.getMaxMultiplex(); } @Override public void setMaxMultiplex(int maxMultiplex) { - lock(); - try - { - this.maxMultiplex = maxMultiplex; - } - finally - { - unlock(); - } + super.setMaxMultiplex(maxMultiplex); } @Override - public boolean isActive(Connection connection) + @ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed") + public int getMaxUsageCount() { - lock(); - try - { - if (muxedConnections.containsKey(connection)) - return true; - return busyConnections.containsKey(connection); - } - finally - { - unlock(); - } + return super.getMaxUsageCount(); } @Override - protected void onCreated(Connection connection) + public void setMaxUsageCount(int maxUsageCount) { - lock(); - try - { - // Use "cold" connections as last. - idleConnections.offer(new Holder(connection)); - } - finally - { - unlock(); - } - - idle(connection, false); - } - - @Override - protected Connection activate() - { - Holder holder; - lock(); - try - { - while (true) - { - if (muxedConnections.isEmpty()) - { - holder = idleConnections.poll(); - if (holder == null) - return null; - muxedConnections.put(holder.connection, holder); - } - else - { - holder = muxedConnections.values().iterator().next(); - } - - if (holder.count < maxMultiplex) - { - ++holder.count; - break; - } - else - { - muxedConnections.remove(holder.connection); - busyConnections.put(holder.connection, holder); - } - } - } - finally - { - unlock(); - } - - return active(holder.connection); - } - - @Override - public boolean release(Connection connection) - { - boolean closed = isClosed(); - boolean idle = false; - Holder holder; - lock(); - try - { - holder = muxedConnections.get(connection); - if (holder != null) - { - int count = --holder.count; - if (count == 0) - { - muxedConnections.remove(connection); - if (!closed) - { - idleConnections.offerFirst(holder); - idle = true; - } - } - } - else - { - holder = busyConnections.remove(connection); - if (holder != null) - { - int count = --holder.count; - if (!closed) - { - if (count == 0) - { - idleConnections.offerFirst(holder); - idle = true; - } - else - { - muxedConnections.put(connection, holder); - } - } - } - } - } - finally - { - unlock(); - } - - if (holder == null) - return false; - - released(connection); - if (idle || closed) - return idle(connection, closed); - return true; - } - - @Override - public boolean remove(Connection connection) - { - return remove(connection, false); - } - - protected boolean remove(Connection connection, boolean force) - { - boolean activeRemoved = true; - boolean idleRemoved = false; - lock(); - try - { - Holder holder = muxedConnections.remove(connection); - if (holder == null) - holder = busyConnections.remove(connection); - if (holder == null) - { - activeRemoved = false; - for (Iterator iterator = idleConnections.iterator(); iterator.hasNext(); ) - { - holder = iterator.next(); - if (holder.connection == connection) - { - idleRemoved = true; - iterator.remove(); - break; - } - } - } - } - finally - { - unlock(); - } - - if (activeRemoved || force) - released(connection); - boolean removed = activeRemoved || idleRemoved || force; - if (removed) - removed(connection); - return removed; - } - - @Override - public void close() - { - super.close(); - - List connections; - lock(); - try - { - connections = idleConnections.stream().map(holder -> holder.connection).collect(Collectors.toList()); - connections.addAll(muxedConnections.keySet()); - connections.addAll(busyConnections.keySet()); - } - finally - { - unlock(); - } - - close(connections); - } - - @Override - public void dump(Appendable out, String indent) throws IOException - { - DumpableCollection busy; - DumpableCollection muxed; - DumpableCollection idle; - lock(); - try - { - busy = new DumpableCollection("busy", new ArrayList<>(busyConnections.values())); - muxed = new DumpableCollection("muxed", new ArrayList<>(muxedConnections.values())); - idle = new DumpableCollection("idle", new ArrayList<>(idleConnections)); - } - finally - { - unlock(); - } - - Dumpable.dumpObjects(out, indent, this, busy, muxed, idle); - } - - @Override - public boolean sweep() - { - List toSweep = new ArrayList<>(); - lock(); - try - { - busyConnections.values().stream() - .map(holder -> holder.connection) - .filter(connection -> connection instanceof Sweeper.Sweepable) - .collect(Collectors.toCollection(() -> toSweep)); - muxedConnections.values().stream() - .map(holder -> holder.connection) - .filter(connection -> connection instanceof Sweeper.Sweepable) - .collect(Collectors.toCollection(() -> toSweep)); - } - finally - { - unlock(); - } - - for (Connection connection : toSweep) - { - if (((Sweeper.Sweepable)connection).sweep()) - { - boolean removed = remove(connection, true); - LOG.warn("Connection swept: {}{}{} from active connections{}{}", - connection, - System.lineSeparator(), - removed ? "Removed" : "Not removed", - System.lineSeparator(), - dump()); - } - } - - return false; - } - - @Override - public String toString() - { - int busySize; - int muxedSize; - int idleSize; - lock(); - try - { - busySize = busyConnections.size(); - muxedSize = muxedConnections.size(); - idleSize = idleConnections.size(); - } - finally - { - unlock(); - } - return String.format("%s@%x[c=%d/%d/%d,b=%d,m=%d,i=%d]", - getClass().getSimpleName(), - hashCode(), - getPendingConnectionCount(), - getConnectionCount(), - getMaxConnectionCount(), - busySize, - muxedSize, - idleSize); - } - - private static class Holder - { - private final Connection connection; - private int count; - - private Holder(Connection connection) - { - this.connection = connection; - } - - @Override - public String toString() - { - return String.format("%s[%d]", connection, count); - } + super.setMaxUsageCount(maxUsageCount); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 181fecf5dcb..57486da0360 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,244 +18,55 @@ package org.eclipse.jetty.client; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.api.Connection; -import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; @ManagedObject -public class RoundRobinConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable +public class RoundRobinConnectionPool extends MultiplexConnectionPool { - private final List entries; - private int maxMultiplex; - private int index; + private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class); - public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester) + private final AtomicInteger offset = new AtomicInteger(); + private final Pool pool; + + public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { this(destination, maxConnections, requester, 1); } - public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester, int maxMultiplex) + public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super((HttpDestination)destination, maxConnections, requester); - entries = new ArrayList<>(maxConnections); - for (int i = 0; i < maxConnections; ++i) - { - entries.add(new Entry()); - } - this.maxMultiplex = maxMultiplex; - } - - @Override - public int getMaxMultiplex() - { - synchronized (this) - { - return maxMultiplex; - } - } - - @Override - public void setMaxMultiplex(int maxMultiplex) - { - synchronized (this) - { - this.maxMultiplex = maxMultiplex; - } - } - - /** - *

Returns an idle connection, if available, following a round robin algorithm; - * otherwise it always tries to create a new connection, up until the max connection count.

- * - * @param create this parameter is ignored and assumed to be always {@code true} - * @return an idle connection or {@code null} if no idle connections are available - */ - @Override - protected Connection acquire(boolean create) - { - // The nature of this connection pool is such that a - // connection must always be present in the next slot. - return super.acquire(true); - } - - @Override - protected void onCreated(Connection connection) - { - synchronized (this) - { - for (Entry entry : entries) - { - if (entry.connection == null) - { - entry.connection = connection; - break; - } - } - } - idle(connection, false); + super(destination, maxConnections, false, requester, maxMultiplex); + pool = destination.getBean(Pool.class); } @Override protected Connection activate() { - Connection connection = null; - synchronized (this) - { - int offset = 0; - int capacity = getMaxConnectionCount(); - while (offset < capacity) - { - int idx = index + offset; - if (idx >= capacity) - idx -= capacity; - - Entry entry = entries.get(idx); - - if (entry.connection == null) - break; - - if (entry.active < getMaxMultiplex()) - { - ++entry.active; - ++entry.used; - connection = entry.connection; - index += offset + 1; - if (index >= capacity) - index -= capacity; - break; - } - - ++offset; - } - } - return connection == null ? null : active(connection); + int offset = this.offset.get(); + Connection connection = activate(offset); + if (connection != null) + this.offset.getAndIncrement(); + return connection; } - @Override - public boolean isActive(Connection connection) + private Connection activate(int offset) { - synchronized (this) + Pool.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries())); + if (LOG.isDebugEnabled()) + LOG.debug("activated '{}'", entry); + if (entry != null) { - for (Entry entry : entries) - { - if (entry.connection == connection) - return entry.active > 0; - } - return false; - } - } - - @Override - public boolean release(Connection connection) - { - boolean found = false; - boolean idle = false; - synchronized (this) - { - for (Entry entry : entries) - { - if (entry.connection == connection) - { - found = true; - int active = --entry.active; - idle = active == 0; - break; - } - } - } - if (!found) - return false; - released(connection); - if (idle) - return idle(connection, isClosed()); - return true; - } - - @Override - public boolean remove(Connection connection) - { - boolean found = false; - synchronized (this) - { - for (Entry entry : entries) - { - if (entry.connection == connection) - { - found = true; - entry.reset(); - break; - } - } - } - if (found) - { - released(connection); - removed(connection); - } - return found; - } - - @Override - public void dump(Appendable out, String indent) throws IOException - { - List connections; - synchronized (this) - { - connections = new ArrayList<>(entries); - } - Dumpable.dumpObjects(out, indent, out, connections); - } - - @Override - public String toString() - { - int present = 0; - int active = 0; - synchronized (this) - { - for (Entry entry : entries) - { - if (entry.connection != null) - { - ++present; - if (entry.active > 0) - ++active; - } - } - } - return String.format("%s@%x[c=%d/%d/%d,a=%d]", - getClass().getSimpleName(), - hashCode(), - getPendingConnectionCount(), - present, - getMaxConnectionCount(), - active - ); - } - - private static class Entry - { - private Connection connection; - private int active; - private long used; - - private void reset() - { - connection = null; - active = 0; - used = 0; - } - - @Override - public String toString() - { - return String.format("{u=%d,c=%s}", used, connection); + Connection connection = entry.getPooled(); + acquired(connection); + return connection; } + return null; } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java index b45c4004fe7..e18c55f07bb 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ValidatingConnectionPool.java @@ -19,16 +19,16 @@ package org.eclipse.jetty.client; import java.io.IOException; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Stream; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -67,10 +67,10 @@ public class ValidatingConnectionPool extends DuplexConnectionPool public ValidatingConnectionPool(Destination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout) { - super(destination, maxConnections, requester); + super((HttpDestination)destination, maxConnections, requester); this.scheduler = scheduler; this.timeout = timeout; - this.quarantine = new HashMap<>(maxConnections); + this.quarantine = new ConcurrentHashMap<>(maxConnections); } @ManagedAttribute(value = "The number of validating connections", readonly = true) @@ -82,21 +82,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool @Override public boolean release(Connection connection) { - lock(); - try - { - if (!getActiveConnections().remove(connection)) - return false; - Holder holder = new Holder(connection); - holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS); - quarantine.put(connection, holder); - if (LOG.isDebugEnabled()) - LOG.debug("Validating for {}ms {}", timeout, connection); - } - finally - { - unlock(); - } + Holder holder = new Holder(connection); + holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS); + quarantine.put(connection, holder); + if (LOG.isDebugEnabled()) + LOG.debug("Validating for {}ms {}", timeout, connection); released(connection); return true; @@ -105,16 +95,7 @@ public class ValidatingConnectionPool extends DuplexConnectionPool @Override public boolean remove(Connection connection) { - Holder holder; - lock(); - try - { - holder = quarantine.remove(connection); - } - finally - { - unlock(); - } + Holder holder = quarantine.remove(connection); if (holder == null) return super.remove(connection); @@ -130,25 +111,16 @@ public class ValidatingConnectionPool extends DuplexConnectionPool } @Override - protected void dump(Appendable out, String indent, Object... items) throws IOException + public void dump(Appendable out, String indent) throws IOException { DumpableCollection toDump = new DumpableCollection("quarantine", quarantine.values()); - super.dump(out, indent, Stream.concat(Stream.of(items), Stream.of(toDump))); + Dumpable.dumpObjects(out, indent, this, toDump); } @Override public String toString() { - int size; - lock(); - try - { - size = quarantine.size(); - } - finally - { - unlock(); - } + int size = quarantine.size(); return String.format("%s[v=%d]", super.toString(), size); } @@ -170,20 +142,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool if (done.compareAndSet(false, true)) { boolean closed = isClosed(); - lock(); - try - { - if (LOG.isDebugEnabled()) - LOG.debug("Validated {}", connection); - quarantine.remove(connection); - if (!closed) - deactivate(connection); - } - finally - { - unlock(); - } - + if (LOG.isDebugEnabled()) + LOG.debug("Validated {}", connection); + quarantine.remove(connection); + if (!closed) + deactivate(connection); idle(connection, closed); proceed(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index c0e02dfaa63..d5408d70d16 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -34,12 +34,13 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; -public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable +public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable { private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class); @@ -135,6 +136,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec return closed.get(); } + @Override + public void setAttachment(Object obj) + { + delegate.setAttachment(obj); + } + + @Override + public Object getAttachment() + { + return delegate.getAttachment(); + } + @Override public boolean onIdleExpired() { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java new file mode 100644 index 00000000000..ffe72e1ee27 --- /dev/null +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java @@ -0,0 +1,35 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Destination; + +public class ConnectionPoolHelper +{ + public static Connection acquire(AbstractConnectionPool connectionPool, boolean create) + { + return connectionPool.acquire(create); + } + + public static void tryCreate(AbstractConnectionPool connectionPool, int pending) + { + connectionPool.tryCreate(pending); + } +} diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index 39a60a16c6e..5d54401f2e8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -650,7 +650,7 @@ public class HttpClientTLSTest HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger the creation of a new connection, but don't use it. - connectionPool.tryCreate(-1); + ConnectionPoolHelper.tryCreate(connectionPool, -1); // Verify that the connection has been created. while (true) { @@ -746,7 +746,7 @@ public class HttpClientTLSTest HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger the creation of a new connection, but don't use it. - connectionPool.tryCreate(-1); + ConnectionPoolHelper.tryCreate(connectionPool, -1); // Verify that the connection has been created. while (true) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java index fcad29b9b03..c3d0dfc976d 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java @@ -67,13 +67,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest String host = "localhost"; int port = connector.getLocalPort(); HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scenario.getScheme(), host, port); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Collection idleConnections = connectionPool.getIdleConnections(); - assertEquals(0, idleConnections.size()); - - final Collection activeConnections = connectionPool.getActiveConnections(); - assertEquals(0, activeConnections.size()); + assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size()); + assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size()); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(3); @@ -82,8 +78,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest .onRequestSuccess(request -> successLatch.countDown()) .onResponseHeaders(response -> { - assertEquals(0, idleConnections.size()); - assertEquals(1, activeConnections.size()); + assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size()); + assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size()); headersLatch.countDown(); }) .send(new Response.Listener.Adapter() @@ -105,8 +101,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest assertTrue(headersLatch.await(30, TimeUnit.SECONDS)); assertTrue(successLatch.await(30, TimeUnit.SECONDS)); - assertEquals(1, idleConnections.size()); - assertEquals(0, activeConnections.size()); + assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size()); + assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size()); } @ParameterizedTest @@ -120,11 +116,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scenario.getScheme(), host, port); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - final Collection idleConnections = connectionPool.getIdleConnections(); - assertEquals(0, idleConnections.size()); - - final Collection activeConnections = connectionPool.getActiveConnections(); - assertEquals(0, activeConnections.size()); + assertEquals(0, connectionPool.getIdleConnections().size()); + assertEquals(0, connectionPool.getActiveConnections().size()); final CountDownLatch beginLatch = new CountDownLatch(1); final CountDownLatch failureLatch = new CountDownLatch(2); @@ -133,7 +126,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest @Override public void onBegin(Request request) { - activeConnections.iterator().next().close(); + connectionPool.getActiveConnections().iterator().next().close(); beginLatch.countDown(); } @@ -148,8 +141,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest public void onComplete(Result result) { assertTrue(result.isFailed()); - assertEquals(0, idleConnections.size()); - assertEquals(0, activeConnections.size()); + assertEquals(0, connectionPool.getIdleConnections().size()); + assertEquals(0, connectionPool.getActiveConnections().size()); failureLatch.countDown(); } }); @@ -157,8 +150,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest assertTrue(beginLatch.await(30, TimeUnit.SECONDS)); assertTrue(failureLatch.await(30, TimeUnit.SECONDS)); - assertEquals(0, idleConnections.size()); - assertEquals(0, activeConnections.size()); + assertEquals(0, connectionPool.getIdleConnections().size()); + assertEquals(0, connectionPool.getActiveConnections().size()); } @ParameterizedTest diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index a00363683e8..edc150c5fd8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; import org.eclipse.jetty.client.AbstractHttpClientServerTest; import org.eclipse.jetty.client.ConnectionPool; +import org.eclipse.jetty.client.ConnectionPoolHelper; import org.eclipse.jetty.client.DuplexConnectionPool; import org.eclipse.jetty.client.EmptyServerHandler; import org.eclipse.jetty.client.HttpClient; @@ -37,7 +38,6 @@ import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; -import org.eclipse.jetty.util.Callback; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -66,7 +66,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest Connection connection = connectionPool.acquire(); assertNull(connection); // There are no queued requests, so no connection should be created. - connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS); + connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS); assertNull(connection); } } @@ -77,17 +77,17 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest { start(scenario, new EmptyServerHandler()); - try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); - Connection connection = connectionPool.acquire(false); + Connection connection = ConnectionPoolHelper.acquire(connectionPool, false); if (connection == null) - connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS); + connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS); assertNotNull(connection); } } @@ -98,13 +98,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest { start(scenario, new EmptyServerHandler()); - try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); Connection connection1 = connectionPool.acquire(); if (connection1 == null) @@ -127,12 +127,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest CountDownLatch idleLatch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1); - try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())) + try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())) { @Override protected ConnectionPool newConnectionPool(HttpClient client) { - return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this) + return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this) { @Override protected void onCreated(Connection connection) @@ -153,10 +153,10 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest }) { destination.start(); - TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); // Make sure we entered idleCreated(). assertTrue(idleLatch.await(5, TimeUnit.SECONDS)); @@ -167,7 +167,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest assertNull(connection1); // Trigger creation of a second connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); // Second attempt also returns null because we delayed idleCreated() above. Connection connection2 = connectionPool.acquire(); @@ -176,9 +176,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest latch.countDown(); // There must be 2 idle connections. - Connection connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS); + Connection connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS); assertNotNull(connection); - connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS); + connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS); assertNotNull(connection); } } @@ -189,13 +189,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest { start(scenario, new EmptyServerHandler()); - try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); Connection connection1 = connectionPool.acquire(); if (connection1 == null) @@ -226,13 +226,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest long idleTimeout = 1000; client.setIdleTimeout(idleTimeout); - try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))) + try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))) { destination.start(); - TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool(); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); // Trigger creation of one connection. - connectionPool.tryCreate(1); + ConnectionPoolHelper.tryCreate(connectionPool, 1); Connection connection1 = connectionPool.acquire(); if (connection1 == null) @@ -243,7 +243,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); - connection1 = connectionPool.getIdleConnections().poll(); + connection1 = connectionPool.getIdleConnections().peek(); assertNull(connection1); } } @@ -350,11 +350,6 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest assertTrue(client.getDestinations().isEmpty(), "Destination must be removed after connection error"); } - private Connection pollIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException - { - return await(() -> connectionPool.getIdleConnections().poll(), time, unit); - } - private Connection peekIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException { return await(() -> connectionPool.getIdleConnections().peek(), time, unit); @@ -372,38 +367,4 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest } return null; } - - private static class TestDestination extends HttpDestinationOverHTTP - { - public TestDestination(HttpClient client, Origin origin) - { - super(client, origin); - } - - @Override - protected ConnectionPool newConnectionPool(HttpClient client) - { - return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this); - } - - public static class TestConnectionPool extends DuplexConnectionPool - { - public TestConnectionPool(Destination destination, int maxConnections, Callback requester) - { - super(destination, maxConnections, requester); - } - - @Override - public void tryCreate(int maxPending) - { - super.tryCreate(maxPending); - } - - @Override - public Connection acquire(boolean create) - { - return super.acquire(create); - } - } - } } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 52a0916936d..b8e484ea529 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -49,13 +49,14 @@ import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -public class HttpConnectionOverFCGI extends AbstractConnection implements Connection +public class HttpConnectionOverFCGI extends AbstractConnection implements Connection, Attachable { private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class); @@ -70,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec private final Delegate delegate; private final ClientParser parser; private RetainableByteBuffer networkBuffer; + private Object attachment; public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise promise, boolean multiplexed) { @@ -267,6 +269,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec return closed.get(); } + @Override + public void setAttachment(Object obj) + { + this.attachment = obj; + } + + @Override + public Object getAttachment() + { + return attachment; + } + protected boolean closeByHTTP(HttpFields fields) { if (multiplexed) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 2b8d4cb73d0..52db438d073 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -22,6 +22,7 @@ import java.io.Closeable; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.Frame; +import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; /** @@ -29,21 +30,8 @@ import org.eclipse.jetty.util.Callback; *

This class extends {@link Stream} by adding the methods required to * implement the HTTP/2 stream functionalities.

*/ -public interface IStream extends Stream, Closeable +public interface IStream extends Stream, Attachable, Closeable { - /** - * @return the object attached to this stream - * @see #setAttachment(Object) - */ - Object getAttachment(); - - /** - * Attaches the given object to this stream for later retrieval. - * - * @param attachment the object to attach to this stream - */ - void setAttachment(Object attachment); - /** * @return whether this stream is local or remote */ diff --git a/jetty-jmh/pom.xml b/jetty-jmh/pom.xml index eb1a92181b7..5b185ad170f 100644 --- a/jetty-jmh/pom.xml +++ b/jetty-jmh/pom.xml @@ -93,6 +93,11 @@ jetty-http ${project.version} + + org.eclipse.jetty + jetty-client + ${project.version} + javax.servlet javax.servlet-api diff --git a/jetty-jmh/src/main/java/org/eclipse/jetty/client/jmh/ConnectionPoolsBenchmark.java b/jetty-jmh/src/main/java/org/eclipse/jetty/client/jmh/ConnectionPoolsBenchmark.java new file mode 100644 index 00000000000..0849bf57858 --- /dev/null +++ b/jetty-jmh/src/main/java/org/eclipse/jetty/client/jmh/ConnectionPoolsBenchmark.java @@ -0,0 +1,180 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client.jmh; + +import java.net.URI; +import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; + +import org.eclipse.jetty.client.ConnectionPool; +import org.eclipse.jetty.client.DuplexConnectionPool; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpConversation; +import org.eclipse.jetty.client.HttpDestination; +import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpRequest; +import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.Origin; +import org.eclipse.jetty.client.RoundRobinConnectionPool; +import org.eclipse.jetty.client.SendFailure; +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.util.Attachable; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Promise; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.infra.Blackhole; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +public class ConnectionPoolsBenchmark +{ + private ConnectionPool pool; + + @Param({"round-robin", "cached/multiplex", "uncached/multiplex", "cached/duplex", "uncached/duplex"}) + public static String POOL_TYPE; + + @Setup + public void setUp() throws Exception + { + HttpClient httpClient = new HttpClient() + { + @Override + protected void newConnection(HttpDestination destination, Promise promise) + { + promise.succeeded(new MockConnection()); + } + }; + HttpDestination httpDestination = new HttpDestination(httpClient, new Origin("http", "localhost", 8080)) + { + @Override + protected SendFailure send(Connection connection, HttpExchange exchange) + { + return null; + } + }; + + HttpConversation httpConversation = new HttpConversation(); + HttpRequest httpRequest = new HttpRequest(httpClient, httpConversation, new URI("http://localhost:8080")) {}; + HttpExchange httpExchange = new HttpExchange(httpDestination, httpRequest, new ArrayList<>()); + httpDestination.getHttpExchanges().add(httpExchange); + + int initialConnections = 12; + int maxConnections = 100; + switch (POOL_TYPE) + { + case "uncached/duplex": + pool = new DuplexConnectionPool(httpDestination, maxConnections, false, Callback.NOOP); + pool.preCreateConnections(initialConnections).get(); + break; + case "cached/duplex": + pool = new DuplexConnectionPool(httpDestination, maxConnections, true, Callback.NOOP); + pool.preCreateConnections(initialConnections).get(); + break; + case "uncached/multiplex": + pool = new MultiplexConnectionPool(httpDestination, maxConnections,false, Callback.NOOP, 12); + pool.preCreateConnections(initialConnections).get(); + break; + case "cached/multiplex": + pool = new MultiplexConnectionPool(httpDestination, maxConnections,true, Callback.NOOP, 12); + pool.preCreateConnections(initialConnections).get(); + break; + case "round-robin": + pool = new RoundRobinConnectionPool(httpDestination, maxConnections, Callback.NOOP); + pool.preCreateConnections(maxConnections).get(); + break; + default: + throw new AssertionError("Unknown pool type: " + POOL_TYPE); + } + } + + @TearDown + public void tearDown() + { + pool.close(); + pool = null; + } + + @Benchmark + public void testPool() + { + Connection connection = pool.acquire(); + if (connection == null && !POOL_TYPE.equals("round-robin")) + throw new AssertionError("from thread " + Thread.currentThread().getName()); + Blackhole.consumeCPU(ThreadLocalRandom.current().nextInt(10, 20)); + if (connection != null) + pool.release(connection); + } + + public static void main(String[] args) throws RunnerException + { + Options opt = new OptionsBuilder() + .include(ConnectionPoolsBenchmark.class.getSimpleName()) + .warmupIterations(3) + .measurementIterations(3) + .forks(1) + .threads(12) + //.addProfiler(LinuxPerfProfiler.class) + .build(); + + new Runner(opt).run(); + } + + static class MockConnection implements Connection, Attachable + { + private Object attachment; + + @Override + public void close() + { + } + + @Override + public boolean isClosed() + { + return false; + } + + @Override + public void send(Request request, Response.CompleteListener listener) + { + } + + @Override + public void setAttachment(Object obj) + { + this.attachment = obj; + } + + @Override + public Object getAttachment() + { + return attachment; + } + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Attachable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Attachable.java new file mode 100644 index 00000000000..3494a6410ed --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Attachable.java @@ -0,0 +1,38 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +/** + * Abstract mechanism to support attachment of miscellaneous objects. + */ +public interface Attachable +{ + /** + * @return the object attached to this stream + * @see #setAttachment(Object) + */ + Object getAttachment(); + + /** + * Attaches the given object to this stream for later retrieval. + * + * @param attachment the object to attach to this stream + */ + void setAttachment(Object attachment); +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java new file mode 100644 index 00000000000..5c118efdd97 --- /dev/null +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -0,0 +1,457 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + * A fast container of poolable objects, with optional support for + * multiplexing, max usage count and thread-local caching. + *

+ * The thread-local caching mechanism is about remembering up to N previously + * used entries into a thread-local single-threaded collection. + * When that collection is not empty, its entries are removed one by one + * during acquisition until an entry that can be acquired is found. + * This can greatly speed up acquisition when both the acquisition and the + * release of the entries is done on the same thread as this avoids iterating + * the global, thread-safe collection of entries. + * @param + */ +public class Pool implements AutoCloseable, Dumpable +{ + private static final Logger LOGGER = Log.getLogger(Pool.class); + + private final List sharedList = new CopyOnWriteArrayList<>(); + /* + * The cache is used to avoid hammering on the first index of the entry list. + * Caches can become poisoned (i.e.: containing entries that are in use) when + * the release isn't done by the acquiring thread or when the entry pool is + * undersized compared to the load applied on it. + * When an entry can't be found in the cache, the global list is iterated + * normally so the cache has no visible effect besides performance. + */ + private final ThreadLocal> cache; + private final Lock lock = new ReentrantLock(); + private final int maxEntries; + private final int cacheSize; + private volatile boolean closed; + private volatile int maxMultiplex = 1; + private volatile int maxUsageCount = -1; + + /** + * Construct a Pool with the specified thread-local cache size. + * + * @param maxEntries the maximum amount of entries that the pool will accept. + * @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled. + */ + public Pool(int maxEntries, int cacheSize) + { + this.maxEntries = maxEntries; + this.cacheSize = cacheSize; + if (cacheSize > 0) + this.cache = ThreadLocal.withInitial(() -> new ArrayList(cacheSize)); + else + this.cache = null; + } + + public int getPendingConnectionCount() + { + return (int)sharedList.stream().filter(entry -> entry.getPooled() == null).count(); + } + + public int getIdleConnectionCount() + { + return (int)sharedList.stream().filter(Entry::isIdle).count(); + } + + public int getInUseConnectionCount() + { + return (int)sharedList.stream().filter(entry -> !entry.isIdle()).count(); + } + + public int getMaxEntries() + { + return maxEntries; + } + + public int getMaxMultiplex() + { + return maxMultiplex; + } + + public final void setMaxMultiplex(int maxMultiplex) + { + if (maxMultiplex < 1) + throw new IllegalArgumentException("Max multiplex must be >= 1"); + this.maxMultiplex = maxMultiplex; + } + + public int getMaxUsageCount() + { + return maxUsageCount; + } + + public final void setMaxUsageCount(int maxUsageCount) + { + if (maxUsageCount == 0) + throw new IllegalArgumentException("Max usage count must be != 0"); + this.maxUsageCount = maxUsageCount; + } + + /** + * Create a new disabled slot into the pool. The returned entry + * won't be acquirable as long as {@link Entry#enable(Object)} + * has not been called. + * + * @param maxReservations the max desired number of reserved entries, + * or a negative number to always trigger the reservation of a new entry. + * @return a disabled entry that is contained in the pool, + * or null if the pool is closed or if the pool already contains + * {@link #getMaxEntries()} entries. + */ + public Entry reserve(int maxReservations) + { + if (maxReservations >= 0 && getPendingConnectionCount() >= maxReservations) + return null; + + lock.lock(); + try + { + if (!closed && sharedList.size() < maxEntries) + { + Entry entry = new Entry(); + sharedList.add(entry); + return entry; + } + return null; + } + finally + { + lock.unlock(); + } + } + + /** + * Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism. + * + * @param idx the index of the entry to acquire. + * @return the specified entry or null if there is none at the specified index or if it is not available. + */ + public Entry acquireAt(int idx) + { + if (closed) + return null; + + try + { + Entry entry = sharedList.get(idx); + if (entry.tryAcquire()) + return entry; + } + catch (IndexOutOfBoundsException e) + { + // no entry at that index + } + return null; + } + + /** + * Acquire an entry from the pool. + * + * @return an entry from the pool or null if none is available. + */ + public Entry acquire() + { + if (closed) + return null; + + // first check the thread-local cache + if (cache != null) + { + List cachedList = cache.get(); + while (!cachedList.isEmpty()) + { + Entry cachedEntry = cachedList.remove(cachedList.size() - 1); + if (cachedEntry.tryAcquire()) + return cachedEntry; + } + } + + // then iterate the shared list + for (Entry entry : sharedList) + { + if (entry.tryAcquire()) + return entry; + } + return null; + } + + /** + * This method will return an acquired object to the pool. Objects + * that are acquired from the pool but never released will result + * in a memory leak. + * + * @param entry the value to return to the pool + * @return true if the entry was released and could be acquired again, + * false if the entry should be removed by calling {@link #remove(Pool.Entry)} + * and the object contained by the entry should be disposed. + * @throws NullPointerException if value is null + */ + public boolean release(Entry entry) + { + if (closed) + return false; + + // first mark it as unused + boolean reusable = entry.tryRelease(); + + // then cache the released entry + if (cache != null && reusable) + { + List cachedList = cache.get(); + if (cachedList.size() < cacheSize) + cachedList.add(entry); + } + return reusable; + } + + /** + * Remove a value from the pool. + * + * @param entry the value to remove + * @return true if the entry was removed, false otherwise + */ + public boolean remove(Entry entry) + { + if (closed) + return false; + + if (!entry.tryRemove()) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Attempt to remove an object from the pool that is still in use: {}", entry); + return false; + } + + boolean removed = sharedList.remove(entry); + if (!removed) + { + if (LOGGER.isDebugEnabled()) + LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry); + } + + return removed; + } + + public boolean isClosed() + { + return closed; + } + + @Override + public void close() + { + List copy; + lock.lock(); + try + { + closed = true; + copy = new ArrayList<>(sharedList); + sharedList.clear(); + } + finally + { + lock.unlock(); + } + + // iterate the copy and close its entries + for (Entry entry : copy) + { + if (entry.tryRemove() && entry.pooled instanceof Closeable) + { + try + { + ((Closeable)entry.pooled).close(); + } + catch (IOException e) + { + LOGGER.warn("Error closing entry {}", entry, e); + } + } + } + } + + public int size() + { + return sharedList.size(); + } + + public Collection values() + { + return Collections.unmodifiableCollection(sharedList); + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + Dumpable.dumpObjects(out, indent, this); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList; + } + + public class Entry + { + // hi: positive=open/maxUsage counter,negative=closed lo: multiplexing counter + private final AtomicBiInteger state; + private volatile T pooled; + + public Entry() + { + this.state = new AtomicBiInteger(-1, 0); + } + + public T getPooled() + { + return pooled; + } + + public void enable(T pooled) + { + if (!isClosed()) + throw new IllegalStateException("Open entries cannot be enabled : " + this); + Objects.requireNonNull(pooled); + this.pooled = pooled; + state.set(0, 0); + } + + /** + * Try to acquire the entry if possible by incrementing both the usage + * count and the multiplex count. + * @return true if the usage count is <= maxUsageCount and + * the multiplex count is maxMultiplex and the entry is not closed, + * false otherwise. + */ + public boolean tryAcquire() + { + while (true) + { + long encoded = state.get(); + int usageCount = AtomicBiInteger.getHi(encoded); + boolean closed = usageCount < 0; + int multiplexingCount = AtomicBiInteger.getLo(encoded); + int currentMaxUsageCount = maxUsageCount; + if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount)) + return false; + + if (state.compareAndSet(encoded, usageCount + 1, multiplexingCount + 1)) + return true; + } + } + + /** + * Try to release the entry if possible by decrementing the multiplexing + * count unless the entity is closed. + * @return true if the entry was released, + * false if {@link #tryRemove()} should be called. + */ + public boolean tryRelease() + { + int newMultiplexingCount; + int usageCount; + while (true) + { + long encoded = state.get(); + usageCount = AtomicBiInteger.getHi(encoded); + boolean closed = usageCount < 0; + if (closed) + return false; + + newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1; + if (newMultiplexingCount < 0) + throw new IllegalStateException("Cannot release an already released entry"); + + if (state.compareAndSet(encoded, usageCount, newMultiplexingCount)) + break; + } + + int currentMaxUsageCount = maxUsageCount; + boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; + return !(overUsed && newMultiplexingCount == 0); + } + + /** + * Try to mark the entry as removed. + * @return true if the entry has to be removed from the containing pool, false otherwise. + */ + public boolean tryRemove() + { + while (true) + { + long encoded = state.get(); + int usageCount = AtomicBiInteger.getHi(encoded); + int multiplexCount = AtomicBiInteger.getLo(encoded); + int newMultiplexCount = Math.max(multiplexCount - 1, 0); + + boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount); + if (removed) + return newMultiplexCount == 0; + } + } + + public boolean isClosed() + { + return state.getHi() < 0; + } + + public boolean isIdle() + { + return state.getLo() <= 0; + } + + public int getUsageCount() + { + return Math.max(state.getHi(), 0); + } + + @Override + public String toString() + { + long encoded = state.get(); + return super.toString() + " stateHi=" + AtomicBiInteger.getHi(encoded) + + " stateLo=" + AtomicBiInteger.getLo(encoded) + " pooled=" + pooled; + } + } +} diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java new file mode 100644 index 00000000000..1259c8503ce --- /dev/null +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -0,0 +1,442 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.io.Closeable; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.jupiter.api.Test; + +import static java.util.stream.Collectors.toList; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.sameInstance; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class PoolTest +{ + + @Test + public void testAcquireRelease() + { + Pool pool = new Pool<>(1,0); + pool.reserve(-1).enable("aaa"); + + assertThat(pool.values().stream().findFirst().get().isIdle(), is(true)); + Pool.Entry e1 = pool.acquire(); + assertThat(e1.getPooled(), equalTo("aaa")); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(false)); + assertThat(pool.acquire(), nullValue()); + assertThat(pool.release(e1), is(true)); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(true)); + assertThrows(IllegalStateException.class, () -> pool.release(e1)); + Pool.Entry e2 = pool.acquire(); + assertThat(e2.getPooled(), equalTo("aaa")); + assertThat(pool.release(e2), is(true)); + assertThrows(NullPointerException.class, () -> pool.release(null)); + } + + @Test + public void testRemoveBeforeRelease() + { + Pool pool = new Pool<>(1,0); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.remove(e1), is(true)); + assertThat(pool.remove(e1), is(false)); + assertThat(pool.release(e1), is(false)); + } + + @Test + public void testCloseBeforeRelease() + { + Pool pool = new Pool<>(1,0); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.size(), is(1)); + pool.close(); + assertThat(pool.size(), is(0)); + assertThat(pool.release(e1), is(false)); + } + + @Test + public void testMaxPoolSize() + { + Pool pool = new Pool<>(1, 0); + assertThat(pool.size(), is(0)); + assertThat(pool.reserve(-1), notNullValue()); + assertThat(pool.size(), is(1)); + assertThat(pool.reserve(-1), nullValue()); + assertThat(pool.size(), is(1)); + } + + @Test + public void testReserve() + { + Pool pool = new Pool<>(2, 0); + Pool.Entry entry = pool.reserve(-1); + assertThat(pool.size(), is(1)); + assertThat(pool.acquire(), nullValue()); + assertThat(entry.isClosed(), is(true)); + + assertThrows(NullPointerException.class, () -> entry.enable(null)); + assertThat(pool.acquire(), nullValue()); + assertThat(entry.isClosed(), is(true)); + + entry.enable("aaa"); + assertThat(entry.isClosed(), is(false)); + assertThat(pool.acquire().getPooled(), notNullValue()); + + assertThrows(IllegalStateException.class, () -> entry.enable("bbb")); + + Pool.Entry e2 = pool.reserve(-1); + assertThat(pool.size(), is(2)); + assertThat(pool.remove(e2), is(true)); + assertThat(pool.size(), is(1)); + + pool.reserve(-1); + assertThat(pool.size(), is(2)); + pool.close(); + assertThat(pool.size(), is(0)); + assertThat(pool.reserve(-1), nullValue()); + assertThat(entry.isClosed(), is(true)); + } + + @Test + public void testReserveMaxPending() + { + Pool pool = new Pool<>(2, 0); + assertThat(pool.reserve(0), nullValue()); + assertThat(pool.reserve(1), notNullValue()); + assertThat(pool.reserve(1), nullValue()); + assertThat(pool.reserve(2), notNullValue()); + assertThat(pool.reserve(2), nullValue()); + assertThat(pool.reserve(3), nullValue()); + assertThat(pool.reserve(-1), nullValue()); + } + + @Test + public void testReserveNegativeMaxPending() + { + Pool pool = new Pool<>(2, 0); + assertThat(pool.reserve(-1), notNullValue()); + assertThat(pool.reserve(-1), notNullValue()); + assertThat(pool.reserve(-1), nullValue()); + } + + @Test + public void testClose() + { + Pool pool = new Pool<>(1, 0); + pool.reserve(-1).enable("aaa"); + assertThat(pool.isClosed(), is(false)); + pool.close(); + pool.close(); + + assertThat(pool.isClosed(), is(true)); + assertThat(pool.size(), is(0)); + assertThat(pool.acquire(), nullValue()); + assertThat(pool.reserve(-1), nullValue()); + } + + @Test + public void testClosingCloseable() + { + AtomicBoolean closed = new AtomicBoolean(); + Pool pool = new Pool<>(1,0); + Closeable pooled = () -> closed.set(true); + pool.reserve(-1).enable(pooled); + assertThat(closed.get(), is(false)); + pool.close(); + assertThat(closed.get(), is(true)); + } + + @Test + public void testRemove() + { + Pool pool = new Pool<>(1, 0); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.remove(e1), is(true)); + assertThat(pool.remove(e1), is(false)); + assertThat(pool.release(e1), is(false)); + assertThat(pool.acquire(), nullValue()); + assertThrows(NullPointerException.class, () -> pool.remove(null)); + } + + @Test + public void testValuesSize() + { + Pool pool = new Pool<>(2, 0); + + assertThat(pool.size(), is(0)); + assertThat(pool.values().isEmpty(), is(true)); + pool.reserve(-1).enable("aaa"); + pool.reserve(-1).enable("bbb"); + assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb"))); + assertThat(pool.size(), is(2)); + } + + @Test + public void testValuesContainsAcquiredEntries() + { + Pool pool = new Pool<>(2, 0); + + pool.reserve(-1).enable("aaa"); + pool.reserve(-1).enable("bbb"); + assertThat(pool.acquire(), notNullValue()); + assertThat(pool.acquire(), notNullValue()); + assertThat(pool.acquire(), nullValue()); + assertThat(pool.values().isEmpty(), is(false)); + } + + @Test + public void testAcquireAt() + { + Pool pool = new Pool<>(2, 0); + + pool.reserve(-1).enable("aaa"); + pool.reserve(-1).enable("bbb"); + + assertThat(pool.acquireAt(2), nullValue()); + assertThat(pool.acquireAt(0), notNullValue()); + assertThat(pool.acquireAt(0), nullValue()); + assertThat(pool.acquireAt(1), notNullValue()); + assertThat(pool.acquireAt(1), nullValue()); + } + + @Test + public void testMaxUsageCount() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxUsageCount(3); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.release(e1), is(true)); + e1 = pool.acquire(); + assertThat(pool.release(e1), is(true)); + e1 = pool.acquire(); + assertThat(pool.release(e1), is(false)); + assertThat(pool.acquire(), nullValue()); + assertThat(pool.size(), is(1)); + assertThat(pool.remove(e1), is(true)); + assertThat(pool.remove(e1), is(false)); + assertThat(pool.size(), is(0)); + Pool.Entry e1Copy = e1; + assertThat(pool.release(e1Copy), is(false)); + } + + @Test + public void testMaxMultiplex() + { + Pool pool = new Pool<>(2, 0); + pool.setMaxMultiplex(3); + pool.reserve(-1).enable("aaa"); + pool.reserve(-1).enable("bbb"); + + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); + Pool.Entry e3 = pool.acquire(); + Pool.Entry e4 = pool.acquire(); + assertThat(e1.getPooled(), equalTo("aaa")); + assertThat(e1, sameInstance(e2)); + assertThat(e1, sameInstance(e3)); + assertThat(e4.getPooled(), equalTo("bbb")); + assertThat(pool.release(e1), is(true)); + Pool.Entry e5 = pool.acquire(); + assertThat(e2, sameInstance(e5)); + Pool.Entry e6 = pool.acquire(); + assertThat(e4, sameInstance(e6)); + } + + @Test + public void testRemoveMultiplexed() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(false)); + + assertThat(pool.remove(e1), is(false)); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(false)); + assertThat(pool.values().stream().findFirst().get().isClosed(), is(true)); + assertThat(pool.remove(e1), is(true)); + assertThat(pool.size(), is(0)); + + assertThat(pool.remove(e1), is(false)); + + assertThat(pool.release(e1), is(false)); + + assertThat(pool.remove(e1), is(false)); + } + + @Test + public void testMultiplexRemoveThenAcquireThenReleaseRemove() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); + + assertThat(pool.remove(e1), is(false)); + assertThat(e1.isClosed(), is(true)); + assertThat(pool.acquire(), nullValue()); + assertThat(pool.release(e2), is(false)); + assertThat(pool.remove(e2), is(true)); + } + + @Test + public void testNonMultiplexRemoveAfterAcquire() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.remove(e1), is(true)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testMultiplexRemoveAfterAcquire() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); + + assertThat(pool.remove(e1), is(false)); + assertThat(pool.remove(e2), is(true)); + assertThat(pool.size(), is(0)); + + assertThat(pool.release(e1), is(false)); + assertThat(pool.size(), is(0)); + + Pool.Entry e3 = pool.acquire(); + assertThat(e3, nullValue()); + + assertThat(pool.release(e2), is(false)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testReleaseThenRemoveNonEnabledEntry() + { + Pool pool = new Pool<>(1, 0); + Pool.Entry e = pool.reserve(-1); + assertThat(pool.size(), is(1)); + assertThat(pool.release(e), is(false)); + assertThat(pool.size(), is(1)); + assertThat(pool.remove(e), is(true)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testRemoveNonEnabledEntry() + { + Pool pool = new Pool<>(1, 0); + Pool.Entry e = pool.reserve(-1); + assertThat(pool.size(), is(1)); + assertThat(pool.remove(e), is(true)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testMultiplexMaxUsageReachedAcquireThenRemove() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.setMaxUsageCount(3); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e0 = pool.acquire(); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.release(e1), is(true)); + Pool.Entry e2 = pool.acquire(); + assertThat(pool.release(e2), is(true)); + assertThat(pool.acquire(), nullValue()); + + assertThat(pool.remove(e0), is(true)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.setMaxUsageCount(3); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e0 = pool.acquire(); + + Pool.Entry e1 = pool.acquire(); + assertThat(pool.release(e1), is(true)); + Pool.Entry e2 = pool.acquire(); + assertThat(pool.release(e2), is(true)); + assertThat(pool.acquire(), nullValue()); + + assertThat(pool.release(e0), is(false)); + assertThat(pool.values().stream().findFirst().get().isIdle(), is(true)); + assertThat(pool.values().stream().findFirst().get().isClosed(), is(false)); + assertThat(pool.size(), is(1)); + assertThat(pool.remove(e0), is(true)); + assertThat(pool.size(), is(0)); + } + + @Test + public void testUsageCountAfterReachingMaxMultiplexLimit() + { + Pool pool = new Pool<>(1, 0); + pool.setMaxMultiplex(2); + pool.setMaxUsageCount(10); + pool.reserve(-1).enable("aaa"); + + Pool.Entry e1 = pool.acquire(); + assertThat(e1.getUsageCount(), is(1)); + Pool.Entry e2 = pool.acquire(); + assertThat(e1.getUsageCount(), is(2)); + assertThat(pool.acquire(), nullValue()); + assertThat(e1.getUsageCount(), is(2)); + } + + @Test + public void testConfigLimits() + { + assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxMultiplex(0)); + assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxMultiplex(-1)); + assertThrows(IllegalArgumentException.class, () -> new Pool(1, 0).setMaxUsageCount(0)); + } +}