mirror of
https://github.com/apache/httpcomponents-client.git
synced 2025-02-16 23:16:33 +00:00
HTTPCLIENT-677: replacing synchronized with Lock/Condition
git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@607287 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2e6c2957ef
commit
caf0b10534
@ -37,6 +37,8 @@
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -53,12 +55,21 @@
|
||||
/**
|
||||
* An abstract connection pool.
|
||||
* It is used by the {@link ThreadSafeClientConnManager}.
|
||||
* The abstract pool includes a {@link #poolLock}, which is used to
|
||||
* synchronize access to the internal pool datastructures.
|
||||
* Don't use <code>synchronized</code> for that purpose!
|
||||
*/
|
||||
public abstract class AbstractConnPool implements RefQueueHandler {
|
||||
|
||||
//@@@ protected, obtain with getClass()?
|
||||
private final Log LOG = LogFactory.getLog(AbstractConnPool.class);
|
||||
|
||||
/**
|
||||
* The global lock for this pool.
|
||||
*/
|
||||
protected final Lock poolLock;
|
||||
|
||||
|
||||
/**
|
||||
* References to issued connections.
|
||||
* Objects in this set are of class
|
||||
@ -135,6 +146,9 @@ protected AbstractConnPool(ClientConnectionManager mgr) {
|
||||
issuedConnections = new HashSet<BasicPoolEntryRef>();
|
||||
idleConnHandler = new IdleConnectionHandler();
|
||||
|
||||
boolean fair = false; //@@@ check parameters to decide
|
||||
poolLock = new ReentrantLock(fair);
|
||||
|
||||
boolean conngc = true; //@@@ check parameters to decide
|
||||
if (conngc) {
|
||||
refQueue = new ReferenceQueue<Object>();
|
||||
@ -185,25 +199,33 @@ public abstract void freeEntry(BasicPoolEntry entry)
|
||||
|
||||
|
||||
// non-javadoc, see interface RefQueueHandler
|
||||
public synchronized void handleReference(Reference<?> ref) {
|
||||
public void handleReference(Reference<?> ref) {
|
||||
|
||||
if (ref instanceof BasicPoolEntryRef) {
|
||||
// check if the GCed pool entry was still in use
|
||||
//@@@ find a way to detect this without lookup
|
||||
//@@@ flag in the BasicPoolEntryRef, to be reset when freed?
|
||||
final boolean lost = issuedConnections.remove(ref);
|
||||
if (lost) {
|
||||
final HttpRoute route = ((BasicPoolEntryRef)ref).getRoute();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connection garbage collected. " + route);
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
if (ref instanceof BasicPoolEntryRef) {
|
||||
// check if the GCed pool entry was still in use
|
||||
//@@@ find a way to detect this without lookup
|
||||
//@@@ flag in the BasicPoolEntryRef, to be reset when freed?
|
||||
final boolean lost = issuedConnections.remove(ref);
|
||||
if (lost) {
|
||||
final HttpRoute route =
|
||||
((BasicPoolEntryRef)ref).getRoute();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connection garbage collected. " + route);
|
||||
}
|
||||
handleLostEntry(route);
|
||||
}
|
||||
handleLostEntry(route);
|
||||
} else if (ref instanceof ConnMgrRef) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connection manager garbage collected.");
|
||||
}
|
||||
shutdown();
|
||||
}
|
||||
} else if (ref instanceof ConnMgrRef) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Connection manager garbage collected. ");
|
||||
}
|
||||
shutdown();
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -225,8 +247,14 @@ protected abstract void handleLostEntry(HttpRoute route)
|
||||
* @param idletime the time the connections should have been idle
|
||||
* in order to be closed now
|
||||
*/
|
||||
public synchronized void closeIdleConnections(long idletime) {
|
||||
idleConnHandler.closeIdleConnections(idletime);
|
||||
public void closeIdleConnections(long idletime) {
|
||||
|
||||
try {
|
||||
poolLock.lock();
|
||||
idleConnHandler.closeIdleConnections(idletime);
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
//@@@ revise this cleanup stuff (closeIdle+deleteClosed), it's not good
|
||||
@ -242,31 +270,38 @@ public abstract void deleteClosedConnections()
|
||||
* Shuts down this pool and all associated resources.
|
||||
* Overriding methods MUST call the implementation here!
|
||||
*/
|
||||
public synchronized void shutdown() {
|
||||
public void shutdown() {
|
||||
|
||||
if (isShutDown)
|
||||
return;
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
// no point in monitoring GC anymore
|
||||
if (refWorker != null)
|
||||
refWorker.shutdown();
|
||||
if (isShutDown)
|
||||
return;
|
||||
|
||||
// close all connections that are issued to an application
|
||||
Iterator<BasicPoolEntryRef> iter = issuedConnections.iterator();
|
||||
while (iter.hasNext()) {
|
||||
BasicPoolEntryRef per = iter.next();
|
||||
iter.remove();
|
||||
BasicPoolEntry entry = per.get();
|
||||
if (entry != null) {
|
||||
closeConnection(entry.getConnection());
|
||||
// no point in monitoring GC anymore
|
||||
if (refWorker != null)
|
||||
refWorker.shutdown();
|
||||
|
||||
// close all connections that are issued to an application
|
||||
Iterator<BasicPoolEntryRef> iter = issuedConnections.iterator();
|
||||
while (iter.hasNext()) {
|
||||
BasicPoolEntryRef per = iter.next();
|
||||
iter.remove();
|
||||
BasicPoolEntry entry = per.get();
|
||||
if (entry != null) {
|
||||
closeConnection(entry.getConnection());
|
||||
}
|
||||
}
|
||||
|
||||
// remove all references to connections
|
||||
//@@@ use this for shutting them down instead?
|
||||
idleConnHandler.removeAll();
|
||||
|
||||
isShutDown = true;
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
// remove all references to connections
|
||||
//@@@ use this for shutting them down instead?
|
||||
idleConnHandler.removeAll();
|
||||
|
||||
isShutDown = true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -35,6 +35,8 @@
|
||||
import java.util.Queue;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -55,6 +57,9 @@
|
||||
* <li>connections are re-used only for the exact same route</li>
|
||||
* <li>connection limits are enforced per route rather than per host</li>
|
||||
* </ul>
|
||||
* Note that access to the pool datastructures is synchronized via the
|
||||
* {@link AbstractConnPool#poolLock poolLock} in the base class,
|
||||
* not via <code>synchronized</code> methods.
|
||||
*
|
||||
* @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
|
||||
* @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
|
||||
@ -66,6 +71,10 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
||||
private final Log LOG = LogFactory.getLog(ConnPoolByRoute.class);
|
||||
|
||||
|
||||
/** Temporary hack: @@@ a global condition that goes with the lock. */
|
||||
protected final Condition poolCondition;
|
||||
|
||||
|
||||
/** The list of free connections */
|
||||
private Queue<BasicPoolEntry> freeConnections;
|
||||
|
||||
@ -114,6 +123,8 @@ protected static class WaitingThread {
|
||||
public ConnPoolByRoute(ClientConnectionManager mgr) {
|
||||
super(mgr);
|
||||
|
||||
poolCondition = poolLock.newCondition(); //@@@ temporary hack
|
||||
|
||||
//@@@ use factory method, at least for waitingThreads
|
||||
freeConnections = new LinkedList<BasicPoolEntry>();
|
||||
waitingThreads = new LinkedList<WaitingThread>();
|
||||
@ -130,14 +141,22 @@ public ConnPoolByRoute(ClientConnectionManager mgr) {
|
||||
* @return the pool for the argument route,
|
||||
* never <code>null</code> if <code>create</code> is <code>true</code>
|
||||
*/
|
||||
protected synchronized RouteSpecificPool getRoutePool(HttpRoute route,
|
||||
boolean create) {
|
||||
protected RouteSpecificPool getRoutePool(HttpRoute route,
|
||||
boolean create) {
|
||||
RouteSpecificPool rospl = null;
|
||||
|
||||
RouteSpecificPool rospl = routeToPool.get(route);
|
||||
if ((rospl == null) && create) {
|
||||
// no pool for this route yet (or anymore)
|
||||
rospl = newRouteSpecificPool(route);
|
||||
routeToPool.put(route, rospl);
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
rospl = routeToPool.get(route);
|
||||
if ((rospl == null) && create) {
|
||||
// no pool for this route yet (or anymore)
|
||||
rospl = newRouteSpecificPool(route);
|
||||
routeToPool.put(route, rospl);
|
||||
}
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
return rospl;
|
||||
@ -158,119 +177,133 @@ protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
|
||||
|
||||
|
||||
//@@@ consider alternatives for gathering statistics
|
||||
public synchronized int getConnectionsInPool(HttpRoute route) {
|
||||
//@@@ don't allow a pool to be created here!
|
||||
RouteSpecificPool rospl = getRoutePool(route, false);
|
||||
return (rospl != null) ? rospl.getEntryCount() : 0;
|
||||
public int getConnectionsInPool(HttpRoute route) {
|
||||
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
// don't allow a pool to be created here!
|
||||
RouteSpecificPool rospl = getRoutePool(route, false);
|
||||
return (rospl != null) ? rospl.getEntryCount() : 0;
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// non-javadoc, see base class AbstractConnPool
|
||||
public synchronized
|
||||
BasicPoolEntry getEntry(HttpRoute route, long timeout,
|
||||
ClientConnectionOperator operator)
|
||||
public BasicPoolEntry getEntry(HttpRoute route, long timeout,
|
||||
ClientConnectionOperator operator)
|
||||
throws ConnectionPoolTimeoutException, InterruptedException {
|
||||
|
||||
BasicPoolEntry entry = null;
|
||||
|
||||
int maxHostConnections = HttpConnectionManagerParams
|
||||
.getMaxConnectionsPerHost(this.params, route);
|
||||
int maxTotalConnections = HttpConnectionManagerParams
|
||||
.getMaxTotalConnections(this.params);
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
WaitingThread waitingThread = null;
|
||||
BasicPoolEntry entry = null;
|
||||
|
||||
boolean useTimeout = (timeout > 0);
|
||||
long timeToWait = timeout;
|
||||
long startWait = 0;
|
||||
long endWait = 0;
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
while (entry == null) {
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
WaitingThread waitingThread = null;
|
||||
|
||||
if (isShutDown) {
|
||||
throw new IllegalStateException
|
||||
("Connection pool shut down.");
|
||||
}
|
||||
boolean useTimeout = (timeout > 0);
|
||||
long timeToWait = timeout;
|
||||
long startWait = 0;
|
||||
long endWait = 0;
|
||||
|
||||
// the cases to check for:
|
||||
// - have a free connection for that route
|
||||
// - allowed to create a free connection for that route
|
||||
// - can delete and replace a free connection for another route
|
||||
// - need to wait for one of the things above to come true
|
||||
while (entry == null) {
|
||||
|
||||
entry = getFreeEntry(rospl);
|
||||
if (entry != null) {
|
||||
// we're fine
|
||||
//@@@ yeah this is ugly, but historical... will be revised
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(numConnections < maxTotalConnections)) {
|
||||
if (isShutDown) {
|
||||
throw new IllegalStateException
|
||||
("Connection pool shut down.");
|
||||
}
|
||||
|
||||
entry = createEntry(rospl, operator);
|
||||
// the cases to check for:
|
||||
// - have a free connection for that route
|
||||
// - allowed to create a free connection for that route
|
||||
// - can delete and replace a free connection for another route
|
||||
// - need to wait for one of the things above to come true
|
||||
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(freeConnections.size() > 0)) {
|
||||
entry = getFreeEntry(rospl);
|
||||
if (entry != null) {
|
||||
// we're fine
|
||||
//@@@ yeah this is ugly, but historical... will be revised
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(numConnections < maxTotalConnections)) {
|
||||
|
||||
deleteLeastUsedEntry();
|
||||
entry = createEntry(rospl, operator);
|
||||
entry = createEntry(rospl, operator);
|
||||
|
||||
} else {
|
||||
// TODO: keep track of which routes have waiting threads,
|
||||
// so they avoid being sacrificed before necessary
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(freeConnections.size() > 0)) {
|
||||
|
||||
try {
|
||||
if (useTimeout && timeToWait <= 0) {
|
||||
throw new ConnectionPoolTimeoutException
|
||||
("Timeout waiting for connection");
|
||||
}
|
||||
deleteLeastUsedEntry();
|
||||
entry = createEntry(rospl, operator);
|
||||
|
||||
} else {
|
||||
// TODO: keep track of which routes have waiting threads,
|
||||
// so they avoid being sacrificed before necessary
|
||||
|
||||
try {
|
||||
if (useTimeout && timeToWait <= 0) {
|
||||
throw new ConnectionPoolTimeoutException
|
||||
("Timeout waiting for connection");
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Need to wait for connection. " + route);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Need to wait for connection. " + route);
|
||||
}
|
||||
|
||||
if (waitingThread == null) {
|
||||
waitingThread = new WaitingThread();
|
||||
waitingThread.pool = rospl;
|
||||
waitingThread.thread = Thread.currentThread();
|
||||
} else {
|
||||
waitingThread.interruptedByConnectionPool = false;
|
||||
}
|
||||
if (waitingThread == null) {
|
||||
waitingThread = new WaitingThread();
|
||||
waitingThread.pool = rospl;
|
||||
waitingThread.thread = Thread.currentThread();
|
||||
} else {
|
||||
waitingThread.interruptedByConnectionPool = false;
|
||||
}
|
||||
|
||||
if (useTimeout) {
|
||||
startWait = System.currentTimeMillis();
|
||||
}
|
||||
if (useTimeout) {
|
||||
startWait = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
rospl.queueThread(waitingThread);
|
||||
waitingThreads.add(waitingThread);
|
||||
wait(timeToWait);
|
||||
rospl.queueThread(waitingThread);
|
||||
waitingThreads.add(waitingThread);
|
||||
poolCondition.await(timeToWait, TimeUnit.MILLISECONDS);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
if (!waitingThread.interruptedByConnectionPool) {
|
||||
LOG.debug("Interrupted while waiting for connection.", e);
|
||||
throw e;
|
||||
}
|
||||
// Else, do nothing, we were interrupted by the
|
||||
// connection pool and should now have a connection
|
||||
// waiting for us. Continue in the loop and get it.
|
||||
// Or else we are shutting down, which is also
|
||||
// detected in the loop.
|
||||
} finally {
|
||||
if (!waitingThread.interruptedByConnectionPool) {
|
||||
// Either we timed out, experienced a
|
||||
// "spurious wakeup", or were interrupted by an
|
||||
// external thread. Regardless we need to
|
||||
// cleanup for ourselves in the wait queue.
|
||||
rospl.removeThread(waitingThread);
|
||||
waitingThreads.remove(waitingThread);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
if (!waitingThread.interruptedByConnectionPool) {
|
||||
LOG.debug("Interrupted while waiting for connection.", e);
|
||||
throw e;
|
||||
}
|
||||
// Else, do nothing, we were interrupted by the
|
||||
// connection pool and should now have a connection
|
||||
// waiting for us. Continue in the loop and get it.
|
||||
// Or else we are shutting down, which is also
|
||||
// detected in the loop.
|
||||
} finally {
|
||||
if (!waitingThread.interruptedByConnectionPool) {
|
||||
// Either we timed out, experienced a
|
||||
// "spurious wakeup", or were interrupted by an
|
||||
// external thread. Regardless we need to
|
||||
// cleanup for ourselves in the wait queue.
|
||||
rospl.removeThread(waitingThread);
|
||||
waitingThreads.remove(waitingThread);
|
||||
}
|
||||
|
||||
if (useTimeout) {
|
||||
endWait = System.currentTimeMillis();
|
||||
timeToWait -= (endWait - startWait);
|
||||
if (useTimeout) {
|
||||
endWait = System.currentTimeMillis();
|
||||
timeToWait -= (endWait - startWait);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} // while no entry
|
||||
} // while no entry
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
return entry;
|
||||
|
||||
@ -278,38 +311,45 @@ BasicPoolEntry getEntry(HttpRoute route, long timeout,
|
||||
|
||||
|
||||
// non-javadoc, see base class AbstractConnPool
|
||||
public synchronized void freeEntry(BasicPoolEntry entry) {
|
||||
public void freeEntry(BasicPoolEntry entry) {
|
||||
|
||||
HttpRoute route = entry.getPlannedRoute();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Freeing connection. " + route);
|
||||
}
|
||||
|
||||
if (isShutDown) {
|
||||
// the pool is shut down, release the
|
||||
// connection's resources and get out of here
|
||||
closeConnection(entry.getConnection());
|
||||
return;
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
if (isShutDown) {
|
||||
// the pool is shut down, release the
|
||||
// connection's resources and get out of here
|
||||
closeConnection(entry.getConnection());
|
||||
return;
|
||||
}
|
||||
|
||||
// no longer issued, we keep a hard reference now
|
||||
issuedConnections.remove(entry.getWeakRef());
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
|
||||
rospl.freeEntry(entry);
|
||||
freeConnections.add(entry);
|
||||
|
||||
if (numConnections == 0) {
|
||||
// for some reason this pool didn't already exist
|
||||
LOG.error("Master connection pool not found. " + route);
|
||||
numConnections = 1;
|
||||
}
|
||||
|
||||
idleConnHandler.add(entry.getConnection());
|
||||
|
||||
notifyWaitingThread(rospl);
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
// no longer issued, we keep a hard reference now
|
||||
issuedConnections.remove(entry.getWeakRef());
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
|
||||
rospl.freeEntry(entry);
|
||||
freeConnections.add(entry);
|
||||
|
||||
if (numConnections == 0) {
|
||||
// for some reason this pool didn't already exist
|
||||
LOG.error("Master connection pool not found. " + route);
|
||||
numConnections = 1;
|
||||
}
|
||||
|
||||
idleConnHandler.add(entry.getConnection());
|
||||
|
||||
notifyWaitingThread(rospl);
|
||||
|
||||
} // freeEntry
|
||||
|
||||
|
||||
@ -322,25 +362,33 @@ public synchronized void freeEntry(BasicPoolEntry entry) {
|
||||
* @return an available pool entry for the given route, or
|
||||
* <code>null</code> if none is available
|
||||
*/
|
||||
protected synchronized
|
||||
BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
|
||||
protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
|
||||
|
||||
BasicPoolEntry entry = rospl.allocEntry();
|
||||
BasicPoolEntry entry = null;
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
if (entry != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Getting free connection. " + rospl.getRoute());
|
||||
entry = rospl.allocEntry();
|
||||
|
||||
if (entry != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Getting free connection. " + rospl.getRoute());
|
||||
}
|
||||
freeConnections.remove(entry);
|
||||
idleConnHandler.remove(entry.getConnection());// no longer idle
|
||||
|
||||
issuedConnections.add(entry.getWeakRef());
|
||||
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No free connections. " + rospl.getRoute());
|
||||
}
|
||||
}
|
||||
freeConnections.remove(entry);
|
||||
idleConnHandler.remove(entry.getConnection()); // no longer idle
|
||||
|
||||
issuedConnections.add(entry.getWeakRef());
|
||||
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No free connections. " + rospl.getRoute());
|
||||
}
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
@ -355,20 +403,27 @@ BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
|
||||
*
|
||||
* @return the new pool entry for a new connection
|
||||
*/
|
||||
protected synchronized
|
||||
BasicPoolEntry createEntry(RouteSpecificPool rospl,
|
||||
ClientConnectionOperator op) {
|
||||
protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
|
||||
ClientConnectionOperator op) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new connection. " + rospl.getRoute());
|
||||
}
|
||||
// the entry will create the connection when needed
|
||||
BasicPoolEntry entry =
|
||||
new BasicPoolEntry(op, rospl.getRoute(), refQueue);
|
||||
rospl.createdEntry(entry);
|
||||
numConnections++;
|
||||
|
||||
BasicPoolEntry entry = null;
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
// the entry will create the connection when needed
|
||||
entry = new BasicPoolEntry(op, rospl.getRoute(), refQueue);
|
||||
rospl.createdEntry(entry);
|
||||
numConnections++;
|
||||
|
||||
issuedConnections.add(entry.getWeakRef());
|
||||
issuedConnections.add(entry.getWeakRef());
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
@ -385,7 +440,7 @@ BasicPoolEntry createEntry(RouteSpecificPool rospl,
|
||||
*
|
||||
* @param entry the pool entry for the connection to delete
|
||||
*/
|
||||
protected synchronized void deleteEntry(BasicPoolEntry entry) {
|
||||
protected void deleteEntry(BasicPoolEntry entry) {
|
||||
|
||||
HttpRoute route = entry.getPlannedRoute();
|
||||
|
||||
@ -393,16 +448,23 @@ protected synchronized void deleteEntry(BasicPoolEntry entry) {
|
||||
LOG.debug("Deleting connection. " + route);
|
||||
}
|
||||
|
||||
closeConnection(entry.getConnection());
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
rospl.deleteEntry(entry);
|
||||
numConnections--;
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
closeConnection(entry.getConnection());
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
rospl.deleteEntry(entry);
|
||||
numConnections--;
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
}
|
||||
|
||||
idleConnHandler.remove(entry.getConnection());// not idle, but dead
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
idleConnHandler.remove(entry.getConnection()); // not idle, but dead
|
||||
}
|
||||
|
||||
|
||||
@ -410,31 +472,45 @@ protected synchronized void deleteEntry(BasicPoolEntry entry) {
|
||||
* Delete an old, free pool entry to make room for a new one.
|
||||
* Used to replace pool entries with ones for a different route.
|
||||
*/
|
||||
protected synchronized void deleteLeastUsedEntry() {
|
||||
protected void deleteLeastUsedEntry() {
|
||||
|
||||
//@@@ with get() instead of remove, we could
|
||||
//@@@ leave the removing to deleteEntry()
|
||||
BasicPoolEntry entry = freeConnections.remove();
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
if (entry != null) {
|
||||
deleteEntry(entry);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No free connection to delete.");
|
||||
//@@@ with get() instead of remove, we could
|
||||
//@@@ leave the removing to deleteEntry()
|
||||
BasicPoolEntry entry = freeConnections.remove();
|
||||
|
||||
if (entry != null) {
|
||||
deleteEntry(entry);
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No free connection to delete.");
|
||||
}
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// non-javadoc, see base class AbstractConnPool
|
||||
protected synchronized void handleLostEntry(HttpRoute route) {
|
||||
protected void handleLostEntry(HttpRoute route) {
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
rospl.dropEntry();
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
rospl.dropEntry();
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
}
|
||||
|
||||
numConnections--;
|
||||
notifyWaitingThread(rospl);
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
numConnections--;
|
||||
notifyWaitingThread(rospl);
|
||||
}
|
||||
|
||||
|
||||
@ -446,7 +522,7 @@ protected synchronized void handleLostEntry(HttpRoute route) {
|
||||
*
|
||||
* @param rospl the pool in which to notify, or <code>null</code>
|
||||
*/
|
||||
protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) {
|
||||
protected void notifyWaitingThread(RouteSpecificPool rospl) {
|
||||
|
||||
//@@@ while this strategy provides for best connection re-use,
|
||||
//@@@ is it fair? only do this if the connection is open?
|
||||
@ -455,28 +531,35 @@ protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) {
|
||||
// it from all wait queues before interrupting.
|
||||
WaitingThread waitingThread = null;
|
||||
|
||||
if ((rospl != null) && rospl.hasThread()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying thread waiting on pool. "
|
||||
+ rospl.getRoute());
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
if ((rospl != null) && rospl.hasThread()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying thread waiting on pool. "
|
||||
+ rospl.getRoute());
|
||||
}
|
||||
waitingThread = rospl.dequeueThread();
|
||||
waitingThreads.remove(waitingThread);
|
||||
|
||||
} else if (!waitingThreads.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying thread waiting on any pool.");
|
||||
}
|
||||
waitingThread = waitingThreads.remove();
|
||||
waitingThread.pool.removeThread(waitingThread);
|
||||
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying no-one, there are no waiting threads");
|
||||
}
|
||||
waitingThread = rospl.dequeueThread();
|
||||
waitingThreads.remove(waitingThread);
|
||||
|
||||
} else if (!waitingThreads.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying thread waiting on any pool.");
|
||||
if (waitingThread != null) {
|
||||
waitingThread.interruptedByConnectionPool = true;
|
||||
waitingThread.thread.interrupt();
|
||||
}
|
||||
waitingThread = waitingThreads.remove();
|
||||
waitingThread.pool.removeThread(waitingThread);
|
||||
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying no-one, there are no waiting threads");
|
||||
}
|
||||
|
||||
if (waitingThread != null) {
|
||||
waitingThread.interruptedByConnectionPool = true;
|
||||
waitingThread.thread.interrupt();
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -484,44 +567,57 @@ protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) {
|
||||
//@@@ revise this cleanup stuff
|
||||
//@@@ move method to base class when deleteEntry() is fixed
|
||||
// non-javadoc, see base class AbstractConnPool
|
||||
public synchronized void deleteClosedConnections() {
|
||||
public void deleteClosedConnections() {
|
||||
|
||||
Iterator<BasicPoolEntry> iter = freeConnections.iterator();
|
||||
while (iter.hasNext()) {
|
||||
BasicPoolEntry entry = iter.next();
|
||||
if (!entry.getConnection().isOpen()) {
|
||||
iter.remove();
|
||||
deleteEntry(entry);
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
Iterator<BasicPoolEntry> iter = freeConnections.iterator();
|
||||
while (iter.hasNext()) {
|
||||
BasicPoolEntry entry = iter.next();
|
||||
if (!entry.getConnection().isOpen()) {
|
||||
iter.remove();
|
||||
deleteEntry(entry);
|
||||
}
|
||||
}
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// non-javadoc, see base class AbstractConnPool
|
||||
public synchronized void shutdown() {
|
||||
public void shutdown() {
|
||||
|
||||
super.shutdown();
|
||||
try {
|
||||
poolLock.lock();
|
||||
|
||||
// close all free connections
|
||||
//@@@ move this to base class?
|
||||
Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
|
||||
while (ibpe.hasNext()) {
|
||||
BasicPoolEntry entry = ibpe.next();
|
||||
ibpe.remove();
|
||||
closeConnection(entry.getConnection());
|
||||
super.shutdown();
|
||||
|
||||
// close all free connections
|
||||
//@@@ move this to base class?
|
||||
Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
|
||||
while (ibpe.hasNext()) {
|
||||
BasicPoolEntry entry = ibpe.next();
|
||||
ibpe.remove();
|
||||
closeConnection(entry.getConnection());
|
||||
}
|
||||
|
||||
// interrupt all waiting threads
|
||||
Iterator<WaitingThread> iwth = waitingThreads.iterator();
|
||||
while (iwth.hasNext()) {
|
||||
WaitingThread waiter = iwth.next();
|
||||
iwth.remove();
|
||||
waiter.interruptedByConnectionPool = true;
|
||||
waiter.thread.interrupt();
|
||||
}
|
||||
|
||||
routeToPool.clear();
|
||||
|
||||
} finally {
|
||||
poolLock.unlock();
|
||||
}
|
||||
|
||||
|
||||
// interrupt all waiting threads
|
||||
Iterator<WaitingThread> iwth = waitingThreads.iterator();
|
||||
while (iwth.hasNext()) {
|
||||
WaitingThread waiter = iwth.next();
|
||||
iwth.remove();
|
||||
waiter.interruptedByConnectionPool = true;
|
||||
waiter.thread.interrupt();
|
||||
}
|
||||
|
||||
routeToPool.clear();
|
||||
}
|
||||
|
||||
|
||||
|
@ -0,0 +1,183 @@
|
||||
/*
|
||||
* $HeadURL$
|
||||
* $Revision$
|
||||
* $Date$
|
||||
*
|
||||
* ====================================================================
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ====================================================================
|
||||
*
|
||||
* This software consists of voluntary contributions made by many
|
||||
* individuals on behalf of the Apache Software Foundation. For more
|
||||
* information on the Apache Software Foundation, please see
|
||||
* <http://www.apache.org/>.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.http.impl.conn.tsccm;
|
||||
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
|
||||
|
||||
/**
|
||||
* Represents a thread waiting for a connection.
|
||||
* This class implements throwaway objects. It is instantiated whenever
|
||||
* a thread needs to wait. Instances are not re-used, except if the
|
||||
* waiting thread experiences a spurious wakeup and continues to wait.
|
||||
* <br/>
|
||||
* All methods assume external synchronization on the condition
|
||||
* passed to the constructor.
|
||||
* Instances of this class do <i>not</i> synchronize access!
|
||||
*
|
||||
* @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
|
||||
*/
|
||||
public class WaitingThread {
|
||||
|
||||
/** The condition on which the thread is waiting. */
|
||||
private final Condition cond;
|
||||
|
||||
/** The route specific pool on which the thread is waiting. */
|
||||
//@@@ replace with generic pool interface
|
||||
private final RouteSpecificPool pool;
|
||||
|
||||
/** The thread that is waiting for an entry. */
|
||||
private Thread waiter;
|
||||
|
||||
|
||||
/**
|
||||
* Indicates the source of an interruption.
|
||||
* Set to <code>true</code> inside
|
||||
* {@link #notifyWaitingThread(RouteSpecificPool)}
|
||||
* and {@link #shutdown shutdown()}
|
||||
* before the thread is interrupted.
|
||||
* If not set, the thread was interrupted from the outside.
|
||||
*/
|
||||
private boolean interruptedByConnectionPool;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new entry for a waiting thread.
|
||||
*
|
||||
* @param cond the condition for which to wait
|
||||
* @param pool the pool on which the thread will be waiting,
|
||||
* or <code>null</code>
|
||||
*/
|
||||
public WaitingThread(Condition cond, RouteSpecificPool pool) {
|
||||
|
||||
if (cond == null) {
|
||||
throw new IllegalArgumentException("Condition must not be null.");
|
||||
}
|
||||
|
||||
this.cond = cond;
|
||||
this.pool = pool;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Blocks the calling thread.
|
||||
* This method returns when the thread is notified or interrupted,
|
||||
* if a timeout occurrs, or if there is a spurious wakeup.
|
||||
* <br/>
|
||||
* This method assumes external synchronization.
|
||||
*
|
||||
* @param timeout the timeout in milliseconds, or 0 for no timeout
|
||||
*
|
||||
* @see #wakeup
|
||||
*/
|
||||
public void await(int timeout)
|
||||
throws InterruptedException {
|
||||
|
||||
//@@@ check timeout for negative, or assume overflow?
|
||||
|
||||
// This is only a sanity check. We cannot not synchronize here,
|
||||
// the lock would not be released on calling cond.await() below.
|
||||
if (this.waiter != null) {
|
||||
throw new IllegalStateException
|
||||
("A thread is already waiting on this object." +
|
||||
"\ncaller: " + Thread.currentThread() +
|
||||
"\nwaiter: " + this.waiter);
|
||||
}
|
||||
|
||||
this.waiter = Thread.currentThread();
|
||||
|
||||
try {
|
||||
//@@@ how to convert the int timeout to the long argument?
|
||||
//@@@ (timeout & 0xffffffffL)? or check for negative above?
|
||||
this.cond.await(timeout, TimeUnit.MILLISECONDS);
|
||||
} finally {
|
||||
this.waiter = null;
|
||||
}
|
||||
} // await
|
||||
|
||||
|
||||
/**
|
||||
* Wakes up the waiting thread.
|
||||
* <br/>
|
||||
* This method assumes external synchronization.
|
||||
*/
|
||||
public void wakeup() {
|
||||
|
||||
// If external synchronization and pooling works properly,
|
||||
// this cannot happen. Just a sanity check.
|
||||
if (this.waiter == null) {
|
||||
throw new IllegalStateException
|
||||
("Nobody waiting on this object.");
|
||||
}
|
||||
|
||||
// One condition might be shared by several WaitingThread instances.
|
||||
// It probably isn't, but just in case: wake all, not just one.
|
||||
this.cond.signalAll();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains the condition.
|
||||
*
|
||||
* @return the condition on which to wait, never <code>null</code>
|
||||
*/
|
||||
public final Condition getCondition() {
|
||||
// not synchronized
|
||||
return this.cond;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains the pool, if there is one.
|
||||
*
|
||||
* @return the pool on which a thread is or was waiting,
|
||||
* or <code>null</code>
|
||||
*/
|
||||
public final RouteSpecificPool getPool() {
|
||||
// not synchronized
|
||||
return this.pool;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains the thread, if there is one.
|
||||
*
|
||||
* @return the thread which is waiting, or <code>null</code>
|
||||
*/
|
||||
public final Thread getThread() {
|
||||
// not synchronized
|
||||
return this.waiter;
|
||||
}
|
||||
|
||||
|
||||
} // class WaitingThread
|
Loading…
x
Reference in New Issue
Block a user