diff --git a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java index 756ad825f..0870b83f5 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java @@ -30,6 +30,7 @@ package org.apache.http.impl.conn.tsccm; +import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Queue; @@ -171,19 +172,18 @@ public BasicPoolEntry getEntry(HttpRoute route, long timeout, int maxTotalConnections = HttpConnectionManagerParams .getMaxTotalConnections(this.params); - BasicPoolEntry entry = null; + Date deadline = null; + if (timeout > 0) { + deadline = new Date(System.currentTimeMillis() + timeout); + } + BasicPoolEntry entry = null; try { poolLock.lock(); RouteSpecificPool rospl = getRoutePool(route, true); WaitingThread waitingThread = null; - boolean useTimeout = (timeout > 0); - long timeToWait = timeout; - long startWait = 0; - long endWait = 0; - while (entry == null) { if (isShutDown) { @@ -213,34 +213,28 @@ public BasicPoolEntry getEntry(HttpRoute route, long timeout, entry = createEntry(rospl, operator); } else { - // TODO: keep track of which routes have waiting threads, - // so they avoid being sacrificed before necessary + + if (LOG.isDebugEnabled()) { + LOG.debug("Need to wait for connection. " + route); + } + + if (waitingThread == null) { + //@@@ use factory method? + waitingThread = new WaitingThread + (poolLock.newCondition(), rospl); + } boolean success = false; try { - if (useTimeout && timeToWait <= 0) { - throw new ConnectionPoolTimeoutException - ("Timeout waiting for connection"); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Need to wait for connection. " + route); - } - - if (waitingThread == null) { - //@@@ use factory method? - waitingThread = new WaitingThread - (poolLock.newCondition(), rospl); - } - - if (useTimeout) { - startWait = System.currentTimeMillis(); - } - rospl.queueThread(waitingThread); waitingThreads.add(waitingThread); - success = waitingThread.await(timeToWait); //@@@, TimeUnit.MILLISECONDS); or deadline + success = waitingThread.await(deadline); + } finally { + // In case of 'success', we were woken up by the + // connection pool and should now have a connection + // waiting for us, or else we're shutting down. + // Just continue in the loop, both cases are checked. if (!success) { // Either we timed out, experienced a // "spurious wakeup", or were interrupted by @@ -249,15 +243,13 @@ public BasicPoolEntry getEntry(HttpRoute route, long timeout, rospl.removeThread(waitingThread); waitingThreads.remove(waitingThread); } - // In case of 'success', we were woken up by the - // connection pool and should now have a connection - // waiting for us, or else we're shutting down. - // Just continue in the loop, both cases are checked. + } - if (useTimeout) { - endWait = System.currentTimeMillis(); - timeToWait -= (endWait - startWait); - } + // check for spurious wakeup vs. timeout + if (!success && (deadline != null) && + (deadline.getTime() <= System.currentTimeMillis())) { + throw new ConnectionPoolTimeoutException + ("Timeout waiting for connection"); } } } // while no entry diff --git a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java index 409b38440..9a4ab9629 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/tsccm/WaitingThread.java @@ -31,6 +31,7 @@ package org.apache.http.impl.conn.tsccm; +import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -78,73 +79,6 @@ public WaitingThread(Condition cond, RouteSpecificPool pool) { } - /** - * Blocks the calling thread. - * This method returns when the thread is notified or interrupted, - * if a timeout occurrs, or if there is a spurious wakeup. - *
- * This method assumes external synchronization. - * - * @param timeout the timeout in milliseconds, or 0 for no timeout - * - * @return true if the condition was satisfied, - * false in case of a timeout. - * Typically, a call to {@link #wakeup} is used to indicate - * that the condition was satisfied. Since the condition can - * be accessed from outside, this cannot be guaranteed though. - * - * @throws InterruptedException if the waiting thread was interrupted - * - * @see #wakeup - */ - public boolean await(long timeout) - throws InterruptedException { - - //@@@ check timeout for negative, or assume overflow? - //@@@ for now, leave the check to the condition - - // This is only a sanity check. We cannot not synchronize here, - // the lock would not be released on calling cond.await() below. - if (this.waiter != null) { - throw new IllegalStateException - ("A thread is already waiting on this object." + - "\ncaller: " + Thread.currentThread() + - "\nwaiter: " + this.waiter); - } - - this.waiter = Thread.currentThread(); - - boolean success = false; - try { - success = this.cond.await(timeout, TimeUnit.MILLISECONDS); - } finally { - this.waiter = null; - } - return success; - - } // await - - - /** - * Wakes up the waiting thread. - *
- * This method assumes external synchronization. - */ - public void wakeup() { - - // If external synchronization and pooling works properly, - // this cannot happen. Just a sanity check. - if (this.waiter == null) { - throw new IllegalStateException - ("Nobody waiting on this object."); - } - - // One condition might be shared by several WaitingThread instances. - // It probably isn't, but just in case: wake all, not just one. - this.cond.signalAll(); - } - - /** * Obtains the condition. * @@ -179,4 +113,73 @@ public final Thread getThread() { } + /** + * Blocks the calling thread. + * This method returns when the thread is notified or interrupted, + * if a timeout occurrs, or if there is a spurious wakeup. + *
+ * This method assumes external synchronization. + * + * @param deadline when to time out, or null for no timeout + * + * @return true if the condition was satisfied, + * false in case of a timeout. + * Typically, a call to {@link #wakeup} is used to indicate + * that the condition was satisfied. Since the condition is + * accessible outside, this cannot be guaranteed though. + * + * @throws InterruptedException if the waiting thread was interrupted + * + * @see #wakeup + */ + public boolean await(Date deadline) + throws InterruptedException { + + // This is only a sanity check. We cannot synchronize here, + // the lock would not be released on calling cond.await() below. + if (this.waiter != null) { + throw new IllegalStateException + ("A thread is already waiting on this object." + + "\ncaller: " + Thread.currentThread() + + "\nwaiter: " + this.waiter); + } + + this.waiter = Thread.currentThread(); + + boolean success = false; + try { + if (deadline != null) { + success = this.cond.awaitUntil(deadline); + } else { + this.cond.await(); + success = true; + } + } finally { + this.waiter = null; + } + return success; + + } // await + + + /** + * Wakes up the waiting thread. + *
+ * This method assumes external synchronization. + */ + public void wakeup() { + + // If external synchronization and pooling works properly, + // this cannot happen. Just a sanity check. + if (this.waiter == null) { + throw new IllegalStateException + ("Nobody waiting on this object."); + } + + // One condition might be shared by several WaitingThread instances. + // It probably isn't, but just in case: wake all, not just one. + this.cond.signalAll(); + } + + } // class WaitingThread