Re-implement HTTP connection pooling with a non-queuing algorithm
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
66ec16006e
commit
e9dad975e6
|
@ -18,69 +18,113 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.AtomicBiInteger;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
import static java.util.stream.Collectors.toCollection;
|
||||
|
||||
@ManagedObject
|
||||
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
||||
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
|
||||
|
||||
/**
|
||||
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
|
||||
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
|
||||
*/
|
||||
private final AtomicBiInteger connections = new AtomicBiInteger();
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
private final HttpDestination destination;
|
||||
private final int maxConnections;
|
||||
private final Callback requester;
|
||||
private final Pool<Connection> pool;
|
||||
|
||||
/**
|
||||
* @param destination the correspondent destination
|
||||
* @param maxConnections the max number of connections
|
||||
* @param requester the callback to notify about new connection creation/failure
|
||||
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, Callback)} instead
|
||||
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
|
||||
*/
|
||||
@Deprecated
|
||||
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
this((HttpDestination)destination, maxConnections, requester);
|
||||
this((HttpDestination)destination, maxConnections, true, requester);
|
||||
}
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
this.destination = destination;
|
||||
this.maxConnections = maxConnections;
|
||||
this.requester = requester;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection> pool = destination.getBean(Pool.class);
|
||||
if (pool == null)
|
||||
{
|
||||
pool = new Pool<>(maxConnections, cache ? 1 : 0);
|
||||
destination.addBean(pool);
|
||||
}
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
protected HttpDestination getHttpDestination()
|
||||
@Override
|
||||
public CompletableFuture<Void> preCreateConnections(int connectionCount)
|
||||
{
|
||||
return destination;
|
||||
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
|
||||
for (int i = 0; i < connectionCount; i++)
|
||||
{
|
||||
futures[i] = tryCreateReturningFuture(pool.getMaxEntries());
|
||||
}
|
||||
return CompletableFuture.allOf(futures);
|
||||
}
|
||||
|
||||
protected int getMaxMultiplex()
|
||||
{
|
||||
return pool.getMaxMultiplex();
|
||||
}
|
||||
|
||||
protected void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
pool.setMaxMultiplex(maxMultiplex);
|
||||
}
|
||||
|
||||
protected int getMaxUsageCount()
|
||||
{
|
||||
return pool.getMaxUsageCount();
|
||||
}
|
||||
|
||||
protected void setMaxUsageCount(int maxUsageCount)
|
||||
{
|
||||
pool.setMaxUsageCount(maxUsageCount);
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of active connections", readonly = true)
|
||||
public int getActiveConnectionCount()
|
||||
{
|
||||
return pool.getInUseConnectionCount();
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of idle connections", readonly = true)
|
||||
public int getIdleConnectionCount()
|
||||
{
|
||||
return pool.getIdleConnectionCount();
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The max number of connections", readonly = true)
|
||||
public int getMaxConnectionCount()
|
||||
{
|
||||
return maxConnections;
|
||||
return pool.getMaxEntries();
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of connections", readonly = true)
|
||||
public int getConnectionCount()
|
||||
{
|
||||
return connections.getLo();
|
||||
return pool.size();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,19 +141,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
@ManagedAttribute(value = "The number of pending connections", readonly = true)
|
||||
public int getPendingConnectionCount()
|
||||
{
|
||||
return connections.getHi();
|
||||
return pool.getPendingConnectionCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEmpty()
|
||||
{
|
||||
return connections.getLo() == 0;
|
||||
return pool.size() == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closed.get();
|
||||
return pool.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -152,88 +196,165 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
*/
|
||||
protected void tryCreate(int maxPending)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
long encoded = connections.get();
|
||||
int pending = AtomicBiInteger.getHi(encoded);
|
||||
int total = AtomicBiInteger.getLo(encoded);
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("tryCreate {}/{} connections {}/{} pending", total, maxConnections, pending, maxPending);
|
||||
|
||||
if (total >= maxConnections)
|
||||
return;
|
||||
|
||||
if (maxPending >= 0 && pending >= maxPending)
|
||||
return;
|
||||
|
||||
if (connections.compareAndSet(encoded, pending + 1, total + 1))
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("newConnection {}/{} connections {}/{} pending", total + 1, maxConnections, pending + 1, maxPending);
|
||||
|
||||
destination.newConnection(new Promise<Connection>()
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Connection connection)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection {}/{} creation succeeded {}", total + 1, maxConnections, connection);
|
||||
connections.add(-1, 0);
|
||||
onCreated(connection);
|
||||
proceed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection " + (total + 1) + "/" + maxConnections + " creation failed", x);
|
||||
connections.add(-1, -1);
|
||||
requester.failed(x);
|
||||
}
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
tryCreateReturningFuture(maxPending);
|
||||
}
|
||||
|
||||
protected abstract void onCreated(Connection connection);
|
||||
private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
|
||||
{
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
|
||||
|
||||
Pool<Connection>.Entry entry = pool.reserve(maxPending);
|
||||
if (entry == null)
|
||||
{
|
||||
future.complete(null);
|
||||
return future;
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
|
||||
|
||||
destination.newConnection(new Promise<Connection>()
|
||||
{
|
||||
@Override
|
||||
public void succeeded(Connection connection)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
|
||||
adopt(entry, connection);
|
||||
future.complete(null);
|
||||
proceed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void failed(Throwable x)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
|
||||
pool.remove(entry);
|
||||
future.completeExceptionally(x);
|
||||
requester.failed(x);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
}
|
||||
|
||||
protected void proceed()
|
||||
{
|
||||
requester.succeeded();
|
||||
}
|
||||
|
||||
protected abstract Connection activate();
|
||||
|
||||
protected Connection active(Connection connection)
|
||||
private void adopt(Pool<Connection>.Entry entry, Connection connection)
|
||||
{
|
||||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
attachable.setAttachment(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection active {}", connection);
|
||||
acquired(connection);
|
||||
return connection;
|
||||
LOG.debug("onCreating {}", entry);
|
||||
onCreated(connection);
|
||||
entry.enable(connection);
|
||||
idle(connection, false);
|
||||
}
|
||||
|
||||
protected void acquired(Connection connection)
|
||||
protected Connection activate()
|
||||
{
|
||||
Pool<Connection>.Entry entry = pool.acquire();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activated '{}'", entry);
|
||||
if (entry != null)
|
||||
{
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(Connection connection)
|
||||
{
|
||||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("isActive {}", entry);
|
||||
return !entry.isIdle();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
if (!deactivate(connection))
|
||||
return false;
|
||||
released(connection);
|
||||
return idle(connection, isClosed());
|
||||
}
|
||||
|
||||
protected boolean deactivate(Connection connection)
|
||||
{
|
||||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
return true;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("releasing {}", entry);
|
||||
boolean reusable = pool.release(entry);
|
||||
if (!reusable)
|
||||
{
|
||||
remove(connection);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
return remove(connection, false);
|
||||
}
|
||||
|
||||
protected boolean remove(Connection connection, boolean force)
|
||||
{
|
||||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
return false;
|
||||
attachable.setAttachment(null);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("removing {}", entry);
|
||||
boolean removed = pool.remove(entry);
|
||||
if (removed || force)
|
||||
{
|
||||
released(connection);
|
||||
removed(connection);
|
||||
}
|
||||
return removed;
|
||||
}
|
||||
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
protected boolean idle(Connection connection, boolean close)
|
||||
{
|
||||
if (close)
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection idle close {}", connection);
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection idle {}", connection);
|
||||
return true;
|
||||
}
|
||||
return !close;
|
||||
}
|
||||
|
||||
protected void acquired(Connection connection)
|
||||
{
|
||||
}
|
||||
|
||||
protected void released(Connection connection)
|
||||
|
@ -242,28 +363,78 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
|
||||
protected void removed(Connection connection)
|
||||
{
|
||||
int pooled = connections.addAndGetLo(-1);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Relying on this method indicates a reliance on the implementation details.
|
||||
* @return an unmodifiable queue working as a view of the idle connections.
|
||||
*/
|
||||
@Deprecated
|
||||
public Queue<Connection> getIdleConnections()
|
||||
{
|
||||
return pool.values().stream()
|
||||
.filter(Pool.Entry::isIdle)
|
||||
.filter(entry -> !entry.isClosed())
|
||||
.map(Pool.Entry::getPooled)
|
||||
.collect(toCollection(ArrayDeque::new));
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Relying on this method indicates a reliance on the implementation details.
|
||||
* @return an unmodifiable collection working as a view of the active connections.
|
||||
*/
|
||||
@Deprecated
|
||||
public Collection<Connection> getActiveConnections()
|
||||
{
|
||||
return pool.values().stream()
|
||||
.filter(entry -> !entry.isIdle())
|
||||
.filter(entry -> !entry.isClosed())
|
||||
.map(Pool.Entry::getPooled)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
if (closed.compareAndSet(false, true))
|
||||
{
|
||||
connections.set(0, 0);
|
||||
}
|
||||
}
|
||||
|
||||
protected void close(Collection<Connection> connections)
|
||||
{
|
||||
connections.forEach(Connection::close);
|
||||
pool.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String dump()
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
return Dumpable.dump(this);
|
||||
Dumpable.dumpObjects(out, indent, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
|
||||
{
|
||||
Connection connection = entry.getPooled();
|
||||
if (((Sweeper.Sweepable)connection).sweep())
|
||||
{
|
||||
boolean removed = remove(connection);
|
||||
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
||||
connection,
|
||||
System.lineSeparator(),
|
||||
removed ? "Removed" : "Not removed",
|
||||
System.lineSeparator(),
|
||||
dump());
|
||||
}
|
||||
});
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
getPendingConnectionCount(),
|
||||
getConnectionCount(),
|
||||
getMaxConnectionCount(),
|
||||
getActiveConnectionCount(),
|
||||
getIdleConnectionCount());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
|
||||
|
@ -27,6 +28,16 @@ import org.eclipse.jetty.client.api.Connection;
|
|||
*/
|
||||
public interface ConnectionPool extends Closeable
|
||||
{
|
||||
/**
|
||||
* Optionally pre-create up to <code>connectionCount</code>
|
||||
* connections so they are immediately ready for use.
|
||||
* @param connectionCount the number of connections to pre-start.
|
||||
*/
|
||||
default CompletableFuture<Void> preCreateConnections(int connectionCount)
|
||||
{
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param connection the connection to test
|
||||
* @return whether the given connection is currently in use
|
||||
|
|
|
@ -18,304 +18,33 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Deque;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
@ManagedObject
|
||||
public class DuplexConnectionPool extends AbstractConnectionPool implements Sweeper.Sweepable
|
||||
public class DuplexConnectionPool extends AbstractConnectionPool
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class);
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private final Deque<Connection> idleConnections;
|
||||
private final Set<Connection> activeConnections;
|
||||
|
||||
public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
super((HttpDestination)destination, maxConnections, requester);
|
||||
this.idleConnections = new ArrayDeque<>(maxConnections);
|
||||
this.activeConnections = new HashSet<>(maxConnections);
|
||||
this(destination, maxConnections, true, requester);
|
||||
}
|
||||
|
||||
protected void lock()
|
||||
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
protected void unlock()
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of idle connections", readonly = true)
|
||||
public int getIdleConnectionCount()
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
return idleConnections.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of active connections", readonly = true)
|
||||
public int getActiveConnectionCount()
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
return activeConnections.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Queue<Connection> getIdleConnections()
|
||||
{
|
||||
return idleConnections;
|
||||
}
|
||||
|
||||
public Collection<Connection> getActiveConnections()
|
||||
{
|
||||
return activeConnections;
|
||||
super(destination, maxConnections, cache, requester);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(Connection connection)
|
||||
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
|
||||
public int getMaxUsageCount()
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
return activeConnections.contains(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
return super.getMaxUsageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
public void setMaxUsageCount(int maxUsageCount)
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
// Use "cold" new connections as last.
|
||||
idleConnections.offer(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
idle(connection, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection activate()
|
||||
{
|
||||
Connection connection;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
connection = idleConnections.poll();
|
||||
if (connection == null)
|
||||
return null;
|
||||
activeConnections.add(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
return active(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
boolean closed = isClosed();
|
||||
lock();
|
||||
try
|
||||
{
|
||||
if (!activeConnections.remove(connection))
|
||||
return false;
|
||||
|
||||
if (!closed)
|
||||
{
|
||||
// Make sure we use "hot" connections first.
|
||||
deactivate(connection);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
released(connection);
|
||||
return idle(connection, closed);
|
||||
}
|
||||
|
||||
protected boolean deactivate(Connection connection)
|
||||
{
|
||||
return idleConnections.offerFirst(connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
return remove(connection, false);
|
||||
}
|
||||
|
||||
protected boolean remove(Connection connection, boolean force)
|
||||
{
|
||||
boolean activeRemoved;
|
||||
boolean idleRemoved;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
activeRemoved = activeConnections.remove(connection);
|
||||
idleRemoved = idleConnections.remove(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
if (activeRemoved || force)
|
||||
released(connection);
|
||||
boolean removed = activeRemoved || idleRemoved || force;
|
||||
if (removed)
|
||||
removed(connection);
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
super.close();
|
||||
|
||||
List<Connection> connections = new ArrayList<>();
|
||||
lock();
|
||||
try
|
||||
{
|
||||
connections.addAll(idleConnections);
|
||||
idleConnections.clear();
|
||||
connections.addAll(activeConnections);
|
||||
activeConnections.clear();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
close(connections);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
DumpableCollection active;
|
||||
DumpableCollection idle;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
active = new DumpableCollection("active", new ArrayList<>(activeConnections));
|
||||
idle = new DumpableCollection("idle", new ArrayList<>(idleConnections));
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
dump(out, indent, active, idle);
|
||||
}
|
||||
|
||||
protected void dump(Appendable out, String indent, Object... items) throws IOException
|
||||
{
|
||||
Dumpable.dumpObjects(out, indent, this, items);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
List<Connection> toSweep;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
toSweep = activeConnections.stream()
|
||||
.filter(connection -> connection instanceof Sweeper.Sweepable)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
for (Connection connection : toSweep)
|
||||
{
|
||||
if (((Sweeper.Sweepable)connection).sweep())
|
||||
{
|
||||
boolean removed = remove(connection, true);
|
||||
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
||||
connection,
|
||||
System.lineSeparator(),
|
||||
removed ? "Removed" : "Not removed",
|
||||
System.lineSeparator(),
|
||||
dump());
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
int activeSize;
|
||||
int idleSize;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
activeSize = activeConnections.size();
|
||||
idleSize = idleConnections.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
getPendingConnectionCount(),
|
||||
getConnectionCount(),
|
||||
getMaxConnectionCount(),
|
||||
activeSize,
|
||||
idleSize);
|
||||
super.setMaxUsageCount(maxUsageCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,15 +35,17 @@ import org.eclipse.jetty.client.api.Response;
|
|||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.HttpCookieStore;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public abstract class HttpConnection implements Connection
|
||||
public abstract class HttpConnection implements Connection, Attachable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnection.class);
|
||||
|
||||
private final HttpDestination destination;
|
||||
private Object attachment;
|
||||
private int idleTimeoutGuard;
|
||||
private long idleTimeoutStamp;
|
||||
|
||||
|
@ -272,6 +274,18 @@ public abstract class HttpConnection implements Connection
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object obj)
|
||||
{
|
||||
this.attachment = obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return attachment;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -438,6 +438,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
|||
{
|
||||
if (connectionPool.isActive(connection))
|
||||
{
|
||||
// trigger the next request after releasing the connection
|
||||
if (connectionPool.release(connection))
|
||||
send(false);
|
||||
else
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.eclipse.jetty.http.HttpStatus;
|
|||
import org.eclipse.jetty.io.ClientConnectionFactory;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -233,11 +234,12 @@ public class HttpProxy extends ProxyConfiguration.Proxy
|
|||
}
|
||||
}
|
||||
|
||||
private static class ProxyConnection implements Connection
|
||||
private static class ProxyConnection implements Connection, Attachable
|
||||
{
|
||||
private final Destination destination;
|
||||
private final Connection connection;
|
||||
private final Promise<Connection> promise;
|
||||
private Object attachment;
|
||||
|
||||
private ProxyConnection(Destination destination, Connection connection, Promise<Connection> promise)
|
||||
{
|
||||
|
@ -270,6 +272,18 @@ public class HttpProxy extends ProxyConfiguration.Proxy
|
|||
{
|
||||
return connection.isClosed();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object obj)
|
||||
{
|
||||
this.attachment = obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return attachment;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TunnelPromise implements Promise<Connection>
|
||||
|
|
|
@ -40,7 +40,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
|
|||
|
||||
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
super(destination, maxConnections, requester);
|
||||
super((HttpDestination)destination, maxConnections, requester);
|
||||
start();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,405 +18,47 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable, Sweeper.Sweepable
|
||||
@ManagedObject
|
||||
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class);
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private final Deque<Holder> idleConnections;
|
||||
private final Map<Connection, Holder> muxedConnections;
|
||||
private final Map<Connection, Holder> busyConnections;
|
||||
private int maxMultiplex;
|
||||
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, requester);
|
||||
this.idleConnections = new ArrayDeque<>(maxConnections);
|
||||
this.muxedConnections = new HashMap<>(maxConnections);
|
||||
this.busyConnections = new HashMap<>(maxConnections);
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection acquire(boolean create)
|
||||
{
|
||||
Connection connection = activate();
|
||||
if (connection == null && create)
|
||||
{
|
||||
int queuedRequests = getHttpDestination().getQueuedRequestCount();
|
||||
int maxMultiplex = getMaxMultiplex();
|
||||
int maxPending = ceilDiv(queuedRequests, maxMultiplex);
|
||||
tryCreate(maxPending);
|
||||
connection = activate();
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param a the dividend
|
||||
* @param b the divisor
|
||||
* @return the ceiling of the algebraic quotient
|
||||
*/
|
||||
private static int ceilDiv(int a, int b)
|
||||
{
|
||||
return (a + b - 1) / b;
|
||||
}
|
||||
|
||||
protected void lock()
|
||||
{
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
protected void unlock()
|
||||
{
|
||||
lock.unlock();
|
||||
this(destination, maxConnections, true, requester, maxMultiplex);
|
||||
}
|
||||
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, cache, requester);
|
||||
setMaxMultiplex(maxMultiplex);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ManagedAttribute(value = "The multiplexing factor of connections")
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
return maxMultiplex;
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
return super.getMaxMultiplex();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
super.setMaxMultiplex(maxMultiplex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(Connection connection)
|
||||
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
|
||||
public int getMaxUsageCount()
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
if (muxedConnections.containsKey(connection))
|
||||
return true;
|
||||
return busyConnections.containsKey(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
return super.getMaxUsageCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
public void setMaxUsageCount(int maxUsageCount)
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
// Use "cold" connections as last.
|
||||
idleConnections.offer(new Holder(connection));
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
idle(connection, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection activate()
|
||||
{
|
||||
Holder holder;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
if (muxedConnections.isEmpty())
|
||||
{
|
||||
holder = idleConnections.poll();
|
||||
if (holder == null)
|
||||
return null;
|
||||
muxedConnections.put(holder.connection, holder);
|
||||
}
|
||||
else
|
||||
{
|
||||
holder = muxedConnections.values().iterator().next();
|
||||
}
|
||||
|
||||
if (holder.count < maxMultiplex)
|
||||
{
|
||||
++holder.count;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
muxedConnections.remove(holder.connection);
|
||||
busyConnections.put(holder.connection, holder);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
return active(holder.connection);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
boolean closed = isClosed();
|
||||
boolean idle = false;
|
||||
Holder holder;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
holder = muxedConnections.get(connection);
|
||||
if (holder != null)
|
||||
{
|
||||
int count = --holder.count;
|
||||
if (count == 0)
|
||||
{
|
||||
muxedConnections.remove(connection);
|
||||
if (!closed)
|
||||
{
|
||||
idleConnections.offerFirst(holder);
|
||||
idle = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
holder = busyConnections.remove(connection);
|
||||
if (holder != null)
|
||||
{
|
||||
int count = --holder.count;
|
||||
if (!closed)
|
||||
{
|
||||
if (count == 0)
|
||||
{
|
||||
idleConnections.offerFirst(holder);
|
||||
idle = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
muxedConnections.put(connection, holder);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
if (holder == null)
|
||||
return false;
|
||||
|
||||
released(connection);
|
||||
if (idle || closed)
|
||||
return idle(connection, closed);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
return remove(connection, false);
|
||||
}
|
||||
|
||||
protected boolean remove(Connection connection, boolean force)
|
||||
{
|
||||
boolean activeRemoved = true;
|
||||
boolean idleRemoved = false;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
Holder holder = muxedConnections.remove(connection);
|
||||
if (holder == null)
|
||||
holder = busyConnections.remove(connection);
|
||||
if (holder == null)
|
||||
{
|
||||
activeRemoved = false;
|
||||
for (Iterator<Holder> iterator = idleConnections.iterator(); iterator.hasNext(); )
|
||||
{
|
||||
holder = iterator.next();
|
||||
if (holder.connection == connection)
|
||||
{
|
||||
idleRemoved = true;
|
||||
iterator.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
if (activeRemoved || force)
|
||||
released(connection);
|
||||
boolean removed = activeRemoved || idleRemoved || force;
|
||||
if (removed)
|
||||
removed(connection);
|
||||
return removed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
super.close();
|
||||
|
||||
List<Connection> connections;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
connections = idleConnections.stream().map(holder -> holder.connection).collect(Collectors.toList());
|
||||
connections.addAll(muxedConnections.keySet());
|
||||
connections.addAll(busyConnections.keySet());
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
close(connections);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
DumpableCollection busy;
|
||||
DumpableCollection muxed;
|
||||
DumpableCollection idle;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
busy = new DumpableCollection("busy", new ArrayList<>(busyConnections.values()));
|
||||
muxed = new DumpableCollection("muxed", new ArrayList<>(muxedConnections.values()));
|
||||
idle = new DumpableCollection("idle", new ArrayList<>(idleConnections));
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
Dumpable.dumpObjects(out, indent, this, busy, muxed, idle);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
List<Connection> toSweep = new ArrayList<>();
|
||||
lock();
|
||||
try
|
||||
{
|
||||
busyConnections.values().stream()
|
||||
.map(holder -> holder.connection)
|
||||
.filter(connection -> connection instanceof Sweeper.Sweepable)
|
||||
.collect(Collectors.toCollection(() -> toSweep));
|
||||
muxedConnections.values().stream()
|
||||
.map(holder -> holder.connection)
|
||||
.filter(connection -> connection instanceof Sweeper.Sweepable)
|
||||
.collect(Collectors.toCollection(() -> toSweep));
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
for (Connection connection : toSweep)
|
||||
{
|
||||
if (((Sweeper.Sweepable)connection).sweep())
|
||||
{
|
||||
boolean removed = remove(connection, true);
|
||||
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
||||
connection,
|
||||
System.lineSeparator(),
|
||||
removed ? "Removed" : "Not removed",
|
||||
System.lineSeparator(),
|
||||
dump());
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
int busySize;
|
||||
int muxedSize;
|
||||
int idleSize;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
busySize = busyConnections.size();
|
||||
muxedSize = muxedConnections.size();
|
||||
idleSize = idleConnections.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
return String.format("%s@%x[c=%d/%d/%d,b=%d,m=%d,i=%d]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
getPendingConnectionCount(),
|
||||
getConnectionCount(),
|
||||
getMaxConnectionCount(),
|
||||
busySize,
|
||||
muxedSize,
|
||||
idleSize);
|
||||
}
|
||||
|
||||
private static class Holder
|
||||
{
|
||||
private final Connection connection;
|
||||
private int count;
|
||||
|
||||
private Holder(Connection connection)
|
||||
{
|
||||
this.connection = connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s[%d]", connection, count);
|
||||
}
|
||||
super.setMaxUsageCount(maxUsageCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,244 +18,55 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
@ManagedObject
|
||||
public class RoundRobinConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
|
||||
public class RoundRobinConnectionPool extends MultiplexConnectionPool
|
||||
{
|
||||
private final List<Entry> entries;
|
||||
private int maxMultiplex;
|
||||
private int index;
|
||||
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);
|
||||
|
||||
public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
private final AtomicInteger offset = new AtomicInteger();
|
||||
private final Pool<Connection> pool;
|
||||
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
this(destination, maxConnections, requester, 1);
|
||||
}
|
||||
|
||||
public RoundRobinConnectionPool(Destination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super((HttpDestination)destination, maxConnections, requester);
|
||||
entries = new ArrayList<>(maxConnections);
|
||||
for (int i = 0; i < maxConnections; ++i)
|
||||
{
|
||||
entries.add(new Entry());
|
||||
}
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return maxMultiplex;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Returns an idle connection, if available, following a round robin algorithm;
|
||||
* otherwise it always tries to create a new connection, up until the max connection count.</p>
|
||||
*
|
||||
* @param create this parameter is ignored and assumed to be always {@code true}
|
||||
* @return an idle connection or {@code null} if no idle connections are available
|
||||
*/
|
||||
@Override
|
||||
protected Connection acquire(boolean create)
|
||||
{
|
||||
// The nature of this connection pool is such that a
|
||||
// connection must always be present in the next slot.
|
||||
return super.acquire(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
{
|
||||
if (entry.connection == null)
|
||||
{
|
||||
entry.connection = connection;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
idle(connection, false);
|
||||
super(destination, maxConnections, false, requester, maxMultiplex);
|
||||
pool = destination.getBean(Pool.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Connection activate()
|
||||
{
|
||||
Connection connection = null;
|
||||
synchronized (this)
|
||||
{
|
||||
int offset = 0;
|
||||
int capacity = getMaxConnectionCount();
|
||||
while (offset < capacity)
|
||||
{
|
||||
int idx = index + offset;
|
||||
if (idx >= capacity)
|
||||
idx -= capacity;
|
||||
|
||||
Entry entry = entries.get(idx);
|
||||
|
||||
if (entry.connection == null)
|
||||
break;
|
||||
|
||||
if (entry.active < getMaxMultiplex())
|
||||
{
|
||||
++entry.active;
|
||||
++entry.used;
|
||||
connection = entry.connection;
|
||||
index += offset + 1;
|
||||
if (index >= capacity)
|
||||
index -= capacity;
|
||||
break;
|
||||
}
|
||||
|
||||
++offset;
|
||||
}
|
||||
}
|
||||
return connection == null ? null : active(connection);
|
||||
int offset = this.offset.get();
|
||||
Connection connection = activate(offset);
|
||||
if (connection != null)
|
||||
this.offset.getAndIncrement();
|
||||
return connection;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(Connection connection)
|
||||
private Connection activate(int offset)
|
||||
{
|
||||
synchronized (this)
|
||||
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activated '{}'", entry);
|
||||
if (entry != null)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
{
|
||||
if (entry.connection == connection)
|
||||
return entry.active > 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
boolean found = false;
|
||||
boolean idle = false;
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
{
|
||||
if (entry.connection == connection)
|
||||
{
|
||||
found = true;
|
||||
int active = --entry.active;
|
||||
idle = active == 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found)
|
||||
return false;
|
||||
released(connection);
|
||||
if (idle)
|
||||
return idle(connection, isClosed());
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
boolean found = false;
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
{
|
||||
if (entry.connection == connection)
|
||||
{
|
||||
found = true;
|
||||
entry.reset();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (found)
|
||||
{
|
||||
released(connection);
|
||||
removed(connection);
|
||||
}
|
||||
return found;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
List<Entry> connections;
|
||||
synchronized (this)
|
||||
{
|
||||
connections = new ArrayList<>(entries);
|
||||
}
|
||||
Dumpable.dumpObjects(out, indent, out, connections);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
int present = 0;
|
||||
int active = 0;
|
||||
synchronized (this)
|
||||
{
|
||||
for (Entry entry : entries)
|
||||
{
|
||||
if (entry.connection != null)
|
||||
{
|
||||
++present;
|
||||
if (entry.active > 0)
|
||||
++active;
|
||||
}
|
||||
}
|
||||
}
|
||||
return String.format("%s@%x[c=%d/%d/%d,a=%d]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
getPendingConnectionCount(),
|
||||
present,
|
||||
getMaxConnectionCount(),
|
||||
active
|
||||
);
|
||||
}
|
||||
|
||||
private static class Entry
|
||||
{
|
||||
private Connection connection;
|
||||
private int active;
|
||||
private long used;
|
||||
|
||||
private void reset()
|
||||
{
|
||||
connection = null;
|
||||
active = 0;
|
||||
used = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("{u=%d,c=%s}", used, connection);
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,16 +19,16 @@
|
|||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -67,10 +67,10 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
|
|||
|
||||
public ValidatingConnectionPool(Destination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout)
|
||||
{
|
||||
super(destination, maxConnections, requester);
|
||||
super((HttpDestination)destination, maxConnections, requester);
|
||||
this.scheduler = scheduler;
|
||||
this.timeout = timeout;
|
||||
this.quarantine = new HashMap<>(maxConnections);
|
||||
this.quarantine = new ConcurrentHashMap<>(maxConnections);
|
||||
}
|
||||
|
||||
@ManagedAttribute(value = "The number of validating connections", readonly = true)
|
||||
|
@ -82,21 +82,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
|
|||
@Override
|
||||
public boolean release(Connection connection)
|
||||
{
|
||||
lock();
|
||||
try
|
||||
{
|
||||
if (!getActiveConnections().remove(connection))
|
||||
return false;
|
||||
Holder holder = new Holder(connection);
|
||||
holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS);
|
||||
quarantine.put(connection, holder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Validating for {}ms {}", timeout, connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
Holder holder = new Holder(connection);
|
||||
holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS);
|
||||
quarantine.put(connection, holder);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Validating for {}ms {}", timeout, connection);
|
||||
|
||||
released(connection);
|
||||
return true;
|
||||
|
@ -105,16 +95,7 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
|
|||
@Override
|
||||
public boolean remove(Connection connection)
|
||||
{
|
||||
Holder holder;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
holder = quarantine.remove(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
Holder holder = quarantine.remove(connection);
|
||||
|
||||
if (holder == null)
|
||||
return super.remove(connection);
|
||||
|
@ -130,25 +111,16 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void dump(Appendable out, String indent, Object... items) throws IOException
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
DumpableCollection toDump = new DumpableCollection("quarantine", quarantine.values());
|
||||
super.dump(out, indent, Stream.concat(Stream.of(items), Stream.of(toDump)));
|
||||
Dumpable.dumpObjects(out, indent, this, toDump);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
int size;
|
||||
lock();
|
||||
try
|
||||
{
|
||||
size = quarantine.size();
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
int size = quarantine.size();
|
||||
return String.format("%s[v=%d]", super.toString(), size);
|
||||
}
|
||||
|
||||
|
@ -170,20 +142,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
|
|||
if (done.compareAndSet(false, true))
|
||||
{
|
||||
boolean closed = isClosed();
|
||||
lock();
|
||||
try
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Validated {}", connection);
|
||||
quarantine.remove(connection);
|
||||
if (!closed)
|
||||
deactivate(connection);
|
||||
}
|
||||
finally
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Validated {}", connection);
|
||||
quarantine.remove(connection);
|
||||
if (!closed)
|
||||
deactivate(connection);
|
||||
idle(connection, closed);
|
||||
proceed();
|
||||
}
|
||||
|
|
|
@ -34,12 +34,13 @@ import org.eclipse.jetty.client.api.Request;
|
|||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.io.AbstractConnection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Sweeper;
|
||||
|
||||
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable
|
||||
public class HttpConnectionOverHTTP extends AbstractConnection implements Connection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnectionOverHTTP.class);
|
||||
|
||||
|
@ -135,6 +136,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
|
|||
return closed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object obj)
|
||||
{
|
||||
delegate.setAttachment(obj);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return delegate.getAttachment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean onIdleExpired()
|
||||
{
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
|
||||
public class ConnectionPoolHelper
|
||||
{
|
||||
public static Connection acquire(AbstractConnectionPool connectionPool, boolean create)
|
||||
{
|
||||
return connectionPool.acquire(create);
|
||||
}
|
||||
|
||||
public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
|
||||
{
|
||||
connectionPool.tryCreate(pending);
|
||||
}
|
||||
}
|
|
@ -650,7 +650,7 @@ public class HttpClientTLSTest
|
|||
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
// Trigger the creation of a new connection, but don't use it.
|
||||
connectionPool.tryCreate(-1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, -1);
|
||||
// Verify that the connection has been created.
|
||||
while (true)
|
||||
{
|
||||
|
@ -746,7 +746,7 @@ public class HttpClientTLSTest
|
|||
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
// Trigger the creation of a new connection, but don't use it.
|
||||
connectionPool.tryCreate(-1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, -1);
|
||||
// Verify that the connection has been created.
|
||||
while (true)
|
||||
{
|
||||
|
|
|
@ -67,13 +67,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
String host = "localhost";
|
||||
int port = connector.getLocalPort();
|
||||
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scenario.getScheme(), host, port);
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
|
||||
assertEquals(0, idleConnections.size());
|
||||
|
||||
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
|
||||
assertEquals(0, activeConnections.size());
|
||||
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size());
|
||||
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size());
|
||||
|
||||
final CountDownLatch headersLatch = new CountDownLatch(1);
|
||||
final CountDownLatch successLatch = new CountDownLatch(3);
|
||||
|
@ -82,8 +78,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
.onRequestSuccess(request -> successLatch.countDown())
|
||||
.onResponseHeaders(response ->
|
||||
{
|
||||
assertEquals(0, idleConnections.size());
|
||||
assertEquals(1, activeConnections.size());
|
||||
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size());
|
||||
assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size());
|
||||
headersLatch.countDown();
|
||||
})
|
||||
.send(new Response.Listener.Adapter()
|
||||
|
@ -105,8 +101,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
assertTrue(headersLatch.await(30, TimeUnit.SECONDS));
|
||||
assertTrue(successLatch.await(30, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(1, idleConnections.size());
|
||||
assertEquals(0, activeConnections.size());
|
||||
assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size());
|
||||
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
@ -120,11 +116,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scenario.getScheme(), host, port);
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
|
||||
assertEquals(0, idleConnections.size());
|
||||
|
||||
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
|
||||
assertEquals(0, activeConnections.size());
|
||||
assertEquals(0, connectionPool.getIdleConnections().size());
|
||||
assertEquals(0, connectionPool.getActiveConnections().size());
|
||||
|
||||
final CountDownLatch beginLatch = new CountDownLatch(1);
|
||||
final CountDownLatch failureLatch = new CountDownLatch(2);
|
||||
|
@ -133,7 +126,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
@Override
|
||||
public void onBegin(Request request)
|
||||
{
|
||||
activeConnections.iterator().next().close();
|
||||
connectionPool.getActiveConnections().iterator().next().close();
|
||||
beginLatch.countDown();
|
||||
}
|
||||
|
||||
|
@ -148,8 +141,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
public void onComplete(Result result)
|
||||
{
|
||||
assertTrue(result.isFailed());
|
||||
assertEquals(0, idleConnections.size());
|
||||
assertEquals(0, activeConnections.size());
|
||||
assertEquals(0, connectionPool.getIdleConnections().size());
|
||||
assertEquals(0, connectionPool.getActiveConnections().size());
|
||||
failureLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
@ -157,8 +150,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
|
|||
assertTrue(beginLatch.await(30, TimeUnit.SECONDS));
|
||||
assertTrue(failureLatch.await(30, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals(0, idleConnections.size());
|
||||
assertEquals(0, activeConnections.size());
|
||||
assertEquals(0, connectionPool.getIdleConnections().size());
|
||||
assertEquals(0, connectionPool.getActiveConnections().size());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.function.Supplier;
|
|||
|
||||
import org.eclipse.jetty.client.AbstractHttpClientServerTest;
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.ConnectionPoolHelper;
|
||||
import org.eclipse.jetty.client.DuplexConnectionPool;
|
||||
import org.eclipse.jetty.client.EmptyServerHandler;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
|
@ -37,7 +38,6 @@ import org.eclipse.jetty.client.api.Destination;
|
|||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.http.HttpHeader;
|
||||
import org.eclipse.jetty.http.HttpHeaderValue;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
@ -66,7 +66,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
Connection connection = connectionPool.acquire();
|
||||
assertNull(connection);
|
||||
// There are no queued requests, so no connection should be created.
|
||||
connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
|
||||
connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
|
||||
assertNull(connection);
|
||||
}
|
||||
}
|
||||
|
@ -77,17 +77,17 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
{
|
||||
start(scenario, new EmptyServerHandler());
|
||||
|
||||
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
{
|
||||
destination.start();
|
||||
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
// Trigger creation of one connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
Connection connection = connectionPool.acquire(false);
|
||||
Connection connection = ConnectionPoolHelper.acquire(connectionPool, false);
|
||||
if (connection == null)
|
||||
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
assertNotNull(connection);
|
||||
}
|
||||
}
|
||||
|
@ -98,13 +98,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
{
|
||||
start(scenario, new EmptyServerHandler());
|
||||
|
||||
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
{
|
||||
destination.start();
|
||||
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
// Trigger creation of one connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
Connection connection1 = connectionPool.acquire();
|
||||
if (connection1 == null)
|
||||
|
@ -127,12 +127,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
|
||||
CountDownLatch idleLatch = new CountDownLatch(1);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
|
||||
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
|
||||
{
|
||||
@Override
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
|
@ -153,10 +153,10 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
})
|
||||
{
|
||||
destination.start();
|
||||
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
// Trigger creation of one connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
// Make sure we entered idleCreated().
|
||||
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
|
||||
|
@ -167,7 +167,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
assertNull(connection1);
|
||||
|
||||
// Trigger creation of a second connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
// Second attempt also returns null because we delayed idleCreated() above.
|
||||
Connection connection2 = connectionPool.acquire();
|
||||
|
@ -176,9 +176,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
latch.countDown();
|
||||
|
||||
// There must be 2 idle connections.
|
||||
Connection connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
Connection connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
assertNotNull(connection);
|
||||
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
|
||||
assertNotNull(connection);
|
||||
}
|
||||
}
|
||||
|
@ -189,13 +189,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
{
|
||||
start(scenario, new EmptyServerHandler());
|
||||
|
||||
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
{
|
||||
destination.start();
|
||||
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
// Trigger creation of one connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
Connection connection1 = connectionPool.acquire();
|
||||
if (connection1 == null)
|
||||
|
@ -226,13 +226,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
long idleTimeout = 1000;
|
||||
client.setIdleTimeout(idleTimeout);
|
||||
|
||||
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
try (HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())))
|
||||
{
|
||||
destination.start();
|
||||
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
|
||||
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
|
||||
|
||||
// Trigger creation of one connection.
|
||||
connectionPool.tryCreate(1);
|
||||
ConnectionPoolHelper.tryCreate(connectionPool, 1);
|
||||
|
||||
Connection connection1 = connectionPool.acquire();
|
||||
if (connection1 == null)
|
||||
|
@ -243,7 +243,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
|
||||
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
|
||||
|
||||
connection1 = connectionPool.getIdleConnections().poll();
|
||||
connection1 = connectionPool.getIdleConnections().peek();
|
||||
assertNull(connection1);
|
||||
}
|
||||
}
|
||||
|
@ -350,11 +350,6 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
assertTrue(client.getDestinations().isEmpty(), "Destination must be removed after connection error");
|
||||
}
|
||||
|
||||
private Connection pollIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return await(() -> connectionPool.getIdleConnections().poll(), time, unit);
|
||||
}
|
||||
|
||||
private Connection peekIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException
|
||||
{
|
||||
return await(() -> connectionPool.getIdleConnections().peek(), time, unit);
|
||||
|
@ -372,38 +367,4 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static class TestDestination extends HttpDestinationOverHTTP
|
||||
{
|
||||
public TestDestination(HttpClient client, Origin origin)
|
||||
{
|
||||
super(client, origin);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConnectionPool newConnectionPool(HttpClient client)
|
||||
{
|
||||
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
|
||||
}
|
||||
|
||||
public static class TestConnectionPool extends DuplexConnectionPool
|
||||
{
|
||||
public TestConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
super(destination, maxConnections, requester);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tryCreate(int maxPending)
|
||||
{
|
||||
super.tryCreate(maxPending);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Connection acquire(boolean create)
|
||||
{
|
||||
return super.acquire(create);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,13 +49,14 @@ import org.eclipse.jetty.io.AbstractConnection;
|
|||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.io.RetainableByteBuffer;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
public class HttpConnectionOverFCGI extends AbstractConnection implements Connection
|
||||
public class HttpConnectionOverFCGI extends AbstractConnection implements Connection, Attachable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(HttpConnectionOverFCGI.class);
|
||||
|
||||
|
@ -70,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
private final Delegate delegate;
|
||||
private final ClientParser parser;
|
||||
private RetainableByteBuffer networkBuffer;
|
||||
private Object attachment;
|
||||
|
||||
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise, boolean multiplexed)
|
||||
{
|
||||
|
@ -267,6 +269,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
|
|||
return closed.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object obj)
|
||||
{
|
||||
this.attachment = obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return attachment;
|
||||
}
|
||||
|
||||
protected boolean closeByHTTP(HttpFields fields)
|
||||
{
|
||||
if (multiplexed)
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.Closeable;
|
|||
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
|
||||
/**
|
||||
|
@ -29,21 +30,8 @@ import org.eclipse.jetty.util.Callback;
|
|||
* <p>This class extends {@link Stream} by adding the methods required to
|
||||
* implement the HTTP/2 stream functionalities.</p>
|
||||
*/
|
||||
public interface IStream extends Stream, Closeable
|
||||
public interface IStream extends Stream, Attachable, Closeable
|
||||
{
|
||||
/**
|
||||
* @return the object attached to this stream
|
||||
* @see #setAttachment(Object)
|
||||
*/
|
||||
Object getAttachment();
|
||||
|
||||
/**
|
||||
* Attaches the given object to this stream for later retrieval.
|
||||
*
|
||||
* @param attachment the object to attach to this stream
|
||||
*/
|
||||
void setAttachment(Object attachment);
|
||||
|
||||
/**
|
||||
* @return whether this stream is local or remote
|
||||
*/
|
||||
|
|
|
@ -93,6 +93,11 @@
|
|||
<artifactId>jetty-http</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.eclipse.jetty</groupId>
|
||||
<artifactId>jetty-client</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>javax.servlet</groupId>
|
||||
<artifactId>javax.servlet-api</artifactId>
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client.jmh;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.DuplexConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpConversation;
|
||||
import org.eclipse.jetty.client.HttpDestination;
|
||||
import org.eclipse.jetty.client.HttpExchange;
|
||||
import org.eclipse.jetty.client.HttpRequest;
|
||||
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||
import org.eclipse.jetty.client.Origin;
|
||||
import org.eclipse.jetty.client.RoundRobinConnectionPool;
|
||||
import org.eclipse.jetty.client.SendFailure;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.RunnerException;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
public class ConnectionPoolsBenchmark
|
||||
{
|
||||
private ConnectionPool pool;
|
||||
|
||||
@Param({"round-robin", "cached/multiplex", "uncached/multiplex", "cached/duplex", "uncached/duplex"})
|
||||
public static String POOL_TYPE;
|
||||
|
||||
@Setup
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
HttpClient httpClient = new HttpClient()
|
||||
{
|
||||
@Override
|
||||
protected void newConnection(HttpDestination destination, Promise<Connection> promise)
|
||||
{
|
||||
promise.succeeded(new MockConnection());
|
||||
}
|
||||
};
|
||||
HttpDestination httpDestination = new HttpDestination(httpClient, new Origin("http", "localhost", 8080))
|
||||
{
|
||||
@Override
|
||||
protected SendFailure send(Connection connection, HttpExchange exchange)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
HttpConversation httpConversation = new HttpConversation();
|
||||
HttpRequest httpRequest = new HttpRequest(httpClient, httpConversation, new URI("http://localhost:8080")) {};
|
||||
HttpExchange httpExchange = new HttpExchange(httpDestination, httpRequest, new ArrayList<>());
|
||||
httpDestination.getHttpExchanges().add(httpExchange);
|
||||
|
||||
int initialConnections = 12;
|
||||
int maxConnections = 100;
|
||||
switch (POOL_TYPE)
|
||||
{
|
||||
case "uncached/duplex":
|
||||
pool = new DuplexConnectionPool(httpDestination, maxConnections, false, Callback.NOOP);
|
||||
pool.preCreateConnections(initialConnections).get();
|
||||
break;
|
||||
case "cached/duplex":
|
||||
pool = new DuplexConnectionPool(httpDestination, maxConnections, true, Callback.NOOP);
|
||||
pool.preCreateConnections(initialConnections).get();
|
||||
break;
|
||||
case "uncached/multiplex":
|
||||
pool = new MultiplexConnectionPool(httpDestination, maxConnections,false, Callback.NOOP, 12);
|
||||
pool.preCreateConnections(initialConnections).get();
|
||||
break;
|
||||
case "cached/multiplex":
|
||||
pool = new MultiplexConnectionPool(httpDestination, maxConnections,true, Callback.NOOP, 12);
|
||||
pool.preCreateConnections(initialConnections).get();
|
||||
break;
|
||||
case "round-robin":
|
||||
pool = new RoundRobinConnectionPool(httpDestination, maxConnections, Callback.NOOP);
|
||||
pool.preCreateConnections(maxConnections).get();
|
||||
break;
|
||||
default:
|
||||
throw new AssertionError("Unknown pool type: " + POOL_TYPE);
|
||||
}
|
||||
}
|
||||
|
||||
@TearDown
|
||||
public void tearDown()
|
||||
{
|
||||
pool.close();
|
||||
pool = null;
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testPool()
|
||||
{
|
||||
Connection connection = pool.acquire();
|
||||
if (connection == null && !POOL_TYPE.equals("round-robin"))
|
||||
throw new AssertionError("from thread " + Thread.currentThread().getName());
|
||||
Blackhole.consumeCPU(ThreadLocalRandom.current().nextInt(10, 20));
|
||||
if (connection != null)
|
||||
pool.release(connection);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws RunnerException
|
||||
{
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(ConnectionPoolsBenchmark.class.getSimpleName())
|
||||
.warmupIterations(3)
|
||||
.measurementIterations(3)
|
||||
.forks(1)
|
||||
.threads(12)
|
||||
//.addProfiler(LinuxPerfProfiler.class)
|
||||
.build();
|
||||
|
||||
new Runner(opt).run();
|
||||
}
|
||||
|
||||
static class MockConnection implements Connection, Attachable
|
||||
{
|
||||
private Object attachment;
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClosed()
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void send(Request request, Response.CompleteListener listener)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAttachment(Object obj)
|
||||
{
|
||||
this.attachment = obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAttachment()
|
||||
{
|
||||
return attachment;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
/**
|
||||
* Abstract mechanism to support attachment of miscellaneous objects.
|
||||
*/
|
||||
public interface Attachable
|
||||
{
|
||||
/**
|
||||
* @return the object attached to this stream
|
||||
* @see #setAttachment(Object)
|
||||
*/
|
||||
Object getAttachment();
|
||||
|
||||
/**
|
||||
* Attaches the given object to this stream for later retrieval.
|
||||
*
|
||||
* @param attachment the object to attach to this stream
|
||||
*/
|
||||
void setAttachment(Object attachment);
|
||||
}
|
|
@ -0,0 +1,457 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* A fast container of poolable objects, with optional support for
|
||||
* multiplexing, max usage count and thread-local caching.
|
||||
* <p>
|
||||
* The thread-local caching mechanism is about remembering up to N previously
|
||||
* used entries into a thread-local single-threaded collection.
|
||||
* When that collection is not empty, its entries are removed one by one
|
||||
* during acquisition until an entry that can be acquired is found.
|
||||
* This can greatly speed up acquisition when both the acquisition and the
|
||||
* release of the entries is done on the same thread as this avoids iterating
|
||||
* the global, thread-safe collection of entries.
|
||||
* @param <T>
|
||||
*/
|
||||
public class Pool<T> implements AutoCloseable, Dumpable
|
||||
{
|
||||
private static final Logger LOGGER = Log.getLogger(Pool.class);
|
||||
|
||||
private final List<Entry> sharedList = new CopyOnWriteArrayList<>();
|
||||
/*
|
||||
* The cache is used to avoid hammering on the first index of the entry list.
|
||||
* Caches can become poisoned (i.e.: containing entries that are in use) when
|
||||
* the release isn't done by the acquiring thread or when the entry pool is
|
||||
* undersized compared to the load applied on it.
|
||||
* When an entry can't be found in the cache, the global list is iterated
|
||||
* normally so the cache has no visible effect besides performance.
|
||||
*/
|
||||
private final ThreadLocal<List<Entry>> cache;
|
||||
private final Lock lock = new ReentrantLock();
|
||||
private final int maxEntries;
|
||||
private final int cacheSize;
|
||||
private volatile boolean closed;
|
||||
private volatile int maxMultiplex = 1;
|
||||
private volatile int maxUsageCount = -1;
|
||||
|
||||
/**
|
||||
* Construct a Pool with the specified thread-local cache size.
|
||||
*
|
||||
* @param maxEntries the maximum amount of entries that the pool will accept.
|
||||
* @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled.
|
||||
*/
|
||||
public Pool(int maxEntries, int cacheSize)
|
||||
{
|
||||
this.maxEntries = maxEntries;
|
||||
this.cacheSize = cacheSize;
|
||||
if (cacheSize > 0)
|
||||
this.cache = ThreadLocal.withInitial(() -> new ArrayList<Entry>(cacheSize));
|
||||
else
|
||||
this.cache = null;
|
||||
}
|
||||
|
||||
public int getPendingConnectionCount()
|
||||
{
|
||||
return (int)sharedList.stream().filter(entry -> entry.getPooled() == null).count();
|
||||
}
|
||||
|
||||
public int getIdleConnectionCount()
|
||||
{
|
||||
return (int)sharedList.stream().filter(Entry::isIdle).count();
|
||||
}
|
||||
|
||||
public int getInUseConnectionCount()
|
||||
{
|
||||
return (int)sharedList.stream().filter(entry -> !entry.isIdle()).count();
|
||||
}
|
||||
|
||||
public int getMaxEntries()
|
||||
{
|
||||
return maxEntries;
|
||||
}
|
||||
|
||||
public int getMaxMultiplex()
|
||||
{
|
||||
return maxMultiplex;
|
||||
}
|
||||
|
||||
public final void setMaxMultiplex(int maxMultiplex)
|
||||
{
|
||||
if (maxMultiplex < 1)
|
||||
throw new IllegalArgumentException("Max multiplex must be >= 1");
|
||||
this.maxMultiplex = maxMultiplex;
|
||||
}
|
||||
|
||||
public int getMaxUsageCount()
|
||||
{
|
||||
return maxUsageCount;
|
||||
}
|
||||
|
||||
public final void setMaxUsageCount(int maxUsageCount)
|
||||
{
|
||||
if (maxUsageCount == 0)
|
||||
throw new IllegalArgumentException("Max usage count must be != 0");
|
||||
this.maxUsageCount = maxUsageCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new disabled slot into the pool. The returned entry
|
||||
* won't be acquirable as long as {@link Entry#enable(Object)}
|
||||
* has not been called.
|
||||
*
|
||||
* @param maxReservations the max desired number of reserved entries,
|
||||
* or a negative number to always trigger the reservation of a new entry.
|
||||
* @return a disabled entry that is contained in the pool,
|
||||
* or null if the pool is closed or if the pool already contains
|
||||
* {@link #getMaxEntries()} entries.
|
||||
*/
|
||||
public Entry reserve(int maxReservations)
|
||||
{
|
||||
if (maxReservations >= 0 && getPendingConnectionCount() >= maxReservations)
|
||||
return null;
|
||||
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
if (!closed && sharedList.size() < maxEntries)
|
||||
{
|
||||
Entry entry = new Entry();
|
||||
sharedList.add(entry);
|
||||
return entry;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
|
||||
*
|
||||
* @param idx the index of the entry to acquire.
|
||||
* @return the specified entry or null if there is none at the specified index or if it is not available.
|
||||
*/
|
||||
public Entry acquireAt(int idx)
|
||||
{
|
||||
if (closed)
|
||||
return null;
|
||||
|
||||
try
|
||||
{
|
||||
Entry entry = sharedList.get(idx);
|
||||
if (entry.tryAcquire())
|
||||
return entry;
|
||||
}
|
||||
catch (IndexOutOfBoundsException e)
|
||||
{
|
||||
// no entry at that index
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire an entry from the pool.
|
||||
*
|
||||
* @return an entry from the pool or null if none is available.
|
||||
*/
|
||||
public Entry acquire()
|
||||
{
|
||||
if (closed)
|
||||
return null;
|
||||
|
||||
// first check the thread-local cache
|
||||
if (cache != null)
|
||||
{
|
||||
List<Entry> cachedList = cache.get();
|
||||
while (!cachedList.isEmpty())
|
||||
{
|
||||
Entry cachedEntry = cachedList.remove(cachedList.size() - 1);
|
||||
if (cachedEntry.tryAcquire())
|
||||
return cachedEntry;
|
||||
}
|
||||
}
|
||||
|
||||
// then iterate the shared list
|
||||
for (Entry entry : sharedList)
|
||||
{
|
||||
if (entry.tryAcquire())
|
||||
return entry;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method will return an acquired object to the pool. Objects
|
||||
* that are acquired from the pool but never released will result
|
||||
* in a memory leak.
|
||||
*
|
||||
* @param entry the value to return to the pool
|
||||
* @return true if the entry was released and could be acquired again,
|
||||
* false if the entry should be removed by calling {@link #remove(Pool.Entry)}
|
||||
* and the object contained by the entry should be disposed.
|
||||
* @throws NullPointerException if value is null
|
||||
*/
|
||||
public boolean release(Entry entry)
|
||||
{
|
||||
if (closed)
|
||||
return false;
|
||||
|
||||
// first mark it as unused
|
||||
boolean reusable = entry.tryRelease();
|
||||
|
||||
// then cache the released entry
|
||||
if (cache != null && reusable)
|
||||
{
|
||||
List<Entry> cachedList = cache.get();
|
||||
if (cachedList.size() < cacheSize)
|
||||
cachedList.add(entry);
|
||||
}
|
||||
return reusable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a value from the pool.
|
||||
*
|
||||
* @param entry the value to remove
|
||||
* @return true if the entry was removed, false otherwise
|
||||
*/
|
||||
public boolean remove(Entry entry)
|
||||
{
|
||||
if (closed)
|
||||
return false;
|
||||
|
||||
if (!entry.tryRemove())
|
||||
{
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("Attempt to remove an object from the pool that is still in use: {}", entry);
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean removed = sharedList.remove(entry);
|
||||
if (!removed)
|
||||
{
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return closed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
List<Entry> copy;
|
||||
lock.lock();
|
||||
try
|
||||
{
|
||||
closed = true;
|
||||
copy = new ArrayList<>(sharedList);
|
||||
sharedList.clear();
|
||||
}
|
||||
finally
|
||||
{
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
// iterate the copy and close its entries
|
||||
for (Entry entry : copy)
|
||||
{
|
||||
if (entry.tryRemove() && entry.pooled instanceof Closeable)
|
||||
{
|
||||
try
|
||||
{
|
||||
((Closeable)entry.pooled).close();
|
||||
}
|
||||
catch (IOException e)
|
||||
{
|
||||
LOGGER.warn("Error closing entry {}", entry, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public int size()
|
||||
{
|
||||
return sharedList.size();
|
||||
}
|
||||
|
||||
public Collection<Entry> values()
|
||||
{
|
||||
return Collections.unmodifiableCollection(sharedList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
Dumpable.dumpObjects(out, indent, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList;
|
||||
}
|
||||
|
||||
public class Entry
|
||||
{
|
||||
// hi: positive=open/maxUsage counter,negative=closed lo: multiplexing counter
|
||||
private final AtomicBiInteger state;
|
||||
private volatile T pooled;
|
||||
|
||||
public Entry()
|
||||
{
|
||||
this.state = new AtomicBiInteger(-1, 0);
|
||||
}
|
||||
|
||||
public T getPooled()
|
||||
{
|
||||
return pooled;
|
||||
}
|
||||
|
||||
public void enable(T pooled)
|
||||
{
|
||||
if (!isClosed())
|
||||
throw new IllegalStateException("Open entries cannot be enabled : " + this);
|
||||
Objects.requireNonNull(pooled);
|
||||
this.pooled = pooled;
|
||||
state.set(0, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to acquire the entry if possible by incrementing both the usage
|
||||
* count and the multiplex count.
|
||||
* @return true if the usage count is <= maxUsageCount and
|
||||
* the multiplex count is maxMultiplex and the entry is not closed,
|
||||
* false otherwise.
|
||||
*/
|
||||
public boolean tryAcquire()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
long encoded = state.get();
|
||||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
boolean closed = usageCount < 0;
|
||||
int multiplexingCount = AtomicBiInteger.getLo(encoded);
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount))
|
||||
return false;
|
||||
|
||||
if (state.compareAndSet(encoded, usageCount + 1, multiplexingCount + 1))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to release the entry if possible by decrementing the multiplexing
|
||||
* count unless the entity is closed.
|
||||
* @return true if the entry was released,
|
||||
* false if {@link #tryRemove()} should be called.
|
||||
*/
|
||||
public boolean tryRelease()
|
||||
{
|
||||
int newMultiplexingCount;
|
||||
int usageCount;
|
||||
while (true)
|
||||
{
|
||||
long encoded = state.get();
|
||||
usageCount = AtomicBiInteger.getHi(encoded);
|
||||
boolean closed = usageCount < 0;
|
||||
if (closed)
|
||||
return false;
|
||||
|
||||
newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1;
|
||||
if (newMultiplexingCount < 0)
|
||||
throw new IllegalStateException("Cannot release an already released entry");
|
||||
|
||||
if (state.compareAndSet(encoded, usageCount, newMultiplexingCount))
|
||||
break;
|
||||
}
|
||||
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
|
||||
return !(overUsed && newMultiplexingCount == 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to mark the entry as removed.
|
||||
* @return true if the entry has to be removed from the containing pool, false otherwise.
|
||||
*/
|
||||
public boolean tryRemove()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
long encoded = state.get();
|
||||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
int multiplexCount = AtomicBiInteger.getLo(encoded);
|
||||
int newMultiplexCount = Math.max(multiplexCount - 1, 0);
|
||||
|
||||
boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
|
||||
if (removed)
|
||||
return newMultiplexCount == 0;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return state.getHi() < 0;
|
||||
}
|
||||
|
||||
public boolean isIdle()
|
||||
{
|
||||
return state.getLo() <= 0;
|
||||
}
|
||||
|
||||
public int getUsageCount()
|
||||
{
|
||||
return Math.max(state.getHi(), 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
long encoded = state.get();
|
||||
return super.toString() + " stateHi=" + AtomicBiInteger.getHi(encoded) +
|
||||
" stateLo=" + AtomicBiInteger.getLo(encoded) + " pooled=" + pooled;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,442 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
public class PoolTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testAcquireRelease()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1,0);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getPooled(), equalTo("aaa"));
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.release(e1), is(true));
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
|
||||
assertThrows(IllegalStateException.class, () -> pool.release(e1));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(e2.getPooled(), equalTo("aaa"));
|
||||
assertThat(pool.release(e2), is(true));
|
||||
assertThrows(NullPointerException.class, () -> pool.release(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveBeforeRelease()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1,0);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCloseBeforeRelease()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1,0);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.size(), is(1));
|
||||
pool.close();
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxPoolSize()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReserve()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
Pool<String>.Entry entry = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(entry.isClosed(), is(true));
|
||||
|
||||
assertThrows(NullPointerException.class, () -> entry.enable(null));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(entry.isClosed(), is(true));
|
||||
|
||||
entry.enable("aaa");
|
||||
assertThat(entry.isClosed(), is(false));
|
||||
assertThat(pool.acquire().getPooled(), notNullValue());
|
||||
|
||||
assertThrows(IllegalStateException.class, () -> entry.enable("bbb"));
|
||||
|
||||
Pool<String>.Entry e2 = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.remove(e2), is(true));
|
||||
assertThat(pool.size(), is(1));
|
||||
|
||||
pool.reserve(-1);
|
||||
assertThat(pool.size(), is(2));
|
||||
pool.close();
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
assertThat(entry.isClosed(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReserveMaxPending()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
assertThat(pool.reserve(0), nullValue());
|
||||
assertThat(pool.reserve(1), notNullValue());
|
||||
assertThat(pool.reserve(1), nullValue());
|
||||
assertThat(pool.reserve(2), notNullValue());
|
||||
assertThat(pool.reserve(2), nullValue());
|
||||
assertThat(pool.reserve(3), nullValue());
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReserveNegativeMaxPending()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClose()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
assertThat(pool.isClosed(), is(false));
|
||||
pool.close();
|
||||
pool.close();
|
||||
|
||||
assertThat(pool.isClosed(), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosingCloseable()
|
||||
{
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
Pool<Closeable> pool = new Pool<>(1,0);
|
||||
Closeable pooled = () -> closed.set(true);
|
||||
pool.reserve(-1).enable(pooled);
|
||||
assertThat(closed.get(), is(false));
|
||||
pool.close();
|
||||
assertThat(closed.get(), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemove()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThrows(NullPointerException.class, () -> pool.remove(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValuesSize()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.values().isEmpty(), is(true));
|
||||
pool.reserve(-1).enable("aaa");
|
||||
pool.reserve(-1).enable("bbb");
|
||||
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
|
||||
assertThat(pool.size(), is(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValuesContainsAcquiredEntries()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
|
||||
pool.reserve(-1).enable("aaa");
|
||||
pool.reserve(-1).enable("bbb");
|
||||
assertThat(pool.acquire(), notNullValue());
|
||||
assertThat(pool.acquire(), notNullValue());
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.values().isEmpty(), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAcquireAt()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
|
||||
pool.reserve(-1).enable("aaa");
|
||||
pool.reserve(-1).enable("bbb");
|
||||
|
||||
assertThat(pool.acquireAt(2), nullValue());
|
||||
assertThat(pool.acquireAt(0), notNullValue());
|
||||
assertThat(pool.acquireAt(0), nullValue());
|
||||
assertThat(pool.acquireAt(1), notNullValue());
|
||||
assertThat(pool.acquireAt(1), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxUsageCount()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(false));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.size(), is(0));
|
||||
Pool<String>.Entry e1Copy = e1;
|
||||
assertThat(pool.release(e1Copy), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxMultiplex()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, 0);
|
||||
pool.setMaxMultiplex(3);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
pool.reserve(-1).enable("bbb");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<String>.Entry e3 = pool.acquire();
|
||||
Pool<String>.Entry e4 = pool.acquire();
|
||||
assertThat(e1.getPooled(), equalTo("aaa"));
|
||||
assertThat(e1, sameInstance(e2));
|
||||
assertThat(e1, sameInstance(e3));
|
||||
assertThat(e4.getPooled(), equalTo("bbb"));
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e5 = pool.acquire();
|
||||
assertThat(e2, sameInstance(e5));
|
||||
Pool<String>.Entry e6 = pool.acquire();
|
||||
assertThat(e4, sameInstance(e6));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveMultiplexed()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
|
||||
assertThat(pool.values().stream().findFirst().get().isClosed(), is(true));
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
|
||||
assertThat(pool.release(e1), is(false));
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiplexRemoveThenAcquireThenReleaseRemove()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(e1.isClosed(), is(true));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.release(e2), is(false));
|
||||
assertThat(pool.remove(e2), is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonMultiplexRemoveAfterAcquire()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiplexRemoveAfterAcquire()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.remove(e2), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
|
||||
assertThat(pool.release(e1), is(false));
|
||||
assertThat(pool.size(), is(0));
|
||||
|
||||
Pool<String>.Entry e3 = pool.acquire();
|
||||
assertThat(e3, nullValue());
|
||||
|
||||
assertThat(pool.release(e2), is(false));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReleaseThenRemoveNonEnabledEntry()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
Pool<String>.Entry e = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.release(e), is(false));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveNonEnabledEntry()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
Pool<String>.Entry e = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiplexMaxUsageReachedAcquireThenRemove()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e0 = pool.acquire();
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(pool.release(e2), is(true));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
||||
assertThat(pool.remove(e0), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e0 = pool.acquire();
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(pool.release(e2), is(true));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
||||
assertThat(pool.release(e0), is(false));
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
|
||||
assertThat(pool.values().stream().findFirst().get().isClosed(), is(false));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e0), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUsageCountAfterReachingMaxMultiplexLimit()
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, 0);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(10);
|
||||
pool.reserve(-1).enable("aaa");
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getUsageCount(), is(1));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(e1.getUsageCount(), is(2));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(e1.getUsageCount(), is(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigLimits()
|
||||
{
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(-1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxUsageCount(0));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue