From eb920ec3cc56a09e197c025ec5ee330e19ab0b8a Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Mon, 8 Aug 2011 10:59:55 +0000 Subject: [PATCH] Redesign of connection management classes based on new pooling components from HttpCore git-svn-id: https://svn.apache.org/repos/asf/httpcomponents/httpclient/branches/conn-mgmt-redesign@1154916 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/http/impl/conn/HttpConnPool.java | 78 +++ .../apache/http/impl/conn/HttpPoolEntry.java | 77 +++ .../conn/ManagedClientConnectionImpl.java | 503 ++++++++++++++++++ .../conn/PoolingClientConnectionManager.java | 318 +++++++++++ 4 files changed, 976 insertions(+) create mode 100644 httpclient/src/main/java/org/apache/http/impl/conn/HttpConnPool.java create mode 100644 httpclient/src/main/java/org/apache/http/impl/conn/HttpPoolEntry.java create mode 100644 httpclient/src/main/java/org/apache/http/impl/conn/ManagedClientConnectionImpl.java create mode 100644 httpclient/src/main/java/org/apache/http/impl/conn/PoolingClientConnectionManager.java diff --git a/httpclient/src/main/java/org/apache/http/impl/conn/HttpConnPool.java b/httpclient/src/main/java/org/apache/http/impl/conn/HttpConnPool.java new file mode 100644 index 000000000..40b907395 --- /dev/null +++ b/httpclient/src/main/java/org/apache/http/impl/conn/HttpConnPool.java @@ -0,0 +1,78 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.http.impl.conn; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.http.HttpConnection; +import org.apache.http.conn.OperatedClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.pool.AbstractConnPool; + +/** + * @since 4.2 + */ +class HttpConnPool extends AbstractConnPool { + + private final Log log; + private final long timeToLive; + private final TimeUnit tunit; + + public HttpConnPool(final Log log, + final int defaultMaxPerRoute, final int maxTotal, + final long timeToLive, final TimeUnit tunit) { + super(defaultMaxPerRoute, maxTotal); + this.log = log; + this.timeToLive = timeToLive; + this.tunit = tunit; + } + + @Override + protected OperatedClientConnection createConnection(final HttpRoute route) throws IOException { + return new DefaultClientConnection(); + } + + @Override + protected HttpPoolEntry createEntry(final HttpRoute route, final OperatedClientConnection conn) { + return new HttpPoolEntry(this.log, route, conn, this.timeToLive, this.tunit); + } + + @Override + protected void closeEntry(final HttpPoolEntry entry) { + HttpConnection conn = entry.getConnection(); + try { + conn.shutdown(); + } catch (IOException ex) { + if (this.log.isDebugEnabled()) { + this.log.debug("I/O error shutting down connection", ex); + } + } + } + +} diff --git a/httpclient/src/main/java/org/apache/http/impl/conn/HttpPoolEntry.java b/httpclient/src/main/java/org/apache/http/impl/conn/HttpPoolEntry.java new file mode 100644 index 000000000..20bc68c51 --- /dev/null +++ b/httpclient/src/main/java/org/apache/http/impl/conn/HttpPoolEntry.java @@ -0,0 +1,77 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.http.impl.conn; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.http.conn.OperatedClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.routing.RouteTracker; +import org.apache.http.pool.PoolEntry; + +/** + * @since 4.2 + */ +class HttpPoolEntry extends PoolEntry { + + private final Log log; + private final RouteTracker tracker; + + public HttpPoolEntry( + final Log log, + final HttpRoute route, + final OperatedClientConnection conn, + final long timeToLive, final TimeUnit tunit) { + super(route, conn, timeToLive, tunit); + this.log = log; + this.tracker = new RouteTracker(route); + } + + @Override + public boolean isExpired(long now) { + boolean expired = super.isExpired(now); + if (expired && this.log.isDebugEnabled()) { + this.log.debug("Connection " + this + " expired @ " + new Date(getExpiry())); + } + return expired; + } + + RouteTracker getTracker() { + return this.tracker; + } + + HttpRoute getPlannedRoute() { + return getRoute(); + } + + HttpRoute getEffectiveRoute() { + return this.tracker.toRoute(); + } + +} diff --git a/httpclient/src/main/java/org/apache/http/impl/conn/ManagedClientConnectionImpl.java b/httpclient/src/main/java/org/apache/http/impl/conn/ManagedClientConnectionImpl.java new file mode 100644 index 000000000..d8949ba46 --- /dev/null +++ b/httpclient/src/main/java/org/apache/http/impl/conn/ManagedClientConnectionImpl.java @@ -0,0 +1,503 @@ +/* + * ==================================================================== + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.impl.conn; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetAddress; +import java.net.Socket; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; + +import org.apache.http.HttpConnectionMetrics; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.annotation.NotThreadSafe; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.ClientConnectionOperator; +import org.apache.http.conn.ManagedClientConnection; +import org.apache.http.conn.OperatedClientConnection; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.routing.RouteTracker; +import org.apache.http.params.HttpParams; +import org.apache.http.protocol.HttpContext; + +@NotThreadSafe +class ManagedClientConnectionImpl implements ManagedClientConnection { + + private final Lock lock; + private final ClientConnectionManager manager; + private final ClientConnectionOperator operator; + private volatile HttpPoolEntry poolEntry; + private volatile boolean reusable; + private volatile long duration; + + ManagedClientConnectionImpl( + final ClientConnectionManager manager, + final ClientConnectionOperator operator, + final HttpPoolEntry entry) { + super(); + if (manager == null) { + throw new IllegalArgumentException("Connection manager may not be null"); + } + if (operator == null) { + throw new IllegalArgumentException("Connection operator may not be null"); + } + if (entry == null) { + throw new IllegalArgumentException("HTTP pool entry may not be null"); + } + this.lock = new ReentrantLock(); + this.manager = manager; + this.operator = operator; + this.poolEntry = entry; + this.reusable = false; + this.duration = Long.MAX_VALUE; + } + + Lock getLock() { + return this.lock; + } + + HttpPoolEntry getPoolEntry() { + return this.poolEntry; + } + + void detach() { + this.poolEntry = null; + } + + boolean isDetached() { + return this.poolEntry == null; + } + + public ClientConnectionManager getManager() { + return this.manager; + } + + private OperatedClientConnection getConnection() { + this.lock.lock(); + try { + if (this.poolEntry == null) { + return null; + } + return this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + } + + private OperatedClientConnection ensureConnection() { + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new ConnectionShutdownException(); + } + return this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + } + + public void close() throws IOException { + OperatedClientConnection conn = getConnection(); + if (conn != null) { + conn.close(); + } + } + + public void shutdown() throws IOException { + OperatedClientConnection conn = getConnection(); + if (conn != null) { + conn.shutdown(); + } + } + + public boolean isOpen() { + OperatedClientConnection conn = getConnection(); + if (conn != null) { + return conn.isOpen(); + } else { + return false; + } + } + + public boolean isStale() { + OperatedClientConnection conn = getConnection(); + if (conn != null) { + return conn.isStale(); + } else { + return true; + } + } + + public void setSocketTimeout(int timeout) { + OperatedClientConnection conn = ensureConnection(); + conn.setSocketTimeout(timeout); + } + + public int getSocketTimeout() { + OperatedClientConnection conn = ensureConnection(); + return conn.getSocketTimeout(); + } + + public HttpConnectionMetrics getMetrics() { + OperatedClientConnection conn = ensureConnection(); + return conn.getMetrics(); + } + + public void flush() throws IOException { + OperatedClientConnection conn = ensureConnection(); + conn.flush(); + } + + public boolean isResponseAvailable(int timeout) throws IOException { + OperatedClientConnection conn = ensureConnection(); + return conn.isResponseAvailable(timeout); + } + + public void receiveResponseEntity( + final HttpResponse response) throws HttpException, IOException { + OperatedClientConnection conn = ensureConnection(); + conn.receiveResponseEntity(response); + } + + public HttpResponse receiveResponseHeader() throws HttpException, IOException { + OperatedClientConnection conn = ensureConnection(); + return conn.receiveResponseHeader(); + } + + public void sendRequestEntity( + final HttpEntityEnclosingRequest request) throws HttpException, IOException { + OperatedClientConnection conn = ensureConnection(); + conn.sendRequestEntity(request); + } + + public void sendRequestHeader( + final HttpRequest request) throws HttpException, IOException { + OperatedClientConnection conn = ensureConnection(); + conn.sendRequestHeader(request); + } + + public InetAddress getLocalAddress() { + OperatedClientConnection conn = ensureConnection(); + return conn.getLocalAddress(); + } + + public int getLocalPort() { + OperatedClientConnection conn = ensureConnection(); + return conn.getLocalPort(); + } + + public InetAddress getRemoteAddress() { + OperatedClientConnection conn = ensureConnection(); + return conn.getRemoteAddress(); + } + + public int getRemotePort() { + OperatedClientConnection conn = ensureConnection(); + return conn.getRemotePort(); + } + + public boolean isSecure() { + OperatedClientConnection conn = ensureConnection(); + return conn.isSecure(); + } + + public SSLSession getSSLSession() { + OperatedClientConnection conn = ensureConnection(); + SSLSession result = null; + Socket sock = conn.getSocket(); + if (sock instanceof SSLSocket) { + result = ((SSLSocket)sock).getSession(); + } + return result; + } + + public Object getAttribute(final String id) { + OperatedClientConnection conn = ensureConnection(); + if (conn instanceof HttpContext) { + return ((HttpContext) conn).getAttribute(id); + } else { + return null; + } + } + + public Object removeAttribute(final String id) { + OperatedClientConnection conn = ensureConnection(); + if (conn instanceof HttpContext) { + return ((HttpContext) conn).removeAttribute(id); + } else { + return null; + } + } + + public void setAttribute(final String id, final Object obj) { + OperatedClientConnection conn = ensureConnection(); + if (conn instanceof HttpContext) { + ((HttpContext) conn).setAttribute(id, obj); + } + } + + public HttpRoute getRoute() { + synchronized (this.poolEntry) { + return this.poolEntry.getEffectiveRoute(); + } + } + + public void open( + final HttpRoute route, + final HttpContext context, + final HttpParams params) throws IOException { + if (route == null) { + throw new IllegalArgumentException("Route may not be null"); + } + if (params == null) { + throw new IllegalArgumentException("HTTP parameters may not be null"); + } + OperatedClientConnection conn; + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new ConnectionShutdownException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + if (tracker.isConnected()) { + throw new IllegalStateException("Connection already open"); + } + conn = this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + + HttpHost proxy = route.getProxyHost(); + this.operator.openConnection( + conn, + (proxy != null) ? proxy : route.getTargetHost(), + route.getLocalAddress(), + context, params); + + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new InterruptedIOException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + if (proxy == null) { + tracker.connectTarget(conn.isSecure()); + } else { + tracker.connectProxy(proxy, conn.isSecure()); + } + } finally { + this.lock.unlock(); + } + } + + public void tunnelTarget( + boolean secure, final HttpParams params) throws IOException { + if (params == null) { + throw new IllegalArgumentException("HTTP parameters may not be null"); + } + HttpHost target; + OperatedClientConnection conn; + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new ConnectionShutdownException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + if (!tracker.isConnected()) { + throw new IllegalStateException("Connection not open"); + } + if (tracker.isTunnelled()) { + throw new IllegalStateException("Connection is already tunnelled"); + } + target = tracker.getTargetHost(); + conn = this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + + conn.update(null, target, secure, params); + + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new InterruptedIOException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + tracker.tunnelTarget(secure); + } finally { + this.lock.unlock(); + } + } + + public void tunnelProxy( + final HttpHost next, boolean secure, final HttpParams params) throws IOException { + if (next == null) { + throw new IllegalArgumentException("Next proxy amy not be null"); + } + if (params == null) { + throw new IllegalArgumentException("HTTP parameters may not be null"); + } + OperatedClientConnection conn; + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new ConnectionShutdownException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + if (!tracker.isConnected()) { + throw new IllegalStateException("Connection not open"); + } + conn = this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + + conn.update(null, next, secure, params); + + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new InterruptedIOException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + tracker.tunnelProxy(next, secure); + } finally { + this.lock.unlock(); + } + } + + public void layerProtocol( + final HttpContext context, final HttpParams params) throws IOException { + if (params == null) { + throw new IllegalArgumentException("HTTP parameters may not be null"); + } + HttpHost target; + OperatedClientConnection conn; + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new ConnectionShutdownException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + if (!tracker.isConnected()) { + throw new IllegalStateException("Connection not open"); + } + if (!tracker.isTunnelled()) { + throw new IllegalStateException("Protocol layering without a tunnel not supported"); + } + if (tracker.isLayered()) { + throw new IllegalStateException("Multiple protocol layering not supported"); + } + target = tracker.getTargetHost(); + conn = this.poolEntry.getConnection(); + } finally { + this.lock.unlock(); + } + this.operator.updateSecureConnection(conn, target, context, params); + + this.lock.lock(); + try { + if (this.poolEntry == null) { + throw new InterruptedIOException(); + } + RouteTracker tracker = this.poolEntry.getTracker(); + tracker.layerProtocol(conn.isSecure()); + } finally { + this.lock.unlock(); + } + } + + public Object getState() { + return this.poolEntry.getState(); + } + + public void setState(final Object state) { + this.poolEntry.setState(state); + } + + public void markReusable() { + this.reusable = true; + } + + public void unmarkReusable() { + this.reusable = false; + } + + public boolean isMarkedReusable() { + return this.reusable; + } + + public void setIdleDuration(long duration, TimeUnit unit) { + if(duration > 0) { + this.duration = unit.toMillis(duration); + } else { + this.duration = -1; + } + } + + public void releaseConnection() { + this.lock.lock(); + try { + if (this.poolEntry == null) { + return; + } + this.manager.releaseConnection(this, this.duration, TimeUnit.MILLISECONDS); + this.poolEntry = null; + } finally { + this.lock.unlock(); + } + } + + public void abortConnection() { + this.lock.lock(); + try { + if (this.poolEntry == null) { + return; + } + this.reusable = false; + OperatedClientConnection conn = this.poolEntry.getConnection(); + try { + conn.shutdown(); + } catch (IOException ignore) { + } + this.manager.releaseConnection(this, this.duration, TimeUnit.MILLISECONDS); + this.poolEntry = null; + } finally { + this.lock.unlock(); + } + } + +} diff --git a/httpclient/src/main/java/org/apache/http/impl/conn/PoolingClientConnectionManager.java b/httpclient/src/main/java/org/apache/http/impl/conn/PoolingClientConnectionManager.java new file mode 100644 index 000000000..45f02b930 --- /dev/null +++ b/httpclient/src/main/java/org/apache/http/impl/conn/PoolingClientConnectionManager.java @@ -0,0 +1,318 @@ +/* + * ==================================================================== + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.http.impl.conn; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Lock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.annotation.ThreadSafe; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.scheme.SchemeRegistry; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.ClientConnectionOperator; +import org.apache.http.conn.ClientConnectionRequest; +import org.apache.http.conn.ConnectionPoolTimeoutException; +import org.apache.http.conn.ManagedClientConnection; +import org.apache.http.conn.OperatedClientConnection; +import org.apache.http.pool.ConnPoolControl; +import org.apache.http.pool.PoolStats; +import org.apache.http.impl.conn.DefaultClientConnectionOperator; +import org.apache.http.impl.conn.SchemeRegistryFactory; + +/** + * Manages a pool of {@link OperatedClientConnection client connections} and + * is able to service connection requests from multiple execution threads. + * Connections are pooled on a per route basis. A request for a route which + * already the manager has persistent connections for available in the pool + * will be services by leasing a connection from the pool rather than + * creating a brand new connection. + *

+ * PoolingConnectionManager maintains a maximum limit of connection on + * a per route basis and in total. Per default this implementation will + * create no more than than 2 concurrent connections per given route + * and no more 20 connections in total. For many real-world applications + * these limits may prove too constraining, especially if they use HTTP + * as a transport protocol for their services. Connection limits, however, + * can be adjusted using HTTP parameters. + * + * @since 4.2 + */ +@ThreadSafe +public class PoolingClientConnectionManager implements ClientConnectionManager, ConnPoolControl { + + private final Log log = LogFactory.getLog(getClass()); + + private final SchemeRegistry schemeRegistry; + + private final HttpConnPool pool; + + private final ClientConnectionOperator operator; + + public PoolingClientConnectionManager(final SchemeRegistry schreg) { + this(schreg, -1, TimeUnit.MILLISECONDS); + } + + public PoolingClientConnectionManager() { + this(SchemeRegistryFactory.createDefault()); + } + + public PoolingClientConnectionManager( + final SchemeRegistry schemeRegistry, + final long timeToLive, final TimeUnit tunit) { + super(); + if (schemeRegistry == null) { + throw new IllegalArgumentException("Scheme registry may not be null"); + } + this.schemeRegistry = schemeRegistry; + this.operator = createConnectionOperator(schemeRegistry); + this.pool = new HttpConnPool(this.log, 2, 20, timeToLive, tunit); + } + + @Override + protected void finalize() throws Throwable { + try { + shutdown(); + } finally { + super.finalize(); + } + } + + /** + * Hook for creating the connection operator. + * It is called by the constructor. + * Derived classes can override this method to change the + * instantiation of the operator. + * The default implementation here instantiates + * {@link DefaultClientConnectionOperator DefaultClientConnectionOperator}. + * + * @param schreg the scheme registry. + * + * @return the connection operator to use + */ + protected ClientConnectionOperator createConnectionOperator(SchemeRegistry schreg) { + return new DefaultClientConnectionOperator(schreg); + } + + public SchemeRegistry getSchemeRegistry() { + return this.schemeRegistry; + } + + private String format(final HttpRoute route, final Object state) { + StringBuilder buf = new StringBuilder(); + buf.append("[route: ").append(route).append("]"); + if (state != null) { + buf.append("[state: ").append(state).append("]"); + } + return buf.toString(); + } + + private String formatStats(final HttpRoute route) { + StringBuilder buf = new StringBuilder(); + PoolStats totals = this.pool.getTotalStats(); + PoolStats stats = this.pool.getStats(route); + buf.append("[total kept alive: ").append(totals.getAvailable()).append("; "); + buf.append("route allocated: ").append(stats.getLeased() + stats.getAvailable()); + buf.append(" of ").append(stats.getMax()).append("; "); + buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable()); + buf.append(" of ").append(totals.getMax()).append("]"); + return buf.toString(); + } + + private String format(final HttpPoolEntry entry) { + StringBuilder buf = new StringBuilder(); + buf.append("[id: ").append(entry.getId()).append("]"); + buf.append("[route: ").append(entry.getRoute()).append("]"); + Object state = entry.getState(); + if (state != null) { + buf.append("[state: ").append(state).append("]"); + } + return buf.toString(); + } + + public ClientConnectionRequest requestConnection( + final HttpRoute route, + final Object state) { + if (route == null) { + throw new IllegalArgumentException("HTTP route may not be null"); + } + if (this.log.isDebugEnabled()) { + this.log.debug("Connection request: " + format(route, state) + formatStats(route)); + } + final Future future = this.pool.lease(route, state); + + return new ClientConnectionRequest() { + + public void abortRequest() { + future.cancel(true); + } + + public ManagedClientConnection getConnection( + final long timeout, + final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException { + return leaseConnection(future, timeout, tunit); + } + + }; + + } + + ManagedClientConnection leaseConnection( + final Future future, + final long timeout, + final TimeUnit tunit) throws InterruptedException, ConnectionPoolTimeoutException { + HttpPoolEntry entry; + try { + entry = future.get(timeout, tunit); + if (entry == null || future.isCancelled()) { + throw new InterruptedException(); + } + if (entry.getConnection() == null) { + throw new IllegalStateException("Pool entry with no connection"); + } + if (this.log.isDebugEnabled()) { + this.log.debug("Connection leased: " + format(entry) + formatStats(entry.getRoute())); + } + return new ManagedClientConnectionImpl(this, this.operator, entry); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + if (cause == null) { + cause = ex; + } + this.log.error("Unexpected exception leasing connection from pool", cause); + // Should never happen + throw new InterruptedException(); + } catch (TimeoutException ex) { + throw new ConnectionPoolTimeoutException("Timeout waiting for connection"); + } + } + + public void releaseConnection( + final ManagedClientConnection conn, final long keepalive, final TimeUnit tunit) { + + if (!(conn instanceof ManagedClientConnectionImpl)) { + throw new IllegalArgumentException + ("Connection class mismatch, " + + "connection not obtained from this manager."); + } + ManagedClientConnectionImpl managedConn = (ManagedClientConnectionImpl) conn; + if (managedConn.getManager() != this) { + throw new IllegalStateException("Connection not obtained from this manager."); + } + + Lock lock = managedConn.getLock(); + lock.lock(); + try { + HttpPoolEntry entry = managedConn.getPoolEntry(); + if (entry == null) { + return; + } + try { + if (managedConn.isOpen() && !managedConn.isMarkedReusable()) { + try { + managedConn.shutdown(); + } catch (IOException iox) { + if (this.log.isDebugEnabled()) { + this.log.debug("I/O exception shutting down released connection", iox); + } + } + } + entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS); + if (this.log.isDebugEnabled()) { + String s; + if (keepalive > 0) { + s = "for " + keepalive + " " + tunit; + } else { + s = "indefinitely"; + } + this.log.debug("Connection " + format(entry) + " can be kept alive " + s); + } + } finally { + this.pool.release(entry, managedConn.isMarkedReusable()); + managedConn.detach(); + } + if (this.log.isDebugEnabled()) { + this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute())); + } + } finally { + lock.unlock(); + } + } + + public void shutdown(long waitMs) throws IOException { + this.log.debug("Connection manager is shutting down"); + this.pool.shutdown(waitMs); + this.log.debug("Connection manager shut down"); + } + + public void shutdown() { + try { + shutdown(2000); + } catch (IOException ex) { + this.log.error("I/O exception while shutting down connection pool", ex); + } + } + + public void closeIdleConnections(long idleTimeout, TimeUnit tunit) { + if (log.isDebugEnabled()) { + log.debug("Closing connections idle longer than " + idleTimeout + " " + tunit); + } + pool.closeIdle(idleTimeout, tunit); + } + + public void closeExpiredConnections() { + this.log.debug("Closing expired connections"); + this.pool.closeExpired(); + } + + public void setMaxTotal(int max) { + this.pool.setMaxTotal(max); + } + + public void setDefaultMaxPerRoute(int max) { + this.pool.setDefaultMaxPerRoute(max); + } + + public void setMaxPerRoute(final HttpRoute route, int max) { + this.pool.setMaxPerRoute(route, max); + } + + public PoolStats getTotalStats() { + return this.pool.getTotalStats(); + } + + public PoolStats getStats(final HttpRoute route) { + return this.pool.getStats(route); + } + +} +