Experimental connections pool implementation that acts as a caching facade in front of a standard ManagedConnPool and shares already leased connections to multiplex message exchanges over active HTTP/2 connections.
This commit is contained in:
parent
90da166323
commit
55cdd9e94f
|
@ -213,4 +213,24 @@ class HttpIntegrationTests {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("HTTP message multiplexing (HTTP/2)")
|
||||||
|
class RequestMultiplexing extends TestHttpAsyncRequestMultiplexing {
|
||||||
|
|
||||||
|
public RequestMultiplexing() {
|
||||||
|
super(URIScheme.HTTP);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Nested
|
||||||
|
@DisplayName("HTTP message multiplexing (HTTP/2, TLS)")
|
||||||
|
class RequestMultiplexingTls extends TestHttpAsyncRequestMultiplexing {
|
||||||
|
|
||||||
|
public RequestMultiplexingTls() {
|
||||||
|
super(URIScheme.HTTPS);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
/*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*
|
||||||
|
* This software consists of voluntary contributions made by many
|
||||||
|
* individuals on behalf of the Apache Software Foundation. For more
|
||||||
|
* information on the Apache Software Foundation, please see
|
||||||
|
* <http://www.apache.org/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hc.client5.testing.async;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.hc.client5.http.config.TlsConfig;
|
||||||
|
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
||||||
|
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
|
||||||
|
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
|
||||||
|
import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
|
||||||
|
import org.apache.hc.core5.http.ContentType;
|
||||||
|
import org.apache.hc.core5.http.HttpHost;
|
||||||
|
import org.apache.hc.core5.http.HttpResponse;
|
||||||
|
import org.apache.hc.core5.http.Message;
|
||||||
|
import org.apache.hc.core5.http.Method;
|
||||||
|
import org.apache.hc.core5.http.URIScheme;
|
||||||
|
import org.apache.hc.core5.http.nio.entity.AsyncEntityProducers;
|
||||||
|
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
|
||||||
|
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
|
||||||
|
import org.apache.hc.core5.http.nio.support.BasicResponseConsumer;
|
||||||
|
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
||||||
|
import org.hamcrest.CoreMatchers;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
abstract class TestHttpAsyncRequestMultiplexing extends AbstractIntegrationTestBase {
|
||||||
|
|
||||||
|
public TestHttpAsyncRequestMultiplexing(final URIScheme uriScheme) {
|
||||||
|
super(uriScheme, ClientProtocolLevel.MINIMAL, ServerProtocolLevel.H2_ONLY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testConcurrentPostRequests() throws Exception {
|
||||||
|
configureServer(bootstrap -> bootstrap.register("/echo/*", AsyncEchoHandler::new));
|
||||||
|
configureClient(custimizer -> custimizer
|
||||||
|
.setDefaultTlsConfig(TlsConfig.custom()
|
||||||
|
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
|
||||||
|
.build())
|
||||||
|
.useMessageMultiplexing()
|
||||||
|
);
|
||||||
|
final HttpHost target = startServer();
|
||||||
|
final TestAsyncClient client = startClient();
|
||||||
|
final byte[] b1 = new byte[1024];
|
||||||
|
final Random rnd = new Random(System.currentTimeMillis());
|
||||||
|
rnd.nextBytes(b1);
|
||||||
|
|
||||||
|
final int reqCount = 200;
|
||||||
|
|
||||||
|
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
|
||||||
|
for (int i = 0; i < reqCount; i++) {
|
||||||
|
final Future<Message<HttpResponse, byte[]>> future = client.execute(
|
||||||
|
new BasicRequestProducer(Method.POST, target, "/echo/",
|
||||||
|
AsyncEntityProducers.create(b1, ContentType.APPLICATION_OCTET_STREAM)),
|
||||||
|
new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
|
||||||
|
queue.add(future);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!queue.isEmpty()) {
|
||||||
|
final Future<Message<HttpResponse, byte[]>> future = queue.remove();
|
||||||
|
final Message<HttpResponse, byte[]> responseMessage = future.get();
|
||||||
|
assertThat(responseMessage, CoreMatchers.notNullValue());
|
||||||
|
final HttpResponse response = responseMessage.getHead();
|
||||||
|
assertThat(response.getCode(), CoreMatchers.equalTo(200));
|
||||||
|
final byte[] b2 = responseMessage.getBody();
|
||||||
|
assertThat(b1, CoreMatchers.equalTo(b2));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -26,6 +26,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hc.client5.testing.compatibility.async;
|
package org.apache.hc.client5.testing.compatibility.async;
|
||||||
|
|
||||||
|
import java.util.Queue;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.hc.client5.http.ContextBuilder;
|
import org.apache.hc.client5.http.ContextBuilder;
|
||||||
|
@ -38,10 +41,13 @@ import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
|
||||||
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
|
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
|
||||||
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
|
||||||
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
import org.apache.hc.client5.http.protocol.HttpClientContext;
|
||||||
|
import org.apache.hc.client5.testing.Result;
|
||||||
import org.apache.hc.client5.testing.extension.async.HttpAsyncClientResource;
|
import org.apache.hc.client5.testing.extension.async.HttpAsyncClientResource;
|
||||||
|
import org.apache.hc.core5.concurrent.FutureCallback;
|
||||||
import org.apache.hc.core5.http.HttpHost;
|
import org.apache.hc.core5.http.HttpHost;
|
||||||
import org.apache.hc.core5.http.HttpStatus;
|
import org.apache.hc.core5.http.HttpStatus;
|
||||||
import org.apache.hc.core5.http.HttpVersion;
|
import org.apache.hc.core5.http.HttpVersion;
|
||||||
|
import org.apache.hc.core5.http.RequestNotExecutedException;
|
||||||
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
||||||
import org.apache.hc.core5.util.Timeout;
|
import org.apache.hc.core5.util.Timeout;
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
@ -51,6 +57,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
|
||||||
public abstract class HttpAsyncClientCompatibilityTest {
|
public abstract class HttpAsyncClientCompatibilityTest {
|
||||||
|
|
||||||
static final Timeout TIMEOUT = Timeout.ofSeconds(5);
|
static final Timeout TIMEOUT = Timeout.ofSeconds(5);
|
||||||
|
static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(30);
|
||||||
|
|
||||||
private final HttpVersionPolicy versionPolicy;
|
private final HttpVersionPolicy versionPolicy;
|
||||||
private final HttpHost target;
|
private final HttpHost target;
|
||||||
|
@ -119,6 +126,54 @@ public abstract class HttpAsyncClientCompatibilityTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void test_concurrent_gets() throws Exception {
|
||||||
|
final CloseableHttpAsyncClient client = client();
|
||||||
|
|
||||||
|
final String[] requestUris = new String[] {"/111", "/222", "/333"};
|
||||||
|
final int n = 200;
|
||||||
|
final Queue<Result<Void>> queue = new ConcurrentLinkedQueue<>();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(requestUris.length * n);
|
||||||
|
|
||||||
|
for (int i = 0; i < n; i++) {
|
||||||
|
for (final String requestUri: requestUris) {
|
||||||
|
final SimpleHttpRequest request = SimpleRequestBuilder.get()
|
||||||
|
.setHttpHost(target)
|
||||||
|
.setPath(requestUri)
|
||||||
|
.build();
|
||||||
|
final HttpClientContext context = context();
|
||||||
|
client.execute(request, context, new FutureCallback<SimpleHttpResponse>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completed(final SimpleHttpResponse response) {
|
||||||
|
queue.add(new Result<>(request, response, null));
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(final Exception ex) {
|
||||||
|
queue.add(new Result<>(request, ex));
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancelled() {
|
||||||
|
queue.add(new Result<>(request, new RequestNotExecutedException()));
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assertions.assertTrue(latch.await(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit()));
|
||||||
|
Assertions.assertEquals(requestUris.length * n, queue.size());
|
||||||
|
for (final Result<Void> result : queue) {
|
||||||
|
if (result.isOK()) {
|
||||||
|
Assertions.assertEquals(HttpStatus.SC_OK, result.response.getCode());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void test_auth_failure_wrong_auth_scope() throws Exception {
|
void test_auth_failure_wrong_auth_scope() throws Exception {
|
||||||
addCredentials(
|
addCredentials(
|
||||||
|
|
|
@ -68,6 +68,11 @@ final class H2OnlyMinimalTestClientBuilder implements TestAsyncClientBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestAsyncClientBuilder useMessageMultiplexing() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestAsyncClient build() throws Exception {
|
public TestAsyncClient build() throws Exception {
|
||||||
final CloseableHttpAsyncClient client = HttpAsyncClients.createHttp2Minimal(
|
final CloseableHttpAsyncClient client = HttpAsyncClients.createHttp2Minimal(
|
||||||
|
|
|
@ -97,6 +97,11 @@ final class H2OnlyTestClientBuilder implements TestAsyncClientBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestAsyncClientBuilder useMessageMultiplexing() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestAsyncClientBuilder setH2Config(final H2Config h2Config) {
|
public TestAsyncClientBuilder setH2Config(final H2Config h2Config) {
|
||||||
this.h2Config = h2Config;
|
this.h2Config = h2Config;
|
||||||
|
|
|
@ -61,6 +61,7 @@ public class HttpAsyncClientResource implements AfterEachCallback {
|
||||||
.setDefaultTlsConfig(TlsConfig.custom()
|
.setDefaultTlsConfig(TlsConfig.custom()
|
||||||
.setVersionPolicy(versionPolicy)
|
.setVersionPolicy(versionPolicy)
|
||||||
.build())
|
.build())
|
||||||
|
.setMessageMultiplexing(true)
|
||||||
.build());
|
.build());
|
||||||
} catch (final CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) {
|
} catch (final CertificateException | NoSuchAlgorithmException | KeyStoreException | KeyManagementException ex) {
|
||||||
throw new IllegalStateException(ex);
|
throw new IllegalStateException(ex);
|
||||||
|
|
|
@ -77,6 +77,12 @@ final class MinimalTestClientBuilder implements TestAsyncClientBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestAsyncClientBuilder useMessageMultiplexing() {
|
||||||
|
this.connectionManagerBuilder.setMessageMultiplexing(true);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
|
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
|
||||||
this.http1Config = http1Config;
|
this.http1Config = http1Config;
|
||||||
|
|
|
@ -111,6 +111,12 @@ final class StandardTestClientBuilder implements TestAsyncClientBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TestAsyncClientBuilder useMessageMultiplexing() {
|
||||||
|
this.connectionManagerBuilder.setMessageMultiplexing(true);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
|
public TestAsyncClientBuilder setHttp1Config(final Http1Config http1Config) {
|
||||||
this.clientBuilder.setHttp1Config(http1Config);
|
this.clientBuilder.setHttp1Config(http1Config);
|
||||||
|
|
|
@ -73,6 +73,10 @@ public interface TestAsyncClientBuilder {
|
||||||
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
|
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default TestAsyncClientBuilder useMessageMultiplexing() {
|
||||||
|
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
|
||||||
|
}
|
||||||
|
|
||||||
default TestAsyncClientBuilder setHttp1Config(Http1Config http1Config) {
|
default TestAsyncClientBuilder setHttp1Config(Http1Config http1Config) {
|
||||||
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
|
throw new UnsupportedOperationException("Operation not supported by " + getProtocolLevel());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,322 @@
|
||||||
|
/*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*
|
||||||
|
* This software consists of voluntary contributions made by many
|
||||||
|
* individuals on behalf of the Apache Software Foundation. For more
|
||||||
|
* information on the Apache Software Foundation, please see
|
||||||
|
* <http://www.apache.org/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hc.client5.http.impl.nio;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
import org.apache.hc.client5.http.impl.ConnPoolSupport;
|
||||||
|
import org.apache.hc.core5.annotation.Contract;
|
||||||
|
import org.apache.hc.core5.annotation.Experimental;
|
||||||
|
import org.apache.hc.core5.annotation.ThreadingBehavior;
|
||||||
|
import org.apache.hc.core5.concurrent.CallbackContribution;
|
||||||
|
import org.apache.hc.core5.concurrent.CompletedFuture;
|
||||||
|
import org.apache.hc.core5.concurrent.FutureCallback;
|
||||||
|
import org.apache.hc.core5.http.HttpConnection;
|
||||||
|
import org.apache.hc.core5.http.HttpVersion;
|
||||||
|
import org.apache.hc.core5.http.ProtocolVersion;
|
||||||
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
|
import org.apache.hc.core5.pool.ManagedConnPool;
|
||||||
|
import org.apache.hc.core5.pool.PoolEntry;
|
||||||
|
import org.apache.hc.core5.pool.PoolStats;
|
||||||
|
import org.apache.hc.core5.util.Args;
|
||||||
|
import org.apache.hc.core5.util.Asserts;
|
||||||
|
import org.apache.hc.core5.util.TimeValue;
|
||||||
|
import org.apache.hc.core5.util.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Experimental connections pool implementation that acts as a caching facade in front of
|
||||||
|
* a standard {@link ManagedConnPool} and shares already leased connections to multiplex
|
||||||
|
* message exchanges over active HTTP/2 connections.
|
||||||
|
* @param <T> route
|
||||||
|
* @param <C> connection object
|
||||||
|
*
|
||||||
|
* @since 5.5
|
||||||
|
*/
|
||||||
|
@Contract(threading = ThreadingBehavior.SAFE)
|
||||||
|
@Experimental
|
||||||
|
public class H2SharingConnPool<T, C extends HttpConnection> implements ManagedConnPool<T, C> {
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(H2SharingConnPool.class);
|
||||||
|
|
||||||
|
private final ManagedConnPool<T, C> pool;
|
||||||
|
private final ConcurrentMap<T, PerRoutePool<T, C>> perRouteCache;
|
||||||
|
private final AtomicBoolean closed;
|
||||||
|
|
||||||
|
public H2SharingConnPool(final ManagedConnPool<T, C> pool) {
|
||||||
|
this.pool = Args.notNull(pool, "Connection pool");
|
||||||
|
this.perRouteCache = new ConcurrentHashMap<>();
|
||||||
|
this.closed = new AtomicBoolean();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close(final CloseMode closeMode) {
|
||||||
|
if (closed.compareAndSet(false, true)) {
|
||||||
|
perRouteCache.clear();
|
||||||
|
pool.close(closeMode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (closed.compareAndSet(false, true)) {
|
||||||
|
perRouteCache.clear();
|
||||||
|
pool.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PerRoutePool<T, C> getPerRoutePool(final T route) {
|
||||||
|
return perRouteCache.computeIfAbsent(route, r -> new PerRoutePool<>());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<PoolEntry<T, C>> lease(final T route,
|
||||||
|
final Object state,
|
||||||
|
final Timeout requestTimeout,
|
||||||
|
final FutureCallback<PoolEntry<T, C>> callback) {
|
||||||
|
Asserts.check(!closed.get(), "Connection pool shut down");
|
||||||
|
if (state == null) {
|
||||||
|
final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
|
||||||
|
if (perRoutePool != null) {
|
||||||
|
final PoolEntry<T, C> entry = perRoutePool.lease();
|
||||||
|
if (entry != null) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Sharing connection {} for message exchange multiplexing (lease count = {})",
|
||||||
|
ConnPoolSupport.getId(entry.getConnection()), perRoutePool.getCount(entry));
|
||||||
|
}
|
||||||
|
final Future<PoolEntry<T, C>> future = new CompletedFuture<>(entry);
|
||||||
|
if (callback != null) {
|
||||||
|
callback.completed(entry);
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.debug("No shared connection available");
|
||||||
|
return pool.lease(route,
|
||||||
|
state,
|
||||||
|
requestTimeout,
|
||||||
|
new CallbackContribution<PoolEntry<T, C>>(callback) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completed(final PoolEntry<T, C> entry) {
|
||||||
|
if (state == null) {
|
||||||
|
final C connection = entry.getConnection();
|
||||||
|
final ProtocolVersion ver = connection != null ? connection.getProtocolVersion() : null;
|
||||||
|
if (ver == HttpVersion.HTTP_2_0) {
|
||||||
|
final PerRoutePool<T, C> perRoutePool = getPerRoutePool(route);
|
||||||
|
final long count = perRoutePool.track(entry);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connection {} can be shared for message exchange multiplexing (lease count = {})",
|
||||||
|
ConnPoolSupport.getId(entry.getConnection()), count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (callback != null) {
|
||||||
|
callback.completed(entry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void release(final PoolEntry<T, C> entry, final boolean reusable) {
|
||||||
|
if (entry == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (closed.get()) {
|
||||||
|
pool.release(entry, reusable);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final T route = entry.getRoute();
|
||||||
|
final PerRoutePool<T, C> perRoutePool = perRouteCache.get(route);
|
||||||
|
if (perRoutePool != null) {
|
||||||
|
final long count = perRoutePool.release(entry, reusable);
|
||||||
|
if (count > 0) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Connection {} is being shared for message exchange multiplexing (lease count = {})",
|
||||||
|
ConnPoolSupport.getId(entry.getConnection()), count);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Releasing connection {} back to the pool", ConnPoolSupport.getId(entry.getConnection()));
|
||||||
|
}
|
||||||
|
pool.release(entry, reusable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxTotal(final int max) {
|
||||||
|
pool.setMaxTotal(max);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxTotal() {
|
||||||
|
return pool.getMaxTotal();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setDefaultMaxPerRoute(final int max) {
|
||||||
|
pool.setDefaultMaxPerRoute(max);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getDefaultMaxPerRoute() {
|
||||||
|
return pool.getDefaultMaxPerRoute();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setMaxPerRoute(final T route, final int max) {
|
||||||
|
pool.setMaxPerRoute(route, max);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMaxPerRoute(final T route) {
|
||||||
|
return pool.getMaxPerRoute(route);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closeIdle(final TimeValue idleTime) {
|
||||||
|
pool.closeIdle(idleTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void closeExpired() {
|
||||||
|
pool.closeExpired();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<T> getRoutes() {
|
||||||
|
return pool.getRoutes();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PoolStats getTotalStats() {
|
||||||
|
return pool.getTotalStats();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PoolStats getStats(final T route) {
|
||||||
|
return pool.getStats(route);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return pool.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
static class PerRoutePool<T, C extends HttpConnection> {
|
||||||
|
|
||||||
|
private final Map<PoolEntry<T, C>, AtomicLong> entryMap;
|
||||||
|
private final Lock lock;
|
||||||
|
|
||||||
|
PerRoutePool() {
|
||||||
|
this.entryMap = new HashMap<>();
|
||||||
|
this.lock = new ReentrantLock();
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicLong getCounter(final PoolEntry<T, C> entry) {
|
||||||
|
return entryMap.computeIfAbsent(entry, e -> new AtomicLong());
|
||||||
|
}
|
||||||
|
|
||||||
|
long track(final PoolEntry<T, C> entry) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
final AtomicLong counter = getCounter(entry);
|
||||||
|
return counter.incrementAndGet();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PoolEntry<T, C> lease() {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
final PoolEntry<T, C> entry = entryMap.entrySet().stream()
|
||||||
|
.min(Comparator.comparingLong(e -> e.getValue().get()))
|
||||||
|
.map(Map.Entry::getKey)
|
||||||
|
.orElse(null);
|
||||||
|
if (entry == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final AtomicLong counter = getCounter(entry);
|
||||||
|
counter.incrementAndGet();
|
||||||
|
return entry;
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long release(final PoolEntry<T, C> entry, final boolean reusable) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
final C connection = entry.getConnection();
|
||||||
|
if (!reusable || connection == null || !connection.isOpen()) {
|
||||||
|
entryMap.remove(entry);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
final AtomicLong counter = entryMap.compute(entry, (e, c) -> {
|
||||||
|
if (c == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
final long count = c.decrementAndGet();
|
||||||
|
return count > 0 ? c : null;
|
||||||
|
});
|
||||||
|
return counter != null ? counter.get() : 0L;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long getCount(final PoolEntry<T, C> entry) {
|
||||||
|
lock.lock();
|
||||||
|
try {
|
||||||
|
final AtomicLong counter = entryMap.get(entry);
|
||||||
|
return counter == null ? 0L : counter.get();
|
||||||
|
} finally {
|
||||||
|
lock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -160,7 +160,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
|
||||||
final SchemePortResolver schemePortResolver,
|
final SchemePortResolver schemePortResolver,
|
||||||
final DnsResolver dnsResolver) {
|
final DnsResolver dnsResolver) {
|
||||||
this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
|
this(new DefaultAsyncClientConnectionOperator(tlsStrategyLookup, schemePortResolver, dnsResolver),
|
||||||
poolConcurrencyPolicy, poolReusePolicy, timeToLive);
|
poolConcurrencyPolicy, poolReusePolicy, timeToLive, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Internal
|
@Internal
|
||||||
|
@ -168,11 +168,13 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
|
||||||
final AsyncClientConnectionOperator connectionOperator,
|
final AsyncClientConnectionOperator connectionOperator,
|
||||||
final PoolConcurrencyPolicy poolConcurrencyPolicy,
|
final PoolConcurrencyPolicy poolConcurrencyPolicy,
|
||||||
final PoolReusePolicy poolReusePolicy,
|
final PoolReusePolicy poolReusePolicy,
|
||||||
final TimeValue timeToLive) {
|
final TimeValue timeToLive,
|
||||||
|
final boolean messageMultiplexing) {
|
||||||
this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
|
this.connectionOperator = Args.notNull(connectionOperator, "Connection operator");
|
||||||
|
final ManagedConnPool<HttpRoute, ManagedAsyncClientConnection> managedConnPool;
|
||||||
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
|
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
|
||||||
case STRICT:
|
case STRICT:
|
||||||
this.pool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
|
managedConnPool = new StrictConnPool<HttpRoute, ManagedAsyncClientConnection>(
|
||||||
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
|
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
|
||||||
DEFAULT_MAX_TOTAL_CONNECTIONS,
|
DEFAULT_MAX_TOTAL_CONNECTIONS,
|
||||||
timeToLive,
|
timeToLive,
|
||||||
|
@ -187,7 +189,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
|
||||||
};
|
};
|
||||||
break;
|
break;
|
||||||
case LAX:
|
case LAX:
|
||||||
this.pool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
|
managedConnPool = new LaxConnPool<HttpRoute, ManagedAsyncClientConnection>(
|
||||||
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
|
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
|
||||||
timeToLive,
|
timeToLive,
|
||||||
poolReusePolicy,
|
poolReusePolicy,
|
||||||
|
@ -203,6 +205,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
|
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
|
||||||
}
|
}
|
||||||
|
this.pool = messageMultiplexing ? new H2SharingConnPool<>(managedConnPool) : managedConnPool;
|
||||||
this.closed = new AtomicBoolean(false);
|
this.closed = new AtomicBoolean(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hc.client5.http.config.TlsConfig;
|
||||||
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
|
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
|
||||||
import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy;
|
import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy;
|
||||||
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
|
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
|
||||||
|
import org.apache.hc.core5.annotation.Experimental;
|
||||||
import org.apache.hc.core5.annotation.Internal;
|
import org.apache.hc.core5.annotation.Internal;
|
||||||
import org.apache.hc.core5.function.Resolver;
|
import org.apache.hc.core5.function.Resolver;
|
||||||
import org.apache.hc.core5.http.HttpHost;
|
import org.apache.hc.core5.http.HttpHost;
|
||||||
|
@ -87,6 +88,7 @@ public class PoolingAsyncClientConnectionManagerBuilder {
|
||||||
|
|
||||||
private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
|
private Resolver<HttpRoute, ConnectionConfig> connectionConfigResolver;
|
||||||
private Resolver<HttpHost, TlsConfig> tlsConfigResolver;
|
private Resolver<HttpHost, TlsConfig> tlsConfigResolver;
|
||||||
|
private boolean messageMultiplexing;
|
||||||
|
|
||||||
public static PoolingAsyncClientConnectionManagerBuilder create() {
|
public static PoolingAsyncClientConnectionManagerBuilder create() {
|
||||||
return new PoolingAsyncClientConnectionManagerBuilder();
|
return new PoolingAsyncClientConnectionManagerBuilder();
|
||||||
|
@ -254,6 +256,24 @@ public class PoolingAsyncClientConnectionManagerBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use experimental connections pool implementation that acts as a caching facade
|
||||||
|
* in front of a standard connection pool and shares already leased connections
|
||||||
|
* to multiplex message exchanges over active HTTP/2 connections.
|
||||||
|
*<p>
|
||||||
|
* Please note this flag has no effect on HTTP/1.1 and HTTP/1.0 connections.
|
||||||
|
*<p>
|
||||||
|
* This feature is considered experimenal
|
||||||
|
*
|
||||||
|
* @since 5.5
|
||||||
|
* @return this instance.
|
||||||
|
*/
|
||||||
|
@Experimental
|
||||||
|
public final PoolingAsyncClientConnectionManagerBuilder setMessageMultiplexing(final boolean messageMultiplexing) {
|
||||||
|
this.messageMultiplexing = messageMultiplexing;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Internal
|
@Internal
|
||||||
protected AsyncClientConnectionOperator createConnectionOperator(
|
protected AsyncClientConnectionOperator createConnectionOperator(
|
||||||
final TlsStrategy tlsStrategy,
|
final TlsStrategy tlsStrategy,
|
||||||
|
@ -290,7 +310,8 @@ public class PoolingAsyncClientConnectionManagerBuilder {
|
||||||
createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver),
|
createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver),
|
||||||
poolConcurrencyPolicy,
|
poolConcurrencyPolicy,
|
||||||
poolReusePolicy,
|
poolReusePolicy,
|
||||||
null);
|
null,
|
||||||
|
messageMultiplexing);
|
||||||
poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
|
poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
|
||||||
poolingmgr.setTlsConfigResolver(tlsConfigResolver);
|
poolingmgr.setTlsConfigResolver(tlsConfigResolver);
|
||||||
if (maxConnTotal > 0) {
|
if (maxConnTotal > 0) {
|
||||||
|
|
|
@ -28,7 +28,6 @@ package org.apache.hc.client5.http.examples;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
|
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
|
||||||
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
|
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
|
||||||
|
@ -36,83 +35,123 @@ import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
|
||||||
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
|
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
|
||||||
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
|
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
|
||||||
import org.apache.hc.client5.http.config.TlsConfig;
|
import org.apache.hc.client5.http.config.TlsConfig;
|
||||||
|
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
|
||||||
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
|
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
|
||||||
import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
|
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
|
||||||
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
|
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
|
||||||
import org.apache.hc.core5.concurrent.FutureCallback;
|
import org.apache.hc.core5.concurrent.FutureCallback;
|
||||||
import org.apache.hc.core5.http.HttpHost;
|
import org.apache.hc.core5.http.HttpHost;
|
||||||
import org.apache.hc.core5.http.config.Http1Config;
|
|
||||||
import org.apache.hc.core5.http.message.StatusLine;
|
import org.apache.hc.core5.http.message.StatusLine;
|
||||||
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
|
|
||||||
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
import org.apache.hc.core5.http2.HttpVersionPolicy;
|
||||||
import org.apache.hc.core5.http2.config.H2Config;
|
|
||||||
import org.apache.hc.core5.io.CloseMode;
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
import org.apache.hc.core5.reactor.IOReactorConfig;
|
import org.apache.hc.core5.reactor.IOReactorConfig;
|
||||||
|
import org.apache.hc.core5.util.Timeout;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This example demonstrates concurrent (multiplexed) execution of multiple
|
* Example of asynchronous HTTP/1.1 request execution with message exchange multiplexing
|
||||||
* HTTP/2 message exchanges.
|
* over HTTP/2 connections.
|
||||||
*/
|
*/
|
||||||
public class AsyncClientH2Multiplexing {
|
public class AsyncClientH2Multiplexing {
|
||||||
|
|
||||||
public static void main(final String[] args) throws Exception {
|
public static void main(final String[] args) throws Exception {
|
||||||
|
|
||||||
final MinimalHttpAsyncClient client = HttpAsyncClients.createMinimal(
|
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
|
||||||
H2Config.DEFAULT,
|
.setSoTimeout(Timeout.ofSeconds(5))
|
||||||
Http1Config.DEFAULT,
|
.build();
|
||||||
IOReactorConfig.DEFAULT,
|
|
||||||
PoolingAsyncClientConnectionManagerBuilder.create()
|
final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
|
||||||
.setDefaultTlsConfig(TlsConfig.custom()
|
.setDefaultTlsConfig(TlsConfig.custom()
|
||||||
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
|
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2)
|
||||||
.build())
|
.build())
|
||||||
.build());
|
.setMessageMultiplexing(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final CloseableHttpAsyncClient client = HttpAsyncClients.custom()
|
||||||
|
.setConnectionManager(connectionManager)
|
||||||
|
.setIOReactorConfig(ioReactorConfig)
|
||||||
|
.build();
|
||||||
|
|
||||||
client.start();
|
client.start();
|
||||||
|
|
||||||
final HttpHost target = new HttpHost("https", "nghttp2.org");
|
final HttpHost target = new HttpHost("https", "nghttp2.org");
|
||||||
final Future<AsyncClientEndpoint> leaseFuture = client.lease(target, null);
|
|
||||||
final AsyncClientEndpoint endpoint = leaseFuture.get(30, TimeUnit.SECONDS);
|
|
||||||
try {
|
|
||||||
final String[] requestUris = new String[] {"/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
|
|
||||||
|
|
||||||
final CountDownLatch latch = new CountDownLatch(requestUris.length);
|
final SimpleHttpRequest warmup = SimpleRequestBuilder.get()
|
||||||
for (final String requestUri: requestUris) {
|
.setHttpHost(target)
|
||||||
|
.setPath("/httpbin")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// Make sure there is an open HTTP/2 connection in the pool
|
||||||
|
System.out.println("Executing warm-up request " + warmup);
|
||||||
|
final Future<SimpleHttpResponse> future = client.execute(
|
||||||
|
SimpleRequestProducer.create(warmup),
|
||||||
|
SimpleResponseConsumer.create(),
|
||||||
|
new FutureCallback<SimpleHttpResponse>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void completed(final SimpleHttpResponse response) {
|
||||||
|
System.out.println(warmup + "->" + new StatusLine(response));
|
||||||
|
System.out.println(response.getBody());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failed(final Exception ex) {
|
||||||
|
System.out.println(warmup + "->" + ex);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancelled() {
|
||||||
|
System.out.println(warmup + " cancelled");
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
future.get();
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
System.out.println("Connection pool stats: " + connectionManager.getTotalStats());
|
||||||
|
|
||||||
|
// Execute multiple requests over the HTTP/2 connection from the pool
|
||||||
|
final String[] requestUris = new String[]{"/httpbin", "/httpbin/ip", "/httpbin/user-agent", "/httpbin/headers"};
|
||||||
|
final CountDownLatch countDownLatch = new CountDownLatch(requestUris.length);
|
||||||
|
|
||||||
|
for (final String requestUri : requestUris) {
|
||||||
final SimpleHttpRequest request = SimpleRequestBuilder.get()
|
final SimpleHttpRequest request = SimpleRequestBuilder.get()
|
||||||
.setHttpHost(target)
|
.setHttpHost(target)
|
||||||
.setPath(requestUri)
|
.setPath(requestUri)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
System.out.println("Executing request " + request);
|
System.out.println("Executing request " + request);
|
||||||
endpoint.execute(
|
client.execute(
|
||||||
SimpleRequestProducer.create(request),
|
SimpleRequestProducer.create(request),
|
||||||
SimpleResponseConsumer.create(),
|
SimpleResponseConsumer.create(),
|
||||||
new FutureCallback<SimpleHttpResponse>() {
|
new FutureCallback<SimpleHttpResponse>() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void completed(final SimpleHttpResponse response) {
|
public void completed(final SimpleHttpResponse response) {
|
||||||
latch.countDown();
|
countDownLatch.countDown();
|
||||||
System.out.println(request + "->" + new StatusLine(response));
|
System.out.println(request + "->" + new StatusLine(response));
|
||||||
System.out.println(response.getBody());
|
System.out.println(response.getBody());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failed(final Exception ex) {
|
public void failed(final Exception ex) {
|
||||||
latch.countDown();
|
countDownLatch.countDown();
|
||||||
System.out.println(request + "->" + ex);
|
System.out.println(request + "->" + ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void cancelled() {
|
public void cancelled() {
|
||||||
latch.countDown();
|
countDownLatch.countDown();
|
||||||
System.out.println(request + " cancelled");
|
System.out.println(request + " cancelled");
|
||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
latch.await();
|
|
||||||
} finally {
|
countDownLatch.await();
|
||||||
endpoint.releaseAndReuse();
|
|
||||||
}
|
// There still should be a single connection in the pool
|
||||||
|
System.out.println("Connection pool stats: " + connectionManager.getTotalStats());
|
||||||
|
|
||||||
System.out.println("Shutting down");
|
System.out.println("Shutting down");
|
||||||
client.close(CloseMode.GRACEFUL);
|
client.close(CloseMode.GRACEFUL);
|
||||||
|
|
|
@ -0,0 +1,387 @@
|
||||||
|
/*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*
|
||||||
|
* This software consists of voluntary contributions made by many
|
||||||
|
* individuals on behalf of the Apache Software Foundation. For more
|
||||||
|
* information on the Apache Software Foundation, please see
|
||||||
|
* <http://www.apache.org/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hc.client5.http.impl.nio;
|
||||||
|
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.hc.core5.concurrent.BasicFuture;
|
||||||
|
import org.apache.hc.core5.concurrent.FutureCallback;
|
||||||
|
import org.apache.hc.core5.http.HttpConnection;
|
||||||
|
import org.apache.hc.core5.http.HttpVersion;
|
||||||
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
|
import org.apache.hc.core5.pool.ManagedConnPool;
|
||||||
|
import org.apache.hc.core5.pool.PoolEntry;
|
||||||
|
import org.apache.hc.core5.util.Timeout;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
|
||||||
|
public class H2SharingConnPoolTest {
|
||||||
|
|
||||||
|
static final String DEFAULT_ROUTE = "DEFAULT_ROUTE";
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
ManagedConnPool<String, HttpConnection> connPool;
|
||||||
|
@Mock
|
||||||
|
FutureCallback<PoolEntry<String, HttpConnection>> callback;
|
||||||
|
@Mock
|
||||||
|
HttpConnection connection;
|
||||||
|
H2SharingConnPool<String, HttpConnection> h2SharingPool;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
MockitoAnnotations.openMocks(this);
|
||||||
|
h2SharingPool = new H2SharingConnPool<>(connPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseFutureReturned() throws Exception {
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenReturn(new BasicFuture<>(null));
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
|
||||||
|
Assertions.assertNotNull(result);
|
||||||
|
Assertions.assertFalse(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq(null),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback, Mockito.never()).completed(
|
||||||
|
Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseExistingConnectionReturned() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> future = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
|
||||||
|
Assertions.assertNotNull(future);
|
||||||
|
Assertions.assertSame(poolEntry, future.get());
|
||||||
|
|
||||||
|
Mockito.verify(connPool, Mockito.never()).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback).completed(
|
||||||
|
Mockito.same(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseWithStateCacheBypassed() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenReturn(new BasicFuture<>(null));
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback);
|
||||||
|
Assertions.assertNotNull(result);
|
||||||
|
Assertions.assertFalse(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq("stuff"),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback, Mockito.never()).completed(
|
||||||
|
Mockito.any());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseNewConnectionReturnedAndCached() throws Exception {
|
||||||
|
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenAnswer(invocationOnMock -> {
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
|
||||||
|
futureRef.set(future);
|
||||||
|
return future;
|
||||||
|
});
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
|
||||||
|
Assertions.assertNotNull(future);
|
||||||
|
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2);
|
||||||
|
future.completed(poolEntry);
|
||||||
|
|
||||||
|
Assertions.assertTrue(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq(null),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback).completed(
|
||||||
|
Mockito.any());
|
||||||
|
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
Assertions.assertEquals(1, routePool.getCount(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseNewConnectionReturnedAndNotCached() throws Exception {
|
||||||
|
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenAnswer(invocationOnMock -> {
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
|
||||||
|
futureRef.set(future);
|
||||||
|
return future;
|
||||||
|
});
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
|
||||||
|
Assertions.assertNotNull(future);
|
||||||
|
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_1_1);
|
||||||
|
future.completed(poolEntry);
|
||||||
|
|
||||||
|
Assertions.assertTrue(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq(null),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback).completed(
|
||||||
|
Mockito.any());
|
||||||
|
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
Assertions.assertEquals(0, routePool.getCount(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseNoConnection() throws Exception {
|
||||||
|
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenAnswer(invocationOnMock -> {
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
|
||||||
|
futureRef.set(future);
|
||||||
|
return future;
|
||||||
|
});
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback);
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
|
||||||
|
Assertions.assertNotNull(future);
|
||||||
|
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.discardConnection(CloseMode.IMMEDIATE);
|
||||||
|
future.completed(poolEntry);
|
||||||
|
|
||||||
|
Assertions.assertTrue(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq(null),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback).completed(
|
||||||
|
Mockito.any());
|
||||||
|
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
Assertions.assertEquals(0, routePool.getCount(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseWithStateNewConnectionReturnedAndNotCached() throws Exception {
|
||||||
|
final AtomicReference<BasicFuture<PoolEntry<String, HttpConnection>>> futureRef = new AtomicReference<>();
|
||||||
|
Mockito.when(connPool.lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any(),
|
||||||
|
Mockito.any())).thenAnswer(invocationOnMock -> {
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = new BasicFuture<>(invocationOnMock.getArgument(3));
|
||||||
|
futureRef.set(future);
|
||||||
|
return future;
|
||||||
|
});
|
||||||
|
|
||||||
|
final Future<PoolEntry<String, HttpConnection>> result = h2SharingPool.lease(DEFAULT_ROUTE, "stuff", Timeout.ONE_MILLISECOND, callback);
|
||||||
|
final BasicFuture<PoolEntry<String, HttpConnection>> future = futureRef.get();
|
||||||
|
Assertions.assertNotNull(future);
|
||||||
|
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.getProtocolVersion()).thenReturn(HttpVersion.HTTP_2);
|
||||||
|
future.completed(poolEntry);
|
||||||
|
|
||||||
|
Assertions.assertTrue(result.isDone());
|
||||||
|
|
||||||
|
Mockito.verify(connPool).lease(
|
||||||
|
Mockito.eq(DEFAULT_ROUTE),
|
||||||
|
Mockito.eq("stuff"),
|
||||||
|
Mockito.eq(Timeout.ONE_MILLISECOND),
|
||||||
|
Mockito.any());
|
||||||
|
Mockito.verify(callback).completed(
|
||||||
|
Mockito.any());
|
||||||
|
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
Assertions.assertEquals(0, routePool.getCount(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseReusableNoCacheReturnedToPool() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(true);
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, true);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.eq(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseReusableNotInCacheReturnedToPool() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(true);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, true);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.eq(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseReusableInCacheNotReturnedToPool() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(true);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, true);
|
||||||
|
|
||||||
|
Mockito.verify(connPool, Mockito.never()).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.anyBoolean());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseNonReusableInCacheReturnedToPool() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(true);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, false);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.eq(false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseReusableAndClosedInCacheReturnedToPool() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(false);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, true);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.eq(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testClose() throws Exception {
|
||||||
|
h2SharingPool.close();
|
||||||
|
|
||||||
|
Mockito.verify(connPool).close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testCloseMode() throws Exception {
|
||||||
|
h2SharingPool.close(CloseMode.IMMEDIATE);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).close(CloseMode.IMMEDIATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeasePoolClosed() throws Exception {
|
||||||
|
h2SharingPool.close();
|
||||||
|
|
||||||
|
Assertions.assertThrows(IllegalStateException.class, () -> h2SharingPool.lease(DEFAULT_ROUTE, null, Timeout.ONE_MILLISECOND, callback));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleasePoolClosed() throws Exception {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>(DEFAULT_ROUTE);
|
||||||
|
poolEntry.assignConnection(connection);
|
||||||
|
Mockito.when(connection.isOpen()).thenReturn(false);
|
||||||
|
final H2SharingConnPool.PerRoutePool<String, HttpConnection> routePool = h2SharingPool.getPerRoutePool(DEFAULT_ROUTE);
|
||||||
|
routePool.track(poolEntry);
|
||||||
|
|
||||||
|
h2SharingPool.close();
|
||||||
|
|
||||||
|
h2SharingPool.release(poolEntry, true);
|
||||||
|
|
||||||
|
Mockito.verify(connPool).release(
|
||||||
|
Mockito.same(poolEntry),
|
||||||
|
Mockito.eq(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,130 @@
|
||||||
|
/*
|
||||||
|
* ====================================================================
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
* ====================================================================
|
||||||
|
*
|
||||||
|
* This software consists of voluntary contributions made by many
|
||||||
|
* individuals on behalf of the Apache Software Foundation. For more
|
||||||
|
* information on the Apache Software Foundation, please see
|
||||||
|
* <http://www.apache.org/>.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
package org.apache.hc.client5.http.impl.nio;
|
||||||
|
|
||||||
|
import org.apache.hc.core5.http.HttpConnection;
|
||||||
|
import org.apache.hc.core5.io.CloseMode;
|
||||||
|
import org.apache.hc.core5.pool.PoolEntry;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
public class H2SharingPerRoutePoolTest {
|
||||||
|
|
||||||
|
static PoolEntry<String, HttpConnection> createMockEntry() {
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = new PoolEntry<>("some route");
|
||||||
|
final HttpConnection conn = Mockito.mock(HttpConnection.class);
|
||||||
|
Mockito.when(conn.isOpen()).thenReturn(true);
|
||||||
|
poolEntry.assignConnection(conn);
|
||||||
|
return poolEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
H2SharingConnPool.PerRoutePool<String, HttpConnection> pool;
|
||||||
|
PoolEntry<String, HttpConnection> poolEntry1;
|
||||||
|
PoolEntry<String, HttpConnection> poolEntry2;
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void setup() {
|
||||||
|
pool = new H2SharingConnPool.PerRoutePool<>();
|
||||||
|
poolEntry1 = createMockEntry();
|
||||||
|
poolEntry2 = createMockEntry();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testKeep() {
|
||||||
|
Assertions.assertEquals(1, pool.track(poolEntry1));
|
||||||
|
Assertions.assertEquals(2, pool.track(poolEntry1));
|
||||||
|
Assertions.assertEquals(1, pool.track(poolEntry2));
|
||||||
|
Assertions.assertEquals(3, pool.track(poolEntry1));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseLeastUsed() {
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry2);
|
||||||
|
Assertions.assertSame(poolEntry2, pool.lease());
|
||||||
|
Assertions.assertEquals(2, pool.getCount(poolEntry2));
|
||||||
|
|
||||||
|
final PoolEntry<String, HttpConnection> poolEntry = pool.lease();
|
||||||
|
Assertions.assertEquals(3, pool.getCount(poolEntry));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testLeaseEmptyPool() {
|
||||||
|
Assertions.assertNull(pool.lease());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseReusable() {
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
|
||||||
|
Assertions.assertEquals(2, pool.release(poolEntry1, true));
|
||||||
|
Assertions.assertEquals(1, pool.release(poolEntry1, true));
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, true));
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseNonReusable() {
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseNonPresent() {
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, true));
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry2, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseConnectionClosed() {
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
|
||||||
|
Mockito.when(poolEntry1.getConnection().isOpen()).thenReturn(false);
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testReleaseConnectionMissing() {
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
pool.track(poolEntry1);
|
||||||
|
|
||||||
|
poolEntry1.discardConnection(CloseMode.IMMEDIATE);
|
||||||
|
Assertions.assertEquals(0, pool.release(poolEntry1, true));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue