HTTPCLIENT-769: Do not pool connection marked non-reusable

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@652945 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2008-05-02 23:10:20 +00:00
parent 70ec62d7c4
commit 18ee596a4d
15 changed files with 419 additions and 64 deletions

View File

@ -23,6 +23,9 @@ HttpClient 3.x features that have NOT yet been ported:
Changelog:
-------------------
* [HTTPCLIENT-769] Do not pool connection marked non-reusable.
Contributed by Oleg Kalnichevski <olegk at apache.org>
* [HTTPCLIENT-763] Fixed problem with AbstractClientConnAdapter#abortConnection()
not releasing the connection if called from the main execution thread while
there is no blocking I/O operation.

View File

@ -359,30 +359,25 @@ public abstract class AbstractClientConnAdapter
}
aborted = true;
unmarkReusable();
OperatedClientConnection conn = getWrappedConnection();
if (conn != null) {
try {
conn.shutdown();
} catch (IOException ignore) {
}
// Usually #abortConnection() is expected to be called from
// a helper thread in order to unblock the main execution thread
// blocked in an I/O operation. It may be unsafe to call
// #releaseConnection() from the helper thread, so we have to rely
// on an IOException thrown by the closed socket on the main thread
// to trigger the release of the connection back to the
// connection manager.
//
// However, if this method is called from the main execution thread
// it should be safe to release the connection immediately. Besides,
// this also helps ensure the connection gets released back to the
// manager if #abortConnection() is called from the main execution
// thread while there is no blocking I/O operation.
if (executionThread.equals(Thread.currentThread())) {
releaseConnection();
}
try {
shutdown();
} catch (IOException ignore) {
}
// Usually #abortConnection() is expected to be called from
// a helper thread in order to unblock the main execution thread
// blocked in an I/O operation. It may be unsafe to call
// #releaseConnection() from the helper thread, so we have to rely
// on an IOException thrown by the closed socket on the main thread
// to trigger the release of the connection back to the
// connection manager.
//
// However, if this method is called from the main execution thread
// it should be safe to release the connection immediately. Besides,
// this also helps ensure the connection gets released back to the
// manager if #abortConnection() is called from the main execution
// thread while there is no blocking I/O operation.
if (executionThread.equals(Thread.currentThread())) {
releaseConnection();
}
}

View File

@ -299,12 +299,9 @@ public abstract class AbstractPoolEntry {
/**
* Tracks close or shutdown of the connection.
* There is no distinction between the two, the route is dropped
* in both cases. This method should be called regardless of
* whether the close or shutdown succeeds or triggers an error.
* Resets tracked route.
*/
public void closing() {
protected void resetTrackedRoute() {
tracker = null;
}

View File

@ -151,7 +151,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte
// non-javadoc, see interface HttpConnection
public void close() throws IOException {
if (poolEntry != null)
poolEntry.closing();
poolEntry.resetTrackedRoute();
OperatedClientConnection conn = getWrappedConnection();
if (conn != null) {
@ -162,7 +162,7 @@ public abstract class AbstractPooledConnAdapter extends AbstractClientConnAdapte
// non-javadoc, see interface HttpConnection
public void shutdown() throws IOException {
if (poolEntry != null)
poolEntry.closing();
poolEntry.resetTrackedRoute();
OperatedClientConnection conn = getWrappedConnection();
if (conn != null) {

View File

@ -283,7 +283,6 @@ public class SingleClientConnManager implements ClientConnectionManager {
} finally {
sca.detach();
managedConn = null;
uniquePoolEntry.tracker = null;
lastReleaseTime = System.currentTimeMillis();
}
} // releaseConnection
@ -374,7 +373,7 @@ public class SingleClientConnManager implements ClientConnectionManager {
protected void close()
throws IOException {
closing();
resetTrackedRoute();
if (connection.isOpen())
connection.close();
}
@ -386,7 +385,7 @@ public class SingleClientConnManager implements ClientConnectionManager {
protected void shutdown()
throws IOException {
closing();
resetTrackedRoute();
if (connection.isOpen())
connection.shutdown();
}

View File

@ -183,8 +183,10 @@ public abstract class AbstractConnPool implements RefQueueHandler {
* attempt to determine whether it can be re-used or not.
*
* @param entry the entry for the connection to release
* @param reusable <code>true</code> if the entry is deemed
* reusable, <code>false</code> otherwise.
*/
public abstract void freeEntry(BasicPoolEntry entry)
public abstract void freeEntry(BasicPoolEntry entry, boolean reusable)
;

View File

@ -369,7 +369,7 @@ public class ConnPoolByRoute extends AbstractConnPool {
// non-javadoc, see base class AbstractConnPool
@Override
public void freeEntry(BasicPoolEntry entry) {
public void freeEntry(BasicPoolEntry entry, boolean reusable) {
HttpRoute route = entry.getPlannedRoute();
if (LOG.isDebugEnabled()) {
@ -391,17 +391,15 @@ public class ConnPoolByRoute extends AbstractConnPool {
RouteSpecificPool rospl = getRoutePool(route, true);
rospl.freeEntry(entry);
freeConnections.add(entry);
if (numConnections == 0) {
// for some reason this pool didn't already exist
LOG.error("Master connection pool not found: " + route);
numConnections = 1;
if (reusable) {
rospl.freeEntry(entry);
freeConnections.add(entry);
idleConnHandler.add(entry.getConnection());
} else {
rospl.dropEntry();
numConnections--;
}
idleConnHandler.add(entry.getConnection());
notifyWaitingThread(rospl);
} finally {

View File

@ -222,9 +222,11 @@ public class ThreadSafeClientConnManager
iox);
} finally {
BasicPoolEntry entry = (BasicPoolEntry) hca.getPoolEntry();
boolean reusable = hca.isMarkedReusable();
hca.detach();
if (entry != null) // is it worth to bother with this check? @@@
connectionPool.freeEntry(entry);
if (entry != null) {
connectionPool.freeEntry(entry, reusable);
}
}
}

View File

@ -50,6 +50,7 @@ public class TestAllConn extends TestCase {
suite.addTest(TestScheme.suite());
suite.addTest(TestExceptions.suite());
suite.addTest(TestConnectionReuse.suite());
suite.addTest(TestConnectionAutoRelease.suite());
suite.addTest(TestAllConnParams.suite());
suite.addTest(TestAllRouting.suite());

View File

@ -200,8 +200,8 @@ public class TestConnectionAutoRelease extends ServerTestBase {
assertNotNull(e);
httpget.abort();
// Expect one connection in the pool
assertEquals(1, mgr.getConnectionsInPool());
// Expect zero connections in the pool
assertEquals(0, mgr.getConnectionsInPool());
// Make sure one connection is available
connreq = mgr.requestConnection(new HttpRoute(target), null);
@ -281,8 +281,8 @@ public class TestConnectionAutoRelease extends ServerTestBase {
}
// Expect one connection in the pool
assertEquals(1, mgr.getConnectionsInPool());
// Expect zero connections in the pool
assertEquals(0, mgr.getConnectionsInPool());
// Make sure one connection is available
connreq = mgr.requestConnection(new HttpRoute(target), null);

View File

@ -0,0 +1,345 @@
/*
* $HeadURL:$
* $Revision:$
* $Date:$
*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.conn;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.conn.params.ConnPerRouteBean;
import org.apache.http.conn.params.HttpConnectionManagerParams;
import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.scheme.SocketFactory;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.localserver.LocalTestServer;
import org.apache.http.localserver.RandomHandler;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.HTTP;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
public class TestConnectionReuse extends TestCase {
public TestConnectionReuse(String testName) {
super(testName);
}
public static void main(String args[]) {
String[] testCaseName = { TestConnectionReuse.class.getName() };
junit.textui.TestRunner.main(testCaseName);
}
public static Test suite() {
return new TestSuite(TestConnectionReuse.class);
}
protected LocalTestServer localServer;
@Override
protected void tearDown() throws Exception {
if (this.localServer != null) {
this.localServer.stop();
}
}
public void testReuseOfPersistentConnections() throws Exception {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new ResponseDate());
httpproc.addInterceptor(new ResponseServer());
httpproc.addInterceptor(new ResponseContent());
httpproc.addInterceptor(new ResponseConnControl());
this.localServer = new LocalTestServer(httpproc, null);
this.localServer.register("/random/*", new RandomHandler());
this.localServer.start();
InetSocketAddress saddress = (InetSocketAddress) this.localServer.getServiceAddress();
HttpParams params = new BasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "TestAgent/1.1");
HttpProtocolParams.setUseExpectContinue(params, false);
HttpConnectionParams.setStaleCheckingEnabled(params, false);
HttpConnectionManagerParams.setMaxTotalConnections(params, 5);
HttpConnectionManagerParams.setMaxConnectionsPerRoute(params,
new ConnPerRouteBean(5));
SchemeRegistry supportedSchemes = new SchemeRegistry();
SocketFactory sf = PlainSocketFactory.getSocketFactory();
supportedSchemes.register(new Scheme("http", sf, 80));
ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(
params, supportedSchemes);
DefaultHttpClient client = new DefaultHttpClient(mgr, params);
HttpHost target = new HttpHost(saddress.getHostName(), saddress.getPort(), "http");
WorkerThread[] workers = new WorkerThread[10];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerThread(
client,
target,
new URI("/random/2000"),
10, false);
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
worker.start();
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
workers[i].join(10000);
Exception ex = worker.getException();
if (ex != null) {
throw ex;
}
}
// Expect some connection in the pool
assertTrue(mgr.getConnectionsInPool() > 0);
mgr.shutdown();
}
private static class AlwaysCloseConn implements HttpResponseInterceptor {
public void process(
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
response.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
}
}
public void testReuseOfClosedConnections() throws Exception {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new ResponseDate());
httpproc.addInterceptor(new ResponseServer());
httpproc.addInterceptor(new ResponseContent());
httpproc.addInterceptor(new AlwaysCloseConn());
this.localServer = new LocalTestServer(httpproc, null);
this.localServer.register("/random/*", new RandomHandler());
this.localServer.start();
InetSocketAddress saddress = (InetSocketAddress) this.localServer.getServiceAddress();
HttpParams params = new BasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "TestAgent/1.1");
HttpProtocolParams.setUseExpectContinue(params, false);
HttpConnectionParams.setStaleCheckingEnabled(params, false);
HttpConnectionManagerParams.setMaxTotalConnections(params, 5);
HttpConnectionManagerParams.setMaxConnectionsPerRoute(params,
new ConnPerRouteBean(5));
SchemeRegistry supportedSchemes = new SchemeRegistry();
SocketFactory sf = PlainSocketFactory.getSocketFactory();
supportedSchemes.register(new Scheme("http", sf, 80));
ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(
params, supportedSchemes);
DefaultHttpClient client = new DefaultHttpClient(mgr, params);
HttpHost target = new HttpHost(saddress.getHostName(), saddress.getPort(), "http");
WorkerThread[] workers = new WorkerThread[10];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerThread(
client,
target,
new URI("/random/2000"),
10, false);
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
worker.start();
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
workers[i].join(10000);
Exception ex = worker.getException();
if (ex != null) {
throw ex;
}
}
// Expect zero connections in the pool
assertEquals(0, mgr.getConnectionsInPool());
mgr.shutdown();
}
public void testReuseOfAbortedConnections() throws Exception {
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new ResponseDate());
httpproc.addInterceptor(new ResponseServer());
httpproc.addInterceptor(new ResponseContent());
httpproc.addInterceptor(new ResponseConnControl());
this.localServer = new LocalTestServer(httpproc, null);
this.localServer.register("/random/*", new RandomHandler());
this.localServer.start();
InetSocketAddress saddress = (InetSocketAddress) this.localServer.getServiceAddress();
HttpParams params = new BasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "TestAgent/1.1");
HttpProtocolParams.setUseExpectContinue(params, false);
HttpConnectionParams.setStaleCheckingEnabled(params, false);
HttpConnectionManagerParams.setMaxTotalConnections(params, 5);
HttpConnectionManagerParams.setMaxConnectionsPerRoute(params,
new ConnPerRouteBean(5));
SchemeRegistry supportedSchemes = new SchemeRegistry();
SocketFactory sf = PlainSocketFactory.getSocketFactory();
supportedSchemes.register(new Scheme("http", sf, 80));
ThreadSafeClientConnManager mgr = new ThreadSafeClientConnManager(
params, supportedSchemes);
DefaultHttpClient client = new DefaultHttpClient(mgr, params);
HttpHost target = new HttpHost(saddress.getHostName(), saddress.getPort(), "http");
WorkerThread[] workers = new WorkerThread[10];
for (int i = 0; i < workers.length; i++) {
workers[i] = new WorkerThread(
client,
target,
new URI("/random/2000"),
10, true);
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
worker.start();
}
for (int i = 0; i < workers.length; i++) {
WorkerThread worker = workers[i];
workers[i].join(10000);
Exception ex = worker.getException();
if (ex != null) {
throw ex;
}
}
// Expect zero connections in the pool
assertEquals(0, mgr.getConnectionsInPool());
mgr.shutdown();
}
private static class WorkerThread extends Thread {
private final URI requestURI;
private final HttpHost target;
private final HttpClient httpclient;
private final int repetitions;
private final boolean forceClose;
private volatile Exception exception;
public WorkerThread(
final HttpClient httpclient,
final HttpHost target,
final URI requestURI,
int repetitions,
boolean forceClose) {
super();
this.httpclient = httpclient;
this.requestURI = requestURI;
this.target = target;
this.repetitions = repetitions;
this.forceClose = forceClose;
}
@Override
public void run() {
try {
for (int i = 0; i < this.repetitions; i++) {
HttpGet httpget = new HttpGet(this.requestURI);
HttpResponse response = this.httpclient.execute(
this.target,
httpget);
if (this.forceClose) {
httpget.abort();
} else {
HttpEntity entity = response.getEntity();
if (entity != null) {
entity.consumeContent();
}
}
}
} catch (Exception ex) {
this.exception = ex;
}
}
public Exception getException() {
return exception;
}
}
}

View File

@ -64,7 +64,6 @@ public class ClientConnAdapterMockup extends AbstractClientConnAdapter {
}
public void shutdown() {
throw new UnsupportedOperationException("just a mockup");
}
public void tunnelTarget(boolean secure, HttpParams params) {

View File

@ -1,7 +1,7 @@
/*
* $HeadURL:$
* $Revision:$
* $Date:$
* $HeadURL$
* $Revision$
* $Date$
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -95,7 +95,7 @@ public class TestConnPoolByRoute extends ServerTestBase {
}
// Free one
connPool.freeEntry(e3);
connPool.freeEntry(e3, true);
// This time the request should succeed
PoolEntryRequest r5 = connPool.requestPoolEntry(route, null);
@ -136,9 +136,9 @@ public class TestConnPoolByRoute extends ServerTestBase {
e3.setState(Integer.valueOf(3));
// Release entries
connPool.freeEntry(e1);
connPool.freeEntry(e2);
connPool.freeEntry(e3);
connPool.freeEntry(e1, true);
connPool.freeEntry(e2, true);
connPool.freeEntry(e3, true);
// Request statefull entries
PoolEntryRequest r4 = connPool.requestPoolEntry(route, Integer.valueOf(2));
@ -160,9 +160,9 @@ public class TestConnPoolByRoute extends ServerTestBase {
assertTrue(e6 == e1);
// Release entries again
connPool.freeEntry(e4);
connPool.freeEntry(e5);
connPool.freeEntry(e6);
connPool.freeEntry(e4, true);
connPool.freeEntry(e5, true);
connPool.freeEntry(e6, true);
// Request an entry with a state not avaialable in the pool
PoolEntryRequest r7 = connPool.requestPoolEntry(route, Integer.valueOf(4));

View File

@ -35,6 +35,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@ -157,7 +158,7 @@ public class LocalTestServer {
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY,
true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER,
"Jakarta-HttpComponents-LocalTestServer/1.1");
"LocalTestServer/1.1");
return params;
}
@ -279,6 +280,19 @@ public class LocalTestServer {
}
/**
* Obtains the local address the server is listening on
*
* @return the service address
*/
public SocketAddress getServiceAddress() {
ServerSocket ssock = servicedSocket; // avoid synchronization
if (ssock == null)
throw new IllegalStateException("not running");
return ssock.getLocalSocketAddress();
}
/**
* The request listener.
* Accepts incoming connections and launches a service thread.

View File

@ -119,7 +119,7 @@ public abstract class ServerTestBase extends TestCase {
HttpProtocolParams.setContentCharset
(defaultParams, "UTF-8");
HttpProtocolParams.setUserAgent
(defaultParams, "Jakarta-HttpComponents-Test/1.1");
(defaultParams, "TestAgent/1.1");
HttpProtocolParams.setUseExpectContinue
(defaultParams, false);
}