From d2b3385ba2b655d9942263964aad78bcca391bda Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Tue, 5 Sep 2017 22:02:10 +0200 Subject: [PATCH] Redesign of classic and asynchronous connection manager APIs --- .../sync/TestConnectionManagement.java | 24 +-- .../http/examples/ClientConfiguration.java | 5 +- .../http/impl/classic/ExecRuntimeImpl.java | 9 +- .../io/BasicHttpClientConnectionManager.java | 7 +- .../DefaultHttpClientConnectionOperator.java | 2 + .../DefaultManagedHttpClientConnection.java | 3 +- .../PoolingHttpClientConnectionManager.java | 144 ++++++++++-------- ...ingHttpClientConnectionManagerBuilder.java | 17 ++- ...DefaultAsyncClientConnectionOperator.java} | 23 ++- ... DefaultManagedAsyncClientConnection.java} | 14 +- .../PoolingAsyncClientConnectionManager.java | 94 +++++++++++- ...ngAsyncClientConnectionManagerBuilder.java | 46 +++--- .../http/io/HttpClientConnectionManager.java | 5 +- .../http/io/HttpClientConnectionOperator.java | 2 + .../http/io/ManagedHttpClientConnection.java | 2 + .../nio/AsyncClientConnectionManager.java | 4 +- .../nio/AsyncClientConnectionOperator.java | 55 +++++++ .../nio/ManagedAsyncClientConnection.java | 42 +++++ .../impl/classic/TestExecRuntimeImpl.java | 21 +-- ...estPoolingHttpClientConnectionManager.java | 2 +- 20 files changed, 388 insertions(+), 133 deletions(-) rename httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/{AsyncClientConnectionOperator.java => DefaultAsyncClientConnectionOperator.java} (89%) rename httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/{ManagedAsyncClientConnection.java => DefaultManagedAsyncClientConnection.java} (93%) create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java create mode 100644 httpclient5/src/main/java/org/apache/hc/client5/http/nio/ManagedAsyncClientConnection.java diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java index f67f96cb2..bf76397e4 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/sync/TestConnectionManagement.java @@ -27,7 +27,6 @@ package org.apache.hc.client5.testing.sync; -import java.util.Collections; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -35,10 +34,14 @@ import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.io.ConnectionEndpoint; import org.apache.hc.client5.http.io.LeaseRequest; +import org.apache.hc.client5.http.socket.ConnectionSocketFactory; +import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; +import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.http.message.BasicClassicHttpRequest; import org.apache.hc.core5.http.protocol.BasicHttpContext; @@ -48,6 +51,8 @@ import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.http.protocol.RequestConnControl; import org.apache.hc.core5.http.protocol.RequestContent; import org.apache.hc.core5.http.protocol.RequestTargetHost; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; +import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.util.TimeValue; import org.junit.Assert; import org.junit.Test; @@ -220,21 +225,18 @@ public class TestConnectionManagement extends LocalServerTestBase { final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS); this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context); - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getLeased()); Assert.assertEquals(1, this.connManager.getStats(route).getLeased()); this.connManager.release(endpoint1, null, TimeValue.ofMillis(100)); // Released, still active. - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); this.connManager.closeExpired(); // Time has not expired yet. - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); @@ -243,7 +245,6 @@ public class TestConnectionManagement extends LocalServerTestBase { this.connManager.closeExpired(); // Time expired now, connections are destroyed. - Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes()); Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(0, this.connManager.getStats(route).getAvailable()); @@ -253,7 +254,14 @@ public class TestConnectionManagement extends LocalServerTestBase { @Test public void testCloseExpiredTTLConnections() throws Exception { - this.connManager = new PoolingHttpClientConnectionManager(TimeValue.ofMillis(100)); + this.connManager = new PoolingHttpClientConnectionManager( + RegistryBuilder.create() + .register("http", PlainConnectionSocketFactory.getSocketFactory()) + .register("https", SSLConnectionSocketFactory.getSocketFactory()) + .build(), + PoolConcurrencyPolicy.STRICT, + PoolReusePolicy.LIFO, + TimeValue.ofMillis(100)); this.clientBuilder.setConnectionManager(this.connManager); this.connManager.setMaxTotal(1); @@ -266,21 +274,18 @@ public class TestConnectionManagement extends LocalServerTestBase { final ConnectionEndpoint endpoint1 = leaseRequest1.get(0, TimeUnit.MILLISECONDS); this.connManager.connect(endpoint1, TimeValue.NEG_ONE_MILLISECONDS, context); - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getLeased()); Assert.assertEquals(1, this.connManager.getStats(route).getLeased()); // Release, let remain idle for forever this.connManager.release(endpoint1, null, TimeValue.NEG_ONE_MILLISECONDS); // Released, still active. - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); this.connManager.closeExpired(); // Time has not expired yet. - Assert.assertEquals(Collections.singleton(route), this.connManager.getRoutes()); Assert.assertEquals(1, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(1, this.connManager.getStats(route).getAvailable()); @@ -289,7 +294,6 @@ public class TestConnectionManagement extends LocalServerTestBase { this.connManager.closeExpired(); // TTL expired now, connections are destroyed. - Assert.assertEquals(Collections.emptySet(), this.connManager.getRoutes()); Assert.assertEquals(0, this.connManager.getTotalStats().getAvailable()); Assert.assertEquals(0, this.connManager.getStats(route).getAvailable()); diff --git a/httpclient5/src/examples/org/apache/hc/client5/http/examples/ClientConfiguration.java b/httpclient5/src/examples/org/apache/hc/client5/http/examples/ClientConfiguration.java index 8fcd99674..1868ac429 100644 --- a/httpclient5/src/examples/org/apache/hc/client5/http/examples/ClientConfiguration.java +++ b/httpclient5/src/examples/org/apache/hc/client5/http/examples/ClientConfiguration.java @@ -78,6 +78,8 @@ import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.hc.core5.http.message.BasicHeader; import org.apache.hc.core5.http.message.BasicLineParser; import org.apache.hc.core5.http.message.LineParser; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; +import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.ssl.SSLContexts; import org.apache.hc.core5.util.CharArrayBuffer; import org.apache.hc.core5.util.TimeValue; @@ -166,7 +168,8 @@ public class ClientConfiguration { // Create a connection manager with custom configuration. final PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager( - socketFactoryRegistry, connFactory, dnsResolver); + socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, PoolReusePolicy.LIFO, TimeValue.ofMinutes(5), + null, dnsResolver, null); // Create socket configuration final SocketConfig socketConfig = SocketConfig.custom() diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/ExecRuntimeImpl.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/ExecRuntimeImpl.java index b9a0666f3..75e876e2b 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/ExecRuntimeImpl.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/classic/ExecRuntimeImpl.java @@ -49,6 +49,7 @@ import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; import org.apache.logging.log4j.Logger; class ExecRuntimeImpl implements ExecRuntime, Cancellable { @@ -92,7 +93,9 @@ class ExecRuntimeImpl implements ExecRuntime, Cancellable { public void acquireConnection(final HttpRoute route, final Object object, final HttpClientContext context) throws IOException { Args.notNull(route, "Route"); if (endpointRef.get() == null) { - final LeaseRequest connRequest = manager.lease(route, object); + final RequestConfig requestConfig = context.getRequestConfig(); + final Timeout requestTimeout = requestConfig.getConnectionRequestTimeout(); + final LeaseRequest connRequest = manager.lease(route, requestTimeout, object); state = object; if (cancellableAware != null) { if (cancellableAware.isCancelled()) { @@ -102,9 +105,7 @@ class ExecRuntimeImpl implements ExecRuntime, Cancellable { cancellableAware.setCancellable(connRequest); } try { - final RequestConfig requestConfig = context.getRequestConfig(); - final TimeValue timeout = requestConfig.getConnectionRequestTimeout(); - final ConnectionEndpoint connectionEndpoint = connRequest.get(timeout.getDuration(), timeout.getTimeUnit()); + final ConnectionEndpoint connectionEndpoint = connRequest.get(requestTimeout.getDuration(), requestTimeout.getTimeUnit()); endpointRef.set(connectionEndpoint); reusable = connectionEndpoint.isConnected(); if (cancellableAware != null) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/BasicHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/BasicHttpClientConnectionManager.java index 8d61e4112..ecde0f203 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/BasicHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/BasicHttpClientConnectionManager.java @@ -65,6 +65,7 @@ import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Asserts; import org.apache.hc.core5.util.LangUtils; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -181,8 +182,12 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT; } - @Override public LeaseRequest lease(final HttpRoute route, final Object state) { + return lease(route, Timeout.DISABLED, state); + } + + @Override + public LeaseRequest lease(final HttpRoute route, final Timeout requestTimeout, final Object state) { return new LeaseRequest() { @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java index cee5af700..0a380184a 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultHttpClientConnectionOperator.java @@ -47,6 +47,7 @@ import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.socket.ConnectionSocketFactory; import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory; import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.http.ConnectionClosedException; import org.apache.hc.core5.http.HttpHost; @@ -65,6 +66,7 @@ import org.apache.logging.log4j.Logger; * * @since 4.4 */ +@Internal @Contract(threading = ThreadingBehavior.IMMUTABLE_CONDITIONAL) public class DefaultHttpClientConnectionOperator implements HttpClientConnectionOperator { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultManagedHttpClientConnection.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultManagedHttpClientConnection.java index 52dea8639..dafa8e259 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultManagedHttpClientConnection.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/DefaultManagedHttpClientConnection.java @@ -56,9 +56,10 @@ import org.apache.logging.log4j.Logger; /** * Default {@link ManagedHttpClientConnection} implementation. + * * @since 4.3 */ -public class DefaultManagedHttpClientConnection +final class DefaultManagedHttpClientConnection extends DefaultBHttpClientConnection implements ManagedHttpClientConnection, Identifiable { private final Logger log = LogManager.getLogger(DefaultManagedHttpClientConnection.class); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java index ecb237eff..e517ef861 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManager.java @@ -27,7 +27,6 @@ package org.apache.hc.client5.http.impl.io; import java.io.IOException; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -49,13 +48,12 @@ import org.apache.hc.client5.http.socket.ConnectionSocketFactory; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; -import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpResponse; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; -import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.Registry; import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.config.SocketConfig; @@ -64,6 +62,9 @@ import org.apache.hc.core5.http.io.HttpConnectionFactory; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.pool.ConnPoolControl; +import org.apache.hc.core5.pool.LaxConnPool; +import org.apache.hc.core5.pool.ManagedConnPool; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolStats; @@ -71,6 +72,7 @@ import org.apache.hc.core5.pool.StrictConnPool; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.Asserts; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -107,95 +109,113 @@ public class PoolingHttpClientConnectionManager public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25; public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5; - private final StrictConnPool pool; - private final HttpConnectionFactory connFactory; private final HttpClientConnectionOperator connectionOperator; + private final ManagedConnPool pool; + private final HttpConnectionFactory connFactory; private final AtomicBoolean closed; private volatile SocketConfig defaultSocketConfig; private volatile TimeValue validateAfterInactivity; - private static Registry getDefaultRegistry() { - return RegistryBuilder.create() + public PoolingHttpClientConnectionManager() { + this(RegistryBuilder.create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", SSLConnectionSocketFactory.getSocketFactory()) - .build(); - } - - public PoolingHttpClientConnectionManager() { - this(getDefaultRegistry()); - } - - public PoolingHttpClientConnectionManager(final TimeValue timeToLive) { - this(getDefaultRegistry(), null, null ,null, PoolReusePolicy.LIFO, timeToLive); + .build()); } public PoolingHttpClientConnectionManager( final Registry socketFactoryRegistry) { - this(socketFactoryRegistry, null, null); - } - - public PoolingHttpClientConnectionManager( - final Registry socketFactoryRegistry, - final DnsResolver dnsResolver) { - this(socketFactoryRegistry, null, dnsResolver); + this(socketFactoryRegistry, null); } public PoolingHttpClientConnectionManager( final Registry socketFactoryRegistry, final HttpConnectionFactory connFactory) { - this(socketFactoryRegistry, connFactory, null); + this(socketFactoryRegistry, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECONDS, connFactory); } public PoolingHttpClientConnectionManager( + final Registry socketFactoryRegistry, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final TimeValue timeToLive, final HttpConnectionFactory connFactory) { - this(getDefaultRegistry(), connFactory, null); + this(socketFactoryRegistry, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive, connFactory); } public PoolingHttpClientConnectionManager( final Registry socketFactoryRegistry, - final HttpConnectionFactory connFactory, - final DnsResolver dnsResolver) { - this(socketFactoryRegistry, connFactory, null, dnsResolver, PoolReusePolicy.LIFO, TimeValue.NEG_ONE_MILLISECONDS); + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive) { + this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null); } public PoolingHttpClientConnectionManager( final Registry socketFactoryRegistry, - final HttpConnectionFactory connFactory, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive, + final HttpConnectionFactory connFactory) { + this(socketFactoryRegistry, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null, connFactory); + } + + public PoolingHttpClientConnectionManager( + final Registry socketFactoryRegistry, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive, final SchemePortResolver schemePortResolver, final DnsResolver dnsResolver, - final PoolReusePolicy poolReusePolicy, - final TimeValue timeToLive) { + final HttpConnectionFactory connFactory) { this(new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver), - connFactory, poolReusePolicy, timeToLive); + poolConcurrencyPolicy, + poolReusePolicy, + timeToLive, + connFactory); } - public PoolingHttpClientConnectionManager( + @Internal + protected PoolingHttpClientConnectionManager( final HttpClientConnectionOperator httpClientConnectionOperator, - final HttpConnectionFactory connFactory, + final PoolConcurrencyPolicy poolConcurrencyPolicy, final PoolReusePolicy poolReusePolicy, - final TimeValue timeToLive) { + final TimeValue timeToLive, + final HttpConnectionFactory connFactory) { super(); this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator"); + switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) { + case STRICT: + this.pool = new StrictConnPool<>( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + DEFAULT_MAX_TOTAL_CONNECTIONS, + timeToLive, + poolReusePolicy, + null); + break; + case LAX: + this.pool = new LaxConnPool<>( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + timeToLive, + poolReusePolicy, + null); + break; + default: + throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy); + } this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE; - this.pool = new StrictConnPool<>(DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_TOTAL_CONNECTIONS, timeToLive, - poolReusePolicy, null); this.closed = new AtomicBoolean(false); } - /** - * Visible for test. - */ - PoolingHttpClientConnectionManager( - final StrictConnPool pool, - final Lookup socketFactoryRegistry, - final SchemePortResolver schemePortResolver, - final DnsResolver dnsResolver) { + @Internal + protected PoolingHttpClientConnectionManager( + final HttpClientConnectionOperator httpClientConnectionOperator, + final ManagedConnPool pool, + final HttpConnectionFactory connFactory) { super(); - this.connectionOperator = new DefaultHttpClientConnectionOperator( - socketFactoryRegistry, schemePortResolver, dnsResolver); - this.connFactory = ManagedHttpClientConnectionFactory.INSTANCE; - this.pool = pool; + this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator"); + this.pool = Args.notNull(pool, "Connection pool"); + this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE; this.closed = new AtomicBoolean(false); } @@ -225,15 +245,24 @@ public class PoolingHttpClientConnectionManager } } + public LeaseRequest lease(final HttpRoute route, final Object state) { + return lease(route, Timeout.DISABLED, state); + } + @Override public LeaseRequest lease( final HttpRoute route, + final Timeout requestTimeout, final Object state) { Args.notNull(route, "HTTP route"); if (this.log.isDebugEnabled()) { this.log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, this.pool)); } - final Future> leaseFuture = this.pool.lease(route, state, null); + //TODO: fix me. + if (log.isWarnEnabled() && Timeout.isPositive(requestTimeout)) { + log.warn("Connection request timeout is not supported"); + } + final Future> leaseFuture = this.pool.lease(route, state, /** requestTimeout, */ null); return new LeaseRequest() { private volatile ConnectionEndpoint endpoint; @@ -391,14 +420,6 @@ public class PoolingHttpClientConnectionManager this.pool.closeExpired(); } - protected void enumAvailable(final Callback> callback) { - this.pool.enumAvailable(callback); - } - - protected void enumLeased(final Callback> callback) { - this.pool.enumLeased(callback); - } - @Override public int getMaxTotal() { return this.pool.getMaxTotal(); @@ -439,13 +460,6 @@ public class PoolingHttpClientConnectionManager return this.pool.getStats(route); } - /** - * @since 4.4 - */ - public Set getRoutes() { - return this.pool.getRoutes(); - } - public SocketConfig getDefaultSocketConfig() { return this.defaultSocketConfig; } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java index 0e3b5539b..037c7330d 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/io/PoolingHttpClientConnectionManagerBuilder.java @@ -37,6 +37,7 @@ import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.io.HttpConnectionFactory; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.util.TimeValue; @@ -72,6 +73,7 @@ public class PoolingHttpClientConnectionManagerBuilder { private LayeredConnectionSocketFactory sslSocketFactory; private SchemePortResolver schemePortResolver; private DnsResolver dnsResolver; + private PoolConcurrencyPolicy poolConcurrencyPolicy; private PoolReusePolicy poolReusePolicy; private SocketConfig defaultSocketConfig; @@ -125,6 +127,14 @@ public class PoolingHttpClientConnectionManagerBuilder { return this; } + /** + * Assigns {@link PoolConcurrencyPolicy} value. + */ + public final PoolingHttpClientConnectionManagerBuilder setPoolConcurrencyPolicy(final PoolConcurrencyPolicy poolConcurrencyPolicy) { + this.poolConcurrencyPolicy = poolConcurrencyPolicy; + return this; + } + /** * Assigns {@link PoolReusePolicy} value. */ @@ -195,11 +205,12 @@ public class PoolingHttpClientConnectionManagerBuilder { SSLConnectionSocketFactory.getSystemSocketFactory() : SSLConnectionSocketFactory.getSocketFactory())) .build(), - connectionFactory, + poolConcurrencyPolicy, + poolReusePolicy, + timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECONDS, schemePortResolver, dnsResolver, - poolReusePolicy, - timeToLive != null ? timeToLive : TimeValue.NEG_ONE_MILLISECONDS); + connectionFactory); poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity); if (defaultSocketConfig != null) { poolingmgr.setDefaultSocketConfig(defaultSocketConfig); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java similarity index 89% rename from httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java rename to httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java index 04c07e2df..c5af9a2e4 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/AsyncClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultAsyncClientConnectionOperator.java @@ -41,6 +41,9 @@ import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.SystemDefaultDnsResolver; import org.apache.hc.client5.http.UnsupportedSchemeException; import org.apache.hc.client5.http.impl.DefaultSchemePortResolver; +import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; +import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.HttpHost; @@ -51,21 +54,28 @@ import org.apache.hc.core5.reactor.IOSession; import org.apache.hc.core5.util.Args; import org.apache.hc.core5.util.TimeValue; -final class AsyncClientConnectionOperator { +/** + * Default {@link AsyncClientConnectionOperator} implementation. + * + * @since 5.0 + */ +@Internal +final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator { private final SchemePortResolver schemePortResolver; private final DnsResolver dnsResolver; private final Lookup tlsStrategyLookup; - AsyncClientConnectionOperator( + DefaultAsyncClientConnectionOperator( + final Lookup tlsStrategyLookup, final SchemePortResolver schemePortResolver, - final DnsResolver dnsResolver, - final Lookup tlsStrategyLookup) { + final DnsResolver dnsResolver) { + this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup"); this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE; this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE; - this.tlsStrategyLookup = tlsStrategyLookup; } + @Override public Future connect( final ConnectionInitiator connectionInitiator, final HttpHost host, @@ -108,7 +118,7 @@ final class AsyncClientConnectionOperator { @Override public void completed(final IOSession session) { - final ManagedAsyncClientConnection connection = new ManagedAsyncClientConnection(session); + final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session); if (tlsStrategy != null) { tlsStrategy.upgrade( connection, @@ -152,6 +162,7 @@ final class AsyncClientConnectionOperator { return future; } + @Override public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) { final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null; if (tlsStrategy != null) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/ManagedAsyncClientConnection.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultManagedAsyncClientConnection.java similarity index 93% rename from httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/ManagedAsyncClientConnection.java rename to httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultManagedAsyncClientConnection.java index ef8bebaac..1132ffe18 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/ManagedAsyncClientConnection.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/DefaultManagedAsyncClientConnection.java @@ -35,6 +35,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSession; import org.apache.hc.client5.http.impl.ConnPoolSupport; +import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.EndpointDetails; import org.apache.hc.core5.http.HttpConnection; import org.apache.hc.core5.http.HttpVersion; @@ -53,14 +55,20 @@ import org.apache.hc.core5.util.Identifiable; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -final class ManagedAsyncClientConnection implements Identifiable, HttpConnection, TransportSecurityLayer { +/** + * Default {@link ManagedAsyncClientConnection} implementation. + * + * @since 5.0 + */ +@Internal +final class DefaultManagedAsyncClientConnection implements ManagedAsyncClientConnection, Identifiable { private final Logger log = LogManager.getLogger(getClass()); private final IOSession ioSession; private final AtomicBoolean closed; - public ManagedAsyncClientConnection(final IOSession ioSession) { + public DefaultManagedAsyncClientConnection(final IOSession ioSession) { this.ioSession = ioSession; this.closed = new AtomicBoolean(); } @@ -162,6 +170,7 @@ final class ManagedAsyncClientConnection implements Identifiable, HttpConnection return tlsDetails != null ? tlsDetails.getSSLSession() : null; } + @Override public void submitPriorityCommand(final Command command) { if (log.isDebugEnabled()) { log.debug(getId() + ": priority command " + command); @@ -169,6 +178,7 @@ final class ManagedAsyncClientConnection implements Identifiable, HttpConnection ioSession.addFirst(command); } + @Override public void submitCommand(final Command command) { if (log.isDebugEnabled()) { log.debug(getId() + ": command " + command); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index 93ce8eb38..8dbfc8acb 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -40,8 +40,12 @@ import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.impl.ConnPoolSupport; import org.apache.hc.client5.http.impl.ConnectionShutdownException; import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; +import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; +import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; +import org.apache.hc.client5.http.ssl.H2TlsStrategy; import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; @@ -50,6 +54,7 @@ import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.config.Lookup; +import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.command.ExecutionCommand; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; @@ -58,6 +63,9 @@ import org.apache.hc.core5.http2.nio.command.PingCommand; import org.apache.hc.core5.http2.nio.support.BasicPingHandler; import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.pool.ConnPoolControl; +import org.apache.hc.core5.pool.LaxConnPool; +import org.apache.hc.core5.pool.ManagedConnPool; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolEntry; import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.pool.PoolStats; @@ -94,20 +102,86 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio private final Logger log = LogManager.getLogger(getClass()); + public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 25; + public static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5; + + private final ManagedConnPool pool; private final AsyncClientConnectionOperator connectionOperator; - private final StrictConnPool pool; private final AtomicBoolean closed; private volatile TimeValue validateAfterInactivity; + public PoolingAsyncClientConnectionManager() { + this(RegistryBuilder.create() + .register("https", H2TlsStrategy.getDefault()) + .build()); + } + + public PoolingAsyncClientConnectionManager(final Lookup tlsStrategyLookup) { + this(tlsStrategyLookup, PoolConcurrencyPolicy.STRICT, TimeValue.NEG_ONE_MILLISECONDS); + } + public PoolingAsyncClientConnectionManager( final Lookup tlsStrategyLookup, - final SchemePortResolver schemePortResolver, - final DnsResolver dnsResolver, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final TimeValue timeToLive) { + this(tlsStrategyLookup, poolConcurrencyPolicy, PoolReusePolicy.LIFO, timeToLive); + } + + public PoolingAsyncClientConnectionManager( + final Lookup tlsStrategyLookup, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive) { + this(tlsStrategyLookup, poolConcurrencyPolicy, poolReusePolicy, timeToLive, null, null); + } + + public PoolingAsyncClientConnectionManager( + final Lookup tlsStrategyLookup, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, final TimeValue timeToLive, - final PoolReusePolicy poolReusePolicy) { - this.connectionOperator = new AsyncClientConnectionOperator(schemePortResolver, dnsResolver, tlsStrategyLookup); - this.pool = new StrictConnPool<>(20, 50, timeToLive, poolReusePolicy != null ? poolReusePolicy : PoolReusePolicy.LIFO, null); + final SchemePortResolver schemePortResolver, + final DnsResolver dnsResolver) { + this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver), + poolConcurrencyPolicy, poolReusePolicy, timeToLive); + } + + @Internal + protected PoolingAsyncClientConnectionManager( + final AsyncClientConnectionOperator connectionOperator, + final PoolConcurrencyPolicy poolConcurrencyPolicy, + final PoolReusePolicy poolReusePolicy, + final TimeValue timeToLive) { + this.connectionOperator = Args.notNull(connectionOperator, "Connection operator"); + switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) { + case STRICT: + this.pool = new StrictConnPool<>( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + DEFAULT_MAX_TOTAL_CONNECTIONS, + timeToLive, + poolReusePolicy, + null); + break; + case LAX: + this.pool = new LaxConnPool<>( + DEFAULT_MAX_CONNECTIONS_PER_ROUTE, + timeToLive, + poolReusePolicy, + null); + break; + default: + throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy); + } + this.closed = new AtomicBoolean(false); + } + + @Internal + protected PoolingAsyncClientConnectionManager( + final ManagedConnPool pool, + final AsyncClientConnectionOperator connectionOperator) { + this.connectionOperator = Args.notNull(connectionOperator, "Connection operator"); + this.pool = Args.notNull(pool, "Connection pool"); this.closed = new AtomicBoolean(false); } @@ -141,14 +215,18 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio public Future lease( final HttpRoute route, final Object state, - final Timeout timeout, + final Timeout requestTimeout, final FutureCallback callback) { if (log.isDebugEnabled()) { log.debug("Connection request: " + ConnPoolSupport.formatStats(null, route, state, pool)); } final ComplexFuture resultFuture = new ComplexFuture<>(callback); + //TODO: fix me. + if (log.isWarnEnabled() && Timeout.isPositive(requestTimeout)) { + log.warn("Connection request timeout is not supported"); + } final Future> leaseFuture = pool.lease( - route, state, timeout, new FutureCallback>() { + route, state, /** requestTimeout, **/ new FutureCallback>() { void leaseCompleted(final PoolEntry poolEntry) { if (log.isDebugEnabled()) { diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java index ed2e42d20..63d1d00cd 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManagerBuilder.java @@ -35,6 +35,7 @@ import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.ssl.H2TlsStrategy; import org.apache.hc.core5.http.config.RegistryBuilder; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.pool.PoolConcurrencyPolicy; import org.apache.hc.core5.pool.PoolReusePolicy; import org.apache.hc.core5.util.TimeValue; @@ -69,6 +70,7 @@ public class PoolingAsyncClientConnectionManagerBuilder { private TlsStrategy tlsStrategy; private SchemePortResolver schemePortResolver; private DnsResolver dnsResolver; + private PoolConcurrencyPolicy poolConcurrencyPolicy; private PoolReusePolicy poolReusePolicy; private boolean systemProperties; @@ -112,6 +114,14 @@ public class PoolingAsyncClientConnectionManagerBuilder { return this; } + /** + * Assigns {@link PoolConcurrencyPolicy} value. + */ + public final PoolingAsyncClientConnectionManagerBuilder setPoolConcurrencyPolicy(final PoolConcurrencyPolicy poolConcurrencyPolicy) { + this.poolConcurrencyPolicy = poolConcurrencyPolicy; + return this; + } + /** * Assigns {@link PoolReusePolicy} value. */ @@ -165,15 +175,29 @@ public class PoolingAsyncClientConnectionManagerBuilder { } public PoolingAsyncClientConnectionManager build() { + final TlsStrategy tlsStrategyCopy; + if (tlsStrategy != null) { + tlsStrategyCopy = tlsStrategy; + } else if (systemProperties) { + tlsStrategyCopy = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public TlsStrategy run() { + return H2TlsStrategy.getSystemDefault(); + } + }); + } else { + tlsStrategyCopy = H2TlsStrategy.getDefault(); + } @SuppressWarnings("resource") final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager( RegistryBuilder.create() - .register("https", getTlsStrategy()) + .register("https", tlsStrategyCopy) .build(), - schemePortResolver, - dnsResolver, + poolConcurrencyPolicy, + poolReusePolicy, timeToLive, - poolReusePolicy); + schemePortResolver, + dnsResolver); poolingmgr.setValidateAfterInactivity(this.validateAfterInactivity); if (maxConnTotal > 0) { poolingmgr.setMaxTotal(maxConnTotal); @@ -184,18 +208,4 @@ public class PoolingAsyncClientConnectionManagerBuilder { return poolingmgr; } - private TlsStrategy getTlsStrategy() { - if (tlsStrategy != null) { - return tlsStrategy; - } else if (systemProperties) { - return AccessController.doPrivileged(new PrivilegedAction() { - @Override - public TlsStrategy run() { - return H2TlsStrategy.getSystemDefault(); - } - }); - } else { - return H2TlsStrategy.getDefault(); - } - } } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionManager.java index 41bff4a80..f689101cc 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionManager.java @@ -32,6 +32,7 @@ import java.io.IOException; import org.apache.hc.client5.http.HttpRoute; import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; /** * Represents a manager of persistent client connections. @@ -64,10 +65,12 @@ public interface HttpClientConnectionManager extends Closeable { * executed a {@code CONNECT} method to all intermediate proxy hops. * * @param route HTTP route of the requested connection. + * @param requestTimeout lease request timeout. * @param state expected state of the connection or {@code null} * if the connection is not expected to carry any state. + * @since 5.0 */ - LeaseRequest lease(HttpRoute route, Object state); + LeaseRequest lease(HttpRoute route, Timeout requestTimeout, Object state); /** * Releases the endpoint back to the manager making it potentially diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java index ed239bad2..c472a9540 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/io/HttpClientConnectionOperator.java @@ -30,6 +30,7 @@ package org.apache.hc.client5.http.io; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.config.SocketConfig; import org.apache.hc.core5.http.protocol.HttpContext; @@ -44,6 +45,7 @@ import org.apache.hc.core5.util.TimeValue; * * @since 4.4 */ +@Internal public interface HttpClientConnectionOperator { void connect( diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/io/ManagedHttpClientConnection.java b/httpclient5/src/main/java/org/apache/hc/client5/http/io/ManagedHttpClientConnection.java index af23aefc2..fa0e92d1c 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/io/ManagedHttpClientConnection.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/io/ManagedHttpClientConnection.java @@ -32,6 +32,7 @@ import java.net.Socket; import javax.net.ssl.SSLSession; +import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.http.io.HttpClientConnection; /** @@ -42,6 +43,7 @@ import org.apache.hc.core5.http.io.HttpClientConnection; * * @since 4.3 */ +@Internal public interface ManagedHttpClientConnection extends HttpClientConnection { /** diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java index 1b119bf82..55b878b92 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionManager.java @@ -71,13 +71,13 @@ public interface AsyncClientConnectionManager extends Closeable { * @param route HTTP route of the requested connection. * @param state expected state of the connection or {@code null} * if the connection is not expected to carry any state. - * @param timeout lease request timeout. + * @param requestTimeout lease request timeout. * @param callback result callback. */ Future lease( HttpRoute route, Object state, - Timeout timeout, + Timeout requestTimeout, FutureCallback callback); /** diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java new file mode 100644 index 000000000..4bea69c7d --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncClientConnectionOperator.java @@ -0,0 +1,55 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.nio; + +import java.net.SocketAddress; +import java.util.concurrent.Future; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.reactor.ConnectionInitiator; +import org.apache.hc.core5.util.TimeValue; + +/** + * @since 5.0 + */ +@Internal +public interface AsyncClientConnectionOperator { + + Future connect( + ConnectionInitiator connectionInitiator, + HttpHost host, + SocketAddress localAddress, + TimeValue connectTimeout, + Object attachment, + FutureCallback callback); + + void upgrade(ManagedAsyncClientConnection connection, HttpHost host, Object attachment); + +} diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/ManagedAsyncClientConnection.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/ManagedAsyncClientConnection.java new file mode 100644 index 000000000..67266604f --- /dev/null +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/ManagedAsyncClientConnection.java @@ -0,0 +1,42 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ + +package org.apache.hc.client5.http.nio; + +import org.apache.hc.core5.annotation.Internal; +import org.apache.hc.core5.http.HttpConnection; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; + +@Internal +public interface ManagedAsyncClientConnection extends HttpConnection, TransportSecurityLayer { + + void submitPriorityCommand(Command command); + + void submitCommand(Command command); + +} diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/classic/TestExecRuntimeImpl.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/classic/TestExecRuntimeImpl.java index 2e53cd118..5b6cf3506 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/classic/TestExecRuntimeImpl.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/classic/TestExecRuntimeImpl.java @@ -43,6 +43,7 @@ import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.impl.io.HttpRequestExecutor; import org.apache.hc.core5.io.ShutdownType; import org.apache.hc.core5.util.TimeValue; +import org.apache.hc.core5.util.Timeout; import org.apache.logging.log4j.Logger; import org.junit.Assert; import org.junit.Before; @@ -88,7 +89,7 @@ public class TestExecRuntimeImpl { context.setRequestConfig(config); final HttpRoute route = new HttpRoute(new HttpHost("host", 80)); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -109,7 +110,7 @@ public class TestExecRuntimeImpl { public void testAcquireEndpointAlreadyAcquired() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -125,7 +126,7 @@ public class TestExecRuntimeImpl { public void testAcquireEndpointLeaseRequestTimeout() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenThrow(new TimeoutException()); @@ -136,7 +137,7 @@ public class TestExecRuntimeImpl { public void testAcquireEndpointLeaseRequestFailure() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenThrow(new ExecutionException(new IllegalStateException())); @@ -146,7 +147,7 @@ public class TestExecRuntimeImpl { @Test public void testAbortEndpoint() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -172,7 +173,7 @@ public class TestExecRuntimeImpl { public void testCancell() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -199,7 +200,7 @@ public class TestExecRuntimeImpl { public void testReleaseEndpointReusable() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -229,7 +230,7 @@ public class TestExecRuntimeImpl { public void testReleaseEndpointNonReusable() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -265,7 +266,7 @@ public class TestExecRuntimeImpl { .build(); context.setRequestConfig(config); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); @@ -285,7 +286,7 @@ public class TestExecRuntimeImpl { public void testDisonnectEndpoint() throws Exception { final HttpClientContext context = HttpClientContext.create(); - Mockito.when(mgr.lease(route, null)).thenReturn(leaseRequest); + Mockito.when(mgr.lease(Mockito.eq(route), Mockito.any(), Mockito.any())).thenReturn(leaseRequest); Mockito.when(leaseRequest.get( Mockito.anyLong(), Mockito.any())).thenReturn(connectionEndpoint); diff --git a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java index 9f9db80d0..0c46b0ef6 100644 --- a/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java +++ b/httpclient5/src/test/java/org/apache/hc/client5/http/impl/io/TestPoolingHttpClientConnectionManager.java @@ -87,7 +87,7 @@ public class TestPoolingHttpClientConnectionManager { public void setup() throws Exception { MockitoAnnotations.initMocks(this); mgr = new PoolingHttpClientConnectionManager( - pool, socketFactoryRegistry, schemePortResolver, dnsResolver); + new DefaultHttpClientConnectionOperator(socketFactoryRegistry, schemePortResolver, dnsResolver), pool, null); } @Test