diff --git a/RELEASE_NOTES.txt b/RELEASE_NOTES.txt index 289ae4a53..51bbbff20 100644 --- a/RELEASE_NOTES.txt +++ b/RELEASE_NOTES.txt @@ -1,6 +1,9 @@ Changes since 4.0 Alpha 4 ------------------- +* [HTTPCLIENT-776] Fixed concurrency issues with AbstractPoolEntry. + Contributed by Sam Berlin + * Resolved a long standing problem with HttpClient not taking into account the user context when pooling / re-using connections. HttpClient now correctly handles stateful / user specific connections such as persistent diff --git a/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java b/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java index 3328f2a4e..8459a0351 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/AbstractPoolEntry.java @@ -72,7 +72,7 @@ public abstract class AbstractPoolEntry { protected final ClientConnectionOperator connOperator; /** The underlying connection being pooled or used. */ - protected volatile OperatedClientConnection connection; + protected final OperatedClientConnection connection; /** The route for which this entry gets allocated. */ //@@@ currently accessed from connection manager(s) as attribute @@ -167,10 +167,19 @@ public abstract class AbstractPoolEntry { route.getLocalAddress(), context, params); - if (proxy == null) - this.tracker.connectTarget(this.connection.isSecure()); - else - this.tracker.connectProxy(proxy, this.connection.isSecure()); + RouteTracker localTracker = tracker; // capture volatile + + // If this tracker was reset while connecting, + // fail early. + if (localTracker == null) { + throw new IOException("Request aborted"); + } + + if (proxy == null) { + localTracker.connectTarget(this.connection.isSecure()); + } else { + localTracker.connectProxy(proxy, this.connection.isSecure()); + } } // open @@ -299,9 +308,12 @@ public abstract class AbstractPoolEntry { /** - * Resets tracked route. + * Shuts down the entry. + * + * If {@link #open(HttpRoute, HttpContext, HttpParams)} is in progress, + * this will cause that open to possibly throw an {@link IOException}. */ - protected void resetTrackedRoute() { + protected void shutdownEntry() { tracker = null; } diff --git a/module-client/src/main/java/org/apache/http/impl/conn/AbstractPooledConnAdapter.java b/module-client/src/main/java/org/apache/http/impl/conn/AbstractPooledConnAdapter.java index 7876f27bd..68b6abd19 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/AbstractPooledConnAdapter.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/AbstractPooledConnAdapter.java @@ -151,7 +151,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte // non-javadoc, see interface HttpConnection public void close() throws IOException { if (poolEntry != null) - poolEntry.resetTrackedRoute(); + poolEntry.shutdownEntry(); OperatedClientConnection conn = getWrappedConnection(); if (conn != null) { @@ -162,7 +162,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte // non-javadoc, see interface HttpConnection public void shutdown() throws IOException { if (poolEntry != null) - poolEntry.resetTrackedRoute(); + poolEntry.shutdownEntry(); OperatedClientConnection conn = getWrappedConnection(); if (conn != null) { diff --git a/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java b/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java index 4ce8a5274..f9418b713 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/DefaultClientConnection.java @@ -79,6 +79,9 @@ public class DefaultClientConnection extends SocketHttpClientConnection /** Whether this connection is secure. */ private boolean connSecure; + + /** True if this connection was shutdown. */ + private volatile boolean shutdown; // public default constructor @@ -102,10 +105,17 @@ public class DefaultClientConnection extends SocketHttpClientConnection } - public void opening(Socket sock, HttpHost target) { - assertNotOpen(); + public void opening(Socket sock, HttpHost target) throws IOException { + assertNotOpen(); this.socket = sock; this.targetHost = target; + + // Check for shutdown after assigning socket, so that + if (this.shutdown) { + sock.close(); // allow this to throw... + // ...but if it doesn't, explicitly throw one ourselves. + throw new IOException("Connection already shutdown"); + } } @@ -127,14 +137,17 @@ public class DefaultClientConnection extends SocketHttpClientConnection * socket that is being connected to a remote address will be closed. * That will interrupt a thread that is blocked on connecting * the socket. + * If the connection is not yet open, this will prevent the connection + * from being opened. * * @throws IOException in case of a problem */ @Override public void shutdown() throws IOException { LOG.debug("Connection shut down"); + shutdown = true; - super.shutdown(); + super.shutdown(); Socket sock = this.socket; // copy volatile attribute if (sock != null) sock.close(); diff --git a/module-client/src/main/java/org/apache/http/impl/conn/SingleClientConnManager.java b/module-client/src/main/java/org/apache/http/impl/conn/SingleClientConnManager.java index b7ecc1e53..8932bd097 100644 --- a/module-client/src/main/java/org/apache/http/impl/conn/SingleClientConnManager.java +++ b/module-client/src/main/java/org/apache/http/impl/conn/SingleClientConnManager.java @@ -372,7 +372,7 @@ public class SingleClientConnManager implements ClientConnectionManager { protected void close() throws IOException { - resetTrackedRoute(); + shutdownEntry(); if (connection.isOpen()) connection.close(); } @@ -384,7 +384,7 @@ public class SingleClientConnManager implements ClientConnectionManager { protected void shutdown() throws IOException { - resetTrackedRoute(); + shutdownEntry(); if (connection.isOpen()) connection.shutdown(); } diff --git a/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java b/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java index 659a9aad6..8e86a7369 100644 --- a/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java +++ b/module-client/src/test/java/org/apache/http/impl/conn/TestTSCCMWithServer.java @@ -31,8 +31,15 @@ package org.apache.http.impl.conn; +import java.io.IOException; import java.lang.ref.WeakReference; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import junit.framework.Test; import junit.framework.TestSuite; @@ -42,21 +49,28 @@ import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.HttpVersion; -import org.apache.http.conn.ConnectionPoolTimeoutException; -import org.apache.http.conn.routing.HttpRoute; -import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.ClientConnectionOperator; import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectTimeoutException; +import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ManagedClientConnection; +import org.apache.http.conn.OperatedClientConnection; import org.apache.http.conn.params.ConnPerRouteBean; import org.apache.http.conn.params.HttpConnectionManagerParams; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.scheme.PlainSocketFactory; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.scheme.SocketFactory; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; import org.apache.http.localserver.ServerTestBase; import org.apache.http.message.BasicHttpRequest; import org.apache.http.params.HttpParams; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.ExecutionContext; +import org.apache.http.protocol.HttpContext; import org.apache.http.util.EntityUtils; -import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; /** @@ -440,6 +454,364 @@ public class TestTSCCMWithServer extends ServerTestBase { assertNull("TSCCM not garbage collected", wref.get()); } + + public void testAbortDuringConnecting() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CONNECT, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(0, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortBeforeSocketCreate() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.BEFORE_CREATE, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected exception"); + } catch(IOException expected) { + assertEquals("Connection already shutdown", expected.getMessage()); + } + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(0, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortAfterSocketConnect() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final StallingSocketFactory stallingSocketFactory = new StallingSocketFactory(connectLatch, WaitPolicy.AFTER_CONNECT, PlainSocketFactory.getSocketFactory()); + Scheme scheme = new Scheme("http", stallingSocketFactory, 80); + SchemeRegistry registry = new SchemeRegistry(); + registry.register(scheme); + + ThreadSafeClientConnManager mgr = createTSCCM(mgrpar, registry); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + stallingSocketFactory.waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected SocketException"); + } catch(SocketException expected) {} + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(1, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + public void testAbortAfterOperatorOpen() throws Exception { + HttpParams mgrpar = defaultParams.copy(); + HttpConnectionManagerParams.setMaxTotalConnections(mgrpar, 1); + + final CountDownLatch connectLatch = new CountDownLatch(1); + final AtomicReference operatorRef = new AtomicReference(); + + ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(mgrpar, supportedSchemes) { + @Override + protected ClientConnectionOperator createConnectionOperator( + SchemeRegistry schreg) { + operatorRef.set(new StallingOperator(connectLatch, WaitPolicy.AFTER_OPEN, super.createConnectionOperator(schreg))); + return operatorRef.get(); + } + }; + assertNotNull(operatorRef.get()); + + final HttpHost target = getServerHttp(); + final HttpRoute route = new HttpRoute(target, null, false); + + final ManagedClientConnection conn = getConnection(mgr, route); + assertTrue(conn instanceof AbstractClientConnAdapter); + + final AtomicReference throwRef = new AtomicReference(); + Thread abortingThread = new Thread(new Runnable() { + public void run() { + try { + operatorRef.get().waitForState(); + conn.abortConnection(); + connectLatch.countDown(); + } catch (Throwable e) { + throwRef.set(e); + } + } + }); + abortingThread.start(); + + try { + conn.open(route, httpContext, defaultParams); + fail("expected exception"); + } catch(IOException iox) { + assertEquals("Request aborted", iox.getMessage()); + } + + abortingThread.join(5000); + if(throwRef.get() != null) + throw new RuntimeException(throwRef.get()); + + assertFalse(conn.isOpen()); + assertEquals(1, localServer.getAcceptedConnectionCount()); + + // check that there are no connections available + try { + // this should fail quickly, connection has not been released + getConnection(mgr, route, 100L, TimeUnit.MILLISECONDS); + fail("ConnectionPoolTimeoutException should have been thrown"); + } catch (ConnectionPoolTimeoutException e) { + // expected + } + + // return it back to the manager + ((AbstractClientConnAdapter) conn).releaseConnection(); + + // the connection is expected to be released back to the manager + ManagedClientConnection conn2 = getConnection(mgr, route, 5L, TimeUnit.SECONDS); + assertFalse("connection should have been closed", conn2.isOpen()); + + mgr.releaseConnection(conn2); + mgr.shutdown(); + } + + private static class LatchSupport { + private final CountDownLatch continueLatch; + private final CountDownLatch waitLatch = new CountDownLatch(1); + protected final WaitPolicy waitPolicy; + + LatchSupport(CountDownLatch continueLatch, WaitPolicy waitPolicy) { + this.continueLatch = continueLatch; + this.waitPolicy = waitPolicy; + } + + void waitForState() throws InterruptedException { + if(!waitLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long"); + } + + void latch() { + waitLatch.countDown(); + try { + if (!continueLatch.await(1, TimeUnit.SECONDS)) + throw new RuntimeException("waited too long!"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static class StallingOperator extends LatchSupport implements ClientConnectionOperator { + private final ClientConnectionOperator delegate; + + public StallingOperator(CountDownLatch continueLatch, + WaitPolicy waitPolicy, ClientConnectionOperator delegate) { + super(continueLatch, waitPolicy); + this.delegate = delegate; + } + + public OperatedClientConnection createConnection() { + return delegate.createConnection(); + } + + public void openConnection(OperatedClientConnection conn, + HttpHost target, InetAddress local, HttpContext context, + HttpParams params) throws IOException { + delegate.openConnection(conn, target, local, context, params); + if(waitPolicy == WaitPolicy.AFTER_OPEN) + latch(); + } + + public void updateSecureConnection(OperatedClientConnection conn, + HttpHost target, HttpContext context, HttpParams params) + throws IOException { + delegate.updateSecureConnection(conn, target, context, params); + } + } + + private static class StallingSocketFactory extends LatchSupport implements SocketFactory { + private final SocketFactory delegate; + + public StallingSocketFactory(CountDownLatch continueLatch, + WaitPolicy waitPolicy, SocketFactory delegate) { + super(continueLatch, waitPolicy); + this.delegate = delegate; + } + + public Socket connectSocket(Socket sock, String host, int port, + InetAddress localAddress, int localPort, HttpParams params) + throws IOException, UnknownHostException, + ConnectTimeoutException { + if(waitPolicy == WaitPolicy.BEFORE_CONNECT) + latch(); + + Socket socket = delegate.connectSocket(sock, host, port, localAddress, + localPort, params); + + if(waitPolicy == WaitPolicy.AFTER_CONNECT) + latch(); + + return socket; + } + + public Socket createSocket() throws IOException { + if(waitPolicy == WaitPolicy.BEFORE_CREATE) + latch(); + + return delegate.createSocket(); + } + + public boolean isSecure(Socket sock) throws IllegalArgumentException { + return delegate.isSecure(sock); + } + } + + private enum WaitPolicy { BEFORE_CREATE, BEFORE_CONNECT, AFTER_CONNECT, AFTER_OPEN } + } // class TestTSCCMWithServer diff --git a/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java b/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java index 64c087d05..4b795895a 100644 --- a/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java +++ b/module-client/src/test/java/org/apache/http/localserver/LocalTestServer.java @@ -39,6 +39,7 @@ import java.net.SocketAddress; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpException; @@ -104,6 +105,9 @@ public class LocalTestServer { /** The request listening thread, while listening. */ protected volatile Thread listenerThread; + + /** The number of connections this accepted. */ + private final AtomicInteger acceptedConnections = new AtomicInteger(0); /** @@ -161,7 +165,13 @@ public class LocalTestServer { "LocalTestServer/1.1"); return params; } - + + /** + * Returns the number of connections this test server has accepted. + */ + public int getAcceptedConnectionCount() { + return acceptedConnections.get(); + } /** * {@link #register Registers} a set of default request handlers. @@ -332,6 +342,7 @@ public class LocalTestServer { protected void accept() throws IOException { // Set up HTTP connection Socket socket = servicedSocket.accept(); + acceptedConnections.incrementAndGet(); DefaultHttpServerConnection conn = new DefaultHttpServerConnection(); conn.bind(socket, serverParams);