Experimental connections pool implementation that acts as a caching facade in front of a standard ManagedConnPool and shares already leased connections to multiplex message exchanges over active HTTP/2 connections.

This commit is contained in:
Oleg Kalnichevski 2024-11-13 18:26:59 +01:00
parent 0b56a628c5
commit 27bb5a0821
15 changed files with 1159 additions and 57 deletions

View File

@ -213,4 +213,24 @@ class HttpIntegrationTests {
} }
@Nested
@DisplayName("HTTP message multiplexing (HTTP/2)")
class RequestMultiplexing extends TestHttpAsyncRequestMultiplexing {
public RequestMultiplexing() {
super(URIScheme.HTTP);
}
}
@Nested
@DisplayName("HTTP message multiplexing (HTTP/2, TLS)")
class RequestMultiplexingTls extends TestHttpAsyncRequestMultiplexing {
public RequestMultiplexingTls() {
super(URIScheme.HTTPS);
}
}
} }

View File

@ -0,0 +1,98 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.hamcrest.CoreMatchers;
import org.junit.jupiter.api.Test;
abstract class TestHttpAsyncRequestMultiplexing extends AbstractIntegrationTestBase {
public TestHttpAsyncRequestMultiplexing(final URIScheme uriScheme) {
super(uriScheme, ClientProtocolLevel.MINIMAL, ServerProtocolLevel.H2_ONLY);
}
@Test
void testConcurrentPostRequests() throws Exception {
configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
configureClient(custimizer -> custimizer
.setDefaultTlsConfig(TlsConfig.custom()
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
.build())
.useMessageMultiplexing()
);
final HttpHost target = startServer();
final TestAsyncClient client = startClient();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());
rnd.nextBytes(b1);
final int reqCount = 200;
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
for (int i = 0; i < reqCount; i++) {
final Future<Message<HttpResponse, byte[]>> future = client.execute(
new BasicRequestProducer(Method.POST, target, "/echo/",
AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
queue.add(future);
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, byte[]>> future = queue.remove();
final Message<HttpResponse, byte[]> responseMessage = future.get();
assertThat(responseMessage, CoreMatchers.notNullValue());
final HttpResponse response = responseMessage.getHead();
assertThat(response.getCode(), CoreMatchers.equalTo(200));
final byte[] b2 = responseMessage.getBody();
assertThat(b1, CoreMatchers.equalTo(b2));
}
}
}

View File

@ -26,6 +26,9 @@
*/ */
package org.apache.hc.client5.testing.compatibility.async; package org.apache.hc.client5.testing.compatibility.async;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hc.client5.http.ContextBuilder; import org.apache.hc.client5.http.ContextBuilder;
@ -38,10 +41,13 @@ import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.testing.Result;
import org.apache.hc.client5.testing.extension.async.HttpAsyncClientResource; import org.apache.hc.client5.testing.extension.async.HttpAsyncClientResource;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.RequestNotExecutedException;
import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.util.Timeout; import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Assertions;
@ -51,6 +57,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
public abstract class HttpAsyncClientCompatibilityTest { public abstract class HttpAsyncClientCompatibilityTest {
static final Timeout TIMEOUT = Timeout.ofSeconds(5); static final Timeout TIMEOUT = Timeout.ofSeconds(5);
static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(30);
private final HttpVersionPolicy versionPolicy; private final HttpVersionPolicy versionPolicy;
private final HttpHost target; private final HttpHost target;
@ -119,6 +126,54 @@ public abstract class HttpAsyncClientCompatibilityTest {
} }
} }
@Test
void test_concurrent_gets() throws Exception {
final CloseableHttpAsyncClient client = client();
final String[] requestUris = new String[] {"/111", "/222", "/333"};
final int n = 200;
final Queue<Result<Void>> queue = new ConcurrentLinkedQueue<>();
final CountDownLatch latch = new CountDownLatch(requestUris.length * n);
for (int i = 0; i < n; i++) {
for (final String requestUri: requestUris) {
final SimpleHttpRequest request = SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath(requestUri)
.build();
final HttpClientContext context = context();
client.execute(request, context, new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
queue.add(new Result<>(request, response, null));
latch.countDown();
}
@Override
public void failed(final Exception ex) {
queue.add(new Result<>(request, ex));
latch.countDown();
}
@Override
public void cancelled() {
queue.add(new Result<>(request, new RequestNotExecutedException()));
latch.countDown();
}
});
}
}
Assertions.assertTrue(latch.await(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit()));
Assertions.assertEquals(requestUris.length * n, queue.size());
for (final Result<Void> result : queue) {
if (result.isOK()) {
Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode());
}
}
}
@Test @Test
void test_auth_failure_wrong_auth_scope() throws Exception { void test_auth_failure_wrong_auth_scope() throws Exception {
addCredentials( addCredentials(

View File

@ -68,6 +68,11 @@ final class H2OnlyMinimalTestClientBuilder implements TestAsyncClientBuilder {
return this; return this;
} }
@Override
public TestAsyncClientBuilder useMessageMultiplexing() {
return this;
}
@Override @Override
public TestAsyncClient build() throws Exception { public TestAsyncClient build() throws Exception {
final CloseableHttpAsyncClient client = HttpAsyncClients.createHttp2Minimal( final CloseableHttpAsyncClient client = HttpAsyncClients.createHttp2Minimal(

View File

@ -97,6 +97,11 @@ final class H2OnlyTestClientBuilder implements TestAsyncClientBuilder {
return this; return this;
} }
@Override
public TestAsyncClientBuilder useMessageMultiplexing() {
return this;
}
@Override @Override
public TestAsyncClientBuilder setH2Config(final H2Config h2Config) { public TestAsyncClientBuilder setH2Config(final H2Config h2Config) {
this.h2Config = h2Config; this.h2Config = h2Config;

View File

@ -61,6 +61,7 @@ public class HttpAsyncClientResource implements AfterEachCallback {
.setDefaultTlsConfig(TlsConfig.custom() .setDefaultTlsConfig(TlsConfig.custom()
.setVersionPolicy(versionPolicy) .setVersionPolicy(versionPolicy)
.build()) .build())
.setMessageMultiplexing(true)
.build()); .build());
} catch (final CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) { } catch (final CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) {
throw new IllegalStateException(ex); throw new IllegalStateException(ex);

View File

@ -77,6 +77,12 @@ final class MinimalTestClientBuilder implements TestAsyncClientBuilder {
return this; return this;
} }
@Override
public TestAsyncClientBuilder useMessageMultiplexing() {
this.connectionManagerBuilder.setMessageMultiplexing(true);
return this;
}
@Override @Override
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) { public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
this.http1Config = http1Config; this.http1Config = http1Config;

View File

@ -111,6 +111,12 @@ final class StandardTestClientBuilder implements TestAsyncClientBuilder {
return this; return this;
} }
@Override
public TestAsyncClientBuilder useMessageMultiplexing() {
this.connectionManagerBuilder.setMessageMultiplexing(true);
return this;
}
@Override @Override
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) { public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
this.clientBuilder.setHttp1Config(http1Config); this.clientBuilder.setHttp1Config(http1Config);

View File

@ -73,6 +73,10 @@ public interface TestAsyncClientBuilder {
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
} }
default TestAsyncClientBuilder useMessageMultiplexing() {
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
}
default TestAsyncClientBuilder setHttp1Config(Http1Config http1Config) { default TestAsyncClientBuilder setHttp1Config(Http1Config http1Config) {
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel()); throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
} }

View File

@ -0,0 +1,322 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.nio;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.CompletedFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Experimental connections pool implementation that acts as a caching facade in front of
* a standard {@link ManagedConnPool} and shares already leased connections to multiplex
* message exchanges over active HTTP/2 connections.
* @param <T> route
* @param <C> connection object
*
* @since 5.5
*/
@Contract(threading = ThreadingBehavior.SAFE)
@Experimental
public class H2SharingConnPool<T, C extends HttpConnection> implements ManagedConnPool<T, C> {
private static final Logger LOG = LoggerFactory.getLogger(H2SharingConnPool.class);
private final ManagedConnPool<T, C> pool;
private final ConcurrentMap<T, PerRoutePool<T, C>> perRouteCache;
private final AtomicBoolean closed;
public H2SharingConnPool(final ManagedConnPool<T, C> pool) {
this.pool = Args.notNull(pool, "Connection pool");
this.perRouteCache = new ConcurrentHashMap<>();
this.closed = new AtomicBoolean();
}
@Override
public void close(final CloseMode closeMode) {
if (closed.compareAndSet(false, true)) {
perRouteCache.clear();
pool.close(closeMode);
}
}
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
perRouteCache.clear();
pool.close();
}
}
PerRoutePool<T, C> getPerRoutePool(final T route) {
return perRouteCache.computeIfAbsent(route, r -> new PerRoutePool<>());
}
@Override
public Future<PoolEntry<T, C>> lease(final T route,
final Object state,
final Timeout requestTimeout,
final FutureCallback<PoolEntry<T, C>> callback) {
Asserts.check(!closed.get(), "Connection pool shut down");
if (state == null) {
final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
if (perRoutePool != null) {
final PoolEntry<T, C> entry = perRoutePool.lease();
if (entry != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sharing connection {} for message exchange multiplexing (lease count = {})",
ConnPoolSupport.getId(entry.getConnection()), perRoutePool.getCount(entry));
}
final Future<PoolEntry<T, C>> future = new CompletedFuture<>(entry);
if (callback != null) {
callback.completed(entry);
}
return future;
}
}
}
LOG.debug("No shared connection available");
return pool.lease(route,
state,
requestTimeout,
new CallbackContribution<PoolEntry<T, C>>(callback) {
@Override
public void completed(final PoolEntry<T, C> entry) {
if (state == null) {
final C connection = entry.getConnection();
final ProtocolVersion ver = connection != null ? connection.getProtocolVersion() : null;
if (ver == HttpVersion.HTTP_2_0) {
final PerRoutePool<T, C> perRoutePool = getPerRoutePool(route);
final long count = perRoutePool.track(entry);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection {} can be shared for message exchange multiplexing (lease count = {})",
ConnPoolSupport.getId(entry.getConnection()), count);
}
}
}
if (callback != null) {
callback.completed(entry);
}
}
});
}
@Override
public void release(final PoolEntry<T, C> entry, final boolean reusable) {
if (entry == null) {
return;
}
if (closed.get()) {
pool.release(entry, reusable);
return;
}
final T route = entry.getRoute();
final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
if (perRoutePool != null) {
final long count = perRoutePool.release(entry, reusable);
if (count > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Connection {} is being shared for message exchange multiplexing (lease count = {})",
ConnPoolSupport.getId(entry.getConnection()), count);
}
return;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Releasing connection {} back to the pool", ConnPoolSupport.getId(entry.getConnection()));
}
pool.release(entry, reusable);
}
@Override
public void setMaxTotal(final int max) {
pool.setMaxTotal(max);
}
@Override
public int getMaxTotal() {
return pool.getMaxTotal();
}
@Override
public void setDefaultMaxPerRoute(final int max) {
pool.setDefaultMaxPerRoute(max);
}
@Override
public int getDefaultMaxPerRoute() {
return pool.getDefaultMaxPerRoute();
}
@Override
public void setMaxPerRoute(final T route, final int max) {
pool.setMaxPerRoute(route, max);
}
@Override
public int getMaxPerRoute(final T route) {
return pool.getMaxPerRoute(route);
}
@Override
public void closeIdle(final TimeValue idleTime) {
pool.closeIdle(idleTime);
}
@Override
public void closeExpired() {
pool.closeExpired();
}
@Override
public Set<T> getRoutes() {
return pool.getRoutes();
}
@Override
public PoolStats getTotalStats() {
return pool.getTotalStats();
}
@Override
public PoolStats getStats(final T route) {
return pool.getStats(route);
}
@Override
public String toString() {
return pool.toString();
}
static class PerRoutePool<T, C extends HttpConnection> {
private final Map<PoolEntry<T, C>, AtomicLong> entryMap;
private final Lock lock;
PerRoutePool() {
this.entryMap = new HashMap<>();
this.lock = new ReentrantLock();
}
AtomicLong getCounter(final PoolEntry<T, C> entry) {
return entryMap.computeIfAbsent(entry, e -> new AtomicLong());
}
long track(final PoolEntry<T, C> entry) {
lock.lock();
try {
final AtomicLong counter = getCounter(entry);
return counter.incrementAndGet();
} finally {
lock.unlock();
}
}
PoolEntry<T, C> lease() {
lock.lock();
try {
final PoolEntry<T, C> entry = entryMap.entrySet().stream()
.min(Comparator.comparingLong(e -> e.getValue().get()))
.map(Map.Entry::getKey)
.orElse(null);
if (entry == null) {
return null;
}
final AtomicLong counter = getCounter(entry);
counter.incrementAndGet();
return entry;
} finally {
lock.unlock();
}
}
long release(final PoolEntry<T, C> entry, final boolean reusable) {
lock.lock();
try {
final C connection = entry.getConnection();
if (!reusable || connection == null || !connection.isOpen()) {
entryMap.remove(entry);
return 0;
} else {
final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
if (c == null) {
return null;
}
final long count = c.decrementAndGet();
return count > 0 ? c : null;
});
return counter != null ? counter.get() : 0L;
}
} finally {
lock.unlock();
}
}
long getCount(final PoolEntry<T, C> entry) {
lock.lock();
try {
final AtomicLong counter = entryMap.get(entry);
return counter == null ? 0L : counter.get();
} finally {
lock.unlock();
}
}
}
}

View File

@ -160,7 +160,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final SchemePortResolver schemePortResolver, final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver) { final DnsResolver dnsResolver) {
this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver), this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
poolConcurrencyPolicy, poolReusePolicy, timeToLive); poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
} }
@Internal @Internal
@ -168,11 +168,13 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
final AsyncClientConnectionOperator connectionOperator, final AsyncClientConnectionOperator connectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy, final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive) { final TimeValue timeToLive,
final boolean messageMultiplexing) {
this.connectionOperator = Args.notNull(connectionOperator, "Connection operator"); this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) { switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
case STRICT: case STRICT:
this.pool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>( managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
DEFAULT_MAX_TOTAL_CONNECTIONS, DEFAULT_MAX_TOTAL_CONNECTIONS,
timeToLive, timeToLive,
@ -187,7 +189,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
}; };
break; break;
case LAX: case LAX:
this.pool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>( managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE, DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
timeToLive, timeToLive,
poolReusePolicy, poolReusePolicy,
@ -203,6 +205,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
default: default:
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy); throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
} }
this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
} }

View File

@ -35,6 +35,7 @@ import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy; import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
@ -87,6 +88,7 @@ public class PoolingAsyncClientConnectionManagerBuilder {
private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver; private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
private Resolver<HttpHost, TlsConfig> tlsConfigResolver; private Resolver<HttpHost, TlsConfig> tlsConfigResolver;
private boolean messageMultiplexing;
public static PoolingAsyncClientConnectionManagerBuilder create() { public static PoolingAsyncClientConnectionManagerBuilder create() {
return new PoolingAsyncClientConnectionManagerBuilder(); return new PoolingAsyncClientConnectionManagerBuilder();
@ -254,6 +256,24 @@ public class PoolingAsyncClientConnectionManagerBuilder {
return this; return this;
} }
/**
* Use experimental connections pool implementation that acts as a caching facade
* in front of a standard connection pool and shares already leased connections
* to multiplex message exchanges over active HTTP/2 connections.
*<p>
* Please note this flag has no effect on HTTP/1.1 and HTTP/1.0 connections.
*<p>
* This feature is considered experimenal
*
* @since 5.5
* @return this instance.
*/
@Experimental
public final PoolingAsyncClientConnectionManagerBuilder setMessageMultiplexing(final boolean messageMultiplexing) {
this.messageMultiplexing = messageMultiplexing;
return this;
}
@Internal @Internal
protected AsyncClientConnectionOperator createConnectionOperator( protected AsyncClientConnectionOperator createConnectionOperator(
final TlsStrategy tlsStrategy, final TlsStrategy tlsStrategy,
@ -290,7 +310,8 @@ public class PoolingAsyncClientConnectionManagerBuilder {
createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver), createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver),
poolConcurrencyPolicy, poolConcurrencyPolicy,
poolReusePolicy, poolReusePolicy,
null); null,
messageMultiplexing);
poolingmgr.setConnectionConfigResolver(connectionConfigResolver); poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
poolingmgr.setTlsConfigResolver(tlsConfigResolver); poolingmgr.setTlsConfigResolver(tlsConfigResolver);
if (maxConnTotal > 0) { if (maxConnTotal > 0) {

View File

@ -28,7 +28,6 @@ package org.apache.hc.client5.http.examples;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
@ -36,46 +35,85 @@ import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer; import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer; import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.config.Http1Config;
import org.apache.hc.core5.http.message.StatusLine; import org.apache.hc.core5.http.message.StatusLine;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.IOReactorConfig; import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
/** /**
* This example demonstrates concurrent (multiplexed) execution of multiple * Example of asynchronous HTTP/1.1 request execution with message exchange multiplexing
* HTTP/2 message exchanges. * over HTTP/2 connections.
*/ */
public class AsyncClientH2Multiplexing { public class AsyncClientH2Multiplexing {
public static void main(final String[] args) throws Exception { public static void main(final String[] args) throws Exception {
final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal( final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
H2Config.DEFAULT, .setSoTimeout(Timeout.ofSeconds(5))
Http1Config.DEFAULT, .build();
IOReactorConfig.DEFAULT,
PoolingAsyncClientConnectionManagerBuilder.create() final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setDefaultTlsConfig(TlsConfig.custom() .setDefaultTlsConfig(TlsConfig.custom()
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
.build()) .build())
.build()); .setMessageMultiplexing(true)
.build();
final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
.setConnectionManager(connectionManager)
.setIOReactorConfig(ioReactorConfig)
.build();
client.start(); client.start();
final HttpHost target = new HttpHost("https", "nghttp2.org"); final HttpHost target = new HttpHost("https", "nghttp2.org");
final Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
final AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS);
try {
final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
final CountDownLatch latch = new CountDownLatch(requestUris.length); final SimpleHttpRequest warmup = SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath("/httpbin")
.build();
// Make sure there is an open HTTP/2 connection in the pool
System.out.println("Executing warm-up request " + warmup);
final Future<SimpleHttpResponse> future = client.execute(
SimpleRequestProducer.create(warmup),
SimpleResponseConsumer.create(),
new FutureCallback<SimpleHttpResponse>() {
@Override
public void completed(final SimpleHttpResponse response) {
System.out.println(warmup + "->" + new StatusLine(response));
System.out.println(response.getBody());
}
@Override
public void failed(final Exception ex) {
System.out.println(warmup + "->" + ex);
}
@Override
public void cancelled() {
System.out.println(warmup + " cancelled");
}
});
future.get();
Thread.sleep(1000);
System.out.println("Connection pool stats: " + connectionManager.getTotalStats());
// Execute multiple requests over the HTTP/2 connection from the pool
final String[] requestUris = new String[]{"/httpbin", "/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
final CountDownLatch countDownLatch = new CountDownLatch(requestUris.length);
for (final String requestUri : requestUris) { for (final String requestUri : requestUris) {
final SimpleHttpRequest request = SimpleRequestBuilder.get() final SimpleHttpRequest request = SimpleRequestBuilder.get()
.setHttpHost(target) .setHttpHost(target)
@ -83,36 +121,37 @@ public class AsyncClientH2Multiplexing {
.build(); .build();
System.out.println("Executing request " + request); System.out.println("Executing request " + request);
endpoint.execute( client.execute(
SimpleRequestProducer.create(request), SimpleRequestProducer.create(request),
SimpleResponseConsumer.create(), SimpleResponseConsumer.create(),
new FutureCallback<SimpleHttpResponse>() { new FutureCallback<SimpleHttpResponse>() {
@Override @Override
public void completed(final SimpleHttpResponse response) { public void completed(final SimpleHttpResponse response) {
latch.countDown(); countDownLatch.countDown();
System.out.println(request + "->" + new StatusLine(response)); System.out.println(request + "->" + new StatusLine(response));
System.out.println(response.getBody()); System.out.println(response.getBody());
} }
@Override @Override
public void failed(final Exception ex) { public void failed(final Exception ex) {
latch.countDown(); countDownLatch.countDown();
System.out.println(request + "->" + ex); System.out.println(request + "->" + ex);
} }
@Override @Override
public void cancelled() { public void cancelled() {
latch.countDown(); countDownLatch.countDown();
System.out.println(request + " cancelled"); System.out.println(request + " cancelled");
} }
}); });
} }
latch.await();
} finally { countDownLatch.await();
endpoint.releaseAndReuse();
} // There still should be a single connection in the pool
System.out.println("Connection pool stats: " + connectionManager.getTotalStats());
System.out.println("Shutting down"); System.out.println("Shutting down");
client.close(CloseMode.GRACEFUL); client.close(CloseMode.GRACEFUL);

View File

@ -0,0 +1,387 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.nio;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.ManagedConnPool;
import org.apache.hc.core5.pool.PoolEntry;
import org.apache.hc.core5.util.Timeout;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
public class H2SharingConnPoolTest {
static final String DEFAULT_ROUTE = "DEFAULT_ROUTE";
@Mock
ManagedConnPool<String, HttpConnection> connPool;
@Mock
FutureCallback<PoolEntry<String, HttpConnection>> callback;
@Mock
HttpConnection connection;
H2SharingConnPool<String, HttpConnection> h2SharingPool;
@BeforeEach
void setup() {
MockitoAnnotations.openMocks(this);
h2SharingPool = new H2SharingConnPool<>(connPool);
}
@Test
void testLeaseFutureReturned() throws Exception {
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenReturn(new BasicFuture<>(null));
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
Assertions.assertNotNull(result);
Assertions.assertFalse(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq(null),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback, Mockito.never()).completed(
Mockito.any());
}
@Test
void testLeaseExistingConnectionReturned() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
final Future<PoolEntry<String, HttpConnection>> future = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
Assertions.assertNotNull(future);
Assertions.assertSame(poolEntry, future.get());
Mockito.verify(connPool, Mockito.never()).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.same(poolEntry));
}
@Test
void testLeaseWithStateCacheBypassed() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenReturn(new BasicFuture<>(null));
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback);
Assertions.assertNotNull(result);
Assertions.assertFalse(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq("stuff"),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback, Mockito.never()).completed(
Mockito.any());
}
@Test
void testLeaseNewConnectionReturnedAndCached() throws Exception {
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenAnswer(invocationOnMock -> {
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
futureRef.set(future);
return future;
});
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
Assertions.assertNotNull(future);
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2);
future.completed(poolEntry);
Assertions.assertTrue(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq(null),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.any());
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
Assertions.assertEquals(1, routePool.getCount(poolEntry));
}
@Test
void testLeaseNewConnectionReturnedAndNotCached() throws Exception {
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenAnswer(invocationOnMock -> {
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
futureRef.set(future);
return future;
});
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
Assertions.assertNotNull(future);
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_1_1);
future.completed(poolEntry);
Assertions.assertTrue(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq(null),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.any());
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
Assertions.assertEquals(0, routePool.getCount(poolEntry));
}
@Test
void testLeaseNoConnection() throws Exception {
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenAnswer(invocationOnMock -> {
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
futureRef.set(future);
return future;
});
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
Assertions.assertNotNull(future);
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.discardConnection(CloseMode.IMMEDIATE);
future.completed(poolEntry);
Assertions.assertTrue(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq(null),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.any());
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
Assertions.assertEquals(0, routePool.getCount(poolEntry));
}
@Test
void testLeaseWithStateNewConnectionReturnedAndNotCached() throws Exception {
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
Mockito.when(connPool.lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.any(),
Mockito.any(),
Mockito.any())).thenAnswer(invocationOnMock -> {
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
futureRef.set(future);
return future;
});
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback);
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
Assertions.assertNotNull(future);
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2);
future.completed(poolEntry);
Assertions.assertTrue(result.isDone());
Mockito.verify(connPool).lease(
Mockito.eq(DEFAULT_ROUTE),
Mockito.eq("stuff"),
Mockito.eq(Timeout.ONE_MILLISECOND),
Mockito.any());
Mockito.verify(callback).completed(
Mockito.any());
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
Assertions.assertEquals(0, routePool.getCount(poolEntry));
}
@Test
void testReleaseReusableNoCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(true);
h2SharingPool.release(poolEntry, true);
Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(true));
}
@Test
void testReleaseReusableNotInCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(true);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
h2SharingPool.release(poolEntry, true);
Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(true));
}
@Test
void testReleaseReusableInCacheNotReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(true);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
routePool.track(poolEntry);
h2SharingPool.release(poolEntry, true);
Mockito.verify(connPool, Mockito.never()).release(
Mockito.same(poolEntry),
Mockito.anyBoolean());
}
@Test
void testReleaseNonReusableInCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(true);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
routePool.track(poolEntry);
h2SharingPool.release(poolEntry, false);
Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(false));
}
@Test
void testReleaseReusableAndClosedInCacheReturnedToPool() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(false);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
routePool.track(poolEntry);
h2SharingPool.release(poolEntry, true);
Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(true));
}
@Test
void testClose() throws Exception {
h2SharingPool.close();
Mockito.verify(connPool).close();
}
@Test
void testCloseMode() throws Exception {
h2SharingPool.close(CloseMode.IMMEDIATE);
Mockito.verify(connPool).close(CloseMode.IMMEDIATE);
}
@Test
void testLeasePoolClosed() throws Exception {
h2SharingPool.close();
Assertions.assertThrows(IllegalStateException.class, () -> h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback));
}
@Test
void testReleasePoolClosed() throws Exception {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
poolEntry.assignConnection(connection);
Mockito.when(connection.isOpen()).thenReturn(false);
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
routePool.track(poolEntry);
h2SharingPool.close();
h2SharingPool.release(poolEntry, true);
Mockito.verify(connPool).release(
Mockito.same(poolEntry),
Mockito.eq(true));
}
}

View File

@ -0,0 +1,130 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.nio;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.PoolEntry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
public class H2SharingPerRoutePoolTest {
static PoolEntry<String, HttpConnection> createMockEntry() {
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>("some route");
final HttpConnection conn = Mockito.mock(HttpConnection.class);
Mockito.when(conn.isOpen()).thenReturn(true);
poolEntry.assignConnection(conn);
return poolEntry;
}
H2SharingConnPool.PerRoutePool<String, HttpConnection> pool;
PoolEntry<String, HttpConnection> poolEntry1;
PoolEntry<String, HttpConnection> poolEntry2;
@BeforeEach
void setup() {
pool = new H2SharingConnPool.PerRoutePool<>();
poolEntry1 = createMockEntry();
poolEntry2 = createMockEntry();
}
@Test
void testKeep() {
Assertions.assertEquals(1, pool.track(poolEntry1));
Assertions.assertEquals(2, pool.track(poolEntry1));
Assertions.assertEquals(1, pool.track(poolEntry2));
Assertions.assertEquals(3, pool.track(poolEntry1));
}
@Test
void testLeaseLeastUsed() {
pool.track(poolEntry1);
pool.track(poolEntry1);
pool.track(poolEntry2);
Assertions.assertSame(poolEntry2, pool.lease());
Assertions.assertEquals(2, pool.getCount(poolEntry2));
final PoolEntry<String, HttpConnection> poolEntry = pool.lease();
Assertions.assertEquals(3, pool.getCount(poolEntry));
}
@Test
void testLeaseEmptyPool() {
Assertions.assertNull(pool.lease());
}
@Test
void testReleaseReusable() {
pool.track(poolEntry1);
pool.track(poolEntry1);
pool.track(poolEntry1);
Assertions.assertEquals(2, pool.release(poolEntry1, true));
Assertions.assertEquals(1, pool.release(poolEntry1, true));
Assertions.assertEquals(0, pool.release(poolEntry1, true));
Assertions.assertEquals(0, pool.release(poolEntry1, true));
}
@Test
void testReleaseNonReusable() {
pool.track(poolEntry1);
pool.track(poolEntry1);
pool.track(poolEntry1);
Assertions.assertEquals(0, pool.release(poolEntry1, false));
}
@Test
void testReleaseNonPresent() {
Assertions.assertEquals(0, pool.release(poolEntry1, true));
Assertions.assertEquals(0, pool.release(poolEntry2, true));
}
@Test
void testReleaseConnectionClosed() {
pool.track(poolEntry1);
pool.track(poolEntry1);
pool.track(poolEntry1);
Mockito.when(poolEntry1.getConnection().isOpen()).thenReturn(false);
Assertions.assertEquals(0, pool.release(poolEntry1, true));
}
@Test
void testReleaseConnectionMissing() {
pool.track(poolEntry1);
pool.track(poolEntry1);
pool.track(poolEntry1);
poolEntry1.discardConnection(CloseMode.IMMEDIATE);
Assertions.assertEquals(0, pool.release(poolEntry1, true));
}
}