Redesign of classic and asynchronous connection manager APIs

This commit is contained in:
Oleg Kalnichevski 2017-09-05 22:02:10 +02:00
parent a65c8e9e1a
commit d2b3385ba2
20 changed files with 388 additions and 133 deletions

View File

@ -27,7 +27,6 @@
package org.apache.hc.client5.testing.sync; package org.apache.hc.client5.testing.sync;
import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -35,10 +34,14 @@ import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.ConnectionEndpoint; import org.apache.hc.client5.http.io.ConnectionEndpoint;
import org.apache.hc.client5.http.io.LeaseRequest; import org.apache.hc.client5.http.io.LeaseRequest;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
import org.apache.hc.core5.http.message.BasicClassicHttpRequest; import org.apache.hc.core5.http.message.BasicClassicHttpRequest;
import org.apache.hc.core5.http.protocol.BasicHttpContext; import org.apache.hc.core5.http.protocol.BasicHttpContext;
@ -48,6 +51,8 @@ import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.RequestConnControl; import org.apache.hc.core5.http.protocol.RequestConnControl;
import org.apache.hc.core5.http.protocol.RequestContent; import org.apache.hc.core5.http.protocol.RequestContent;
import org.apache.hc.core5.http.protocol.RequestTargetHost; import org.apache.hc.core5.http.protocol.RequestTargetHost;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -220,21 +225,18 @@ public class TestConnectionManagement extends LocalServerTestBase {
final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS); final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS);
this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context); this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context);
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getLeased()); Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
Assert.assertEquals(1, this.connManager.getStats(route).getLeased()); Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
this.connManager.release(endpoint1, null, TimeValue.ofMillis(100)); this.connManager.release(endpoint1, null, TimeValue.ofMillis(100));
// Released, still active. // Released, still active.
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
this.connManager.closeExpired(); this.connManager.closeExpired();
// Time has not expired yet. // Time has not expired yet.
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
@ -243,7 +245,6 @@ public class TestConnectionManagement extends LocalServerTestBase {
this.connManager.closeExpired(); this.connManager.closeExpired();
// Time expired now, connections are destroyed. // Time expired now, connections are destroyed.
Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(0, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());
@ -253,7 +254,14 @@ public class TestConnectionManagement extends LocalServerTestBase {
@Test @Test
public void testCloseExpiredTTLConnections() throws Exception { public void testCloseExpiredTTLConnections() throws Exception {
this.connManager = new PoolingHttpClientConnectionManager(TimeValue.ofMillis(100)); this.connManager = new PoolingHttpClientConnectionManager(
RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory())
.build(),
PoolConcurrencyPolicy.STRICT,
PoolReusePolicy.LIFO,
TimeValue.ofMillis(100));
this.clientBuilder.setConnectionManager(this.connManager); this.clientBuilder.setConnectionManager(this.connManager);
this.connManager.setMaxTotal(1); this.connManager.setMaxTotal(1);
@ -266,21 +274,18 @@ public class TestConnectionManagement extends LocalServerTestBase {
final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS); final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS);
this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context); this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context);
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getLeased()); Assert.assertEquals(1, this.connManager.getTotalStats().getLeased());
Assert.assertEquals(1, this.connManager.getStats(route).getLeased()); Assert.assertEquals(1, this.connManager.getStats(route).getLeased());
// Release, let remain idle for forever // Release, let remain idle for forever
this.connManager.release(endpoint1, null, TimeValue.NEG_ONE_MILLISECONDS); this.connManager.release(endpoint1, null, TimeValue.NEG_ONE_MILLISECONDS);
// Released, still active. // Released, still active.
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
this.connManager.closeExpired(); this.connManager.closeExpired();
// Time has not expired yet. // Time has not expired yet.
Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes());
Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable());
@ -289,7 +294,6 @@ public class TestConnectionManagement extends LocalServerTestBase {
this.connManager.closeExpired(); this.connManager.closeExpired();
// TTL expired now, connections are destroyed. // TTL expired now, connections are destroyed.
Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes());
Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable());
Assert.assertEquals(0, this.connManager.getStats(route).getAvailable()); Assert.assertEquals(0, this.connManager.getStats(route).getAvailable());

View File

@ -78,6 +78,8 @@ import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.message.BasicLineParser; import org.apache.hc.core5.http.message.BasicLineParser;
import org.apache.hc.core5.http.message.LineParser; import org.apache.hc.core5.http.message.LineParser;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.ssl.SSLContexts; import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.CharArrayBuffer; import org.apache.hc.core5.util.CharArrayBuffer;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
@ -166,7 +168,8 @@ public class ClientConfiguration {
// Create a connection manager with custom configuration. // Create a connection manager with custom configuration.
final PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager( final PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(
socketFactoryRegistry, connFactory, dnsResolver); socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, PoolReusePolicy.LIFO, TimeValue.ofMinutes(5),
null, dnsResolver, null);
// Create socket configuration // Create socket configuration
final SocketConfig socketConfig = SocketConfig.custom() final SocketConfig socketConfig = SocketConfig.custom()

View File

@ -49,6 +49,7 @@ import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
class ExecRuntimeImpl implements ExecRuntime, Cancellable { class ExecRuntimeImpl implements ExecRuntime, Cancellable {
@ -92,7 +93,9 @@ class ExecRuntimeImpl implements ExecRuntime, Cancellable {
public void acquireConnection(final HttpRoute route, final Object object, final HttpClientContext context) throws IOException { public void acquireConnection(final HttpRoute route, final Object object, final HttpClientContext context) throws IOException {
Args.notNull(route, "Route"); Args.notNull(route, "Route");
if (endpointRef.get() == null) { if (endpointRef.get() == null) {
final LeaseRequest connRequest = manager.lease(route, object); final RequestConfig requestConfig = context.getRequestConfig();
final Timeout requestTimeout = requestConfig.getConnectionRequestTimeout();
final LeaseRequest connRequest = manager.lease(route, requestTimeout, object);
state = object; state = object;
if (cancellableAware != null) { if (cancellableAware != null) {
if (cancellableAware.isCancelled()) { if (cancellableAware.isCancelled()) {
@ -102,9 +105,7 @@ class ExecRuntimeImpl implements ExecRuntime, Cancellable {
cancellableAware.setCancellable(connRequest); cancellableAware.setCancellable(connRequest);
} }
try { try {
final RequestConfig requestConfig = context.getRequestConfig(); final ConnectionEndpoint connectionEndpoint = connRequest.get(requestTimeout.getDuration(), requestTimeout.getTimeUnit());
final TimeValue timeout = requestConfig.getConnectionRequestTimeout();
final ConnectionEndpoint connectionEndpoint = connRequest.get(timeout.getDuration(), timeout.getTimeUnit());
endpointRef.set(connectionEndpoint); endpointRef.set(connectionEndpoint);
reusable = connectionEndpoint.isConnected(); reusable = connectionEndpoint.isConnected();
if (cancellableAware != null) { if (cancellableAware != null) {

View File

@ -65,6 +65,7 @@ import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts; import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.LangUtils; import org.apache.hc.core5.util.LangUtils;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -181,8 +182,12 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT; this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
} }
@Override
public LeaseRequest lease(final HttpRoute route, final Object state) { public LeaseRequest lease(final HttpRoute route, final Object state) {
return lease(route, Timeout.DISABLED, state);
}
@Override
public LeaseRequest lease(final HttpRoute route, final Timeout requestTimeout, final Object state) {
return new LeaseRequest() { return new LeaseRequest() {
@Override @Override

View File

@ -47,6 +47,7 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory; import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory; import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.ConnectionClosedException;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
@ -65,6 +66,7 @@ import org.apache.logging.log4j.Logger;
* *
* @since 4.4 * @since 4.4
*/ */
@Internal
@Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL)
public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator { public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator {

View File

@ -56,9 +56,10 @@ import org.apache.logging.log4j.Logger;
/** /**
* Default {@link ManagedHttpClientConnection} implementation. * Default {@link ManagedHttpClientConnection} implementation.
*
* @since 4.3 * @since 4.3
*/ */
public class DefaultManagedHttpClientConnection final class DefaultManagedHttpClientConnection
extends DefaultBHttpClientConnection implements ManagedHttpClientConnection, Identifiable { extends DefaultBHttpClientConnection implements ManagedHttpClientConnection, Identifiable {
private final Logger log = LogManager.getLogger(DefaultManagedHttpClientConnection.class); private final Logger log = LogManager.getLogger(DefaultManagedHttpClientConnection.class);

View File

@ -27,7 +27,6 @@
package org.apache.hc.client5.http.impl.io; package org.apache.hc.client5.http.impl.io;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -49,13 +48,12 @@ import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.Registry; import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.config.SocketConfig;
@ -64,6 +62,9 @@ import org.apache.hc.core5.http.io.HttpConnectionFactory;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.pool.ConnPoolControl; import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.LaxConnPool;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.pool.PoolStats;
@ -71,6 +72,7 @@ import org.apache.hc.core5.pool.StrictConnPool;
import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts; import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -107,95 +109,113 @@ public class PoolingHttpClientConnectionManager
public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25; public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5; public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
private final StrictConnPool<HttpRoute, ManagedHttpClientConnection> pool;
private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
private final HttpClientConnectionOperator connectionOperator; private final HttpClientConnectionOperator connectionOperator;
private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
private final AtomicBoolean closed; private final AtomicBoolean closed;
private volatile SocketConfig defaultSocketConfig; private volatile SocketConfig defaultSocketConfig;
private volatile TimeValue validateAfterInactivity; private volatile TimeValue validateAfterInactivity;
private static Registry<ConnectionSocketFactory> getDefaultRegistry() { public PoolingHttpClientConnectionManager() {
return RegistryBuilder.<ConnectionSocketFactory>create() this(RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("http", PlainConnectionSocketFactory.getSocketFactory())
.register("https", SSLConnectionSocketFactory.getSocketFactory()) .register("https", SSLConnectionSocketFactory.getSocketFactory())
.build(); .build());
}
public PoolingHttpClientConnectionManager() {
this(getDefaultRegistry());
}
public PoolingHttpClientConnectionManager(final TimeValue timeToLive) {
this(getDefaultRegistry(), null, null ,null, PoolReusePolicy.LIFO, timeToLive);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry) { final Registry<ConnectionSocketFactory> socketFactoryRegistry) {
this(socketFactoryRegistry, null, null); this(socketFactoryRegistry, null);
}
public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final DnsResolver dnsResolver) {
this(socketFactoryRegistry, null, dnsResolver);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) { final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
this(socketFactoryRegistry, connFactory, null); this(socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECONDS, connFactory);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final TimeValue timeToLive,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) { final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
this(getDefaultRegistry(), connFactory, null); this(socketFactoryRegistry, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive, connFactory);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final DnsResolver dnsResolver) { final PoolReusePolicy poolReusePolicy,
this(socketFactoryRegistry, connFactory, null, dnsResolver, PoolReusePolicy.LIFO, TimeValue.NEG_ONE_MILLISECONDS); final TimeValue timeToLive) {
this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null);
} }
public PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry, final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null, connFactory);
}
public PoolingHttpClientConnectionManager(
final Registry<ConnectionSocketFactory> socketFactoryRegistry,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive,
final SchemePortResolver schemePortResolver, final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver, final DnsResolver dnsResolver,
final PoolReusePolicy poolReusePolicy, final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
final TimeValue timeToLive) {
this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver), this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver),
connFactory, poolReusePolicy, timeToLive); poolConcurrencyPolicy,
poolReusePolicy,
timeToLive,
connFactory);
} }
public PoolingHttpClientConnectionManager( @Internal
protected PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator, final HttpClientConnectionOperator httpClientConnectionOperator,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy, final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive) { final TimeValue timeToLive,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
super(); super();
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator"); this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
case STRICT:
this.pool = new StrictConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
DEFAULT_MAX_TOTAL_CONNECTIONS,
timeToLive,
poolReusePolicy,
null);
break;
case LAX:
this.pool = new LaxConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
timeToLive,
poolReusePolicy,
null);
break;
default:
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
}
this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE; this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
this.pool = new StrictConnPool<>(DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_TOTAL_CONNECTIONS, timeToLive,
poolReusePolicy, null);
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
} }
/** @Internal
* Visible for test. protected PoolingHttpClientConnectionManager(
*/ final HttpClientConnectionOperator httpClientConnectionOperator,
PoolingHttpClientConnectionManager( final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool,
final StrictConnPool<HttpRoute, ManagedHttpClientConnection> pool, final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
final Lookup<ConnectionSocketFactory> socketFactoryRegistry,
final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver) {
super(); super();
this.connectionOperator = new DefaultHttpClientConnectionOperator( this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
socketFactoryRegistry, schemePortResolver, dnsResolver); this.pool = Args.notNull(pool, "Connection pool");
this.connFactory = ManagedHttpClientConnectionFactory.INSTANCE; this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
this.pool = pool;
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
} }
@ -225,15 +245,24 @@ public class PoolingHttpClientConnectionManager
} }
} }
public LeaseRequest lease(final HttpRoute route, final Object state) {
return lease(route, Timeout.DISABLED, state);
}
@Override @Override
public LeaseRequest lease( public LeaseRequest lease(
final HttpRoute route, final HttpRoute route,
final Timeout requestTimeout,
final Object state) { final Object state) {
Args.notNull(route, "HTTP route"); Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) { if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, this.pool)); this.log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, this.pool));
} }
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, null); //TODO: fix me.
if (log.isWarnEnabled() && Timeout.isPositive(requestTimeout)) {
log.warn("Connection request timeout is not supported");
}
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, /** requestTimeout, */ null);
return new LeaseRequest() { return new LeaseRequest() {
private volatile ConnectionEndpoint endpoint; private volatile ConnectionEndpoint endpoint;
@ -391,14 +420,6 @@ public class PoolingHttpClientConnectionManager
this.pool.closeExpired(); this.pool.closeExpired();
} }
protected void enumAvailable(final Callback<PoolEntry<HttpRoute, ManagedHttpClientConnection>> callback) {
this.pool.enumAvailable(callback);
}
protected void enumLeased(final Callback<PoolEntry<HttpRoute, ManagedHttpClientConnection>> callback) {
this.pool.enumLeased(callback);
}
@Override @Override
public int getMaxTotal() { public int getMaxTotal() {
return this.pool.getMaxTotal(); return this.pool.getMaxTotal();
@ -439,13 +460,6 @@ public class PoolingHttpClientConnectionManager
return this.pool.getStats(route); return this.pool.getStats(route);
} }
/**
* @since 4.4
*/
public Set<HttpRoute> getRoutes() {
return this.pool.getRoutes();
}
public SocketConfig getDefaultSocketConfig() { public SocketConfig getDefaultSocketConfig() {
return this.defaultSocketConfig; return this.defaultSocketConfig;
} }

View File

@ -37,6 +37,7 @@ import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.config.SocketConfig;
import org.apache.hc.core5.http.io.HttpConnectionFactory; import org.apache.hc.core5.http.io.HttpConnectionFactory;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
@ -72,6 +73,7 @@ public class PoolingHttpClientConnectionManagerBuilder {
private LayeredConnectionSocketFactory sslSocketFactory; private LayeredConnectionSocketFactory sslSocketFactory;
private SchemePortResolver schemePortResolver; private SchemePortResolver schemePortResolver;
private DnsResolver dnsResolver; private DnsResolver dnsResolver;
private PoolConcurrencyPolicy poolConcurrencyPolicy;
private PoolReusePolicy poolReusePolicy; private PoolReusePolicy poolReusePolicy;
private SocketConfig defaultSocketConfig; private SocketConfig defaultSocketConfig;
@ -125,6 +127,14 @@ public class PoolingHttpClientConnectionManagerBuilder {
return this; return this;
} }
/**
* Assigns {@link PoolConcurrencyPolicy} value.
*/
public final PoolingHttpClientConnectionManagerBuilder setPoolConcurrencyPolicy(final PoolConcurrencyPolicy poolConcurrencyPolicy) {
this.poolConcurrencyPolicy = poolConcurrencyPolicy;
return this;
}
/** /**
* Assigns {@link PoolReusePolicy} value. * Assigns {@link PoolReusePolicy} value.
*/ */
@ -195,11 +205,12 @@ public class PoolingHttpClientConnectionManagerBuilder {
SSLConnectionSocketFactory.getSystemSocketFactory() : SSLConnectionSocketFactory.getSystemSocketFactory() :
SSLConnectionSocketFactory.getSocketFactory())) SSLConnectionSocketFactory.getSocketFactory()))
.build(), .build(),
connectionFactory, poolConcurrencyPolicy,
poolReusePolicy,
timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECONDS,
schemePortResolver, schemePortResolver,
dnsResolver, dnsResolver,
poolReusePolicy, connectionFactory);
timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECONDS);
poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity); poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity);
if (defaultSocketConfig != null) { if (defaultSocketConfig != null) {
poolingmgr.setDefaultSocketConfig(defaultSocketConfig); poolingmgr.setDefaultSocketConfig(defaultSocketConfig);

View File

@ -41,6 +41,9 @@ import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver; import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.UnsupportedSchemeException; import org.apache.hc.client5.http.UnsupportedSchemeException;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver; import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
@ -51,21 +54,28 @@ import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
final class AsyncClientConnectionOperator { /**
* Default {@link AsyncClientConnectionOperator} implementation.
*
* @since 5.0
*/
@Internal
final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
private final SchemePortResolver schemePortResolver; private final SchemePortResolver schemePortResolver;
private final DnsResolver dnsResolver; private final DnsResolver dnsResolver;
private final Lookup<TlsStrategy> tlsStrategyLookup; private final Lookup<TlsStrategy> tlsStrategyLookup;
AsyncClientConnectionOperator( DefaultAsyncClientConnectionOperator(
final Lookup<TlsStrategy> tlsStrategyLookup,
final SchemePortResolver schemePortResolver, final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver, final DnsResolver dnsResolver) {
final Lookup<TlsStrategy> tlsStrategyLookup) { this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup");
this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE; this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE; this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
this.tlsStrategyLookup = tlsStrategyLookup;
} }
@Override
public Future<ManagedAsyncClientConnection> connect( public Future<ManagedAsyncClientConnection> connect(
final ConnectionInitiator connectionInitiator, final ConnectionInitiator connectionInitiator,
final HttpHost host, final HttpHost host,
@ -108,7 +118,7 @@ final class AsyncClientConnectionOperator {
@Override @Override
public void completed(final IOSession session) { public void completed(final IOSession session) {
final ManagedAsyncClientConnection connection = new ManagedAsyncClientConnection(session); final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
if (tlsStrategy != null) { if (tlsStrategy != null) {
tlsStrategy.upgrade( tlsStrategy.upgrade(
connection, connection,
@ -152,6 +162,7 @@ final class AsyncClientConnectionOperator {
return future; return future;
} }
@Override
public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) { public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) {
final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null; final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
if (tlsStrategy != null) { if (tlsStrategy != null) {

View File

@ -35,6 +35,8 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import org.apache.hc.client5.http.impl.ConnPoolSupport; import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.EndpointDetails; import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.HttpConnection; import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.HttpVersion;
@ -53,14 +55,20 @@ import org.apache.hc.core5.util.Identifiable;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
final class ManagedAsyncClientConnection implements Identifiable, HttpConnection, TransportSecurityLayer { /**
* Default {@link ManagedAsyncClientConnection} implementation.
*
* @since 5.0
*/
@Internal
final class DefaultManagedAsyncClientConnection implements ManagedAsyncClientConnection, Identifiable {
private final Logger log = LogManager.getLogger(getClass()); private final Logger log = LogManager.getLogger(getClass());
private final IOSession ioSession; private final IOSession ioSession;
private final AtomicBoolean closed; private final AtomicBoolean closed;
public ManagedAsyncClientConnection(final IOSession ioSession) { public DefaultManagedAsyncClientConnection(final IOSession ioSession) {
this.ioSession = ioSession; this.ioSession = ioSession;
this.closed = new AtomicBoolean(); this.closed = new AtomicBoolean();
} }
@ -162,6 +170,7 @@ final class ManagedAsyncClientConnection implements Identifiable, HttpConnection
return tlsDetails != null ? tlsDetails.getSSLSession() : null; return tlsDetails != null ? tlsDetails.getSSLSession() : null;
} }
@Override
public void submitPriorityCommand(final Command command) { public void submitPriorityCommand(final Command command) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(getId() + ": priority command " + command); log.debug(getId() + ": priority command " + command);
@ -169,6 +178,7 @@ final class ManagedAsyncClientConnection implements Identifiable, HttpConnection
ioSession.addFirst(command); ioSession.addFirst(command);
} }
@Override
public void submitCommand(final Command command) { public void submitCommand(final Command command) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(getId() + ": command " + command); log.debug(getId() + ": command " + command);

View File

@ -40,8 +40,12 @@ import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.impl.ConnPoolSupport; import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.impl.ConnectionShutdownException; import org.apache.hc.client5.http.impl.ConnectionShutdownException;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
@ -50,6 +54,7 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.command.ExecutionCommand; import org.apache.hc.core5.http.nio.command.ExecutionCommand;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
@ -58,6 +63,9 @@ import org.apache.hc.core5.http2.nio.command.PingCommand;
import org.apache.hc.core5.http2.nio.support.BasicPingHandler; import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.pool.ConnPoolControl; import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.LaxConnPool;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.pool.PoolStats;
@ -94,20 +102,86 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
private final Logger log = LogManager.getLogger(getClass()); private final Logger log = LogManager.getLogger(getClass());
public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25;
public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
private final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
private final AsyncClientConnectionOperator connectionOperator; private final AsyncClientConnectionOperator connectionOperator;
private final StrictConnPool<HttpRoute, ManagedAsyncClientConnection> pool;
private final AtomicBoolean closed; private final AtomicBoolean closed;
private volatile TimeValue validateAfterInactivity; private volatile TimeValue validateAfterInactivity;
public PoolingAsyncClientConnectionManager() {
this(RegistryBuilder.<TlsStrategy>create()
.register("https", H2TlsStrategy.getDefault())
.build());
}
public PoolingAsyncClientConnectionManager(final Lookup<TlsStrategy> tlsStrategyLookup) {
this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECONDS);
}
public PoolingAsyncClientConnectionManager( public PoolingAsyncClientConnectionManager(
final Lookup<TlsStrategy> tlsStrategyLookup, final Lookup<TlsStrategy> tlsStrategyLookup,
final SchemePortResolver schemePortResolver, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final DnsResolver dnsResolver, final TimeValue timeToLive) {
this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive);
}
public PoolingAsyncClientConnectionManager(
final Lookup<TlsStrategy> tlsStrategyLookup,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive) {
this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null);
}
public PoolingAsyncClientConnectionManager(
final Lookup<TlsStrategy> tlsStrategyLookup,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive, final TimeValue timeToLive,
final PoolReusePolicy poolReusePolicy) { final SchemePortResolver schemePortResolver,
this.connectionOperator = new AsyncClientConnectionOperator(schemePortResolver, dnsResolver, tlsStrategyLookup); final DnsResolver dnsResolver) {
this.pool = new StrictConnPool<>(20, 50, timeToLive, poolReusePolicy != null ? poolReusePolicy : PoolReusePolicy.LIFO, null); this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
poolConcurrencyPolicy, poolReusePolicy, timeToLive);
}
@Internal
protected PoolingAsyncClientConnectionManager(
final AsyncClientConnectionOperator connectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive) {
this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
case STRICT:
this.pool = new StrictConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
DEFAULT_MAX_TOTAL_CONNECTIONS,
timeToLive,
poolReusePolicy,
null);
break;
case LAX:
this.pool = new LaxConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
timeToLive,
poolReusePolicy,
null);
break;
default:
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
}
this.closed = new AtomicBoolean(false);
}
@Internal
protected PoolingAsyncClientConnectionManager(
final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> pool,
final AsyncClientConnectionOperator connectionOperator) {
this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
this.pool = Args.notNull(pool, "Connection pool");
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
} }
@ -141,14 +215,18 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
public Future<AsyncConnectionEndpoint> lease( public Future<AsyncConnectionEndpoint> lease(
final HttpRoute route, final HttpRoute route,
final Object state, final Object state,
final Timeout timeout, final Timeout requestTimeout,
final FutureCallback<AsyncConnectionEndpoint> callback) { final FutureCallback<AsyncConnectionEndpoint> callback) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, pool)); log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, pool));
} }
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback); final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
//TODO: fix me.
if (log.isWarnEnabled() && Timeout.isPositive(requestTimeout)) {
log.warn("Connection request timeout is not supported");
}
final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease( final Future<PoolEntry<HttpRoute, ManagedAsyncClientConnection>> leaseFuture = pool.lease(
route, state, timeout, new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() { route, state, /** requestTimeout, **/ new FutureCallback<PoolEntry<HttpRoute, ManagedAsyncClientConnection>>() {
void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) { void leaseCompleted(final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {

View File

@ -35,6 +35,7 @@ import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.ssl.H2TlsStrategy; import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.pool.PoolConcurrencyPolicy;
import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolReusePolicy;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
@ -69,6 +70,7 @@ public class PoolingAsyncClientConnectionManagerBuilder {
private TlsStrategy tlsStrategy; private TlsStrategy tlsStrategy;
private SchemePortResolver schemePortResolver; private SchemePortResolver schemePortResolver;
private DnsResolver dnsResolver; private DnsResolver dnsResolver;
private PoolConcurrencyPolicy poolConcurrencyPolicy;
private PoolReusePolicy poolReusePolicy; private PoolReusePolicy poolReusePolicy;
private boolean systemProperties; private boolean systemProperties;
@ -112,6 +114,14 @@ public class PoolingAsyncClientConnectionManagerBuilder {
return this; return this;
} }
/**
* Assigns {@link PoolConcurrencyPolicy} value.
*/
public final PoolingAsyncClientConnectionManagerBuilder setPoolConcurrencyPolicy(final PoolConcurrencyPolicy poolConcurrencyPolicy) {
this.poolConcurrencyPolicy = poolConcurrencyPolicy;
return this;
}
/** /**
* Assigns {@link PoolReusePolicy} value. * Assigns {@link PoolReusePolicy} value.
*/ */
@ -165,15 +175,29 @@ public class PoolingAsyncClientConnectionManagerBuilder {
} }
public PoolingAsyncClientConnectionManager build() { public PoolingAsyncClientConnectionManager build() {
final TlsStrategy tlsStrategyCopy;
if (tlsStrategy != null) {
tlsStrategyCopy = tlsStrategy;
} else if (systemProperties) {
tlsStrategyCopy = AccessController.doPrivileged(new PrivilegedAction<TlsStrategy>() {
@Override
public TlsStrategy run() {
return H2TlsStrategy.getSystemDefault();
}
});
} else {
tlsStrategyCopy = H2TlsStrategy.getDefault();
}
@SuppressWarnings("resource") @SuppressWarnings("resource")
final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager( final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager(
RegistryBuilder.<TlsStrategy>create() RegistryBuilder.<TlsStrategy>create()
.register("https", getTlsStrategy()) .register("https", tlsStrategyCopy)
.build(), .build(),
schemePortResolver, poolConcurrencyPolicy,
dnsResolver, poolReusePolicy,
timeToLive, timeToLive,
poolReusePolicy); schemePortResolver,
dnsResolver);
poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity); poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity);
if (maxConnTotal > 0) { if (maxConnTotal > 0) {
poolingmgr.setMaxTotal(maxConnTotal); poolingmgr.setMaxTotal(maxConnTotal);
@ -184,18 +208,4 @@ public class PoolingAsyncClientConnectionManagerBuilder {
return poolingmgr; return poolingmgr;
} }
private TlsStrategy getTlsStrategy() {
if (tlsStrategy != null) {
return tlsStrategy;
} else if (systemProperties) {
return AccessController.doPrivileged(new PrivilegedAction<TlsStrategy>() {
@Override
public TlsStrategy run() {
return H2TlsStrategy.getSystemDefault();
}
});
} else {
return H2TlsStrategy.getDefault();
}
}
} }

View File

@ -32,6 +32,7 @@ import java.io.IOException;
import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
/** /**
* Represents a manager of persistent client connections. * Represents a manager of persistent client connections.
@ -64,10 +65,12 @@ public interface HttpClientConnectionManager extends Closeable {
* executed a {@code CONNECT} method to all intermediate proxy hops. * executed a {@code CONNECT} method to all intermediate proxy hops.
* *
* @param route HTTP route of the requested connection. * @param route HTTP route of the requested connection.
* @param requestTimeout lease request timeout.
* @param state expected state of the connection or {@code null} * @param state expected state of the connection or {@code null}
* if the connection is not expected to carry any state. * if the connection is not expected to carry any state.
* @since 5.0
*/ */
LeaseRequest lease(HttpRoute route, Object state); LeaseRequest lease(HttpRoute route, Timeout requestTimeout, Object state);
/** /**
* Releases the endpoint back to the manager making it potentially * Releases the endpoint back to the manager making it potentially

View File

@ -30,6 +30,7 @@ package org.apache.hc.client5.http.io;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.config.SocketConfig;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
@ -44,6 +45,7 @@ import org.apache.hc.core5.util.TimeValue;
* *
* @since 4.4 * @since 4.4
*/ */
@Internal
public interface HttpClientConnectionOperator { public interface HttpClientConnectionOperator {
void connect( void connect(

View File

@ -32,6 +32,7 @@ import java.net.Socket;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.io.HttpClientConnection; import org.apache.hc.core5.http.io.HttpClientConnection;
/** /**
@ -42,6 +43,7 @@ import org.apache.hc.core5.http.io.HttpClientConnection;
* *
* @since 4.3 * @since 4.3
*/ */
@Internal
public interface ManagedHttpClientConnection extends HttpClientConnection { public interface ManagedHttpClientConnection extends HttpClientConnection {
/** /**

View File

@ -71,13 +71,13 @@ public interface AsyncClientConnectionManager extends Closeable {
* @param route HTTP route of the requested connection. * @param route HTTP route of the requested connection.
* @param state expected state of the connection or {@code null} * @param state expected state of the connection or {@code null}
* if the connection is not expected to carry any state. * if the connection is not expected to carry any state.
* @param timeout lease request timeout. * @param requestTimeout lease request timeout.
* @param callback result callback. * @param callback result callback.
*/ */
Future<AsyncConnectionEndpoint> lease( Future<AsyncConnectionEndpoint> lease(
HttpRoute route, HttpRoute route,
Object state, Object state,
Timeout timeout, Timeout requestTimeout,
FutureCallback<AsyncConnectionEndpoint> callback); FutureCallback<AsyncConnectionEndpoint> callback);
/** /**

View File

@ -0,0 +1,55 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.nio;
import java.net.SocketAddress;
import java.util.concurrent.Future;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.util.TimeValue;
/**
* @since 5.0
*/
@Internal
public interface AsyncClientConnectionOperator {
Future<ManagedAsyncClientConnection> connect(
ConnectionInitiator connectionInitiator,
HttpHost host,
SocketAddress localAddress,
TimeValue connectTimeout,
Object attachment,
FutureCallback<ManagedAsyncClientConnection> callback);
void upgrade(ManagedAsyncClientConnection connection, HttpHost host, Object attachment);
}

View File

@ -0,0 +1,42 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.nio;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
@Internal
public interface ManagedAsyncClientConnection extends HttpConnection, TransportSecurityLayer {
void submitPriorityCommand(Command command);
void submitCommand(Command command);
}

View File

@ -43,6 +43,7 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.http.impl.io.HttpRequestExecutor;
import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.util.TimeValue; import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -88,7 +89,7 @@ public class TestExecRuntimeImpl {
context.setRequestConfig(config); context.setRequestConfig(config);
final HttpRoute route = new HttpRoute(new HttpHost("host", 80)); final HttpRoute route = new HttpRoute(new HttpHost("host", 80));
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -109,7 +110,7 @@ public class TestExecRuntimeImpl {
public void testAcquireEndpointAlreadyAcquired() throws Exception { public void testAcquireEndpointAlreadyAcquired() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -125,7 +126,7 @@ public class TestExecRuntimeImpl {
public void testAcquireEndpointLeaseRequestTimeout() throws Exception { public void testAcquireEndpointLeaseRequestTimeout() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenThrow(new TimeoutException()); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenThrow(new TimeoutException());
@ -136,7 +137,7 @@ public class TestExecRuntimeImpl {
public void testAcquireEndpointLeaseRequestFailure() throws Exception { public void testAcquireEndpointLeaseRequestFailure() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenThrow(new ExecutionException(new IllegalStateException())); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenThrow(new ExecutionException(new IllegalStateException()));
@ -146,7 +147,7 @@ public class TestExecRuntimeImpl {
@Test @Test
public void testAbortEndpoint() throws Exception { public void testAbortEndpoint() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -172,7 +173,7 @@ public class TestExecRuntimeImpl {
public void testCancell() throws Exception { public void testCancell() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -199,7 +200,7 @@ public class TestExecRuntimeImpl {
public void testReleaseEndpointReusable() throws Exception { public void testReleaseEndpointReusable() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -229,7 +230,7 @@ public class TestExecRuntimeImpl {
public void testReleaseEndpointNonReusable() throws Exception { public void testReleaseEndpointNonReusable() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -265,7 +266,7 @@ public class TestExecRuntimeImpl {
.build(); .build();
context.setRequestConfig(config); context.setRequestConfig(config);
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);
@ -285,7 +286,7 @@ public class TestExecRuntimeImpl {
public void testDisonnectEndpoint() throws Exception { public void testDisonnectEndpoint() throws Exception {
final HttpClientContext context = HttpClientContext.create(); final HttpClientContext context = HttpClientContext.create();
Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); Mockito.when(mgr.lease(Mockito.eq(route), Mockito.<Timeout>any(), Mockito.any())).thenReturn(leaseRequest);
Mockito.when(leaseRequest.get( Mockito.when(leaseRequest.get(
Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint); Mockito.anyLong(), Mockito.<TimeUnit>any())).thenReturn(connectionEndpoint);

View File

@ -87,7 +87,7 @@ public class TestPoolingHttpClientConnectionManager {
public void setup() throws Exception { public void setup() throws Exception {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
mgr = new PoolingHttpClientConnectionManager( mgr = new PoolingHttpClientConnectionManager(
pool, socketFactoryRegistry, schemePortResolver, dnsResolver); new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver), pool, null);
} }
@Test @Test