factored RouteSpecificPool out from ConnPoolByRoute
git-svn-id: https://svn.apache.org/repos/asf/jakarta/httpcomponents/httpclient/trunk@561098 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6431580e82
commit
3476e5f3e2
|
@ -74,53 +74,23 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
/**
|
||||
* A map of route-specific pools.
|
||||
* Keys are of class {@link HttpRoute},
|
||||
* values of class {@link RouteConnPool}.
|
||||
* values of class {@link RouteSpecificPool}.
|
||||
*/
|
||||
private final Map routeToPool;
|
||||
|
||||
|
||||
/**
|
||||
* A simple struct-like class to combine the connection list
|
||||
* and the count of created connections.
|
||||
*/
|
||||
protected static class RouteConnPool {
|
||||
|
||||
/** The route this pool is for. */
|
||||
public final HttpRoute route;
|
||||
|
||||
/** The list of free connections. */
|
||||
public LinkedList freeConnections;
|
||||
|
||||
/** The list of WaitingThreads for this pool. */
|
||||
public LinkedList waitingThreads;
|
||||
|
||||
/** The number of created connections. */
|
||||
public int numConnections;
|
||||
|
||||
/**
|
||||
* Creates a new route-specific pool.
|
||||
*
|
||||
* @param r the route for which to pool
|
||||
*/
|
||||
public RouteConnPool(HttpRoute r) {
|
||||
this.route = r;
|
||||
this.freeConnections = new LinkedList();
|
||||
this.waitingThreads = new LinkedList();
|
||||
this.numConnections = 0;
|
||||
}
|
||||
} // class RouteConnPool
|
||||
|
||||
|
||||
/**
|
||||
* A thread and the pool in which it is waiting.
|
||||
* <!-- @@@ will be revised for HTTPCLIENT-677 -->
|
||||
*/
|
||||
private static class WaitingThread {
|
||||
protected static class WaitingThread {
|
||||
|
||||
/** The thread that is waiting for a connection */
|
||||
/** The thread that is waiting for an entry. */
|
||||
public Thread thread;
|
||||
|
||||
/** The connection pool the thread is waiting for */
|
||||
public RouteConnPool pool;
|
||||
/** The route specific pool the thread is waiting for. */
|
||||
public RouteSpecificPool pool;
|
||||
|
||||
/**
|
||||
* Indicates the source of an interruption.
|
||||
|
@ -153,19 +123,22 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
* Get a route-specific pool of available connections.
|
||||
*
|
||||
* @param route the route
|
||||
* @param create whether to create the pool if it doesn't exist
|
||||
*
|
||||
* @return the pool for the argument route, never <code>null</code>
|
||||
* @return the pool for the argument route,
|
||||
* never <code>null</code> if <code>create</code> is <code>true</code>
|
||||
*/
|
||||
protected synchronized RouteConnPool getRoutePool(HttpRoute route) {
|
||||
protected synchronized RouteSpecificPool getRoutePool(HttpRoute route,
|
||||
boolean create) {
|
||||
|
||||
RouteConnPool rcp = (RouteConnPool) routeToPool.get(route);
|
||||
if (rcp == null) {
|
||||
RouteSpecificPool rospl = (RouteSpecificPool) routeToPool.get(route);
|
||||
if ((rospl == null) && create) {
|
||||
// no pool for this route yet (or anymore)
|
||||
rcp = newRouteConnPool(route);
|
||||
routeToPool.put(route, rcp);
|
||||
rospl = newRouteSpecificPool(route);
|
||||
routeToPool.put(route, rospl);
|
||||
}
|
||||
|
||||
return rcp;
|
||||
return rospl;
|
||||
}
|
||||
|
||||
|
||||
|
@ -177,16 +150,16 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
*
|
||||
* @return the new pool
|
||||
*/
|
||||
protected RouteConnPool newRouteConnPool(HttpRoute route) {
|
||||
return new RouteConnPool(route);
|
||||
protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
|
||||
return new RouteSpecificPool(route);
|
||||
}
|
||||
|
||||
|
||||
//@@@ consider alternatives for gathering statistics
|
||||
public synchronized int getConnectionsInPool(HttpRoute route) {
|
||||
//@@@ don't allow a pool to be created here!
|
||||
RouteConnPool rcp = getRoutePool(route);
|
||||
return rcp.numConnections;
|
||||
RouteSpecificPool rospl = getRoutePool(route, false);
|
||||
return (rospl != null) ? rospl.getEntryCount() : 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -203,7 +176,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
int maxTotalConnections = HttpConnectionManagerParams
|
||||
.getMaxTotalConnections(this.params);
|
||||
|
||||
RouteConnPool routePool = getRoutePool(route);
|
||||
RouteSpecificPool rospl = getRoutePool(route, true);
|
||||
WaitingThread waitingThread = null;
|
||||
|
||||
boolean useTimeout = (timeout > 0);
|
||||
|
@ -224,19 +197,20 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
// - can delete and replace a free connection for another route
|
||||
// - need to wait for one of the things above to come true
|
||||
|
||||
if (routePool.freeConnections.size() > 0) {
|
||||
entry = getFreeEntry(routePool);
|
||||
|
||||
} else if ((routePool.numConnections < maxHostConnections) &&
|
||||
entry = getFreeEntry(rospl);
|
||||
if (entry != null) {
|
||||
// we're fine
|
||||
//@@@ yeah this is ugly, but historical... will be revised
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(numConnections < maxTotalConnections)) {
|
||||
|
||||
entry = createEntry(routePool, operator);
|
||||
entry = createEntry(rospl, operator);
|
||||
|
||||
} else if ((routePool.numConnections < maxHostConnections) &&
|
||||
} else if ((rospl.getEntryCount() < maxHostConnections) &&
|
||||
(freeConnections.size() > 0)) {
|
||||
|
||||
deleteLeastUsedEntry();
|
||||
entry = createEntry(routePool, operator);
|
||||
entry = createEntry(rospl, operator);
|
||||
|
||||
} else {
|
||||
// TODO: keep track of which routes have waiting threads,
|
||||
|
@ -254,7 +228,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
|
||||
if (waitingThread == null) {
|
||||
waitingThread = new WaitingThread();
|
||||
waitingThread.pool = routePool;
|
||||
waitingThread.pool = rospl;
|
||||
waitingThread.thread = Thread.currentThread();
|
||||
} else {
|
||||
waitingThread.interruptedByConnectionPool = false;
|
||||
|
@ -264,7 +238,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
startWait = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
routePool.waitingThreads.addLast(waitingThread);
|
||||
rospl.waitingThreads.addLast(waitingThread);
|
||||
waitingThreads.addLast(waitingThread);
|
||||
wait(timeToWait);
|
||||
|
||||
|
@ -285,7 +259,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
// "spurious wakeup", or were interrupted by an
|
||||
// external thread. Regardless we need to
|
||||
// cleanup for ourselves in the wait queue.
|
||||
routePool.waitingThreads.remove(waitingThread);
|
||||
rospl.waitingThreads.remove(waitingThread);
|
||||
waitingThreads.remove(waitingThread);
|
||||
}
|
||||
|
||||
|
@ -320,15 +294,9 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
// no longer issued, we keep a hard reference now
|
||||
issuedConnections.remove(entry.getWeakRef());
|
||||
|
||||
RouteConnPool routePool = getRoutePool(route);
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
|
||||
// put the connection back in the available list and notify a waiter
|
||||
routePool.freeConnections.add(entry);
|
||||
if (routePool.numConnections == 0) {
|
||||
// for some reason the route pool didn't already exist
|
||||
LOG.error("Route connection pool not found. " + route);
|
||||
routePool.numConnections = 1;
|
||||
}
|
||||
rospl.freeEntry(entry);
|
||||
freeConnections.add(entry);
|
||||
|
||||
if (numConnections == 0) {
|
||||
|
@ -337,10 +305,9 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
numConnections = 1;
|
||||
}
|
||||
|
||||
// register the connection with the timeout handler
|
||||
idleConnHandler.add(entry.getConnection());
|
||||
|
||||
notifyWaitingThread(routePool);
|
||||
notifyWaitingThread(rospl);
|
||||
|
||||
} // freeEntry
|
||||
|
||||
|
@ -349,19 +316,20 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
/**
|
||||
* If available, get a free pool entry for a route.
|
||||
*
|
||||
* @param rcp the route-specific pool from which to get an entry
|
||||
* @param rospl the route-specific pool from which to get an entry
|
||||
*
|
||||
* @return an available pool entry for the given route
|
||||
* @return an available pool entry for the given route, or
|
||||
* <code>null</code> if none is available
|
||||
*/
|
||||
protected synchronized BasicPoolEntry getFreeEntry(RouteConnPool rcp) {
|
||||
protected synchronized
|
||||
BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) {
|
||||
|
||||
BasicPoolEntry entry = null;
|
||||
BasicPoolEntry entry = rospl.allocEntry();
|
||||
|
||||
if (rcp.freeConnections.size() > 0) {
|
||||
if (entry != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Getting free connection. " + rcp.route);
|
||||
LOG.debug("Getting free connection. " + rospl.getRoute());
|
||||
}
|
||||
entry = (BasicPoolEntry) rcp.freeConnections.removeLast();
|
||||
freeConnections.remove(entry);
|
||||
idleConnHandler.remove(entry.getConnection()); // no longer idle
|
||||
|
||||
|
@ -369,7 +337,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No free connections. " + rcp.route);
|
||||
LOG.debug("No free connections. " + rospl.getRoute());
|
||||
}
|
||||
}
|
||||
return entry;
|
||||
|
@ -381,22 +349,23 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
* This method assumes that the new connection will be handed
|
||||
* out immediately.
|
||||
*
|
||||
* @param rcp the route-specific pool for which to create the entry
|
||||
* @param rospl the route-specific pool for which to create the entry
|
||||
* @param op the operator for creating a connection
|
||||
*
|
||||
* @return the new pool entry for a new connection
|
||||
*/
|
||||
protected synchronized
|
||||
BasicPoolEntry createEntry(RouteConnPool rcp,
|
||||
BasicPoolEntry createEntry(RouteSpecificPool rospl,
|
||||
ClientConnectionOperator op) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new connection. " + rcp.route);
|
||||
LOG.debug("Creating new connection. " + rospl.getRoute());
|
||||
}
|
||||
// the entry will create the connection when needed
|
||||
BasicPoolEntry entry = new BasicPoolEntry(op, rcp.route, refQueue);
|
||||
BasicPoolEntry entry =
|
||||
new BasicPoolEntry(op, rospl.getRoute(), refQueue);
|
||||
rospl.createdEntry(entry);
|
||||
numConnections++;
|
||||
rcp.numConnections++;
|
||||
|
||||
issuedConnections.add(entry.getWeakRef());
|
||||
|
||||
|
@ -425,13 +394,10 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
|
||||
closeConnection(entry.getConnection());
|
||||
|
||||
RouteConnPool routePool = getRoutePool(route);
|
||||
routePool.freeConnections.remove(entry);
|
||||
routePool.numConnections--;
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
rospl.deleteEntry(entry);
|
||||
numConnections--;
|
||||
if ((routePool.numConnections < 1) &&
|
||||
routePool.waitingThreads.isEmpty()) {
|
||||
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
}
|
||||
|
||||
|
@ -460,17 +426,14 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
// non-javadoc, see base class AbstractConnPool
|
||||
protected synchronized void handleLostEntry(HttpRoute route) {
|
||||
|
||||
RouteConnPool routePool = getRoutePool(route);
|
||||
routePool.numConnections--;
|
||||
|
||||
if ((routePool.numConnections < 1) &&
|
||||
routePool.waitingThreads.isEmpty()) {
|
||||
|
||||
RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true???
|
||||
rospl.dropEntry();
|
||||
if (rospl.isUnused()) {
|
||||
routeToPool.remove(route);
|
||||
}
|
||||
|
||||
numConnections--;
|
||||
notifyWaitingThread(routePool);
|
||||
notifyWaitingThread(rospl);
|
||||
}
|
||||
|
||||
|
||||
|
@ -480,9 +443,9 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
* if there is one.
|
||||
* Otherwise, a thread in the connection pool will be notified.
|
||||
*
|
||||
* @param routePool the pool in which to notify, or <code>null</code>
|
||||
* @param rospl the pool in which to notify, or <code>null</code>
|
||||
*/
|
||||
protected synchronized void notifyWaitingThread(RouteConnPool routePool) {
|
||||
protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) {
|
||||
|
||||
//@@@ while this strategy provides for best connection re-use,
|
||||
//@@@ is it fair? only do this if the connection is open?
|
||||
|
@ -491,13 +454,13 @@ public class ConnPoolByRoute extends AbstractConnPool {
|
|||
// it from all wait queues before interrupting.
|
||||
WaitingThread waitingThread = null;
|
||||
|
||||
if ((routePool != null) && !routePool.waitingThreads.isEmpty()) {
|
||||
if ((rospl != null) && !rospl.waitingThreads.isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Notifying thread waiting on pool. "
|
||||
+ routePool.route);
|
||||
+ rospl.getRoute());
|
||||
}
|
||||
waitingThread = (WaitingThread)
|
||||
routePool.waitingThreads.removeFirst();
|
||||
rospl.waitingThreads.removeFirst();
|
||||
waitingThreads.remove(waitingThread);
|
||||
|
||||
} else if (!waitingThreads.isEmpty()) {
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
/*
|
||||
* $HeadURL$
|
||||
* $Revision$
|
||||
* $Date$
|
||||
*
|
||||
* ====================================================================
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
* ====================================================================
|
||||
*
|
||||
* This software consists of voluntary contributions made by many
|
||||
* individuals on behalf of the Apache Software Foundation. For more
|
||||
* information on the Apache Software Foundation, please see
|
||||
* <http://www.apache.org/>.
|
||||
*
|
||||
*/
|
||||
|
||||
package org.apache.http.impl.conn.tsccm;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.http.conn.HttpRoute;
|
||||
//@@@ create entry w/o operator, set operator afterwards
|
||||
import org.apache.http.conn.ClientConnectionOperator; //@@@
|
||||
|
||||
|
||||
/**
|
||||
* A connection sub-pool for a specific route, used by {@link ConnPoolByRoute}.
|
||||
* The methods in this class are unsynchronized. It is expected that the
|
||||
* containing pool takes care of synchronization.
|
||||
*/
|
||||
public class RouteSpecificPool {
|
||||
|
||||
//@@@ change attribute visibility to protected once it is ensured
|
||||
//@@@ that there is no direct attribute access within this package
|
||||
|
||||
/** The route this pool is for. */
|
||||
private final HttpRoute route;
|
||||
|
||||
/** The list of free entries. */
|
||||
private LinkedList freeEntries;
|
||||
|
||||
/** The list of threads waiting for this pool. */
|
||||
/*private@@@ currently still default*/ LinkedList waitingThreads;
|
||||
|
||||
/** The number of created entries. */
|
||||
private int numEntries;
|
||||
|
||||
|
||||
/**
|
||||
* Creates a new route-specific pool.
|
||||
*
|
||||
* @param r the route for which to pool
|
||||
*/
|
||||
public RouteSpecificPool(HttpRoute r) {
|
||||
this.route = r;
|
||||
this.freeEntries = new LinkedList();
|
||||
this.waitingThreads = new LinkedList();
|
||||
this.numEntries = 0;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains the route for which this pool is specific.
|
||||
*
|
||||
* @return the route
|
||||
*/
|
||||
public final HttpRoute getRoute() {
|
||||
return route;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Indicates whether this pool is unused.
|
||||
* A pool is unused if there is neither an entry nor a waiting thread.
|
||||
* All entries count, not only the free but also the allocated ones.
|
||||
*
|
||||
* @return <code>true</code> if this pool is unused,
|
||||
* <code>false</code> otherwise
|
||||
*/
|
||||
public boolean isUnused() {
|
||||
return (numEntries < 1) && waitingThreads.isEmpty();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains the number of entries.
|
||||
* This includes not only the free entries, but also those that
|
||||
* have been created and are currently issued to an application.
|
||||
*
|
||||
* @return the number of entries for the route of this pool
|
||||
*/
|
||||
public final int getEntryCount() {
|
||||
return numEntries;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Obtains a free entry from this pool, if one is available.
|
||||
*
|
||||
* @return an available pool entry, or <code>null</code> if there is none
|
||||
*/
|
||||
public BasicPoolEntry allocEntry() {
|
||||
|
||||
BasicPoolEntry entry = null;
|
||||
|
||||
if (!freeEntries.isEmpty()) {
|
||||
entry = (BasicPoolEntry) freeEntries.removeLast();
|
||||
}
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns an allocated entry to this pool.
|
||||
*
|
||||
* @param entry the entry obtained from {@link #allocEntry allocEntry}
|
||||
* or presented to {@link #createdEntry createdEntry}
|
||||
*/
|
||||
public void freeEntry(BasicPoolEntry entry) {
|
||||
|
||||
if (numEntries < 1) {
|
||||
throw new IllegalStateException
|
||||
("No entry created for this pool. " + route);
|
||||
}
|
||||
if (numEntries <= freeEntries.size()) {
|
||||
throw new IllegalStateException
|
||||
("No entry allocated from this pool. " + route);
|
||||
}
|
||||
freeEntries.add(entry);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Indicates creation of an entry for this pool.
|
||||
* The entry will <i>not</i> be added to the list of free entries,
|
||||
* it is only recognized as belonging to this pool now. It can then
|
||||
* be passed to {@link #freeEntry freeEntry}.
|
||||
*
|
||||
* @param entry the entry that was created for this pool
|
||||
*
|
||||
* @return the new pool entry for a new entry
|
||||
*/
|
||||
public void createdEntry(BasicPoolEntry entry) {
|
||||
|
||||
if (!route.equals(entry.getPlannedRoute())) {
|
||||
throw new IllegalArgumentException
|
||||
("Entry not planned for this pool." +
|
||||
"\npool: " + route +
|
||||
"\nplan: " + entry.getPlannedRoute());
|
||||
}
|
||||
|
||||
numEntries++;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Deletes an entry from this pool.
|
||||
* Only entries that are currently free in this pool can be deleted.
|
||||
* Allocated entries can not be deleted.
|
||||
*
|
||||
* @param entry the entry to delete from this pool
|
||||
*
|
||||
* @return <code>true</code> if the entry was found and deleted, or
|
||||
* <code>false</code> if the entry was not found
|
||||
*/
|
||||
public boolean deleteEntry(BasicPoolEntry entry) {
|
||||
|
||||
final boolean found = freeEntries.remove(entry);
|
||||
if (found)
|
||||
numEntries--;
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Forgets about an entry from this pool.
|
||||
* This method is used to indicate that an entry
|
||||
* {@link #allocEntry allocated}
|
||||
* from this pool has been lost and will not be returned.
|
||||
*/
|
||||
public void dropEntry() {
|
||||
if (numEntries < 1) {
|
||||
throw new IllegalStateException
|
||||
("There is no entry that could be dropped.");
|
||||
}
|
||||
numEntries--;
|
||||
}
|
||||
|
||||
|
||||
} // class RouteSpecificPool
|
Loading…
Reference in New Issue