Added methods to set SocketConfig and ConnectionConfig for PoolingHttpClientConnectionManager and HttpClientBuilder

git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk@1421309 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Oleg Kalnichevski 2012-12-13 14:22:28 +00:00
parent 6425e97a02
commit bfbc573cbd
6 changed files with 411 additions and 981 deletions

View File

@ -60,8 +60,10 @@ import org.apache.http.client.protocol.RequestDefaultHeaders;
import org.apache.http.client.protocol.RequestExpectContinue; import org.apache.http.client.protocol.RequestExpectContinue;
import org.apache.http.client.protocol.ResponseContentEncoding; import org.apache.http.client.protocol.ResponseContentEncoding;
import org.apache.http.client.protocol.ResponseProcessCookies; import org.apache.http.client.protocol.ResponseProcessCookies;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup; import org.apache.http.config.Lookup;
import org.apache.http.config.RegistryBuilder; import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionKeepAliveStrategy; import org.apache.http.conn.ConnectionKeepAliveStrategy;
import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.SchemePortResolver; import org.apache.http.conn.SchemePortResolver;
@ -125,7 +127,7 @@ import org.apache.http.util.VersionInfo;
* <li>http.nonProxyHosts</li> * <li>http.nonProxyHosts</li>
* <li>http.keepAlive</li> * <li>http.keepAlive</li>
* <li>http.maxConnections</li> * <li>http.maxConnections</li>
* <li>http.user</li> * <li>http.agent</li>
* </ul> * </ul>
* </p> * </p>
* *
@ -164,7 +166,9 @@ public class HttpClientBuilder {
private CredentialsProvider credentialsProvider; private CredentialsProvider credentialsProvider;
private String userAgent; private String userAgent;
private Collection<? extends Header> defaultHeaders; private Collection<? extends Header> defaultHeaders;
private RequestConfig defaultConfig; private SocketConfig defaultSocketConfig;
private ConnectionConfig defaultConnectionConfig;
private RequestConfig defaultRequestConfig;
private boolean systemProperties; private boolean systemProperties;
private boolean redirectHandlingDisabled; private boolean redirectHandlingDisabled;
@ -379,8 +383,18 @@ public class HttpClientBuilder {
return this; return this;
} }
public final HttpClientBuilder setDefaultConfig(final RequestConfig defaultConfig) { public final HttpClientBuilder setDefaultSocketConfig(final SocketConfig config) {
this.defaultConfig = defaultConfig; this.defaultSocketConfig = config;
return this;
}
public final HttpClientBuilder setDefaultConnectionConfig(final ConnectionConfig config) {
this.defaultConnectionConfig = config;
return this;
}
public final HttpClientBuilder setDefaultRequestConfig(final RequestConfig config) {
this.defaultRequestConfig = config;
return this; return this;
} }
@ -438,6 +452,12 @@ public class HttpClientBuilder {
.register("http", PlainSocketFactory.getSocketFactory()) .register("http", PlainSocketFactory.getSocketFactory())
.register("https", sslSocketFactory) .register("https", sslSocketFactory)
.build()); .build());
if (defaultSocketConfig != null) {
poolingmgr.setDefaultSocketConfig(defaultSocketConfig);
}
if (defaultConnectionConfig != null) {
poolingmgr.setDefaultConnectionConfig(defaultConnectionConfig);
}
if (systemProperties) { if (systemProperties) {
String s = System.getProperty("http.keepAlive"); String s = System.getProperty("http.keepAlive");
if ("true".equalsIgnoreCase(s)) { if ("true".equalsIgnoreCase(s)) {
@ -659,7 +679,7 @@ public class HttpClientBuilder {
authSchemeRegistry, authSchemeRegistry,
defaultCookieStore, defaultCookieStore,
defaultCredentialsProvider, defaultCredentialsProvider,
defaultConfig != null ? defaultConfig : RequestConfig.DEFAULT); defaultRequestConfig != null ? defaultRequestConfig : RequestConfig.DEFAULT);
} }
} }

View File

@ -26,7 +26,6 @@
*/ */
package org.apache.http.impl.conn; package org.apache.http.impl.conn;
import java.io.IOException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -52,9 +51,10 @@ class CPool extends AbstractConnPool<HttpRoute, SocketClientConnection, CPoolEnt
private final TimeUnit tunit; private final TimeUnit tunit;
public CPool( public CPool(
final ConnFactory<HttpRoute, SocketClientConnection> connFactory,
final int defaultMaxPerRoute, final int maxTotal, final int defaultMaxPerRoute, final int maxTotal,
final long timeToLive, final TimeUnit tunit) { final long timeToLive, final TimeUnit tunit) {
super(new InternalConnFactory(), defaultMaxPerRoute, maxTotal); super(connFactory, defaultMaxPerRoute, maxTotal);
this.timeToLive = timeToLive; this.timeToLive = timeToLive;
this.tunit = tunit; this.tunit = tunit;
} }
@ -65,12 +65,4 @@ class CPool extends AbstractConnPool<HttpRoute, SocketClientConnection, CPoolEnt
return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.tunit); return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.tunit);
} }
static class InternalConnFactory implements ConnFactory<HttpRoute, SocketClientConnection> {
public SocketClientConnection create(final HttpRoute route) throws IOException {
return new SocketClientConnectionImpl(8 * 1024);
}
}
} }

View File

@ -1,269 +0,0 @@
/*
* ====================================================================
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.conn;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpHost;
import org.apache.http.annotation.ThreadSafe;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.SchemePortResolver;
import org.apache.http.conn.SocketClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.pool.ConnPool;
import org.apache.http.protocol.HttpContext;
/**
* Base class for {@link HttpClientConnectionManager} implementations.
* This class primarily provides consistent implementation of
* {@link #connect(HttpClientConnection, HttpHost, InetAddress, HttpContext, HttpParams)}
* and {@link #upgrade(HttpClientConnection, HttpHost, HttpContext, HttpParams)}
* methods for all standard connection managers shipped with HttpClient.
*
* @since 4.3
*/
@ThreadSafe
abstract class HttpClientConnectionManagerBase implements HttpClientConnectionManager {
private final ConnPool<HttpRoute, CPoolEntry> pool;
private final HttpClientConnectionOperator connectionOperator;
private final Map<HttpHost, SocketConfig> socketConfigMap;
private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
private volatile SocketConfig defaultSocketConfig;
private volatile ConnectionConfig defaultConnectionConfig;
HttpClientConnectionManagerBase(
final ConnPool<HttpRoute, CPoolEntry> pool,
final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver) {
super();
if (pool == null) {
throw new IllegalArgumentException("Connection pool may nor be null");
}
this.pool = pool;
this.connectionOperator = new HttpClientConnectionOperator(
socketFactoryRegistry, schemePortResolver, dnsResolver);
this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
this.defaultSocketConfig = SocketConfig.DEFAULT;
this.defaultConnectionConfig = ConnectionConfig.DEFAULT;
}
@Override
protected void finalize() throws Throwable {
try {
shutdown();
} finally {
super.finalize();
}
}
protected void onConnectionLeaseRequest(final HttpRoute route, final Object state) {
}
protected void onConnectionLease(final CPoolEntry entry) {
}
protected void onConnectionKeepAlive(final CPoolEntry entry) {
}
protected void onConnectionRelease(final CPoolEntry entry) {
}
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
if (route == null) {
throw new IllegalArgumentException("HTTP route may not be null");
}
onConnectionLeaseRequest(route, state);
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
public boolean cancel() {
return future.cancel(true);
}
public HttpClientConnection get(
final long timeout,
final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
CPoolEntry entry;
try {
entry = future.get(timeout, tunit);
if (entry == null || future.isCancelled()) {
throw new InterruptedException();
}
if (entry.getConnection() == null) {
throw new IllegalStateException("Pool entry with no connection");
}
onConnectionLease(entry);
return CPoolProxy.newProxy(entry);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
InterruptedException intex = new InterruptedException();
intex.initCause(cause);
throw intex;
} catch (TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
}
public void releaseConnection(
final HttpClientConnection managedConn,
final Object state,
final long keepalive, final TimeUnit tunit) {
if (managedConn == null) {
throw new IllegalArgumentException("Managed connection may not be null");
}
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.detach(managedConn);
if (entry == null) {
return;
}
SocketClientConnection conn = entry.getConnection();
try {
if (conn.isOpen()) {
entry.setState(state);
entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
onConnectionKeepAlive(entry);
}
} finally {
this.pool.release(entry, conn.isOpen());
onConnectionRelease(entry);
}
}
}
public void connect(
final HttpClientConnection managedConn,
final HttpHost host,
final InetAddress local,
final int connectTimeout,
final HttpContext context) throws IOException {
if (managedConn == null) {
throw new IllegalArgumentException("Connection may not be null");
}
SocketClientConnection conn;
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
SocketConfig socketConfig = this.socketConfigMap.get(host);
if (socketConfig == null) {
socketConfig = this.defaultSocketConfig;
}
ConnectionConfig connConfig = this.connectionConfigMap.get(host);
if (connConfig == null) {
connConfig = this.defaultConnectionConfig;
}
InetSocketAddress localAddress = local != null ? new InetSocketAddress(local, 0) : null;
this.connectionOperator.connect(conn, host, localAddress,
connectTimeout, socketConfig, context);
}
public void upgrade(
final HttpClientConnection managedConn,
final HttpHost host,
final HttpContext context) throws IOException {
if (managedConn == null) {
throw new IllegalArgumentException("Connection may not be null");
}
SocketClientConnection conn;
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
this.connectionOperator.upgrade(conn, host, context);
}
public SocketConfig getDefaultSocketConfig() {
return this.defaultSocketConfig;
}
public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
this.defaultSocketConfig = defaultSocketConfig != null ? defaultSocketConfig :
SocketConfig.DEFAULT;
}
public ConnectionConfig getDefaultConnectionConfig() {
return this.defaultConnectionConfig;
}
public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
this.defaultConnectionConfig = defaultConnectionConfig != null ? defaultConnectionConfig :
ConnectionConfig.DEFAULT;
}
public SocketConfig getSocketConfig(final HttpHost host) {
return this.socketConfigMap.get(host);
}
public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
this.socketConfigMap.put(host, socketConfig);
}
public ConnectionConfig getConnectionConfig(final HttpHost host) {
return this.connectionConfigMap.get(host);
}
public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
this.connectionConfigMap.put(host, connectionConfig);
}
}

View File

@ -27,22 +27,40 @@
package org.apache.http.impl.conn; package org.apache.http.impl.conn;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpClientConnection; import org.apache.http.HttpClientConnection;
import org.apache.http.HttpHost;
import org.apache.http.annotation.ThreadSafe; import org.apache.http.annotation.ThreadSafe;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Lookup;
import org.apache.http.config.Registry; import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder; import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.DnsResolver; import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.HttpConnectionFactory;
import org.apache.http.conn.SchemePortResolver; import org.apache.http.conn.SchemePortResolver;
import org.apache.http.conn.SocketClientConnection;
import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.PlainSocketFactory; import org.apache.http.conn.socket.PlainSocketFactory;
import org.apache.http.conn.ssl.SSLSocketFactory; import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.pool.ConnFactory;
import org.apache.http.pool.ConnPoolControl; import org.apache.http.pool.ConnPoolControl;
import org.apache.http.pool.PoolStats; import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.HttpContext;
/** /**
* <tt>ClientConnectionPoolManager</tt> maintains a pool of * <tt>ClientConnectionPoolManager</tt> maintains a pool of
@ -63,11 +81,13 @@ import org.apache.http.pool.PoolStats;
* @since 4.3 * @since 4.3
*/ */
@ThreadSafe @ThreadSafe
public class PoolingHttpClientConnectionManager extends HttpClientConnectionManagerBase { public class PoolingHttpClientConnectionManager implements HttpClientConnectionManager {
private final Log log = LogFactory.getLog(getClass()); private final Log log = LogFactory.getLog(getClass());
private final ConfigData configData;
private final CPool pool; private final CPool pool;
private final HttpClientConnectionOperator connectionOperator;
private static Registry<ConnectionSocketFactory> getDefaultRegistry() { private static Registry<ConnectionSocketFactory> getDefaultRegistry() {
return RegistryBuilder.<ConnectionSocketFactory>create() return RegistryBuilder.<ConnectionSocketFactory>create()
@ -81,42 +101,51 @@ public class PoolingHttpClientConnectionManager extends HttpClientConnectionMana
} }
public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) { public PoolingHttpClientConnectionManager(final long timeToLive, final TimeUnit tunit) {
this(getDefaultRegistry(), timeToLive, tunit); this(getDefaultRegistry(), null, timeToLive, tunit);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry) { final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
this(socketFactoryRegistry, -1, TimeUnit.MILLISECONDS); this(socketFactoryRegistry, null, -1, TimeUnit.MILLISECONDS);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final DnsResolver dnsResolver) { final DnsResolver dnsResolver) {
this(socketFactoryRegistry, null, dnsResolver, -1, TimeUnit.MILLISECONDS); this(socketFactoryRegistry, null, null, dnsResolver, -1, TimeUnit.MILLISECONDS);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<SocketClientConnection> connFactory,
final long timeToLive, final TimeUnit tunit) { final long timeToLive, final TimeUnit tunit) {
this(new CPool(2, 20, timeToLive, tunit), socketFactoryRegistry, null, null); this(socketFactoryRegistry, connFactory, null, null, timeToLive, tunit);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<SocketClientConnection> connFactory,
final SchemePortResolver schemePortResolver, final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver, final DnsResolver dnsResolver,
final long timeToLive, final TimeUnit tunit) { final long timeToLive, final TimeUnit tunit) {
this(new CPool(2, 20, timeToLive, tunit), super();
this.configData = new ConfigData();
this.pool = new CPool(
new InternalConnectionFactory(this.configData, connFactory), 2, 20, timeToLive, tunit);
this.connectionOperator = new HttpClientConnectionOperator(
socketFactoryRegistry, schemePortResolver, dnsResolver); socketFactoryRegistry, schemePortResolver, dnsResolver);
} }
PoolingHttpClientConnectionManager( PoolingHttpClientConnectionManager(
final CPool pool, final CPool pool,
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
final SchemePortResolver schemePortResolver, final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver) { final DnsResolver dnsResolver) {
super(pool, socketFactoryRegistry, schemePortResolver, dnsResolver); super();
this.configData = new ConfigData();
this.pool = pool; this.pool = pool;
this.connectionOperator = new HttpClientConnectionOperator(
socketFactoryRegistry, schemePortResolver, dnsResolver);
} }
@Override @Override
@ -160,24 +189,80 @@ public class PoolingHttpClientConnectionManager extends HttpClientConnectionMana
return buf.toString(); return buf.toString();
} }
@Override public ConnectionRequest requestConnection(
protected void onConnectionLeaseRequest(final HttpRoute route, final Object state) { final HttpRoute route,
final Object state) {
if (route == null) {
throw new IllegalArgumentException("HTTP route may not be null");
}
if (this.log.isDebugEnabled()) { if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route)); this.log.debug("Connection request: " + format(route, state) + formatStats(route));
} }
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
return new ConnectionRequest() {
public boolean cancel() {
return future.cancel(true);
} }
@Override public HttpClientConnection get(
protected void onConnectionLease(final CPoolEntry entry) { final long timeout,
final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
return leaseConnection(future, timeout, tunit);
}
};
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException {
CPoolEntry entry;
try {
entry = future.get(timeout, tunit);
if (entry == null || future.isCancelled()) {
throw new InterruptedException();
}
if (entry.getConnection() == null) {
throw new IllegalStateException("Pool entry with no connection");
}
if (this.log.isDebugEnabled()) { if (this.log.isDebugEnabled()) {
this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute())); this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute()));
} }
return CPoolProxy.newProxy(entry);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
if (cause == null) {
cause = ex;
}
InterruptedException intex = new InterruptedException();
intex.initCause(cause);
throw intex;
} catch (TimeoutException ex) {
throw new ConnectionPoolTimeoutException("Timeout waiting for connection from pool");
}
} }
@Override public void releaseConnection(
protected void onConnectionKeepAlive(final CPoolEntry entry) { final HttpClientConnection managedConn,
final Object state,
final long keepalive, final TimeUnit tunit) {
if (managedConn == null) {
throw new IllegalArgumentException("Managed connection may not be null");
}
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.detach(managedConn);
if (entry == null) {
return;
}
SocketClientConnection conn = entry.getConnection();
try {
if (conn.isOpen()) {
entry.setState(state);
entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
if (this.log.isDebugEnabled()) { if (this.log.isDebugEnabled()) {
long keepalive = entry.getExpiry();
String s; String s;
if (keepalive > 0) { if (keepalive > 0) {
s = "for " + (double) keepalive / 1000 + " seconds"; s = "for " + (double) keepalive / 1000 + " seconds";
@ -187,13 +272,55 @@ public class PoolingHttpClientConnectionManager extends HttpClientConnectionMana
this.log.debug("Connection " + format(entry) + " can be kept alive " + s); this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
} }
} }
} finally {
@Override this.pool.release(entry, conn.isOpen());
protected void onConnectionRelease(final CPoolEntry entry) {
if (this.log.isDebugEnabled()) { if (this.log.isDebugEnabled()) {
this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute())); this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
} }
} }
}
}
public void connect(
final HttpClientConnection managedConn,
final HttpHost host,
final InetAddress local,
final int connectTimeout,
final HttpContext context) throws IOException {
if (managedConn == null) {
throw new IllegalArgumentException("Connection may not be null");
}
SocketClientConnection conn;
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
SocketConfig socketConfig = this.configData.getSocketConfig(host);
if (socketConfig == null) {
socketConfig = this.configData.getDefaultSocketConfig();
}
if (socketConfig == null) {
socketConfig = SocketConfig.DEFAULT;
}
InetSocketAddress localAddress = local != null ? new InetSocketAddress(local, 0) : null;
this.connectionOperator.connect(
conn, host, localAddress, connectTimeout, socketConfig, context);
}
public void upgrade(
final HttpClientConnection managedConn,
final HttpHost host,
final HttpContext context) throws IOException {
if (managedConn == null) {
throw new IllegalArgumentException("Connection may not be null");
}
SocketClientConnection conn;
synchronized (managedConn) {
CPoolEntry entry = CPoolProxy.getPoolEntry(managedConn);
conn = entry.getConnection();
}
this.connectionOperator.upgrade(conn, host, context);
}
public void shutdown() { public void shutdown() {
this.log.debug("Connection manager is shutting down"); this.log.debug("Connection manager is shutting down");
@ -249,4 +376,116 @@ public class PoolingHttpClientConnectionManager extends HttpClientConnectionMana
return this.pool.getStats(route); return this.pool.getStats(route);
} }
public SocketConfig getDefaultSocketConfig() {
return this.configData.getDefaultSocketConfig();
}
public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
this.configData.setDefaultSocketConfig(defaultSocketConfig);
}
public ConnectionConfig getDefaultConnectionConfig() {
return this.configData.getDefaultConnectionConfig();
}
public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
this.configData.setDefaultConnectionConfig(defaultConnectionConfig);
}
public SocketConfig getSocketConfig(final HttpHost host) {
return this.configData.getSocketConfig(host);
}
public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
this.configData.setSocketConfig(host, socketConfig);
}
public ConnectionConfig getConnectionConfig(final HttpHost host) {
return this.configData.getConnectionConfig(host);
}
public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
this.configData.setConnectionConfig(host, connectionConfig);
}
static class ConfigData {
private final Map<HttpHost, SocketConfig> socketConfigMap;
private final Map<HttpHost, ConnectionConfig> connectionConfigMap;
private volatile SocketConfig defaultSocketConfig;
private volatile ConnectionConfig defaultConnectionConfig;
ConfigData() {
super();
this.socketConfigMap = new ConcurrentHashMap<HttpHost, SocketConfig>();
this.connectionConfigMap = new ConcurrentHashMap<HttpHost, ConnectionConfig>();
}
public SocketConfig getDefaultSocketConfig() {
return this.defaultSocketConfig;
}
public void setDefaultSocketConfig(final SocketConfig defaultSocketConfig) {
this.defaultSocketConfig = defaultSocketConfig;
}
public ConnectionConfig getDefaultConnectionConfig() {
return this.defaultConnectionConfig;
}
public void setDefaultConnectionConfig(final ConnectionConfig defaultConnectionConfig) {
this.defaultConnectionConfig = defaultConnectionConfig;
}
public SocketConfig getSocketConfig(final HttpHost host) {
return this.socketConfigMap.get(host);
}
public void setSocketConfig(final HttpHost host, final SocketConfig socketConfig) {
this.socketConfigMap.put(host, socketConfig);
}
public ConnectionConfig getConnectionConfig(final HttpHost host) {
return this.connectionConfigMap.get(host);
}
public void setConnectionConfig(final HttpHost host, final ConnectionConfig connectionConfig) {
this.connectionConfigMap.put(host, connectionConfig);
}
}
static class InternalConnectionFactory implements ConnFactory<HttpRoute, SocketClientConnection> {
private final ConfigData configData;
private final HttpConnectionFactory<SocketClientConnection> connFactory;
InternalConnectionFactory(
final ConfigData configData,
final HttpConnectionFactory<SocketClientConnection> connFactory) {
super();
this.configData = configData != null ? configData : new ConfigData();
this.connFactory = connFactory != null ? connFactory :
DefaultClientConnectionFactory.INSTANCE;
}
public SocketClientConnection create(final HttpRoute route) throws IOException {
ConnectionConfig config = null;
if (route.getProxyHost() != null) {
config = this.configData.getConnectionConfig(route.getProxyHost());
}
if (config == null) {
config = this.configData.getConnectionConfig(route.getTargetHost());
}
if (config == null) {
config = this.configData.getDefaultConnectionConfig();
}
if (config == null) {
config = ConnectionConfig.DEFAULT;
}
return this.connFactory.create(config);
}
}
} }

View File

@ -1,198 +0,0 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.impl.conn;
import java.net.Socket;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpClientConnection;
import org.apache.http.HttpHost;
import org.apache.http.config.Lookup;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.SchemePortResolver;
import org.apache.http.conn.SocketClientConnection;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.pool.ConnPool;
import org.apache.http.protocol.HttpContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestHttpClientConnectionManagerBase {
private SocketClientConnection conn;
private Socket socket;
private ConnectionSocketFactory plainSocketFactory;
private Lookup<ConnectionSocketFactory> socketFactoryRegistry;
private SchemePortResolver schemePortResolver;
private DnsResolver dnsResolver;
private Future<CPoolEntry> future;
private ConnPool<HttpRoute, CPoolEntry> pool;
private HttpClientConnectionManagerBase mgr;
@SuppressWarnings("unchecked")
@Before
public void setup() throws Exception {
conn = Mockito.mock(SocketClientConnection.class);
socket = Mockito.mock(Socket.class);
plainSocketFactory = Mockito.mock(ConnectionSocketFactory.class);
socketFactoryRegistry = Mockito.mock(Lookup.class);
schemePortResolver = Mockito.mock(SchemePortResolver.class);
dnsResolver = Mockito.mock(DnsResolver.class);
pool = Mockito.mock(ConnPool.class);
future = Mockito.mock(Future.class);
mgr = new HttpClientConnectionManagerBase(
pool, socketFactoryRegistry, schemePortResolver, dnsResolver) {
public void closeIdleConnections(long idletime, TimeUnit tunit) {
}
public void closeExpiredConnections() {
}
public void shutdown() {
}
};
}
@Test
public void testLeaseRelease() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
CPoolEntry entry = new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
-1, TimeUnit.MILLISECONDS);
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(socketFactoryRegistry.lookup("http")).thenReturn(plainSocketFactory);
Mockito.when(schemePortResolver.resolve(target)).thenReturn(80);
Mockito.when(plainSocketFactory.createSocket(Mockito.<HttpContext>any())).thenReturn(socket);
Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
Assert.assertNotNull(conn1);
Assert.assertFalse(conn1.isOpen());
Assert.assertNotSame(conn, conn1);
mgr.releaseConnection(conn1, null, 0, TimeUnit.MILLISECONDS);
Mockito.verify(pool).release(entry, false);
}
@Test(expected=InterruptedException.class)
public void testLeaseFutureCancelled() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
CPoolEntry entry = new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
-1, TimeUnit.MILLISECONDS);
Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
connRequest1.get(1, TimeUnit.SECONDS);
}
@Test(expected=ConnectionPoolTimeoutException.class)
public void testLeaseFutureTimeout() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new TimeoutException());
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
connRequest1.get(1, TimeUnit.SECONDS);
}
@Test
public void testReleaseReusable() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
CPoolEntry entry = Mockito.spy(new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
-1, TimeUnit.MILLISECONDS));
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
Mockito.when(conn.isOpen()).thenReturn(Boolean.TRUE);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
Assert.assertNotNull(conn1);
Assert.assertTrue(conn1.isOpen());
mgr.releaseConnection(conn1, "some state", 0, TimeUnit.MILLISECONDS);
Mockito.verify(pool).release(entry, true);
Mockito.verify(entry).setState("some state");
Mockito.verify(entry).updateExpiry(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS));
}
@Test
public void testReleaseNonReusable() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
CPoolEntry entry = Mockito.spy(new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
-1, TimeUnit.MILLISECONDS));
Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
Mockito.when(conn.isOpen()).thenReturn(Boolean.FALSE);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
Assert.assertNotNull(conn1);
Assert.assertFalse(conn1.isOpen());
mgr.releaseConnection(conn1, "some state", 0, TimeUnit.MILLISECONDS);
Mockito.verify(pool).release(entry, false);
Mockito.verify(entry, Mockito.never()).setState(Mockito.anyObject());
Mockito.verify(entry, Mockito.never()).updateExpiry(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS));
}
}

View File

@ -27,15 +27,25 @@
package org.apache.http.impl.conn; package org.apache.http.impl.conn;
import java.net.Socket;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpClientConnection; import org.apache.http.HttpClientConnection;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.config.Lookup;
import org.apache.http.conn.ConnectionRequest; import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.DnsResolver;
import org.apache.http.conn.SchemePortResolver;
import org.apache.http.conn.SocketClientConnection;
import org.apache.http.conn.routing.HttpRoute; import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.protocol.HttpContext;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -44,500 +54,136 @@ import org.mockito.Mockito;
*/ */
public class TestPoolingHttpClientConnectionManager { public class TestPoolingHttpClientConnectionManager {
private static HttpClientConnection getConnection( private SocketClientConnection conn;
final HttpClientConnectionManager mgr, private Socket socket;
final HttpRoute route, private ConnectionSocketFactory plainSocketFactory;
long timeout, private Lookup<ConnectionSocketFactory> socketFactoryRegistry;
TimeUnit unit) throws ConnectionPoolTimeoutException, InterruptedException { private SchemePortResolver schemePortResolver;
ConnectionRequest connRequest = mgr.requestConnection(route, null); private DnsResolver dnsResolver;
return connRequest.get(timeout, unit); private Future<CPoolEntry> future;
private CPool pool;
private PoolingHttpClientConnectionManager mgr;
@SuppressWarnings("unchecked")
@Before
public void setup() throws Exception {
conn = Mockito.mock(SocketClientConnection.class);
socket = Mockito.mock(Socket.class);
plainSocketFactory = Mockito.mock(ConnectionSocketFactory.class);
socketFactoryRegistry = Mockito.mock(Lookup.class);
schemePortResolver = Mockito.mock(SchemePortResolver.class);
dnsResolver = Mockito.mock(DnsResolver.class);
pool = Mockito.mock(CPool.class);
future = Mockito.mock(Future.class);
mgr = new PoolingHttpClientConnectionManager(
pool, socketFactoryRegistry, schemePortResolver, dnsResolver);
} }
private static HttpClientConnection getConnection(
final HttpClientConnectionManager mgr,
final HttpRoute route) throws ConnectionPoolTimeoutException, InterruptedException {
ConnectionRequest connRequest = mgr.requestConnection(route, null);
return connRequest.get(0, null);
}
@Test(expected=IllegalArgumentException.class)
public void testIllegalConstructor() {
new PoolingHttpClientConnectionManager(null);
}
@Test(expected=IllegalArgumentException.class)
public void testGetConnection()
throws InterruptedException, ConnectionPoolTimeoutException {
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
HttpClientConnection conn = getConnection(mgr, route);
Assert.assertNotNull(conn);
Assert.assertFalse(conn.isOpen());
mgr.releaseConnection(conn, null, -1, null);
try {
getConnection(mgr, null);
} finally {
mgr.shutdown();
}
}
// testTimeout in 3.x TestHttpConnectionManager is redundant
// several other tests here rely on timeout behavior
@Test @Test
public void testMaxConnTotal() public void testLeaseRelease() throws Exception {
throws InterruptedException, ConnectionPoolTimeoutException { HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager(); CPoolEntry entry = new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
mgr.setMaxTotal(2); -1, TimeUnit.MILLISECONDS);
mgr.setDefaultMaxPerRoute(1);
HttpHost target1 = new HttpHost("www.test1.invalid", 80, "http"); Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
HttpRoute route1 = new HttpRoute(target1, null, false); Mockito.when(socketFactoryRegistry.lookup("http")).thenReturn(plainSocketFactory);
HttpHost target2 = new HttpHost("www.test2.invalid", 80, "http"); Mockito.when(schemePortResolver.resolve(target)).thenReturn(80);
HttpRoute route2 = new HttpRoute(target2, null, false); Mockito.when(plainSocketFactory.createSocket(Mockito.<HttpContext>any())).thenReturn(socket);
HttpClientConnection conn1 = getConnection(mgr, route1); Mockito.when(future.isCancelled()).thenReturn(false);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
Assert.assertNotNull(conn1); Assert.assertNotNull(conn1);
HttpClientConnection conn2 = getConnection(mgr, route2); Assert.assertFalse(conn1.isOpen());
Assert.assertNotNull(conn2); Assert.assertNotSame(conn, conn1);
try { mgr.releaseConnection(conn1, null, 0, TimeUnit.MILLISECONDS);
// this should fail quickly, connection has not been released
getConnection(mgr, route2, 100L, TimeUnit.MILLISECONDS); Mockito.verify(pool).release(entry, false);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
} }
// release one of the connections @Test(expected=InterruptedException.class)
mgr.releaseConnection(conn2, null, -1, null); public void testLeaseFutureCancelled() throws Exception {
conn2 = null; HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
// there should be a connection available now CPoolEntry entry = new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
try { -1, TimeUnit.MILLISECONDS);
getConnection(mgr, route2, 100L, TimeUnit.MILLISECONDS);
} catch (ConnectionPoolTimeoutException cptx) { Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
Assert.fail("connection should have been available: " + cptx); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
connRequest1.get(1, TimeUnit.SECONDS);
} }
mgr.shutdown(); @Test(expected=ConnectionPoolTimeoutException.class)
public void testLeaseFutureTimeout() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
Mockito.when(future.isCancelled()).thenReturn(Boolean.TRUE);
Mockito.when(future.get(1, TimeUnit.SECONDS)).thenThrow(new TimeoutException());
Mockito.when(pool.lease(route, null, null)).thenReturn(future);
ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
connRequest1.get(1, TimeUnit.SECONDS);
} }
@Test @Test
public void testMaxConnPerHost() throws Exception { public void testReleaseReusable() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
HttpHost target1 = new HttpHost("www.test1.invalid", 80, "http"); CPoolEntry entry = Mockito.spy(new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
HttpRoute route1 = new HttpRoute(target1, null, false); -1, TimeUnit.MILLISECONDS));
HttpHost target2 = new HttpHost("www.test2.invalid", 80, "http");
HttpRoute route2 = new HttpRoute(target2, null, false);
HttpHost target3 = new HttpHost("www.test3.invalid", 80, "http");
HttpRoute route3 = new HttpRoute(target3, null, false);
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager(); Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
mgr.setMaxTotal(100); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
mgr.setDefaultMaxPerRoute(1); Mockito.when(pool.lease(route, null, null)).thenReturn(future);
mgr.setMaxPerRoute(route2, 2); Mockito.when(conn.isOpen()).thenReturn(Boolean.TRUE);
mgr.setMaxPerRoute(route3, 3);
// route 3, limit 3 ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull(conn1); Assert.assertNotNull(conn1);
HttpClientConnection conn2 = Assert.assertTrue(conn1.isOpen());
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull(conn2);
HttpClientConnection conn3 =
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull(conn3);
try {
// should fail quickly, connection has not been released
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// route 2, limit 2 mgr.releaseConnection(conn1, "some state", 0, TimeUnit.MILLISECONDS);
conn1 = getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
conn2 = getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
try {
// should fail quickly, connection has not been released
getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// route 1, should use default limit of 1 Mockito.verify(pool).release(entry, true);
conn1 = getConnection(mgr, route1, 10L, TimeUnit.MILLISECONDS); Mockito.verify(entry).setState("some state");
try { Mockito.verify(entry).updateExpiry(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS));
// should fail quickly, connection has not been released
getConnection(mgr, route1, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// check releaseConnection with invalid arguments
try {
mgr.releaseConnection(null, null, -1, null);
Assert.fail("null connection adapter not detected");
} catch (IllegalArgumentException iax) {
// expected
}
try {
HttpClientConnection conn = Mockito.mock(HttpClientConnection.class);
mgr.releaseConnection(conn, null, -1, null);
Assert.fail("foreign connection adapter not detected");
} catch (IllegalArgumentException iax) {
// expected
}
mgr.shutdown();
} }
@Test @Test
public void testReleaseConnection() throws Exception { public void testReleaseNonReusable() throws Exception {
HttpHost target = new HttpHost("localhost");
HttpRoute route = new HttpRoute(target);
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager(); CPoolEntry entry = Mockito.spy(new CPoolEntry(LogFactory.getLog(getClass()), "id", route, conn,
mgr.setMaxTotal(3); -1, TimeUnit.MILLISECONDS));
mgr.setDefaultMaxPerRoute(1);
HttpHost target1 = new HttpHost("www.test1.invalid", 80, "http"); Mockito.when(future.isCancelled()).thenReturn(Boolean.FALSE);
HttpRoute route1 = new HttpRoute(target1, null, false); Mockito.when(future.get(1, TimeUnit.SECONDS)).thenReturn(entry);
HttpHost target2 = new HttpHost("www.test2.invalid", 80, "http"); Mockito.when(pool.lease(route, null, null)).thenReturn(future);
HttpRoute route2 = new HttpRoute(target2, null, false); Mockito.when(conn.isOpen()).thenReturn(Boolean.FALSE);
HttpHost target3 = new HttpHost("www.test3.invalid", 80, "http");
HttpRoute route3 = new HttpRoute(target3, null, false);
// the first three allocations should pass ConnectionRequest connRequest1 = mgr.requestConnection(route, null);
HttpClientConnection conn1 = HttpClientConnection conn1 = connRequest1.get(1, TimeUnit.SECONDS);
getConnection(mgr, route1, 10L, TimeUnit.MILLISECONDS);
HttpClientConnection conn2 =
getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
HttpClientConnection conn3 =
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull(conn1); Assert.assertNotNull(conn1);
Assert.assertNotNull(conn2); Assert.assertFalse(conn1.isOpen());
Assert.assertNotNull(conn3);
// obtaining another connection for either of the three should fail mgr.releaseConnection(conn1, "some state", 0, TimeUnit.MILLISECONDS);
// this is somehow redundant with testMaxConnPerHost
try {
getConnection(mgr, route1, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
try {
getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
try {
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// now release one and check that exactly that one can be obtained then
mgr.releaseConnection(conn2, null, -1, null);
conn2 = null;
try {
getConnection(mgr, route1, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
// this one succeeds
conn2 = getConnection(mgr, route2, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull(conn2);
try {
getConnection(mgr, route3, 10L, TimeUnit.MILLISECONDS);
Assert.fail("ConnectionPoolTimeoutException should have been thrown");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.shutdown();
}
@Test
public void testDeleteClosedConnections() throws Exception {
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
HttpClientConnection conn = getConnection(mgr, route);
Assert.assertEquals(1, mgr.getTotalStats().getLeased());
Assert.assertEquals(1, mgr.getStats(route).getLeased());
mgr.releaseConnection(conn, null, -1, null);
Assert.assertEquals(0, mgr.getTotalStats().getAvailable());
Assert.assertEquals(0, mgr.getStats(route).getAvailable());
mgr.shutdown();
}
@Test
public void testShutdown() throws Exception {
// 3.x: TestHttpConnectionManager.testShutdown
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
mgr.setMaxTotal(1);
mgr.setDefaultMaxPerRoute(1);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
// on shutdown, the extra thread should get an exception
HttpClientConnection conn =
getConnection(mgr, route, 1L, TimeUnit.MILLISECONDS);
GetConnThread gct = new GetConnThread(mgr, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
mgr.shutdown();
// First release the connection. If the manager keeps working
// despite the shutdown, this will deblock the extra thread.
// The release itself should turn into a no-op, without exception.
mgr.releaseConnection(conn, null, -1, null);
gct.join(10000);
Assert.assertNull("thread should not have obtained connection",
gct.getConnection());
Assert.assertNotNull("thread should have gotten an exception",
gct.getException());
Assert.assertSame("thread got wrong exception",
InterruptedException.class, gct.getException().getClass());
// the manager is down, we should not be able to get a connection
try {
getConnection(mgr, route, 1L, TimeUnit.MILLISECONDS);
Assert.fail("shut-down manager does not raise exception");
} catch (IllegalStateException isx) {
// expected
}
}
@Test
public void testInterruptThread() throws Exception {
// 3.x: TestHttpConnectionManager.testWaitingThreadInterrupted
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
mgr.setMaxTotal(1);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
HttpClientConnection conn =
getConnection(mgr, route, 1L, TimeUnit.MILLISECONDS);
GetConnThread gct = new GetConnThread(mgr, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
// interrupt the thread, it should cancel waiting with an exception
gct.interrupt();
gct.join(10000);
Assert.assertNotNull("thread should have gotten an exception",
gct.getException());
Assert.assertSame("thread got wrong exception",
InterruptedException.class,
gct.getException().getClass());
// make sure the manager is still working
try {
getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.fail("should have gotten a timeout");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.releaseConnection(conn, null, -1, null);
// this time: no exception
conn = getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull("should have gotten a connection", conn);
mgr.shutdown();
}
@Test
public void testReusePreference() throws Exception {
// 3.x: TestHttpConnectionManager.testHostReusePreference
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
mgr.setMaxTotal(1);
HttpHost target1 = new HttpHost("www.test1.invalid", 80, "http");
HttpRoute route1 = new HttpRoute(target1, null, false);
HttpHost target2 = new HttpHost("www.test2.invalid", 80, "http");
HttpRoute route2 = new HttpRoute(target2, null, false);
// get the only connection, then start two extra threads
HttpClientConnection conn =
getConnection(mgr, route1, 1L, TimeUnit.MILLISECONDS);
GetConnThread gct1 = new GetConnThread(mgr, route1, 1000L);
GetConnThread gct2 = new GetConnThread(mgr, route2, 1000L);
// the second thread is started first, to distinguish the
// route-based reuse preference from first-come, first-served
gct2.start();
Thread.sleep(100); // give the thread time to block
gct1.start();
Thread.sleep(100); // give the thread time to block
// releasing the connection for route1 should deblock thread1
// the other thread gets a timeout
mgr.releaseConnection(conn, null, -1, null);
gct1.join(10000);
gct2.join(10000);
Assert.assertNotNull("thread 1 should have gotten a connection",
gct1.getConnection());
Assert.assertNull ("thread 2 should NOT have gotten a connection",
gct2.getConnection());
mgr.shutdown();
}
@Test
public void testAbortAfterRequestStarts() throws Exception {
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
mgr.setMaxTotal(1);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
HttpClientConnection conn = getConnection(mgr, route, 1L, TimeUnit.MILLISECONDS);
ConnectionRequest request = mgr.requestConnection(route, null);
GetConnThread gct = new GetConnThread(request, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
request.cancel();
gct.join(10000);
Assert.assertNotNull("thread should have gotten an exception",
gct.getException());
Assert.assertSame("thread got wrong exception",
InterruptedException.class,
gct.getException().getClass());
// make sure the manager is still working
try {
getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.fail("should have gotten a timeout");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.releaseConnection(conn, null, -1, null);
// this time: no exception
conn = getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull("should have gotten a connection", conn);
mgr.shutdown();
}
@Test
public void testAbortBeforeRequestStarts() throws Exception {
PoolingHttpClientConnectionManager mgr = new PoolingHttpClientConnectionManager();
mgr.setMaxTotal(1);
HttpHost target = new HttpHost("www.test.invalid", 80, "http");
HttpRoute route = new HttpRoute(target, null, false);
// get the only connection, then start an extra thread
HttpClientConnection conn = getConnection(mgr, route, 1L, TimeUnit.MILLISECONDS);
ConnectionRequest request = mgr.requestConnection(route, null);
request.cancel();
GetConnThread gct = new GetConnThread(request, route, 0L); // no timeout
gct.start();
Thread.sleep(100); // give extra thread time to block
gct.join(10000);
Assert.assertNotNull("thread should have gotten an exception",
gct.getException());
Assert.assertSame("thread got wrong exception",
InterruptedException.class,
gct.getException().getClass());
// make sure the manager is still working
try {
getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.fail("should have gotten a timeout");
} catch (ConnectionPoolTimeoutException e) {
// expected
}
mgr.releaseConnection(conn, null, -1, null);
// this time: no exception
conn = getConnection(mgr, route, 10L, TimeUnit.MILLISECONDS);
Assert.assertNotNull("should have gotten a connection", conn);
mgr.shutdown();
}
public class GetConnThread extends Thread {
private final ConnectionRequest connRequest;
private final long timeout;
private volatile HttpClientConnection connection;
private volatile Exception exception;
public GetConnThread(
final HttpClientConnectionManager mgr,
final HttpRoute route, long timeout) {
this(mgr.requestConnection(route, null), route, timeout);
}
public GetConnThread(
final ConnectionRequest connRequest,
final HttpRoute route, long timeout) {
super();
this.connRequest = connRequest;
this.timeout = timeout;
}
@Override
public void run() {
try {
connection = connRequest.get(timeout, TimeUnit.MILLISECONDS);
} catch (Exception ex) {
exception = ex;
}
}
public Throwable getException() {
return exception;
}
public HttpClientConnection getConnection() {
return connection;
}
Mockito.verify(pool).release(entry, false);
Mockito.verify(entry, Mockito.never()).setState(Mockito.anyObject());
Mockito.verify(entry, Mockito.never()).updateExpiry(Mockito.anyLong(), Mockito.eq(TimeUnit.MILLISECONDS));
} }
} }