diff --git a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java index df4c70be3..e1907bc41 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java @@ -74,53 +74,23 @@ public class ConnPoolByRoute extends AbstractConnPool { /** * A map of route-specific pools. * Keys are of class {@link HttpRoute}, - * values of class {@link RouteConnPool}. + * values of class {@link RouteSpecificPool}. */ private final Map routeToPool; - /** - * A simple struct-like class to combine the connection list - * and the count of created connections. - */ - protected static class RouteConnPool { - - /** The route this pool is for. */ - public final HttpRoute route; - - /** The list of free connections. */ - public LinkedList freeConnections; - - /** The list of WaitingThreads for this pool. */ - public LinkedList waitingThreads; - - /** The number of created connections. */ - public int numConnections; - - /** - * Creates a new route-specific pool. - * - * @param r the route for which to pool - */ - public RouteConnPool(HttpRoute r) { - this.route = r; - this.freeConnections = new LinkedList(); - this.waitingThreads = new LinkedList(); - this.numConnections = 0; - } - } // class RouteConnPool - /** * A thread and the pool in which it is waiting. + * */ - private static class WaitingThread { + protected static class WaitingThread { - /** The thread that is waiting for a connection */ + /** The thread that is waiting for an entry. */ public Thread thread; - /** The connection pool the thread is waiting for */ - public RouteConnPool pool; + /** The route specific pool the thread is waiting for. */ + public RouteSpecificPool pool; /** * Indicates the source of an interruption. @@ -153,19 +123,22 @@ public class ConnPoolByRoute extends AbstractConnPool { * Get a route-specific pool of available connections. * * @param route the route + * @param create whether to create the pool if it doesn't exist * - * @return the pool for the argument route, never null + * @return the pool for the argument route, + * never null if create is true */ - protected synchronized RouteConnPool getRoutePool(HttpRoute route) { + protected synchronized RouteSpecificPool getRoutePool(HttpRoute route, + boolean create) { - RouteConnPool rcp = (RouteConnPool) routeToPool.get(route); - if (rcp == null) { + RouteSpecificPool rospl = (RouteSpecificPool) routeToPool.get(route); + if ((rospl == null) && create) { // no pool for this route yet (or anymore) - rcp = newRouteConnPool(route); - routeToPool.put(route, rcp); + rospl = newRouteSpecificPool(route); + routeToPool.put(route, rospl); } - return rcp; + return rospl; } @@ -177,16 +150,16 @@ public class ConnPoolByRoute extends AbstractConnPool { * * @return the new pool */ - protected RouteConnPool newRouteConnPool(HttpRoute route) { - return new RouteConnPool(route); + protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { + return new RouteSpecificPool(route); } //@@@ consider alternatives for gathering statistics public synchronized int getConnectionsInPool(HttpRoute route) { //@@@ don't allow a pool to be created here! - RouteConnPool rcp = getRoutePool(route); - return rcp.numConnections; + RouteSpecificPool rospl = getRoutePool(route, false); + return (rospl != null) ? rospl.getEntryCount() : 0; } @@ -203,7 +176,7 @@ public class ConnPoolByRoute extends AbstractConnPool { int maxTotalConnections = HttpConnectionManagerParams .getMaxTotalConnections(this.params); - RouteConnPool routePool = getRoutePool(route); + RouteSpecificPool rospl = getRoutePool(route, true); WaitingThread waitingThread = null; boolean useTimeout = (timeout > 0); @@ -224,19 +197,20 @@ public class ConnPoolByRoute extends AbstractConnPool { // - can delete and replace a free connection for another route // - need to wait for one of the things above to come true - if (routePool.freeConnections.size() > 0) { - entry = getFreeEntry(routePool); - - } else if ((routePool.numConnections < maxHostConnections) && + entry = getFreeEntry(rospl); + if (entry != null) { + // we're fine + //@@@ yeah this is ugly, but historical... will be revised + } else if ((rospl.getEntryCount() < maxHostConnections) && (numConnections < maxTotalConnections)) { - entry = createEntry(routePool, operator); + entry = createEntry(rospl, operator); - } else if ((routePool.numConnections < maxHostConnections) && + } else if ((rospl.getEntryCount() < maxHostConnections) && (freeConnections.size() > 0)) { deleteLeastUsedEntry(); - entry = createEntry(routePool, operator); + entry = createEntry(rospl, operator); } else { // TODO: keep track of which routes have waiting threads, @@ -254,7 +228,7 @@ public class ConnPoolByRoute extends AbstractConnPool { if (waitingThread == null) { waitingThread = new WaitingThread(); - waitingThread.pool = routePool; + waitingThread.pool = rospl; waitingThread.thread = Thread.currentThread(); } else { waitingThread.interruptedByConnectionPool = false; @@ -264,7 +238,7 @@ public class ConnPoolByRoute extends AbstractConnPool { startWait = System.currentTimeMillis(); } - routePool.waitingThreads.addLast(waitingThread); + rospl.waitingThreads.addLast(waitingThread); waitingThreads.addLast(waitingThread); wait(timeToWait); @@ -285,7 +259,7 @@ public class ConnPoolByRoute extends AbstractConnPool { // "spurious wakeup", or were interrupted by an // external thread. Regardless we need to // cleanup for ourselves in the wait queue. - routePool.waitingThreads.remove(waitingThread); + rospl.waitingThreads.remove(waitingThread); waitingThreads.remove(waitingThread); } @@ -320,15 +294,9 @@ public class ConnPoolByRoute extends AbstractConnPool { // no longer issued, we keep a hard reference now issuedConnections.remove(entry.getWeakRef()); - RouteConnPool routePool = getRoutePool(route); + RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true??? - // put the connection back in the available list and notify a waiter - routePool.freeConnections.add(entry); - if (routePool.numConnections == 0) { - // for some reason the route pool didn't already exist - LOG.error("Route connection pool not found. " + route); - routePool.numConnections = 1; - } + rospl.freeEntry(entry); freeConnections.add(entry); if (numConnections == 0) { @@ -337,10 +305,9 @@ public class ConnPoolByRoute extends AbstractConnPool { numConnections = 1; } - // register the connection with the timeout handler idleConnHandler.add(entry.getConnection()); - notifyWaitingThread(routePool); + notifyWaitingThread(rospl); } // freeEntry @@ -349,19 +316,20 @@ public class ConnPoolByRoute extends AbstractConnPool { /** * If available, get a free pool entry for a route. * - * @param rcp the route-specific pool from which to get an entry + * @param rospl the route-specific pool from which to get an entry * - * @return an available pool entry for the given route + * @return an available pool entry for the given route, or + * null if none is available */ - protected synchronized BasicPoolEntry getFreeEntry(RouteConnPool rcp) { + protected synchronized + BasicPoolEntry getFreeEntry(RouteSpecificPool rospl) { - BasicPoolEntry entry = null; + BasicPoolEntry entry = rospl.allocEntry(); - if (rcp.freeConnections.size() > 0) { + if (entry != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Getting free connection. " + rcp.route); + LOG.debug("Getting free connection. " + rospl.getRoute()); } - entry = (BasicPoolEntry) rcp.freeConnections.removeLast(); freeConnections.remove(entry); idleConnHandler.remove(entry.getConnection()); // no longer idle @@ -369,7 +337,7 @@ public class ConnPoolByRoute extends AbstractConnPool { } else { if (LOG.isDebugEnabled()) { - LOG.debug("No free connections. " + rcp.route); + LOG.debug("No free connections. " + rospl.getRoute()); } } return entry; @@ -381,22 +349,23 @@ public class ConnPoolByRoute extends AbstractConnPool { * This method assumes that the new connection will be handed * out immediately. * - * @param rcp the route-specific pool for which to create the entry + * @param rospl the route-specific pool for which to create the entry * @param op the operator for creating a connection * * @return the new pool entry for a new connection */ protected synchronized - BasicPoolEntry createEntry(RouteConnPool rcp, + BasicPoolEntry createEntry(RouteSpecificPool rospl, ClientConnectionOperator op) { if (LOG.isDebugEnabled()) { - LOG.debug("Creating new connection. " + rcp.route); + LOG.debug("Creating new connection. " + rospl.getRoute()); } // the entry will create the connection when needed - BasicPoolEntry entry = new BasicPoolEntry(op, rcp.route, refQueue); + BasicPoolEntry entry = + new BasicPoolEntry(op, rospl.getRoute(), refQueue); + rospl.createdEntry(entry); numConnections++; - rcp.numConnections++; issuedConnections.add(entry.getWeakRef()); @@ -425,13 +394,10 @@ public class ConnPoolByRoute extends AbstractConnPool { closeConnection(entry.getConnection()); - RouteConnPool routePool = getRoutePool(route); - routePool.freeConnections.remove(entry); - routePool.numConnections--; + RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true??? + rospl.deleteEntry(entry); numConnections--; - if ((routePool.numConnections < 1) && - routePool.waitingThreads.isEmpty()) { - + if (rospl.isUnused()) { routeToPool.remove(route); } @@ -460,17 +426,14 @@ public class ConnPoolByRoute extends AbstractConnPool { // non-javadoc, see base class AbstractConnPool protected synchronized void handleLostEntry(HttpRoute route) { - RouteConnPool routePool = getRoutePool(route); - routePool.numConnections--; - - if ((routePool.numConnections < 1) && - routePool.waitingThreads.isEmpty()) { - + RouteSpecificPool rospl = getRoutePool(route, true); //@@@ true??? + rospl.dropEntry(); + if (rospl.isUnused()) { routeToPool.remove(route); } numConnections--; - notifyWaitingThread(routePool); + notifyWaitingThread(rospl); } @@ -480,9 +443,9 @@ public class ConnPoolByRoute extends AbstractConnPool { * if there is one. * Otherwise, a thread in the connection pool will be notified. * - * @param routePool the pool in which to notify, or null + * @param rospl the pool in which to notify, or null */ - protected synchronized void notifyWaitingThread(RouteConnPool routePool) { + protected synchronized void notifyWaitingThread(RouteSpecificPool rospl) { //@@@ while this strategy provides for best connection re-use, //@@@ is it fair? only do this if the connection is open? @@ -491,13 +454,13 @@ public class ConnPoolByRoute extends AbstractConnPool { // it from all wait queues before interrupting. WaitingThread waitingThread = null; - if ((routePool != null) && !routePool.waitingThreads.isEmpty()) { + if ((rospl != null) && !rospl.waitingThreads.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("Notifying thread waiting on pool. " - + routePool.route); + + rospl.getRoute()); } waitingThread = (WaitingThread) - routePool.waitingThreads.removeFirst(); + rospl.waitingThreads.removeFirst(); waitingThreads.remove(waitingThread); } else if (!waitingThreads.isEmpty()) { diff --git a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/RouteSpecificPool.java b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/RouteSpecificPool.java new file mode 100644 index 000000000..0b9a213e6 --- /dev/null +++ b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/RouteSpecificPool.java @@ -0,0 +1,208 @@ +/* + * $HeadURL$ + * $Revision$ + * $Date$ + * + * ==================================================================== + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.impl.conn.tsccm; + +import java.util.Iterator; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.conn.HttpRoute; +//@@@ create entry w/o operator, set operator afterwards +import org.apache.http.conn.ClientConnectionOperator; //@@@ + + +/** + * A connection sub-pool for a specific route, used by {@link ConnPoolByRoute}. + * The methods in this class are unsynchronized. It is expected that the + * containing pool takes care of synchronization. + */ +public class RouteSpecificPool { + + //@@@ change attribute visibility to protected once it is ensured + //@@@ that there is no direct attribute access within this package + + /** The route this pool is for. */ + private final HttpRoute route; + + /** The list of free entries. */ + private LinkedList freeEntries; + + /** The list of threads waiting for this pool. */ + /*private@@@ currently still default*/ LinkedList waitingThreads; + + /** The number of created entries. */ + private int numEntries; + + + /** + * Creates a new route-specific pool. + * + * @param r the route for which to pool + */ + public RouteSpecificPool(HttpRoute r) { + this.route = r; + this.freeEntries = new LinkedList(); + this.waitingThreads = new LinkedList(); + this.numEntries = 0; + } + + + /** + * Obtains the route for which this pool is specific. + * + * @return the route + */ + public final HttpRoute getRoute() { + return route; + } + + + /** + * Indicates whether this pool is unused. + * A pool is unused if there is neither an entry nor a waiting thread. + * All entries count, not only the free but also the allocated ones. + * + * @return true if this pool is unused, + * false otherwise + */ + public boolean isUnused() { + return (numEntries < 1) && waitingThreads.isEmpty(); + } + + + /** + * Obtains the number of entries. + * This includes not only the free entries, but also those that + * have been created and are currently issued to an application. + * + * @return the number of entries for the route of this pool + */ + public final int getEntryCount() { + return numEntries; + } + + + /** + * Obtains a free entry from this pool, if one is available. + * + * @return an available pool entry, or null if there is none + */ + public BasicPoolEntry allocEntry() { + + BasicPoolEntry entry = null; + + if (!freeEntries.isEmpty()) { + entry = (BasicPoolEntry) freeEntries.removeLast(); + } + + return entry; + } + + + /** + * Returns an allocated entry to this pool. + * + * @param entry the entry obtained from {@link #allocEntry allocEntry} + * or presented to {@link #createdEntry createdEntry} + */ + public void freeEntry(BasicPoolEntry entry) { + + if (numEntries < 1) { + throw new IllegalStateException + ("No entry created for this pool. " + route); + } + if (numEntries <= freeEntries.size()) { + throw new IllegalStateException + ("No entry allocated from this pool. " + route); + } + freeEntries.add(entry); + } + + + /** + * Indicates creation of an entry for this pool. + * The entry will not be added to the list of free entries, + * it is only recognized as belonging to this pool now. It can then + * be passed to {@link #freeEntry freeEntry}. + * + * @param entry the entry that was created for this pool + * + * @return the new pool entry for a new entry + */ + public void createdEntry(BasicPoolEntry entry) { + + if (!route.equals(entry.getPlannedRoute())) { + throw new IllegalArgumentException + ("Entry not planned for this pool." + + "\npool: " + route + + "\nplan: " + entry.getPlannedRoute()); + } + + numEntries++; + } + + + /** + * Deletes an entry from this pool. + * Only entries that are currently free in this pool can be deleted. + * Allocated entries can not be deleted. + * + * @param entry the entry to delete from this pool + * + * @return true if the entry was found and deleted, or + * false if the entry was not found + */ + public boolean deleteEntry(BasicPoolEntry entry) { + + final boolean found = freeEntries.remove(entry); + if (found) + numEntries--; + return found; + } + + + /** + * Forgets about an entry from this pool. + * This method is used to indicate that an entry + * {@link #allocEntry allocated} + * from this pool has been lost and will not be returned. + */ + public void dropEntry() { + if (numEntries < 1) { + throw new IllegalStateException + ("There is no entry that could be dropped."); + } + numEntries--; + } + + +} // class RouteSpecificPool