HTTPCLIENT-734: Request abort will unblock the thread waiting for a connection

Contributed by Sam Berlin <sberlin at gmail.com>
Reviewed by Oleg Kalnichevski



git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@638979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2008-03-19 19:36:02 +00:00
parent 1e153a504c
commit a72e883d9c
15 changed files with 643 additions and 74 deletions

View File

@ -1,6 +1,9 @@
Changes since 4.0 Alpha 3 Changes since 4.0 Alpha 3
------------------- -------------------
* [HTTPCLIENT-734] Request abort will unblock the thread waiting for a connection
Contributed by Sam Berlin <sberlin at gmail.com>
* [HTTPCLIENT-759] Ensure release of connections back to the connection manager * [HTTPCLIENT-759] Ensure release of connections back to the connection manager
on exceptions. on exceptions.
Contributed by Sam Berlin <sberlin at gmail.com> Contributed by Sam Berlin <sberlin at gmail.com>

View File

@ -119,6 +119,15 @@ public interface ClientConnectionManager {
; ;
/**
* Returns a new {@link ClientConnectionRequest}, from which a
* {@link ManagedClientConnection} can be obtained, or the request can be
* aborted.
*/
ClientConnectionRequest newConnectionRequest()
;
/** /**
* Releases a connection for use by others. * Releases a connection for use by others.
* If the argument connection has been released before, * If the argument connection has been released before,

View File

@ -0,0 +1,46 @@
package org.apache.http.conn;
import java.util.concurrent.TimeUnit;
import org.apache.http.conn.routing.HttpRoute;
/**
* Encapsulates a request for a {@link ManagedClientConnection}.
*/
public interface ClientConnectionRequest {
/**
* Obtains a connection within a given time.
* This method will block until a connection becomes available,
* the timeout expires, or the connection manager is
* {@link #shutdown shut down}.
* Timeouts are handled with millisecond precision.
*
* If {@link #abortRequest()} is called while this is blocking or
* before this began, an {@link InterruptedException} will
* be thrown.
*
* @param route where the connection should point to
* @param timeout the timeout, 0 or negative for no timeout
* @param tunit the unit for the <code>timeout</code>,
* may be <code>null</code> only if there is no timeout
*
* @return a connection that can be used to communicate
* along the given route
*
* @throws ConnectionPoolTimeoutException
* in case of a timeout
* @throws InterruptedException
* if the calling thread is interrupted while waiting
*/
ManagedClientConnection getConnection(HttpRoute route, long timeout,
TimeUnit unit) throws InterruptedException,
ConnectionPoolTimeoutException;
/**
* Aborts the call to {@link #getConnection(HttpRoute, long, TimeUnit)},
* causing it to throw an {@link InterruptedException}.
*/
void abortRequest();
}

View File

@ -47,8 +47,8 @@ import org.apache.http.HttpException;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.HttpRequest; import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.ProtocolException; import org.apache.http.ProtocolException;
import org.apache.http.ProtocolVersion;
import org.apache.http.auth.AuthScheme; import org.apache.http.auth.AuthScheme;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
import org.apache.http.auth.AuthenticationException; import org.apache.http.auth.AuthenticationException;
@ -69,21 +69,22 @@ import org.apache.http.client.protocol.ClientContext;
import org.apache.http.client.utils.URLUtils; import org.apache.http.client.utils.URLUtils;
import org.apache.http.conn.BasicManagedEntity; import org.apache.http.conn.BasicManagedEntity;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.ConnectionReleaseTrigger;
import org.apache.http.conn.routing.HttpRoutePlanner;
import org.apache.http.conn.routing.HttpRouteDirector;
import org.apache.http.conn.routing.BasicRouteDirector;
import org.apache.http.conn.Scheme;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.Scheme;
import org.apache.http.conn.routing.BasicRouteDirector;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.routing.HttpRouteDirector;
import org.apache.http.conn.routing.HttpRoutePlanner;
import org.apache.http.entity.BufferedHttpEntity; import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.message.BasicHttpRequest; import org.apache.http.message.BasicHttpRequest;
import org.apache.http.params.HttpConnectionParams; import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams; import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams; import org.apache.http.params.HttpProtocolParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HTTP; import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestExecutor; import org.apache.http.protocol.HttpRequestExecutor;
@ -288,15 +289,20 @@ public class DefaultClientRequestDirector
HttpRoute route = roureq.getRoute(); HttpRoute route = roureq.getRoute();
// Allocate connection if needed ReleaseTrigger releaseTrigger = new ReleaseTrigger();
if (managedConn == null) { if (orig instanceof AbortableHttpRequest) {
managedConn = allocateConnection(route, timeout); ((AbortableHttpRequest) orig).setReleaseTrigger(releaseTrigger);
} }
if (orig instanceof AbortableHttpRequest) { // Allocate connection if needed
((AbortableHttpRequest) orig).setReleaseTrigger(managedConn); if (managedConn == null) {
ClientConnectionRequest connectionRequest = allocateConnection();
releaseTrigger.setClientConnectionRequest(connectionRequest);
managedConn = connectionRequest.getConnection(route, timeout, TimeUnit.MILLISECONDS);
} }
releaseTrigger.setConnectionReleaseTrigger(managedConn);
// Reopen connection if needed // Reopen connection if needed
if (!managedConn.isOpen()) { if (!managedConn.isOpen()) {
managedConn.open(route, context, params); managedConn.open(route, context, params);
@ -489,23 +495,10 @@ public class DefaultClientRequestDirector
/** /**
* Obtains a connection for the target route. * Obtains a connection request, from which the connection can be retrieved.
*
* @param route the route for which to allocate a connection
* @param timeout the timeout in milliseconds,
* 0 or negative for no timeout
*
* @throws HttpException in case of a (protocol) problem
* @throws ConnectionPoolTimeoutException in case of a timeout
* @throws InterruptedException in case of an interrupt
*/ */
protected ManagedClientConnection allocateConnection(HttpRoute route, protected ClientConnectionRequest allocateConnection() {
long timeout) return connManager.newConnectionRequest();
throws HttpException, ConnectionPoolTimeoutException,
InterruptedException {
return connManager.getConnection
(route, timeout, TimeUnit.MILLISECONDS);
} // allocateConnection } // allocateConnection
@ -1027,4 +1020,69 @@ public class DefaultClientRequestDirector
authState.setCredentials(creds); authState.setCredentials(creds);
} }
/**
* A {@link ConnectionReleaseTrigger} that delegates either a
* {@link ClientConnectionRequest} or another ConnectionReleaseTrigger
* for aborting.
*/
private static class ReleaseTrigger implements ConnectionReleaseTrigger {
private boolean aborted = false;
private ClientConnectionRequest delegateRequest;
private ConnectionReleaseTrigger delegateTrigger;
void setConnectionReleaseTrigger(ConnectionReleaseTrigger releaseTrigger) throws IOException {
synchronized(this) {
if(aborted) {
throw new IOException("already aborted!");
}
this.delegateTrigger = releaseTrigger;
this.delegateRequest = null;
}
}
void setClientConnectionRequest(ClientConnectionRequest connectionRequest) throws IOException {
synchronized(this) {
if(aborted) {
throw new IOException("already aborted");
}
this.delegateRequest = connectionRequest;
this.delegateTrigger = null;
}
}
public void abortConnection() throws IOException {
ConnectionReleaseTrigger releaseTrigger;
ClientConnectionRequest connectionRequest;
synchronized(this) {
if(aborted)
throw new IOException("already aborted");
aborted = true;
// capture references within lock
releaseTrigger = delegateTrigger;
connectionRequest = delegateRequest;
}
if(connectionRequest != null)
connectionRequest.abortRequest();
if(releaseTrigger != null) {
releaseTrigger.abortConnection();
}
}
public void releaseConnection() throws IOException {
ConnectionReleaseTrigger releaseTrigger;
synchronized(this) {
releaseTrigger = delegateTrigger; // capture reference within lock
}
if(releaseTrigger != null)
releaseTrigger.releaseConnection();
}
}
} // class DefaultClientRequestDirector } // class DefaultClientRequestDirector

View File

@ -36,12 +36,13 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionOperator; import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.OperatedClientConnection; import org.apache.http.conn.OperatedClientConnection;
import org.apache.http.conn.SchemeRegistry; import org.apache.http.conn.SchemeRegistry;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.params.HttpParams; import org.apache.http.params.HttpParams;
@ -195,6 +196,22 @@ public class SingleClientConnManager implements ClientConnectionManager {
return getConnection(route); return getConnection(route);
} }
public final ClientConnectionRequest newConnectionRequest() {
return new ClientConnectionRequest() {
public void abortRequest() {
// Nothing to abort, since requests are immediate.
}
public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit tunit) {
return SingleClientConnManager.this.getConnection(route);
}
};
}
/** /**
* Obtains a connection. * Obtains a connection.

View File

@ -0,0 +1,62 @@
/*
* $HeadURL:$
* $Revision:$
* $Date:$
*
* ====================================================================
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.conn.tsccm;
/** A simple class that can interrupt a {@link WaitingThread}. */
public class Aborter {
private WaitingThread waitingThread;
private boolean aborted;
/**
* If a waiting thread has been set, interrupts it.
*/
public void abort() {
aborted = true;
if (waitingThread != null)
waitingThread.interrupt();
}
/**
* Sets the waiting thread. If this has already been aborted,
* the waiting thread is immediately interrupted.
*
* @param waitingThread The thread to interrupt when aborting.
*/
public void setWaitingThread(WaitingThread waitingThread) {
this.waitingThread = waitingThread;
if (aborted)
waitingThread.interrupt();
}
}

View File

@ -208,11 +208,18 @@ public abstract class AbstractConnPool implements RefQueueHandler {
* @throws InterruptedException * @throws InterruptedException
* if the calling thread was interrupted * if the calling thread was interrupted
*/ */
public abstract public final
BasicPoolEntry getEntry(HttpRoute route, long timeout, TimeUnit tunit, BasicPoolEntry getEntry(HttpRoute route, long timeout, TimeUnit tunit,
ClientConnectionOperator operator) ClientConnectionOperator operator)
throws ConnectionPoolTimeoutException, InterruptedException throws ConnectionPoolTimeoutException, InterruptedException {
; return newPoolEntryRequest().getPoolEntry(route, timeout, tunit, operator);
}
/**
* Returns a new {@link PoolEntryRequest}, from which a {@link BasicPoolEntry}
* can be obtained, or the request can be aborted.
*/
public abstract PoolEntryRequest newPoolEntryRequest();
/** /**

View File

@ -206,12 +206,55 @@ public class ConnPoolByRoute extends AbstractConnPool {
} }
} }
// non-javadoc, see base class AbstractConnPool
@Override @Override
public BasicPoolEntry getEntry(HttpRoute route, public PoolEntryRequest newPoolEntryRequest() {
final Aborter aborter = new Aborter();
return new PoolEntryRequest() {
public void abortRequest() {
try {
poolLock.lock();
aborter.abort();
} finally {
poolLock.unlock();
}
}
public BasicPoolEntry getPoolEntry(HttpRoute route, long timeout,
TimeUnit tunit, ClientConnectionOperator operator)
throws InterruptedException, ConnectionPoolTimeoutException {
return getEntryBlocking(route, timeout, tunit, operator, aborter);
}
};
}
/**
* Obtains a pool entry with a connection within the given timeout.
* If a {@link WaitingThread} is used to block, {@link Aborter#setWaitingThread(WaitingThread)}
* must be called before blocking, to allow the thread to be interrupted.
*
* @param route the route for which to get the connection
* @param timeout the timeout, 0 or negative for no timeout
* @param tunit the unit for the <code>timeout</code>,
* may be <code>null</code> only if there is no timeout
* @param operator the connection operator, in case
* a connection has to be created
* @param aborter an object which can abort a {@link WaitingThread}.
*
* @return pool entry holding a connection for the route
*
* @throws ConnectionPoolTimeoutException
* if the timeout expired
* @throws InterruptedException
* if the calling thread was interrupted
*/
protected BasicPoolEntry getEntryBlocking(HttpRoute route,
long timeout, TimeUnit tunit, long timeout, TimeUnit tunit,
ClientConnectionOperator operator) ClientConnectionOperator operator,
Aborter aborter)
throws ConnectionPoolTimeoutException, InterruptedException { throws ConnectionPoolTimeoutException, InterruptedException {
int maxHostConnections = HttpConnectionManagerParams int maxHostConnections = HttpConnectionManagerParams
@ -269,6 +312,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
if (waitingThread == null) { if (waitingThread == null) {
waitingThread = waitingThread =
newWaitingThread(poolLock.newCondition(), rospl); newWaitingThread(poolLock.newCondition(), rospl);
aborter.setWaitingThread(waitingThread);
} }
boolean success = false; boolean success = false;

View File

@ -0,0 +1,73 @@
/*
* $HeadURL:$
* $Revision:$
* $Date:$
*
* ====================================================================
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.conn.tsccm;
import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.routing.HttpRoute;
/**
* Encapsulates a request for a {@link BasicPoolEntry}.
*/
public interface PoolEntryRequest {
/**
* Obtains a pool entry with a connection within the given timeout.
* If {@link #abortRequest()} is called before this completes,
* an {@link InterruptedException} is thrown.
*
* @param route the route for which to get the connection
* @param timeout the timeout, 0 or negative for no timeout
* @param tunit the unit for the <code>timeout</code>,
* may be <code>null</code> only if there is no timeout
* @param operator the connection operator, in case
* a connection has to be created
*
* @return pool entry holding a connection for the route
*
* @throws ConnectionPoolTimeoutException
* if the timeout expired
* @throws InterruptedException
* if the calling thread was interrupted
*/
BasicPoolEntry getPoolEntry(HttpRoute route, long timeout, TimeUnit unit,
ClientConnectionOperator operator) throws InterruptedException,
ConnectionPoolTimeoutException;
/**
* Aborts the active or next call to
* {@link #getPoolEntry(HttpRoute, long, TimeUnit, ClientConnectionOperator)}.
*/
void abortRequest();
}

View File

@ -38,6 +38,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionOperator; import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.OperatedClientConnection; import org.apache.http.conn.OperatedClientConnection;
@ -147,7 +148,7 @@ public class ThreadSafeClientConnManager
// non-javadoc, see interface ClientConnectionManager // non-javadoc, see interface ClientConnectionManager
public ManagedClientConnection getConnection(HttpRoute route) public final ManagedClientConnection getConnection(HttpRoute route)
throws InterruptedException { throws InterruptedException {
while (true) { while (true) {
@ -166,24 +167,44 @@ public class ThreadSafeClientConnManager
// non-javadoc, see interface ClientConnectionManager // non-javadoc, see interface ClientConnectionManager
public ManagedClientConnection getConnection(HttpRoute route, public final ManagedClientConnection getConnection(HttpRoute route,
long timeout, long timeout,
TimeUnit tunit) TimeUnit tunit)
throws ConnectionPoolTimeoutException, InterruptedException { throws ConnectionPoolTimeoutException, InterruptedException {
if (route == null) { return newConnectionRequest().getConnection(route, timeout, tunit);
throw new IllegalArgumentException("Route may not be null."); }
}
if (LOG.isDebugEnabled()) {
LOG.debug("ThreadSafeClientConnManager.getConnection: "
+ route + ", timeout = " + timeout);
}
final BasicPoolEntry entry = public ClientConnectionRequest newConnectionRequest() {
connectionPool.getEntry(route, timeout, tunit, connOperator);
final PoolEntryRequest poolRequest = connectionPool.newPoolEntryRequest();
return new ClientConnectionRequest() {
public void abortRequest() {
poolRequest.abortRequest();
}
public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit tunit) throws InterruptedException,
ConnectionPoolTimeoutException {
if (route == null) {
throw new IllegalArgumentException("Route may not be null.");
}
if (LOG.isDebugEnabled()) {
LOG.debug("ThreadSafeClientConnManager.getConnection: "
+ route + ", timeout = " + timeout);
}
final BasicPoolEntry entry = poolRequest.getPoolEntry(route, timeout, tunit, connOperator);
return new BasicPooledConnAdapter(ThreadSafeClientConnManager.this, entry);
}
};
return new BasicPooledConnAdapter(this, entry);
} }

View File

@ -59,6 +59,9 @@ public class WaitingThread {
/** The thread that is waiting for an entry. */ /** The thread that is waiting for an entry. */
private Thread waiter; private Thread waiter;
/** True if this was interrupted. */
private boolean aborted;
/** /**
* Creates a new entry for a waiting thread. * Creates a new entry for a waiting thread.
@ -143,6 +146,9 @@ public class WaitingThread {
"\nwaiter: " + this.waiter); "\nwaiter: " + this.waiter);
} }
if (aborted)
throw new InterruptedException("interrupted already");
this.waiter = Thread.currentThread(); this.waiter = Thread.currentThread();
boolean success = false; boolean success = false;
@ -180,5 +186,12 @@ public class WaitingThread {
this.cond.signalAll(); this.cond.signalAll();
} }
public void interrupt() {
aborted = true;
if (this.waiter != null)
this.waiter.interrupt();
}
} // class WaitingThread } // class WaitingThread

View File

@ -1,7 +1,7 @@
/* /*
* $HeadURL:$ * $HeadURL$
* $Revision:$ * $Revision$
* $Date:$ * $Date$
* ==================================================================== * ====================================================================
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
@ -30,7 +30,9 @@ package org.apache.http.impl.client;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Test; import junit.framework.Test;
import junit.framework.TestSuite; import junit.framework.TestSuite;
@ -40,6 +42,7 @@ import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse; import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.PlainSocketFactory; import org.apache.http.conn.PlainSocketFactory;
@ -73,6 +76,43 @@ public class TestDefaultClientRequestDirector extends ServerTestBase {
return new TestSuite(TestDefaultClientRequestDirector.class); return new TestSuite(TestDefaultClientRequestDirector.class);
} }
/**
* Tests that if abort is called on an {@link AbortableHttpRequest} while
* {@link DefaultClientRequestDirector} is allocating a connection, that the
* connection is properly aborted.
*/
public void testAbortInAllocate() throws Exception {
CountDownLatch connLatch = new CountDownLatch(1);
CountDownLatch awaitLatch = new CountDownLatch(1);
final ConMan conMan = new ConMan(connLatch, awaitLatch);
final AtomicReference<Throwable> throwableRef = new AtomicReference<Throwable>();
final CountDownLatch getLatch = new CountDownLatch(1);
final DefaultHttpClient client = new DefaultHttpClient(conMan, new BasicHttpParams());
final HttpContext context = client.getDefaultContext();
final HttpGet httpget = new HttpGet("http://www.example.com/a");
new Thread(new Runnable() {
public void run() {
try {
client.execute(httpget, context);
} catch(Throwable t) {
throwableRef.set(t);
} finally {
getLatch.countDown();
}
}
}).start();
assertTrue("should have tried to get a connection", connLatch.await(1, TimeUnit.SECONDS));
httpget.abort();
assertTrue("should have finished get request", getLatch.await(1, TimeUnit.SECONDS));
assertTrue("should be instanceof InterruptedException, was: " + throwableRef.get(),
throwableRef.get() instanceof InterruptedException);
}
/** /**
* Tests that if a socket fails to connect, the allocated connection is * Tests that if a socket fails to connect, the allocated connection is
* properly released back to the connection manager. * properly released back to the connection manager.
@ -160,16 +200,29 @@ public class TestDefaultClientRequestDirector extends ServerTestBase {
} }
public ManagedClientConnection getConnection(HttpRoute route, public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit tunit) long timeout, TimeUnit tunit) {
throws ConnectionPoolTimeoutException, InterruptedException { throw new UnsupportedOperationException("just a mockup");
allocatedConnection = new ClientConnAdapterMockup() { }
@Override
public void open(HttpRoute route, HttpContext context, public ClientConnectionRequest newConnectionRequest() {
HttpParams params) throws IOException { return new ClientConnectionRequest() {
throw new ConnectException(); public void abortRequest() {
throw new UnsupportedOperationException("just a mockup");
}
public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit unit)
throws InterruptedException,
ConnectionPoolTimeoutException {
allocatedConnection = new ClientConnAdapterMockup() {
@Override
public void open(HttpRoute route, HttpContext context,
HttpParams params) throws IOException {
throw new ConnectException();
}
};
return allocatedConnection;
} }
}; };
return allocatedConnection;
} }
public HttpParams getParams() { public HttpParams getParams() {
@ -191,4 +244,71 @@ public class TestDefaultClientRequestDirector extends ServerTestBase {
} }
} }
private static class ConMan implements ClientConnectionManager {
private final CountDownLatch connLatch;
private final CountDownLatch awaitLatch;
public ConMan(CountDownLatch connLatch, CountDownLatch awaitLatch) {
this.connLatch = connLatch;
this.awaitLatch = awaitLatch;
}
public void closeIdleConnections(long idletime, TimeUnit tunit) {
throw new UnsupportedOperationException("just a mockup");
}
public ManagedClientConnection getConnection(HttpRoute route)
throws InterruptedException {
throw new UnsupportedOperationException("just a mockup");
}
public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit tunit) {
throw new UnsupportedOperationException("just a mockup");
}
public ClientConnectionRequest newConnectionRequest() {
final Thread currentThread = Thread.currentThread();
return new ClientConnectionRequest() {
public void abortRequest() {
currentThread.interrupt();
}
public ManagedClientConnection getConnection(HttpRoute route,
long timeout, TimeUnit tunit)
throws InterruptedException,
ConnectionPoolTimeoutException {
connLatch.countDown(); // notify waiter that we're getting a connection
// zero usually means sleep forever, but CountDownLatch doesn't interpret it that way.
if(timeout == 0)
timeout = Integer.MAX_VALUE;
if(!awaitLatch.await(timeout, tunit))
throw new ConnectionPoolTimeoutException();
return new ClientConnAdapterMockup();
}
};
}
public HttpParams getParams() {
throw new UnsupportedOperationException("just a mockup");
}
public SchemeRegistry getSchemeRegistry() {
SchemeRegistry registry = new SchemeRegistry();
registry.register(new Scheme("http", new SocketFactoryMockup(null), 80));
return registry;
}
public void releaseConnection(ManagedClientConnection conn) {
throw new UnsupportedOperationException("just a mockup");
}
public void shutdown() {
throw new UnsupportedOperationException("just a mockup");
}
}
} }

View File

@ -50,7 +50,8 @@ import org.apache.http.util.EntityUtils;
*/ */
public class ExecReqThread extends GetConnThread { public class ExecReqThread extends GetConnThread {
protected RequestSpec request_spec; protected final ClientConnectionManager conn_manager;
protected final RequestSpec request_spec;
protected volatile HttpResponse response; protected volatile HttpResponse response;
protected volatile byte[] response_data; protected volatile byte[] response_data;
@ -71,6 +72,7 @@ public class ExecReqThread extends GetConnThread {
HttpRoute route, long timeout, HttpRoute route, long timeout,
RequestSpec reqspec) { RequestSpec reqspec) {
super(mgr, route, timeout); super(mgr, route, timeout);
this.conn_manager = mgr;
request_spec = reqspec; request_spec = reqspec;
} }

View File

@ -33,6 +33,7 @@ package org.apache.http.impl.conn;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
@ -44,26 +45,36 @@ import org.apache.http.conn.ManagedClientConnection;
*/ */
public class GetConnThread extends Thread { public class GetConnThread extends Thread {
protected ClientConnectionManager conn_manager; protected final HttpRoute conn_route;
protected HttpRoute conn_route; protected final long conn_timeout;
protected long conn_timeout; protected final ClientConnectionRequest conn_request;
protected volatile ManagedClientConnection connection; protected volatile ManagedClientConnection connection;
protected volatile Throwable exception; protected volatile Throwable exception;
/** /**
* Creates a new thread. * Creates a new thread for requesting a connection from the given manager.
*
* When this thread is started, it will try to obtain a connection. * When this thread is started, it will try to obtain a connection.
* The timeout is in milliseconds. * The timeout is in milliseconds.
*/ */
public GetConnThread(ClientConnectionManager mgr, public GetConnThread(ClientConnectionManager mgr,
HttpRoute route, long timeout) { HttpRoute route, long timeout) {
this(mgr.newConnectionRequest(), route, timeout);
conn_manager = mgr;
conn_route = route;
conn_timeout = timeout;
} }
/**
* Creates a new for requesting a connection from the given request object.
*
* When this thread is started, it will try to obtain a connection.
* The timeout is in milliseconds.
*/
public GetConnThread(ClientConnectionRequest connectionRequest,
HttpRoute route, long timeout) {
conn_route = route;
conn_timeout = timeout;
conn_request = connectionRequest;
}
/** /**
* This method is executed when the thread is started. * This method is executed when the thread is started.
@ -71,7 +82,7 @@ public class GetConnThread extends Thread {
@Override @Override
public void run() { public void run() {
try { try {
connection = conn_manager.getConnection connection = conn_request.getConnection
(conn_route, conn_timeout, TimeUnit.MILLISECONDS); (conn_route, conn_timeout, TimeUnit.MILLISECONDS);
} catch (Throwable dart) { } catch (Throwable dart) {
exception = dart; exception = dart;

View File

@ -38,18 +38,19 @@ import junit.framework.TestSuite;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.HttpVersion; import org.apache.http.HttpVersion;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.ManagedClientConnection; import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.PlainSocketFactory; import org.apache.http.conn.PlainSocketFactory;
import org.apache.http.conn.Scheme; import org.apache.http.conn.Scheme;
import org.apache.http.conn.SchemeRegistry; import org.apache.http.conn.SchemeRegistry;
import org.apache.http.conn.SocketFactory; import org.apache.http.conn.SocketFactory;
import org.apache.http.conn.params.HttpConnectionManagerParams; import org.apache.http.conn.params.HttpConnectionManagerParams;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.params.BasicHttpParams; import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpParams; import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams; import org.apache.http.params.HttpProtocolParams;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
/** /**
@ -543,5 +544,87 @@ public class TestTSCCMNoServer extends TestCase {
mgr.shutdown(); mgr.shutdown();
} }
public void testAbortAfterRequestStarts() throws Exception {
HttpParams params = createDefaultParams();
HttpConnectionManagerParams.setMaxTotalConnections(params, 1);
ThreadSafeClientConnManager mgr = createTSCCM(params, null);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
ManagedClientConnection conn = mgr.getConnection(route, 1L, TimeUnit.MILLISECONDS);
ClientConnectionRequest request = mgr.newConnectionRequest();
GetConnThread gct = new GetConnThread(request, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
request.abortRequest();
gct.join(10000);
assertNotNull("thread should have gotten an exception",
gct.getException());
assertSame("thread got wrong exception",
InterruptedException.class,
gct.getException().getClass());
// make sure the manager is still working
try {
mgr.getConnection(route, 10L, TimeUnit.MILLISECONDS);
fail("should have gotten a timeout");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.releaseConnection(conn);
// this time: no exception
conn = mgr.getConnection(route, 10L, TimeUnit.MILLISECONDS);
assertNotNull("should have gotten a connection", conn);
mgr.shutdown();
}
public void testAbortBeforeRequestStarts() throws Exception {
HttpParams params = createDefaultParams();
HttpConnectionManagerParams.setMaxTotalConnections(params, 1);
ThreadSafeClientConnManager mgr = createTSCCM(params, null);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
ManagedClientConnection conn = mgr.getConnection(route, 1L, TimeUnit.MILLISECONDS);
ClientConnectionRequest request = mgr.newConnectionRequest();
request.abortRequest();
GetConnThread gct = new GetConnThread(request, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
gct.join(10000);
assertNotNull("thread should have gotten an exception",
gct.getException());
assertSame("thread got wrong exception",
InterruptedException.class,
gct.getException().getClass());
// make sure the manager is still working
try {
mgr.getConnection(route, 10L, TimeUnit.MILLISECONDS);
fail("should have gotten a timeout");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.releaseConnection(conn);
// this time: no exception
conn = mgr.getConnection(route, 10L, TimeUnit.MILLISECONDS);
assertNotNull("should have gotten a connection", conn);
mgr.shutdown();
}
} // class TestTSCCMNoServer } // class TestTSCCMNoServer