HTTPCLIENT-776: Fixed concurrency issues with AbstractPoolEntry

Contributed by Sam Berlin <sberlin at gmail.com>
Reviewed by Oleg Kalnichevski



git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@658775 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2008-05-21 17:30:45 +00:00
parent 6d0c809900
commit 1b4ecf0b57
7 changed files with 430 additions and 19 deletions

View File

@ -1,6 +1,9 @@
Changes since 4.0 Alpha 4
-------------------
* [HTTPCLIENT-776] Fixed concurrency issues with AbstractPoolEntry.
Contributed by Sam Berlin <sberlin at gmail.com>
* Resolved a long standing problem with HttpClient not taking into account
the user context when pooling / re-using connections. HttpClient now
correctly handles stateful / user specific connections such as persistent

View File

@ -72,7 +72,7 @@ public abstract class AbstractPoolEntry {
protected final ClientConnectionOperator connOperator;
/** The underlying connection being pooled or used. */
protected volatile OperatedClientConnection connection;
protected final OperatedClientConnection connection;
/** The route for which this entry gets allocated. */
//@@@ currently accessed from connection manager(s) as attribute
@ -167,10 +167,19 @@ public abstract class AbstractPoolEntry {
route.getLocalAddress(),
context, params);
if (proxy == null)
this.tracker.connectTarget(this.connection.isSecure());
else
this.tracker.connectProxy(proxy, this.connection.isSecure());
RouteTracker localTracker = tracker; // capture volatile
// If this tracker was reset while connecting,
// fail early.
if (localTracker == null) {
throw new IOException("Request aborted");
}
if (proxy == null) {
localTracker.connectTarget(this.connection.isSecure());
} else {
localTracker.connectProxy(proxy, this.connection.isSecure());
}
} // open
@ -299,9 +308,12 @@ public abstract class AbstractPoolEntry {
/**
* Resets tracked route.
* Shuts down the entry.
*
* If {@link #open(HttpRoute, HttpContext, HttpParams)} is in progress,
* this will cause that open to possibly throw an {@link IOException}.
*/
protected void resetTrackedRoute() {
protected void shutdownEntry() {
tracker = null;
}

View File

@ -151,7 +151,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte
// non-javadoc, see interface HttpConnection
public void close() throws IOException {
if (poolEntry != null)
poolEntry.resetTrackedRoute();
poolEntry.shutdownEntry();
OperatedClientConnection conn = getWrappedConnection();
if (conn != null) {
@ -162,7 +162,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte
// non-javadoc, see interface HttpConnection
public void shutdown() throws IOException {
if (poolEntry != null)
poolEntry.resetTrackedRoute();
poolEntry.shutdownEntry();
OperatedClientConnection conn = getWrappedConnection();
if (conn != null) {

View File

@ -79,6 +79,9 @@ public class DefaultClientConnection extends SocketHttpClientConnection
/** Whether this connection is secure. */
private boolean connSecure;
/** True if this connection was shutdown. */
private volatile boolean shutdown;
// public default constructor
@ -102,10 +105,17 @@ public class DefaultClientConnection extends SocketHttpClientConnection
}
public void opening(Socket sock, HttpHost target) {
assertNotOpen();
public void opening(Socket sock, HttpHost target) throws IOException {
assertNotOpen();
this.socket = sock;
this.targetHost = target;
// Check for shutdown after assigning socket, so that
if (this.shutdown) {
sock.close(); // allow this to throw...
// ...but if it doesn't, explicitly throw one ourselves.
throw new IOException("Connection already shutdown");
}
}
@ -127,14 +137,17 @@ public class DefaultClientConnection extends SocketHttpClientConnection
* socket that is being connected to a remote address will be closed.
* That will interrupt a thread that is blocked on connecting
* the socket.
* If the connection is not yet open, this will prevent the connection
* from being opened.
*
* @throws IOException in case of a problem
*/
@Override
public void shutdown() throws IOException {
LOG.debug("Connection shut down");
shutdown = true;
super.shutdown();
super.shutdown();
Socket sock = this.socket; // copy volatile attribute
if (sock != null)
sock.close();

View File

@ -372,7 +372,7 @@ public class SingleClientConnManager implements ClientConnectionManager {
protected void close()
throws IOException {
resetTrackedRoute();
shutdownEntry();
if (connection.isOpen())
connection.close();
}
@ -384,7 +384,7 @@ public class SingleClientConnManager implements ClientConnectionManager {
protected void shutdown()
throws IOException {
resetTrackedRoute();
shutdownEntry();
if (connection.isOpen())
connection.shutdown();
}

View File

@ -31,8 +31,15 @@
package org.apache.http.impl.conn;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.Test;
import junit.framework.TestSuite;
@ -42,21 +49,28 @@ import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.ClientConnectionOperator;
import org.apache.http.conn.ClientConnectionRequest;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ManagedClientConnection;
import org.apache.http.conn.OperatedClientConnection;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.params.HttpConnectionManagerParams;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.scheme.SocketFactory;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.localserver.ServerTestBase;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
/**
@ -440,6 +454,364 @@ public class TestTSCCMWithServer extends ServerTestBase {
assertNull("TSCCM not garbage collected", wref.get());
}
public void testAbortDuringConnecting() throws Exception {
HttpParams mgrpar = defaultParams.copy();
HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1);
final CountDownLatch connectLatch = new CountDownLatch(1);
final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CONNECT, PlainSocketFactory.getSocketFactory());
Scheme scheme = new Scheme("http", stallingSocketFactory, 80);
SchemeRegistry registry = new SchemeRegistry();
registry.register(scheme);
ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry);
final HttpHost target = getServerHttp();
final HttpRoute route = new HttpRoute(target, null, false);
final ManagedClientConnection conn = getConnection(mgr, route);
assertTrue(conn instanceof AbstractClientConnAdapter);
final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
Thread abortingThread = new Thread(new Runnable() {
public void run() {
try {
stallingSocketFactory.waitForState();
conn.abortConnection();
connectLatch.countDown();
} catch (Throwable e) {
throwRef.set(e);
}
}
});
abortingThread.start();
try {
conn.open(route, httpContext, defaultParams);
fail("expected SocketException");
} catch(SocketException expected) {}
abortingThread.join(5000);
if(throwRef.get() != null)
throw new RuntimeException(throwRef.get());
assertFalse(conn.isOpen());
assertEquals(0, localServer.getAcceptedConnectionCount());
// check that there are no connections available
try {
// this should fail quickly, connection has not been released
getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS);
fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// return it back to the manager
((AbstractClientConnAdapter) conn).releaseConnection();
// the connection is expected to be released back to the manager
ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS);
assertFalse("connection should have been closed", conn2.isOpen());
mgr.releaseConnection(conn2);
mgr.shutdown();
}
public void testAbortBeforeSocketCreate() throws Exception {
HttpParams mgrpar = defaultParams.copy();
HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1);
final CountDownLatch connectLatch = new CountDownLatch(1);
final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CREATE, PlainSocketFactory.getSocketFactory());
Scheme scheme = new Scheme("http", stallingSocketFactory, 80);
SchemeRegistry registry = new SchemeRegistry();
registry.register(scheme);
ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry);
final HttpHost target = getServerHttp();
final HttpRoute route = new HttpRoute(target, null, false);
final ManagedClientConnection conn = getConnection(mgr, route);
assertTrue(conn instanceof AbstractClientConnAdapter);
final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
Thread abortingThread = new Thread(new Runnable() {
public void run() {
try {
stallingSocketFactory.waitForState();
conn.abortConnection();
connectLatch.countDown();
} catch (Throwable e) {
throwRef.set(e);
}
}
});
abortingThread.start();
try {
conn.open(route, httpContext, defaultParams);
fail("expected exception");
} catch(IOException expected) {
assertEquals("Connection already shutdown", expected.getMessage());
}
abortingThread.join(5000);
if(throwRef.get() != null)
throw new RuntimeException(throwRef.get());
assertFalse(conn.isOpen());
assertEquals(0, localServer.getAcceptedConnectionCount());
// check that there are no connections available
try {
// this should fail quickly, connection has not been released
getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS);
fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// return it back to the manager
((AbstractClientConnAdapter) conn).releaseConnection();
// the connection is expected to be released back to the manager
ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS);
assertFalse("connection should have been closed", conn2.isOpen());
mgr.releaseConnection(conn2);
mgr.shutdown();
}
public void testAbortAfterSocketConnect() throws Exception {
HttpParams mgrpar = defaultParams.copy();
HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1);
final CountDownLatch connectLatch = new CountDownLatch(1);
final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.AFTER_CONNECT, PlainSocketFactory.getSocketFactory());
Scheme scheme = new Scheme("http", stallingSocketFactory, 80);
SchemeRegistry registry = new SchemeRegistry();
registry.register(scheme);
ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry);
final HttpHost target = getServerHttp();
final HttpRoute route = new HttpRoute(target, null, false);
final ManagedClientConnection conn = getConnection(mgr, route);
assertTrue(conn instanceof AbstractClientConnAdapter);
final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
Thread abortingThread = new Thread(new Runnable() {
public void run() {
try {
stallingSocketFactory.waitForState();
conn.abortConnection();
connectLatch.countDown();
} catch (Throwable e) {
throwRef.set(e);
}
}
});
abortingThread.start();
try {
conn.open(route, httpContext, defaultParams);
fail("expected SocketException");
} catch(SocketException expected) {}
abortingThread.join(5000);
if(throwRef.get() != null)
throw new RuntimeException(throwRef.get());
assertFalse(conn.isOpen());
assertEquals(1, localServer.getAcceptedConnectionCount());
// check that there are no connections available
try {
// this should fail quickly, connection has not been released
getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS);
fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// return it back to the manager
((AbstractClientConnAdapter) conn).releaseConnection();
// the connection is expected to be released back to the manager
ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS);
assertFalse("connection should have been closed", conn2.isOpen());
mgr.releaseConnection(conn2);
mgr.shutdown();
}
public void testAbortAfterOperatorOpen() throws Exception {
HttpParams mgrpar = defaultParams.copy();
HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1);
final CountDownLatch connectLatch = new CountDownLatch(1);
final AtomicReference<StallingOperator> operatorRef = new AtomicReference<StallingOperator>();
ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(mgrpar, supportedSchemes) {
@Override
protected ClientConnectionOperator createConnectionOperator(
SchemeRegistry schreg) {
operatorRef.set(new StallingOperator(connectLatch, WaitPolicy.AFTER_OPEN, super.createConnectionOperator(schreg)));
return operatorRef.get();
}
};
assertNotNull(operatorRef.get());
final HttpHost target = getServerHttp();
final HttpRoute route = new HttpRoute(target, null, false);
final ManagedClientConnection conn = getConnection(mgr, route);
assertTrue(conn instanceof AbstractClientConnAdapter);
final AtomicReference<Throwable> throwRef = new AtomicReference<Throwable>();
Thread abortingThread = new Thread(new Runnable() {
public void run() {
try {
operatorRef.get().waitForState();
conn.abortConnection();
connectLatch.countDown();
} catch (Throwable e) {
throwRef.set(e);
}
}
});
abortingThread.start();
try {
conn.open(route, httpContext, defaultParams);
fail("expected exception");
} catch(IOException iox) {
assertEquals("Request aborted", iox.getMessage());
}
abortingThread.join(5000);
if(throwRef.get() != null)
throw new RuntimeException(throwRef.get());
assertFalse(conn.isOpen());
assertEquals(1, localServer.getAcceptedConnectionCount());
// check that there are no connections available
try {
// this should fail quickly, connection has not been released
getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS);
fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// return it back to the manager
((AbstractClientConnAdapter) conn).releaseConnection();
// the connection is expected to be released back to the manager
ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS);
assertFalse("connection should have been closed", conn2.isOpen());
mgr.releaseConnection(conn2);
mgr.shutdown();
}
private static class LatchSupport {
private final CountDownLatch continueLatch;
private final CountDownLatch waitLatch = new CountDownLatch(1);
protected final WaitPolicy waitPolicy;
LatchSupport(CountDownLatch continueLatch, WaitPolicy waitPolicy) {
this.continueLatch = continueLatch;
this.waitPolicy = waitPolicy;
}
void waitForState() throws InterruptedException {
if(!waitLatch.await(1, TimeUnit.SECONDS))
throw new RuntimeException("waited too long");
}
void latch() {
waitLatch.countDown();
try {
if (!continueLatch.await(1, TimeUnit.SECONDS))
throw new RuntimeException("waited too long!");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class StallingOperator extends LatchSupport implements ClientConnectionOperator {
private final ClientConnectionOperator delegate;
public StallingOperator(CountDownLatch continueLatch,
WaitPolicy waitPolicy, ClientConnectionOperator delegate) {
super(continueLatch, waitPolicy);
this.delegate = delegate;
}
public OperatedClientConnection createConnection() {
return delegate.createConnection();
}
public void openConnection(OperatedClientConnection conn,
HttpHost target, InetAddress local, HttpContext context,
HttpParams params) throws IOException {
delegate.openConnection(conn, target, local, context, params);
if(waitPolicy == WaitPolicy.AFTER_OPEN)
latch();
}
public void updateSecureConnection(OperatedClientConnection conn,
HttpHost target, HttpContext context, HttpParams params)
throws IOException {
delegate.updateSecureConnection(conn, target, context, params);
}
}
private static class StallingSocketFactory extends LatchSupport implements SocketFactory {
private final SocketFactory delegate;
public StallingSocketFactory(CountDownLatch continueLatch,
WaitPolicy waitPolicy, SocketFactory delegate) {
super(continueLatch, waitPolicy);
this.delegate = delegate;
}
public Socket connectSocket(Socket sock, String host, int port,
InetAddress localAddress, int localPort, HttpParams params)
throws IOException, UnknownHostException,
ConnectTimeoutException {
if(waitPolicy == WaitPolicy.BEFORE_CONNECT)
latch();
Socket socket = delegate.connectSocket(sock, host, port, localAddress,
localPort, params);
if(waitPolicy == WaitPolicy.AFTER_CONNECT)
latch();
return socket;
}
public Socket createSocket() throws IOException {
if(waitPolicy == WaitPolicy.BEFORE_CREATE)
latch();
return delegate.createSocket();
}
public boolean isSecure(Socket sock) throws IllegalArgumentException {
return delegate.isSecure(sock);
}
}
private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN }
} // class TestTSCCMWithServer

View File

@ -39,6 +39,7 @@ import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpException;
@ -104,6 +105,9 @@ public class LocalTestServer {
/** The request listening thread, while listening. */
protected volatile Thread listenerThread;
/** The number of connections this accepted. */
private final AtomicInteger acceptedConnections = new AtomicInteger(0);
/**
@ -161,7 +165,13 @@ public class LocalTestServer {
"LocalTestServer/1.1");
return params;
}
/**
* Returns the number of connections this test server has accepted.
*/
public int getAcceptedConnectionCount() {
return acceptedConnections.get();
}
/**
* {@link #register Registers} a set of default request handlers.
@ -332,6 +342,7 @@ public class LocalTestServer {
protected void accept() throws IOException {
// Set up HTTP connection
Socket socket = servicedSocket.accept();
acceptedConnections.incrementAndGet();
DefaultHttpServerConnection conn =
new DefaultHttpServerConnection();
conn.bind(socket, serverParams);