* HTTP/2 multiplexing HttpAsyncClient implementation

* Restructured integration tests to reduce test duplication
This commit is contained in:
Oleg Kalnichevski 2017-11-13 10:50:27 +01:00
parent 703b7968c5
commit 6228a73613
27 changed files with 3131 additions and 884 deletions

View File

@ -33,6 +33,10 @@ import java.util.concurrent.Future;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
@ -45,19 +49,40 @@ import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
public abstract class IntegrationTestBase extends LocalAsyncServerTestBase {
public abstract class AbstractHttp1IntegrationTestBase extends AbstractServerTestBase {
public IntegrationTestBase(final URIScheme scheme) {
public AbstractHttp1IntegrationTestBase(final URIScheme scheme) {
super(scheme);
}
public IntegrationTestBase() {
public AbstractHttp1IntegrationTestBase() {
super(URIScheme.HTTP);
}
protected HttpAsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
protected CloseableHttpAsyncClient httpclient;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {

View File

@ -27,12 +27,12 @@
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.auth.AuthChallenge;
@ -45,6 +45,7 @@ import org.apache.hc.client5.http.auth.CredentialsStore;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.auth.BasicScheme;
import org.apache.hc.client5.http.protocol.HttpClientContext;
@ -53,14 +54,15 @@ import org.apache.hc.client5.testing.auth.Authenticator;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.Registry;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.impl.HttpProcessors;
@ -68,42 +70,52 @@ import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.impl.Http2Processors;
import org.apache.hc.core5.net.URIAuthority;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestClientAuthentication extends IntegrationTestBase {
public abstract class AbstractHttpAsyncClientAuthentication<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase<T> {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{URIScheme.HTTP},
{URIScheme.HTTPS},
});
}
protected final HttpVersion protocolVersion;
public TestClientAuthentication(final URIScheme scheme) {
public AbstractHttpAsyncClientAuthentication(final URIScheme scheme, final HttpVersion protocolVersion) {
super(scheme);
this.protocolVersion = protocolVersion;
}
@Override
public HttpHost start() throws Exception {
return super.start(
HttpProcessors.server(),
new Decorator<AsyncServerExchangeHandler>() {
public final HttpHost start() throws Exception {
return start(new Decorator<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler requestHandler) {
return new AuthenticatingAsyncDecorator(requestHandler, new BasicTestAuthenticator("test:test", "test realm"));
}
@Override
public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler requestHandler) {
return new AuthenticatingAsyncDecorator(requestHandler, new BasicTestAuthenticator("test:test", "test realm"));
}
},
H1Config.DEFAULT);
});
}
public final HttpHost start(
final Decorator<AsyncServerExchangeHandler> exchangeHandlerDecorator) throws Exception {
if (protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
return super.start(
Http2Processors.server(),
exchangeHandlerDecorator,
H2Config.DEFAULT);
} else {
return super.start(
HttpProcessors.server(),
exchangeHandlerDecorator,
H1Config.DEFAULT);
}
}
abstract void setDefaultAuthSchemeRegistry(Lookup<AuthSchemeProvider> authSchemeRegistry);
abstract void setTargetAuthenticationStrategy(AuthenticationStrategy targetAuthStrategy);
static class TestCredentialsProvider implements CredentialsStore {
private final Credentials creds;
@ -243,49 +255,6 @@ public class TestClientAuthentication extends IntegrationTestBase {
Assert.assertEquals("test realm", authscope.getRealm());
}
@Test
public void testBasicAuthenticationSuccessNonPersistentConnection() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncEchoHandler();
}
});
final HttpHost target = start(
HttpProcessors.server(),
new Decorator<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler exchangeHandler) {
return new AuthenticatingAsyncDecorator(exchangeHandler, new BasicTestAuthenticator("test:test", "test realm")) {
@Override
protected void customizeUnauthorizedResponse(final HttpResponse unauthorized) {
unauthorized.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
}
};
}
},
H1Config.DEFAULT);
final TestCredentialsProvider credsProvider = new TestCredentialsProvider(
new UsernamePasswordCredentials("test", "test".toCharArray()));
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credsProvider);
final Future<SimpleHttpResponse> future = httpclient.execute(SimpleHttpRequest.get(target, "/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
final AuthScope authscope = credsProvider.getAuthScope();
Assert.assertNotNull(authscope);
Assert.assertEquals("test realm", authscope.getRealm());
}
@Test
public void testBasicAuthenticationExpectationFailure() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@ -355,7 +324,7 @@ public class TestClientAuthentication extends IntegrationTestBase {
});
final AtomicLong count = new AtomicLong(0);
this.clientBuilder.setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy() {
setTargetAuthenticationStrategy(new DefaultAuthenticationStrategy() {
@Override
public List<AuthScheme> select(
@ -497,7 +466,7 @@ public class TestClientAuthentication extends IntegrationTestBase {
})
.build();
this.clientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
setDefaultAuthSchemeRegistry(authSchemeRegistry);
final Authenticator authenticator = new BasicTestAuthenticator("test:test", "test realm") {
@ -519,7 +488,6 @@ public class TestClientAuthentication extends IntegrationTestBase {
};
final HttpHost target = start(
HttpProcessors.server(),
new Decorator<AsyncServerExchangeHandler>() {
@Override
@ -535,7 +503,7 @@ public class TestClientAuthentication extends IntegrationTestBase {
};
}
}, H1Config.DEFAULT);
});
final RequestConfig config = RequestConfig.custom()
.setTargetPreferredAuthSchemes(Arrays.asList("MyBasic"))
@ -564,7 +532,6 @@ public class TestClientAuthentication extends IntegrationTestBase {
});
final HttpHost target = start(
HttpProcessors.server(),
new Decorator<AsyncServerExchangeHandler>() {
@Override
@ -579,7 +546,7 @@ public class TestClientAuthentication extends IntegrationTestBase {
};
}
}, H1Config.DEFAULT);
});
final TestCredentialsProvider credsProvider = new TestCredentialsProvider(
new UsernamePasswordCredentials("test", "test".toCharArray()));

View File

@ -26,8 +26,6 @@
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
@ -37,11 +35,8 @@ import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
@ -51,21 +46,10 @@ import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttpAsync extends IntegrationTestBase {
public abstract class AbstractHttpAsyncFundamentalsTest<T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase<T> {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ URIScheme.HTTP },
{ URIScheme.HTTPS },
});
}
public TestHttpAsync(final URIScheme scheme) {
public AbstractHttpAsyncFundamentalsTest(final URIScheme scheme) {
super(scheme);
}
@ -98,22 +82,6 @@ public class TestHttpAsync extends IntegrationTestBase {
}
}
@Test
public void testSequenctialGetRequestsCloseConnection() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final SimpleHttpRequest get = SimpleHttpRequest.get(target, "/random/2048");
get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.notNullValue());
Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
}
}
@Test
public void testSequenctialPostRequests() throws Exception {
final HttpHost target = start();
@ -136,7 +104,7 @@ public class TestHttpAsync extends IntegrationTestBase {
}
@Test
public void testConcurrentPostsOverMultipleConnections() throws Exception {
public void testConcurrentPostRequests() throws Exception {
final HttpHost target = start();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());
@ -144,9 +112,6 @@ public class TestHttpAsync extends IntegrationTestBase {
final int reqCount = 20;
connManager.setDefaultMaxPerRoute(reqCount);
connManager.setMaxTotal(100);
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
for (int i = 0; i < reqCount; i++) {
final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
@ -168,85 +133,4 @@ public class TestHttpAsync extends IntegrationTestBase {
}
}
@Test
public void testConcurrentPostsOverSingleConnection() throws Exception {
final HttpHost target = start();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());
rnd.nextBytes(b1);
final int reqCount = 20;
connManager.setDefaultMaxPerRoute(1);
connManager.setMaxTotal(100);
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
for (int i = 0; i < reqCount; i++) {
final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
AsyncRequestBuilder.post(target, "/echo/")
.setEntity(b1, ContentType.APPLICATION_OCTET_STREAM)
.build(),
new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
queue.add(future);
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, byte[]>> future = queue.remove();
final Message<HttpResponse, byte[]> responseMessage = future.get();
Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
final HttpResponse response = responseMessage.getHead();
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final byte[] b2 = responseMessage.getBody();
Assert.assertThat(b1, CoreMatchers.equalTo(b2));
}
}
@Test
public void testSharedPool() throws Exception {
final HttpHost target = start();
final Future<SimpleHttpResponse> future1 = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response1 = future1.get();
Assert.assertThat(response1, CoreMatchers.notNullValue());
Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(200));
final String body1 = response1.getBodyText();
Assert.assertThat(body1, CoreMatchers.notNullValue());
Assert.assertThat(body1.length(), CoreMatchers.equalTo(2048));
try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
.setConnectionManager(connManager)
.setConnectionManagerShared(true)
.build()) {
httpclient2.start();
final Future<SimpleHttpResponse> future2 = httpclient2.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response2 = future2.get();
Assert.assertThat(response2, CoreMatchers.notNullValue());
Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(200));
final String body2 = response2.getBodyText();
Assert.assertThat(body2, CoreMatchers.notNullValue());
Assert.assertThat(body2.length(), CoreMatchers.equalTo(2048));
}
final Future<SimpleHttpResponse> future3 = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response3 = future3.get();
Assert.assertThat(response3, CoreMatchers.notNullValue());
Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(200));
final String body3 = response3.getBodyText();
Assert.assertThat(body3, CoreMatchers.notNullValue());
Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
}
@Test
public void testBadRequest() throws Exception {
final HttpHost target = start();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/random/boom"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(400));
}
}

View File

@ -29,10 +29,6 @@ package org.apache.hc.client5.testing.async;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@ -45,6 +41,7 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.cookie.BasicClientCookie;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.testing.SSLTestContexts;
@ -52,7 +49,6 @@ import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
@ -69,40 +65,20 @@ import org.apache.hc.core5.testing.nio.Http2TestServer;
import org.apache.hc.core5.util.TimeValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Redirection test cases.
*/
@RunWith(Parameterized.class)
public class TestAsyncRedirects extends IntegrationTestBase {
public abstract class AbstractHttpAsyncRedirectsTest <T extends CloseableHttpAsyncClient> extends AbstractIntegrationTestBase<T> {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{URIScheme.HTTP},
{URIScheme.HTTPS},
});
}
public TestAsyncRedirects(final URIScheme scheme) {
public AbstractHttpAsyncRedirectsTest(final URIScheme scheme) {
super(scheme);
}
static class BasicRedirectService extends AbstractSimpleServerExchangeHandler {
private final int statuscode;
private final boolean keepAlive;
public BasicRedirectService(final int statuscode, final boolean keepAlive) {
super();
this.statuscode = statuscode;
this.keepAlive = keepAlive;
}
public BasicRedirectService(final int statuscode) {
this(statuscode, true);
super();
this.statuscode = statuscode;
}
@Override
@ -115,9 +91,6 @@ public class TestAsyncRedirects extends IntegrationTestBase {
final SimpleHttpResponse response = new SimpleHttpResponse(statuscode);
response.addHeader(new BasicHeader("Location",
new URIBuilder(requestURI).setPath("/newlocation/").build()));
if (!keepAlive) {
response.addHeader(new BasicHeader("Connection", "close"));
}
return response;
} else if (path.equals("/newlocation/")) {
final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
@ -220,7 +193,7 @@ public class TestAsyncRedirects extends IntegrationTestBase {
@Override
public AsyncServerExchangeHandler get() {
return new BasicRedirectService(HttpStatus.SC_MULTIPLE_CHOICES, false);
return new BasicRedirectService(HttpStatus.SC_MULTIPLE_CHOICES);
}
});
@ -239,37 +212,12 @@ public class TestAsyncRedirects extends IntegrationTestBase {
}
@Test
public void testBasicRedirect301KeepAlive() throws Exception {
public void testBasicRedirect301() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new BasicRedirectService(HttpStatus.SC_MOVED_PERMANENTLY, true);
}
});
final HttpHost target = start();
final HttpClientContext context = HttpClientContext.create();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/oldlocation/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
final HttpRequest request = context.getRequest();
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
Assert.assertEquals("/newlocation/", request.getRequestUri());
Assert.assertEquals(target, new HttpHost(request.getAuthority(), request.getScheme()));
}
@Test
public void testBasicRedirect301NoKeepAlive() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new BasicRedirectService(HttpStatus.SC_MOVED_PERMANENTLY, false);
return new BasicRedirectService(HttpStatus.SC_MOVED_PERMANENTLY);
}
});
@ -681,39 +629,6 @@ public class TestAsyncRedirects extends IntegrationTestBase {
Assert.assertEquals("There can only be one (cookie)", 1, headers.length);
}
@Test
public void testDefaultHeadersRedirect() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new BasicRedirectService(HttpStatus.SC_MOVED_TEMPORARILY);
}
});
final List<Header> defaultHeaders = new ArrayList<>(1);
defaultHeaders.add(new BasicHeader(HttpHeaders.USER_AGENT, "my-test-client"));
clientBuilder.setDefaultHeaders(defaultHeaders);
final HttpHost target = start();
final HttpClientContext context = HttpClientContext.create();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/oldlocation/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
final HttpRequest request = context.getRequest();
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
Assert.assertEquals("/newlocation/", request.getRequestUri());
final Header header = request.getFirstHeader(HttpHeaders.USER_AGENT);
Assert.assertEquals("my-test-client", header.getValue());
}
static class CrossSiteRedirectService extends AbstractSimpleServerExchangeHandler {
private final HttpHost host;

View File

@ -0,0 +1,114 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
public abstract class AbstractIntegrationTestBase<T extends CloseableHttpAsyncClient> extends AbstractServerTestBase {
public AbstractIntegrationTestBase(final URIScheme scheme) {
super(scheme);
}
public AbstractIntegrationTestBase() {
super(URIScheme.HTTP);
}
protected T httpclient;
protected abstract T createClient() throws Exception;
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void after() {
if (httpclient != null) {
httpclient.shutdown(ShutdownType.GRACEFUL);
httpclient = null;
}
}
};
public abstract HttpHost start() throws Exception;
public final HttpHost start(
final HttpProcessor httpProcessor,
final Decorator<AsyncServerExchangeHandler> exchangeHandlerDecorator,
final H1Config h1Config) throws Exception {
server.start(httpProcessor, exchangeHandlerDecorator, h1Config);
final Future<ListenerEndpoint> endpointFuture = server.listen(new InetSocketAddress(0));
httpclient = createClient();
httpclient.start();
final ListenerEndpoint endpoint = endpointFuture.get();
final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
return new HttpHost("localhost", address.getPort(), scheme.name());
}
public final HttpHost start(
final HttpProcessor httpProcessor,
final H1Config h1Config) throws Exception {
return start(httpProcessor, null, h1Config);
}
public final HttpHost start(
final HttpProcessor httpProcessor,
final Decorator<AsyncServerExchangeHandler> exchangeHandlerDecorator,
final H2Config h2Config) throws Exception {
server.start(httpProcessor, exchangeHandlerDecorator, h2Config);
final Future<ListenerEndpoint> endpointFuture = server.listen(new InetSocketAddress(0));
httpclient = createClient();
httpclient.start();
final ListenerEndpoint endpoint = endpointFuture.get();
final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
return new HttpHost("localhost", address.getPort(), scheme.name());
}
public final HttpHost start(
final HttpProcessor httpProcessor,
final H2Config h2Config) throws Exception {
return start(httpProcessor, null, h2Config);
}
}

View File

@ -27,9 +27,6 @@
package org.apache.hc.client5.testing.async;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.URIScheme;
@ -41,23 +38,22 @@ import org.apache.hc.core5.util.Timeout;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
public abstract class LocalAsyncServerTestBase {
public abstract class AbstractServerTestBase {
public static final Timeout TIMEOUT = Timeout.ofSeconds(30);
public static final Timeout LONG_TIMEOUT = Timeout.ofSeconds(60);
protected final URIScheme scheme;
public LocalAsyncServerTestBase(final URIScheme scheme) {
public AbstractServerTestBase(final URIScheme scheme) {
this.scheme = scheme;
}
public LocalAsyncServerTestBase() {
public AbstractServerTestBase() {
this(URIScheme.HTTP);
}
protected Http2TestServer server;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource serverResource = new ExternalResource() {
@ -97,24 +93,4 @@ public abstract class LocalAsyncServerTestBase {
};
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
}

View File

@ -0,0 +1,198 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp1Async extends AbstractHttpAsyncFundamentalsTest<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/1.1 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ URIScheme.HTTP },
{ URIScheme.HTTPS },
});
}
protected HttpAsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setConnectionManager(connManager);
}
};
public TestHttp1Async(final URIScheme scheme) {
super(scheme);
}
@Override
protected CloseableHttpAsyncClient createClient() {
return clientBuilder.build();
}
@Override
public HttpHost start() throws Exception {
return super.start(null, H1Config.DEFAULT);
}
@Test
public void testSequenctialGetRequestsCloseConnection() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final SimpleHttpRequest get = SimpleHttpRequest.get(target, "/random/2048");
get.setHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
final Future<SimpleHttpResponse> future = httpclient.execute(get, null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.notNullValue());
Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
}
}
@Test
public void testConcurrentPostsOverMultipleConnections() throws Exception {
connManager.setDefaultMaxPerRoute(20);
connManager.setMaxTotal(100);
super.testConcurrentPostRequests();
}
@Test
public void testConcurrentPostsOverSingleConnection() throws Exception {
connManager.setDefaultMaxPerRoute(1);
connManager.setMaxTotal(100);
super.testConcurrentPostRequests();
}
@Test
public void testSharedPool() throws Exception {
final HttpHost target = start();
final Future<SimpleHttpResponse> future1 = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response1 = future1.get();
Assert.assertThat(response1, CoreMatchers.notNullValue());
Assert.assertThat(response1.getCode(), CoreMatchers.equalTo(200));
final String body1 = response1.getBodyText();
Assert.assertThat(body1, CoreMatchers.notNullValue());
Assert.assertThat(body1.length(), CoreMatchers.equalTo(2048));
try (final CloseableHttpAsyncClient httpclient2 = HttpAsyncClients.custom()
.setConnectionManager(connManager)
.setConnectionManagerShared(true)
.build()) {
httpclient2.start();
final Future<SimpleHttpResponse> future2 = httpclient2.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response2 = future2.get();
Assert.assertThat(response2, CoreMatchers.notNullValue());
Assert.assertThat(response2.getCode(), CoreMatchers.equalTo(200));
final String body2 = response2.getBodyText();
Assert.assertThat(body2, CoreMatchers.notNullValue());
Assert.assertThat(body2.length(), CoreMatchers.equalTo(2048));
}
final Future<SimpleHttpResponse> future3 = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response3 = future3.get();
Assert.assertThat(response3, CoreMatchers.notNullValue());
Assert.assertThat(response3.getCode(), CoreMatchers.equalTo(200));
final String body3 = response3.getBodyText();
Assert.assertThat(body3, CoreMatchers.notNullValue());
Assert.assertThat(body3.length(), CoreMatchers.equalTo(2048));
}
@Test
public void testBadRequest() throws Exception {
final HttpHost target = start();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/random/boom"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(400));
}
}

View File

@ -0,0 +1,254 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.net.URIBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Redirection test cases.
*/
@RunWith(Parameterized.class)
public class TestHttp1AsyncRedirects extends AbstractHttpAsyncRedirectsTest<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/1.1 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{URIScheme.HTTP},
{URIScheme.HTTPS},
});
}
protected HttpAsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setConnectionManager(connManager);
}
};
public TestHttp1AsyncRedirects(final URIScheme scheme) {
super(scheme);
}
@Override
protected CloseableHttpAsyncClient createClient() throws Exception {
return clientBuilder.build();
}
@Override
public final HttpHost start() throws Exception {
return super.start(null, H1Config.DEFAULT);
}
static class NoKeepAliveRedirectService extends AbstractSimpleServerExchangeHandler {
private final int statuscode;
public NoKeepAliveRedirectService(final int statuscode) {
super();
this.statuscode = statuscode;
}
@Override
protected SimpleHttpResponse handle(
final SimpleHttpRequest request, final HttpCoreContext context) throws HttpException {
try {
final URI requestURI = request.getUri();
final String path = requestURI.getPath();
if (path.equals("/oldlocation/")) {
final SimpleHttpResponse response = new SimpleHttpResponse(statuscode);
response.addHeader(new BasicHeader("Location",
new URIBuilder(requestURI).setPath("/newlocation/").build()));
response.addHeader(new BasicHeader("Connection", "close"));
return response;
} else if (path.equals("/newlocation/")) {
final SimpleHttpResponse response = new SimpleHttpResponse(HttpStatus.SC_OK);
response.setBodyText("Successful redirect", ContentType.TEXT_PLAIN);
return response;
} else {
return new SimpleHttpResponse(HttpStatus.SC_NOT_FOUND);
}
} catch (final URISyntaxException ex) {
throw new ProtocolException(ex.getMessage(), ex);
}
}
}
@Test
public void testBasicRedirect300() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new NoKeepAliveRedirectService(HttpStatus.SC_MULTIPLE_CHOICES);
}
});
final HttpHost target = start();
final HttpClientContext context = HttpClientContext.create();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/oldlocation/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
final HttpRequest request = context.getRequest();
Assert.assertEquals(HttpStatus.SC_MULTIPLE_CHOICES, response.getCode());
Assert.assertEquals("/oldlocation/", request.getRequestUri());
}
@Test
public void testBasicRedirect301NoKeepAlive() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new NoKeepAliveRedirectService(HttpStatus.SC_MOVED_PERMANENTLY);
}
});
final HttpHost target = start();
final HttpClientContext context = HttpClientContext.create();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/oldlocation/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
final HttpRequest request = context.getRequest();
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
Assert.assertEquals("/newlocation/", request.getRequestUri());
Assert.assertEquals(target, new HttpHost(request.getAuthority(), request.getScheme()));
}
@Test
public void testDefaultHeadersRedirect() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new NoKeepAliveRedirectService(HttpStatus.SC_MOVED_TEMPORARILY);
}
});
final List<Header> defaultHeaders = new ArrayList<>(1);
defaultHeaders.add(new BasicHeader(HttpHeaders.USER_AGENT, "my-test-client"));
clientBuilder.setDefaultHeaders(defaultHeaders);
final HttpHost target = start();
final HttpClientContext context = HttpClientContext.create();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/oldlocation/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
final HttpRequest request = context.getRequest();
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
Assert.assertEquals("/newlocation/", request.getRequestUri());
final Header header = request.getFirstHeader(HttpHeaders.USER_AGENT);
Assert.assertEquals("my-test-client", header.getValue());
}
}

View File

@ -32,8 +32,14 @@ import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.UserTokenHandler;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EndpointDetails;
@ -41,14 +47,66 @@ import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.protocol.BasicHttpContext;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
public class TestAsyncStatefulConnManagement extends IntegrationTestBase {
public class TestHttp1AsyncStatefulConnManagement extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
protected HttpAsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setConnectionManager(connManager);
}
};
@Override
protected CloseableHttpAsyncClient createClient() throws Exception {
return clientBuilder.build();
}
@Override
public HttpHost start() throws Exception {
return super.start(null, H1Config.DEFAULT);
}
@Test
public void testStatefulConnections() throws Exception {

View File

@ -0,0 +1,180 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.BasicTestAuthenticator;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HeaderElements;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.impl.HttpProcessors;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp1ClientAuthentication extends AbstractHttpAsyncClientAuthentication<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/1.1 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{URIScheme.HTTP},
{URIScheme.HTTPS},
});
}
protected HttpAsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setConnectionManager(connManager);
}
};
public TestHttp1ClientAuthentication(final URIScheme scheme) {
super(scheme, HttpVersion.HTTP_1_1);
}
@Override
void setDefaultAuthSchemeRegistry(final Lookup<AuthSchemeProvider> authSchemeRegistry) {
clientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
}
@Override
void setTargetAuthenticationStrategy(final AuthenticationStrategy targetAuthStrategy) {
clientBuilder.setTargetAuthenticationStrategy(targetAuthStrategy);
}
@Override
protected CloseableHttpAsyncClient createClient() throws Exception {
return clientBuilder.build();
}
@Test
public void testBasicAuthenticationSuccessNonPersistentConnection() throws Exception {
server.register("*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncEchoHandler();
}
});
final HttpHost target = start(
HttpProcessors.server(),
new Decorator<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler decorate(final AsyncServerExchangeHandler exchangeHandler) {
return new AuthenticatingAsyncDecorator(exchangeHandler, new BasicTestAuthenticator("test:test", "test realm")) {
@Override
protected void customizeUnauthorizedResponse(final HttpResponse unauthorized) {
unauthorized.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
}
};
}
},
H1Config.DEFAULT);
final TestCredentialsProvider credsProvider = new TestCredentialsProvider(
new UsernamePasswordCredentials("test", "test".toCharArray()));
final HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credsProvider);
final Future<SimpleHttpResponse> future = httpclient.execute(SimpleHttpRequest.get(target, "/"), context, null);
final HttpResponse response = future.get();
Assert.assertNotNull(response);
Assert.assertEquals(HttpStatus.SC_OK, response.getCode());
final AuthScope authscope = credsProvider.getAuthScope();
Assert.assertNotNull(authscope);
Assert.assertEquals("test realm", authscope.getRealm());
}
}

View File

@ -0,0 +1,88 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.Http2AsyncClientBuilder;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http2.config.H2Config;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp2Async extends AbstractHttpAsyncFundamentalsTest<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/2 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ URIScheme.HTTP },
{ URIScheme.HTTPS }
});
}
protected Http2AsyncClientBuilder clientBuilder;
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = Http2AsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()));
}
};
public TestHttp2Async(final URIScheme scheme) {
super(scheme);
}
@Override
protected CloseableHttpAsyncClient createClient() {
return clientBuilder.build();
}
@Override
public HttpHost start() throws Exception {
return super.start(null, H2Config.DEFAULT);
}
}

View File

@ -26,50 +26,22 @@
*/
package org.apache.hc.client5.testing.async;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.async.MinimalHttp2AsyncClient;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.testing.nio.Http2TestServer;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp2AsyncMinimal {
public static final Timeout TIMEOUT = Timeout.ofSeconds(30);
public class TestHttp2AsyncMinimal extends AbstractHttpAsyncFundamentalsTest<MinimalHttp2AsyncClient> {
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> protocols() {
@ -79,141 +51,22 @@ public class TestHttp2AsyncMinimal {
});
}
protected final URIScheme scheme;
public TestHttp2AsyncMinimal(final URIScheme scheme) {
this.scheme = scheme;
super(scheme);
}
protected Http2TestServer server;
protected MinimalHttp2AsyncClient httpclient;
@Rule
public ExternalResource serverResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
server = new Http2TestServer(
IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build(),
scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null);
server.register("/echo/*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncEchoHandler();
}
});
server.register("/random/*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncRandomHandler();
}
});
}
@Override
protected void after() {
if (server != null) {
server.shutdown(TimeValue.ofSeconds(5));
server = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build();
httpclient = HttpAsyncClients.createHttp2Minimal(
H2Config.DEFAULT, ioReactorConfig, new H2TlsStrategy(SSLTestContexts.createClientSSLContext()));
}
@Override
protected void after() {
if (httpclient != null) {
httpclient.shutdown(ShutdownType.GRACEFUL);
httpclient = null;
}
}
};
@Override
protected MinimalHttp2AsyncClient createClient() throws Exception {
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build();
return HttpAsyncClients.createHttp2Minimal(
H2Config.DEFAULT, ioReactorConfig, new H2TlsStrategy(SSLTestContexts.createClientSSLContext()));
}
@Override
public HttpHost start() throws Exception {
server.start(H2Config.DEFAULT);
final Future<ListenerEndpoint> endpointFuture = server.listen(new InetSocketAddress(0));
httpclient.start();
final ListenerEndpoint endpoint = endpointFuture.get();
final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
return new HttpHost("localhost", address.getPort(), scheme.name());
}
@Test
public void testSequenctialGetRequests() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.notNullValue());
Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
}
}
@Test
public void testSequenctialHeadRequests() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.head(target, "/random/2048"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.nullValue());
}
}
@Test
public void testConcurrentPostsOver() throws Exception {
final HttpHost target = start();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());
rnd.nextBytes(b1);
final int reqCount = 20;
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
for (int i = 0; i < reqCount; i++) {
final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
AsyncRequestBuilder.post(target, "/echo/")
.setEntity(b1, ContentType.APPLICATION_OCTET_STREAM)
.build(),
new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
queue.add(future);
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, byte[]>> future = queue.remove();
final Message<HttpResponse, byte[]> responseMessage = future.get();
Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
final HttpResponse response = responseMessage.getHead();
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final byte[] b2 = responseMessage.getBody();
Assert.assertThat(b1, CoreMatchers.equalTo(b2));
}
return super.start(null, H2Config.DEFAULT);
}
}

View File

@ -0,0 +1,88 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.Http2AsyncClientBuilder;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http2.config.H2Config;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp2AsyncRedirect extends AbstractHttpAsyncRedirectsTest<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/2 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ URIScheme.HTTP },
{ URIScheme.HTTPS }
});
}
protected Http2AsyncClientBuilder clientBuilder;
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = Http2AsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()));
}
};
public TestHttp2AsyncRedirect(final URIScheme scheme) {
super(scheme);
}
@Override
protected CloseableHttpAsyncClient createClient() {
return clientBuilder.build();
}
@Override
public HttpHost start() throws Exception {
return super.start(null, H2Config.DEFAULT);
}
}

View File

@ -0,0 +1,97 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import java.util.Arrays;
import java.util.Collection;
import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.Http2AsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.Lookup;
import org.junit.Rule;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttp2ClientAuthentication extends AbstractHttpAsyncClientAuthentication<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "HTTP/2 {0}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{URIScheme.HTTP},
{URIScheme.HTTPS},
});
}
protected Http2AsyncClientBuilder clientBuilder;
protected PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
clientBuilder = Http2AsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(TIMEOUT)
.setConnectTimeout(TIMEOUT)
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()));
}
};
public TestHttp2ClientAuthentication(final URIScheme scheme) {
super(scheme, HttpVersion.HTTP_2);
}
@Override
void setDefaultAuthSchemeRegistry(final Lookup<AuthSchemeProvider> authSchemeRegistry) {
clientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
}
@Override
void setTargetAuthenticationStrategy(final AuthenticationStrategy targetAuthStrategy) {
clientBuilder.setTargetAuthenticationStrategy(targetAuthStrategy);
}
@Override
protected CloseableHttpAsyncClient createClient() throws Exception {
return clientBuilder.build();
}
}

View File

@ -26,7 +26,6 @@
*/
package org.apache.hc.client5.testing.async;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
@ -36,8 +35,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hc.client5.http.async.methods.AsyncRequestBuilder;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.async.MinimalHttpAsyncClient;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
@ -45,7 +42,6 @@ import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBu
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
@ -54,31 +50,21 @@ import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.H1Config;
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
import org.apache.hc.core5.http.nio.AsyncServerExchangeHandler;
import org.apache.hc.core5.http.nio.BasicResponseConsumer;
import org.apache.hc.core5.http.nio.entity.BasicAsyncEntityConsumer;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.ListenerEndpoint;
import org.apache.hc.core5.testing.nio.Http2TestServer;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class TestHttpAsyncMinimal {
public class TestHttpAsyncMinimal extends AbstractHttpAsyncFundamentalsTest<MinimalHttpAsyncClient> {
public static final Timeout TIMEOUT = Timeout.ofSeconds(30);
@Parameterized.Parameters(name = "{0} {1}")
@Parameterized.Parameters(name = "Minimal {0} {1}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ HttpVersion.HTTP_1_1, URIScheme.HTTP },
@ -89,158 +75,40 @@ public class TestHttpAsyncMinimal {
}
protected final HttpVersion version;
protected final URIScheme scheme;
public TestHttpAsyncMinimal(final HttpVersion version, final URIScheme scheme) {
super(scheme);
this.version = version;
this.scheme = scheme;
}
protected Http2TestServer server;
protected MinimalHttpAsyncClient httpclient;
@Rule
public ExternalResource serverResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
server = new Http2TestServer(
IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build(),
scheme == URIScheme.HTTPS ? SSLTestContexts.createServerSSLContext() : null);
server.register("/echo/*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncEchoHandler();
}
});
server.register("/random/*", new Supplier<AsyncServerExchangeHandler>() {
@Override
public AsyncServerExchangeHandler get() {
return new AsyncRandomHandler();
}
});
@Override
protected MinimalHttpAsyncClient createClient() throws Exception {
final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build();
if (version.greaterEquals(HttpVersion.HTTP_2)) {
return HttpAsyncClients.createMinimal(
HttpVersionPolicy.FORCE_HTTP_2, H2Config.DEFAULT, H1Config.DEFAULT, ioReactorConfig, connectionManager);
} else {
return HttpAsyncClients.createMinimal(
HttpVersionPolicy.FORCE_HTTP_1, H2Config.DEFAULT, H1Config.DEFAULT, ioReactorConfig, connectionManager);
}
}
@Override
protected void after() {
if (server != null) {
server.shutdown(TimeValue.ofSeconds(5));
server = null;
}
}
};
@Rule
public ExternalResource clientResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new H2TlsStrategy(SSLTestContexts.createClientSSLContext()))
.build();
final IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setSoTimeout(TIMEOUT)
.build();
if (version.greaterEquals(HttpVersion.HTTP_2)) {
httpclient = HttpAsyncClients.createMinimal(
HttpVersionPolicy.FORCE_HTTP_2, H2Config.DEFAULT, H1Config.DEFAULT, ioReactorConfig, connectionManager);
} else {
httpclient = HttpAsyncClients.createMinimal(
HttpVersionPolicy.FORCE_HTTP_1, H2Config.DEFAULT, H1Config.DEFAULT, ioReactorConfig, connectionManager);
}
}
@Override
protected void after() {
if (httpclient != null) {
httpclient.shutdown(ShutdownType.GRACEFUL);
httpclient = null;
}
}
};
@Override
public HttpHost start() throws Exception {
if (version.greaterEquals(HttpVersion.HTTP_2)) {
server.start(H2Config.DEFAULT);
return super.start(null, H2Config.DEFAULT);
} else {
server.start(H1Config.DEFAULT);
}
final Future<ListenerEndpoint> endpointFuture = server.listen(new InetSocketAddress(0));
httpclient.start();
final ListenerEndpoint endpoint = endpointFuture.get();
final InetSocketAddress address = (InetSocketAddress) endpoint.getAddress();
return new HttpHost("localhost", address.getPort(), scheme.name());
}
@Test
public void testSequenctialGetRequests() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.get(target, "/random/2048"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.notNullValue());
Assert.assertThat(body.length(), CoreMatchers.equalTo(2048));
return super.start(null, H1Config.DEFAULT);
}
}
@Test
public void testSequenctialHeadRequests() throws Exception {
final HttpHost target = start();
for (int i = 0; i < 3; i++) {
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleHttpRequest.head(target, "/random/2048"), null);
final SimpleHttpResponse response = future.get();
Assert.assertThat(response, CoreMatchers.notNullValue());
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
Assert.assertThat(body, CoreMatchers.nullValue());
}
}
@Test
public void testConcurrentPostsOverMultipleConnections() throws Exception {
final HttpHost target = start();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());
rnd.nextBytes(b1);
final int reqCount = 20;
final Queue<Future<Message<HttpResponse, byte[]>>> queue = new LinkedList<>();
for (int i = 0; i < reqCount; i++) {
final Future<Message<HttpResponse, byte[]>> future = httpclient.execute(
AsyncRequestBuilder.post(target, "/echo/")
.setEntity(b1, ContentType.APPLICATION_OCTET_STREAM)
.build(),
new BasicResponseConsumer<>(new BasicAsyncEntityConsumer()), HttpClientContext.create(), null);
queue.add(future);
}
while (!queue.isEmpty()) {
final Future<Message<HttpResponse, byte[]>> future = queue.remove();
final Message<HttpResponse, byte[]> responseMessage = future.get();
Assert.assertThat(responseMessage, CoreMatchers.notNullValue());
final HttpResponse response = responseMessage.getHead();
Assert.assertThat(response.getCode(), CoreMatchers.equalTo(200));
final byte[] b2 = responseMessage.getBody();
Assert.assertThat(b1, CoreMatchers.equalTo(b2));
}
}
@Test
public void testConcurrentPostsOverSingleConnection() throws Exception {
public void testConcurrentPostRequestsSameEndpoint() throws Exception {
final HttpHost target = start();
final byte[] b1 = new byte[1024];
final Random rnd = new Random(System.currentTimeMillis());

View File

@ -0,0 +1,885 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.HttpRequestRetryHandler;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.auth.CredentialsProvider;
import org.apache.hc.client5.http.auth.KerberosConfig;
import org.apache.hc.client5.http.config.AuthSchemes;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.apache.hc.client5.http.cookie.CookieSpecProvider;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.ChainElements;
import org.apache.hc.client5.http.impl.CookieSpecRegistries;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.DefaultHttpRequestRetryHandler;
import org.apache.hc.client5.http.impl.DefaultRedirectStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.auth.BasicSchemeFactory;
import org.apache.hc.client5.http.impl.auth.CredSspSchemeFactory;
import org.apache.hc.client5.http.impl.auth.DigestSchemeFactory;
import org.apache.hc.client5.http.impl.auth.KerberosSchemeFactory;
import org.apache.hc.client5.http.impl.auth.NTLMSchemeFactory;
import org.apache.hc.client5.http.impl.auth.SPNegoSchemeFactory;
import org.apache.hc.client5.http.impl.auth.SystemDefaultCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.MultuhomeConnectionInitiator;
import org.apache.hc.client5.http.impl.routing.DefaultRoutePlanner;
import org.apache.hc.client5.http.protocol.RedirectStrategy;
import org.apache.hc.client5.http.protocol.RequestAddCookies;
import org.apache.hc.client5.http.protocol.RequestAuthCache;
import org.apache.hc.client5.http.protocol.RequestDefaultHeaders;
import org.apache.hc.client5.http.protocol.RequestExpectContinue;
import org.apache.hc.client5.http.protocol.ResponseProcessCookies;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.HttpResponseInterceptor;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.config.NamedElementChain;
import org.apache.hc.core5.http.config.RegistryBuilder;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.nio.command.ShutdownCommand;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.protocol.HttpProcessorBuilder;
import org.apache.hc.core5.http.protocol.RequestTargetHost;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.http2.protocol.H2RequestConnControl;
import org.apache.hc.core5.http2.protocol.H2RequestContent;
import org.apache.hc.core5.http2.protocol.H2RequestTargetHost;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.apache.hc.core5.util.VersionInfo;
/**
* Builder for HTTP/2 {@link CloseableHttpAsyncClient} instances.
* <p>
* When a particular component is not explicitly set this class will
* use its default implementation.
*
* @since 5.0
*/
public class Http2AsyncClientBuilder {
private static class RequestInterceptorEntry {
enum Postion { FIRST, LAST }
final RequestInterceptorEntry.Postion postion;
final HttpRequestInterceptor interceptor;
private RequestInterceptorEntry(final RequestInterceptorEntry.Postion postion, final HttpRequestInterceptor interceptor) {
this.postion = postion;
this.interceptor = interceptor;
}
}
private static class ResponseInterceptorEntry {
enum Postion { FIRST, LAST }
final ResponseInterceptorEntry.Postion postion;
final HttpResponseInterceptor interceptor;
private ResponseInterceptorEntry(final ResponseInterceptorEntry.Postion postion, final HttpResponseInterceptor interceptor) {
this.postion = postion;
this.interceptor = interceptor;
}
}
private static class ExecInterceptorEntry {
enum Postion { BEFORE, AFTER, REPLACE, FIRST, LAST }
final ExecInterceptorEntry.Postion postion;
final String name;
final AsyncExecChainHandler interceptor;
final String existing;
private ExecInterceptorEntry(
final ExecInterceptorEntry.Postion postion,
final String name,
final AsyncExecChainHandler interceptor,
final String existing) {
this.postion = postion;
this.name = name;
this.interceptor = interceptor;
this.existing = existing;
}
}
private IOReactorConfig ioReactorConfig;
private H2Config h2Config;
private CharCodingConfig charCodingConfig;
private SchemePortResolver schemePortResolver;
private AuthenticationStrategy targetAuthStrategy;
private AuthenticationStrategy proxyAuthStrategy;
private LinkedList<RequestInterceptorEntry> requestInterceptors;
private LinkedList<ResponseInterceptorEntry> responseInterceptors;
private LinkedList<ExecInterceptorEntry> execInterceptors;
private HttpRoutePlanner routePlanner;
private RedirectStrategy redirectStrategy;
private HttpRequestRetryHandler retryHandler;
private Lookup<AuthSchemeProvider> authSchemeRegistry;
private Lookup<CookieSpecProvider> cookieSpecRegistry;
private CookieStore cookieStore;
private CredentialsProvider credentialsProvider;
private String userAgent;
private Collection<? extends Header> defaultHeaders;
private RequestConfig defaultRequestConfig;
private boolean evictIdleConnections;
private TimeValue maxIdleTime;
private boolean systemProperties;
private boolean automaticRetriesDisabled;
private boolean redirectHandlingDisabled;
private boolean cookieManagementDisabled;
private boolean authCachingDisabled;
private DnsResolver dnsResolver;
private TlsStrategy tlsStrategy;
private ThreadFactory threadFactory;
private List<Closeable> closeables;
public static Http2AsyncClientBuilder create() {
return new Http2AsyncClientBuilder();
}
protected Http2AsyncClientBuilder() {
super();
}
/**
* Sets {@link H2Config} configuration.
*/
public final Http2AsyncClientBuilder setH2Config(final H2Config h2Config) {
this.h2Config = h2Config;
return this;
}
/**
* Sets {@link IOReactorConfig} configuration.
*/
public final Http2AsyncClientBuilder setIOReactorConfig(final IOReactorConfig ioReactorConfig) {
this.ioReactorConfig = ioReactorConfig;
return this;
}
/**
* Sets {@link CharCodingConfig} configuration.
*/
public final Http2AsyncClientBuilder setCharCodingConfig(final CharCodingConfig charCodingConfig) {
this.charCodingConfig = charCodingConfig;
return this;
}
/**
* Assigns {@link AuthenticationStrategy} instance for target
* host authentication.
*/
public final Http2AsyncClientBuilder setTargetAuthenticationStrategy(
final AuthenticationStrategy targetAuthStrategy) {
this.targetAuthStrategy = targetAuthStrategy;
return this;
}
/**
* Assigns {@link AuthenticationStrategy} instance for proxy
* authentication.
*/
public final Http2AsyncClientBuilder setProxyAuthenticationStrategy(
final AuthenticationStrategy proxyAuthStrategy) {
this.proxyAuthStrategy = proxyAuthStrategy;
return this;
}
/**
* Adds this protocol interceptor to the head of the protocol processing list.
*/
public final Http2AsyncClientBuilder addRequestInterceptorFirst(final HttpResponseInterceptor interceptor) {
Args.notNull(interceptor, "Interceptor");
if (responseInterceptors == null) {
responseInterceptors = new LinkedList<>();
}
responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.FIRST, interceptor));
return this;
}
/**
* Adds this protocol interceptor to the tail of the protocol processing list.
*/
public final Http2AsyncClientBuilder addResponseInterceptorLast(final HttpResponseInterceptor interceptor) {
Args.notNull(interceptor, "Interceptor");
if (responseInterceptors == null) {
responseInterceptors = new LinkedList<>();
}
responseInterceptors.add(new ResponseInterceptorEntry(ResponseInterceptorEntry.Postion.LAST, interceptor));
return this;
}
/**
* Adds this execution interceptor before an existing interceptor.
*/
public final Http2AsyncClientBuilder addExecInterceptorBefore(final String existing, final String name, final AsyncExecChainHandler interceptor) {
Args.notBlank(existing, "Existing");
Args.notBlank(name, "Name");
Args.notNull(interceptor, "Interceptor");
if (execInterceptors == null) {
execInterceptors = new LinkedList<>();
}
execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.BEFORE, name, interceptor, existing));
return this;
}
/**
* Adds this execution interceptor after interceptor with the given name.
*/
public final Http2AsyncClientBuilder addExecInterceptorAfter(final String existing, final String name, final AsyncExecChainHandler interceptor) {
Args.notBlank(existing, "Existing");
Args.notBlank(name, "Name");
Args.notNull(interceptor, "Interceptor");
if (execInterceptors == null) {
execInterceptors = new LinkedList<>();
}
execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.AFTER, name, interceptor, existing));
return this;
}
/**
* Replace an existing interceptor with the given name with new interceptor.
*/
public final Http2AsyncClientBuilder replaceExecInterceptor(final String existing, final AsyncExecChainHandler interceptor) {
Args.notBlank(existing, "Existing");
Args.notNull(interceptor, "Interceptor");
if (execInterceptors == null) {
execInterceptors = new LinkedList<>();
}
execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.REPLACE, existing, interceptor, existing));
return this;
}
/**
* Add an interceptor to the head of the processing list.
*/
public final Http2AsyncClientBuilder addExecInterceptorFirst(final String name, final AsyncExecChainHandler interceptor) {
Args.notNull(name, "Name");
Args.notNull(interceptor, "Interceptor");
execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.FIRST, name, interceptor, null));
return this;
}
/**
* Add an interceptor to the tail of the processing list.
*/
public final Http2AsyncClientBuilder addExecInterceptorLast(final String name, final AsyncExecChainHandler interceptor) {
Args.notNull(name, "Name");
Args.notNull(interceptor, "Interceptor");
execInterceptors.add(new ExecInterceptorEntry(ExecInterceptorEntry.Postion.LAST, name, interceptor, null));
return this;
}
/**
* Adds this protocol interceptor to the head of the protocol processing list.
*/
public final Http2AsyncClientBuilder addRequestInterceptorFirst(final HttpRequestInterceptor interceptor) {
Args.notNull(interceptor, "Interceptor");
if (requestInterceptors == null) {
requestInterceptors = new LinkedList<>();
}
requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.FIRST, interceptor));
return this;
}
/**
* Adds this protocol interceptor to the tail of the protocol processing list.
*/
public final Http2AsyncClientBuilder addResponseInterceptorLast(final HttpRequestInterceptor interceptor) {
Args.notNull(interceptor, "Interceptor");
if (requestInterceptors == null) {
requestInterceptors = new LinkedList<>();
}
requestInterceptors.add(new RequestInterceptorEntry(RequestInterceptorEntry.Postion.LAST, interceptor));
return this;
}
/**
* Assigns {@link HttpRequestRetryHandler} instance.
* <p>
* Please note this value can be overridden by the {@link #disableAutomaticRetries()}
* method.
*/
public final Http2AsyncClientBuilder setRetryHandler(final HttpRequestRetryHandler retryHandler) {
this.retryHandler = retryHandler;
return this;
}
/**
* Assigns {@link RedirectStrategy} instance.
* <p>
* Please note this value can be overridden by the {@link #disableRedirectHandling()}
* method.
* </p>
*/
public Http2AsyncClientBuilder setRedirectStrategy(final RedirectStrategy redirectStrategy) {
this.redirectStrategy = redirectStrategy;
return this;
}
/**
* Assigns {@link SchemePortResolver} instance.
*/
public final Http2AsyncClientBuilder setSchemePortResolver(final SchemePortResolver schemePortResolver) {
this.schemePortResolver = schemePortResolver;
return this;
}
/**
* Assigns {@link DnsResolver} instance.
*/
public final Http2AsyncClientBuilder setDnsResolver(final DnsResolver dnsResolver) {
this.dnsResolver = dnsResolver;
return this;
}
/**
* Assigns {@link TlsStrategy} instance.
*/
public final Http2AsyncClientBuilder setTlsStrategy(final TlsStrategy tlsStrategy) {
this.tlsStrategy = tlsStrategy;
return this;
}
/**
* Assigns {@link ThreadFactory} instance.
*/
public final Http2AsyncClientBuilder setThreadFactory(final ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* Assigns {@code User-Agent} value.
*/
public final Http2AsyncClientBuilder setUserAgent(final String userAgent) {
this.userAgent = userAgent;
return this;
}
/**
* Assigns default request header values.
*/
public final Http2AsyncClientBuilder setDefaultHeaders(final Collection<? extends Header> defaultHeaders) {
this.defaultHeaders = defaultHeaders;
return this;
}
/**
* Assigns {@link HttpRoutePlanner} instance.
*/
public final Http2AsyncClientBuilder setRoutePlanner(final HttpRoutePlanner routePlanner) {
this.routePlanner = routePlanner;
return this;
}
/**
* Assigns default {@link CredentialsProvider} instance which will be used
* for request execution if not explicitly set in the client execution
* context.
*/
public final Http2AsyncClientBuilder setDefaultCredentialsProvider(final CredentialsProvider credentialsProvider) {
this.credentialsProvider = credentialsProvider;
return this;
}
/**
* Assigns default {@link org.apache.hc.client5.http.auth.AuthScheme} registry which will
* be used for request execution if not explicitly set in the client execution
* context.
*/
public final Http2AsyncClientBuilder setDefaultAuthSchemeRegistry(final Lookup<AuthSchemeProvider> authSchemeRegistry) {
this.authSchemeRegistry = authSchemeRegistry;
return this;
}
/**
* Assigns default {@link org.apache.hc.client5.http.cookie.CookieSpec} registry
* which will be used for request execution if not explicitly set in the client
* execution context.
*/
public final Http2AsyncClientBuilder setDefaultCookieSpecRegistry(final Lookup<CookieSpecProvider> cookieSpecRegistry) {
this.cookieSpecRegistry = cookieSpecRegistry;
return this;
}
/**
* Assigns default {@link CookieStore} instance which will be used for
* request execution if not explicitly set in the client execution context.
*/
public final Http2AsyncClientBuilder setDefaultCookieStore(final CookieStore cookieStore) {
this.cookieStore = cookieStore;
return this;
}
/**
* Assigns default {@link RequestConfig} instance which will be used
* for request execution if not explicitly set in the client execution
* context.
*/
public final Http2AsyncClientBuilder setDefaultRequestConfig(final RequestConfig config) {
this.defaultRequestConfig = config;
return this;
}
/**
* Use system properties when creating and configuring default
* implementations.
*/
public final Http2AsyncClientBuilder useSystemProperties() {
this.systemProperties = true;
return this;
}
/**
* Disables automatic redirect handling.
*/
public final Http2AsyncClientBuilder disableRedirectHandling() {
redirectHandlingDisabled = true;
return this;
}
/**
* Disables automatic request recovery and re-execution.
*/
public final Http2AsyncClientBuilder disableAutomaticRetries() {
automaticRetriesDisabled = true;
return this;
}
/**
* Disables state (cookie) management.
*/
public final Http2AsyncClientBuilder disableCookieManagement() {
this.cookieManagementDisabled = true;
return this;
}
/**
* Disables authentication scheme caching.
*/
public final Http2AsyncClientBuilder disableAuthCaching() {
this.authCachingDisabled = true;
return this;
}
/**
* Makes this instance of HttpClient proactively evict idle connections from the
* connection pool using a background thread.
* <p>
* One MUST explicitly close HttpClient with {@link CloseableHttpAsyncClient#close()}
* in order to stop and release the background thread.
* <p>
* Please note this method has no effect if the instance of HttpClient is configuted to
* use a shared connection manager.
*
* @param maxIdleTime maximum time persistent connections can stay idle while kept alive
* in the connection pool. Connections whose inactivity period exceeds this value will
* get closed and evicted from the pool.
*/
public final Http2AsyncClientBuilder evictIdleConnections(final TimeValue maxIdleTime) {
this.evictIdleConnections = true;
this.maxIdleTime = maxIdleTime;
return this;
}
/**
* Request exec chain customization and extension.
* <p>
* For internal use.
*/
@Internal
protected void customizeExecChain(final NamedElementChain<AsyncExecChainHandler> execChainDefinition) {
}
/**
* Adds to the list of {@link Closeable} resources to be managed by the client.
* <p>
* For internal use.
*/
@Internal
protected void addCloseable(final Closeable closeable) {
if (closeable == null) {
return;
}
if (closeables == null) {
closeables = new ArrayList<>();
}
closeables.add(closeable);
}
public CloseableHttpAsyncClient build() {
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new Http2AsyncMainClientExec(),
ChainElements.MAIN_TRANSPORT.name());
AuthenticationStrategy targetAuthStrategyCopy = this.targetAuthStrategy;
if (targetAuthStrategyCopy == null) {
targetAuthStrategyCopy = DefaultAuthenticationStrategy.INSTANCE;
}
AuthenticationStrategy proxyAuthStrategyCopy = this.proxyAuthStrategy;
if (proxyAuthStrategyCopy == null) {
proxyAuthStrategyCopy = DefaultAuthenticationStrategy.INSTANCE;
}
String userAgentCopy = this.userAgent;
if (userAgentCopy == null) {
if (systemProperties) {
userAgentCopy = getProperty("http.agent", null);
}
if (userAgentCopy == null) {
userAgentCopy = VersionInfo.getSoftwareInfo("Apache-HttpAsyncClient",
"org.apache.hc.client5", getClass());
}
}
execChainDefinition.addFirst(
new AsyncConnectExec(
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy),
ChainElements.CONNECT.name());
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
if (entry.postion == RequestInterceptorEntry.Postion.FIRST) {
b.addFirst(entry.interceptor);
}
}
}
if (responseInterceptors != null) {
for (final ResponseInterceptorEntry entry: responseInterceptors) {
if (entry.postion == ResponseInterceptorEntry.Postion.FIRST) {
b.addFirst(entry.interceptor);
}
}
}
b.addAll(
new RequestDefaultHeaders(defaultHeaders),
new RequestUserAgent(userAgentCopy),
new RequestExpectContinue());
if (!cookieManagementDisabled) {
b.add(new RequestAddCookies());
}
if (!authCachingDisabled) {
b.add(new RequestAuthCache());
}
if (!cookieManagementDisabled) {
b.add(new ResponseProcessCookies());
}
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
if (entry.postion == RequestInterceptorEntry.Postion.LAST) {
b.addFirst(entry.interceptor);
}
}
}
if (responseInterceptors != null) {
for (final ResponseInterceptorEntry entry: responseInterceptors) {
if (entry.postion == ResponseInterceptorEntry.Postion.LAST) {
b.addFirst(entry.interceptor);
}
}
}
final HttpProcessor httpProcessor = b.build();
execChainDefinition.addFirst(
new AsyncProtocolExec(httpProcessor, targetAuthStrategyCopy, proxyAuthStrategyCopy),
ChainElements.PROTOCOL.name());
// Add request retry executor, if not disabled
if (!automaticRetriesDisabled) {
HttpRequestRetryHandler retryHandlerCopy = this.retryHandler;
if (retryHandlerCopy == null) {
retryHandlerCopy = DefaultHttpRequestRetryHandler.INSTANCE;
}
execChainDefinition.addFirst(
new AsyncRetryExec(retryHandlerCopy),
ChainElements.RETRY_IO_ERROR.name());
}
HttpRoutePlanner routePlannerCopy = this.routePlanner;
if (routePlannerCopy == null) {
SchemePortResolver schemePortResolverCopy = this.schemePortResolver;
if (schemePortResolverCopy == null) {
schemePortResolverCopy = DefaultSchemePortResolver.INSTANCE;
}
routePlannerCopy = new DefaultRoutePlanner(schemePortResolverCopy);
}
// Add redirect executor, if not disabled
if (!redirectHandlingDisabled) {
RedirectStrategy redirectStrategyCopy = this.redirectStrategy;
if (redirectStrategyCopy == null) {
redirectStrategyCopy = DefaultRedirectStrategy.INSTANCE;
}
execChainDefinition.addFirst(
new AsyncRedirectExec(routePlannerCopy, redirectStrategyCopy),
ChainElements.REDIRECT.name());
}
final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
final IOEventHandlerFactory ioEventHandlerFactory = new Http2AsyncClientEventHandlerFactory(
new DefaultHttpProcessor(new H2RequestContent(), new H2RequestTargetHost(), new H2RequestConnControl()),
new HandlerFactory<AsyncPushConsumer>() {
@Override
public AsyncPushConsumer create(final HttpRequest request, final HttpContext context) throws HttpException {
return pushConsumerRegistry.get(request);
}
},
h2Config != null ? h2Config : H2Config.DEFAULT,
charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT);
final DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(
ioEventHandlerFactory,
ioReactorConfig != null ? ioReactorConfig : IOReactorConfig.DEFAULT,
threadFactory != null ? threadFactory : new DefaultThreadFactory("httpclient-dispatch", true),
null,
null,
new Callback<IOSession>() {
@Override
public void execute(final IOSession ioSession) {
ioSession.addFirst(new ShutdownCommand(ShutdownType.GRACEFUL));
}
});
if (execInterceptors != null) {
for (final ExecInterceptorEntry entry: execInterceptors) {
switch (entry.postion) {
case AFTER:
execChainDefinition.addAfter(entry.existing, entry.interceptor, entry.name);
break;
case BEFORE:
execChainDefinition.addBefore(entry.existing, entry.interceptor, entry.name);
break;
case FIRST:
execChainDefinition.addFirst(entry.interceptor, entry.name);
break;
case LAST:
execChainDefinition.addLast(entry.interceptor, entry.name);
break;
}
}
}
customizeExecChain(execChainDefinition);
NamedElementChain<AsyncExecChainHandler>.Node current = execChainDefinition.getLast();
AsyncExecChainElement execChain = null;
while (current != null) {
execChain = new AsyncExecChainElement(current.getValue(), execChain);
current = current.getPrevious();
}
Lookup<AuthSchemeProvider> authSchemeRegistryCopy = this.authSchemeRegistry;
if (authSchemeRegistryCopy == null) {
authSchemeRegistryCopy = RegistryBuilder.<AuthSchemeProvider>create()
.register(AuthSchemes.BASIC, new BasicSchemeFactory())
.register(AuthSchemes.DIGEST, new DigestSchemeFactory())
.register(AuthSchemes.CREDSSP, new CredSspSchemeFactory())
.register(AuthSchemes.NTLM, new NTLMSchemeFactory())
.register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(KerberosConfig.DEFAULT, SystemDefaultDnsResolver.INSTANCE))
.register(AuthSchemes.KERBEROS, new KerberosSchemeFactory(KerberosConfig.DEFAULT, SystemDefaultDnsResolver.INSTANCE))
.build();
}
Lookup<CookieSpecProvider> cookieSpecRegistryCopy = this.cookieSpecRegistry;
if (cookieSpecRegistryCopy == null) {
cookieSpecRegistryCopy = CookieSpecRegistries.createDefault();
}
CookieStore cookieStoreCopy = this.cookieStore;
if (cookieStoreCopy == null) {
cookieStoreCopy = new BasicCookieStore();
}
CredentialsProvider credentialsProviderCopy = this.credentialsProvider;
if (credentialsProviderCopy == null) {
if (systemProperties) {
credentialsProviderCopy = new SystemDefaultCredentialsProvider();
} else {
credentialsProviderCopy = new BasicCredentialsProvider();
}
}
TlsStrategy tlsStrategyCopy = this.tlsStrategy;
if (tlsStrategyCopy == null) {
if (systemProperties) {
tlsStrategyCopy = H2TlsStrategy.getSystemDefault();
} else {
tlsStrategyCopy = H2TlsStrategy.getDefault();
}
}
final MultuhomeConnectionInitiator connectionInitiator = new MultuhomeConnectionInitiator(ioReactor, dnsResolver);
final H2ConnPool connPool = new H2ConnPool(connectionInitiator, new Resolver<HttpHost, InetSocketAddress>() {
@Override
public InetSocketAddress resolve(final HttpHost host) {
return null;
}
}, tlsStrategyCopy);
List<Closeable> closeablesCopy = closeables != null ? new ArrayList<>(closeables) : null;
if (closeablesCopy == null) {
closeablesCopy = new ArrayList<>(1);
}
if (evictIdleConnections) {
final IdleConnectionEvictor connectionEvictor = new IdleConnectionEvictor(connPool,
maxIdleTime != null ? maxIdleTime : TimeValue.ofSeconds(30L));
closeablesCopy.add(new Closeable() {
@Override
public void close() throws IOException {
connectionEvictor.shutdown();
}
});
connectionEvictor.start();
}
closeablesCopy.add(connPool);
return new InternalHttp2AsyncClient(
ioReactor,
execChain,
pushConsumerRegistry,
threadFactory != null ? threadFactory : new DefaultThreadFactory("httpclient-main", true),
connPool,
routePlannerCopy,
cookieSpecRegistryCopy,
authSchemeRegistryCopy,
cookieStoreCopy,
credentialsProviderCopy,
defaultRequestConfig,
closeablesCopy);
}
private static String getProperty(final String key, final String defaultValue) {
return AccessController.doPrivileged(new PrivilegedAction<String>() {
@Override
public String run() {
return System.getProperty(key, defaultValue);
}
});
}
static class IdleConnectionEvictor implements Closeable {
private final Thread thread;
public IdleConnectionEvictor(final H2ConnPool connPool, final TimeValue maxIdleTime) {
this.thread = new DefaultThreadFactory("idle-connection-evictor", true).newThread(new Runnable() {
@Override
public void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(maxIdleTime.toMillis());
connPool.closeIdle(maxIdleTime);
}
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
} catch (final Exception ex) {
}
}
});
}
public void start() {
thread.start();
}
public void shutdown() {
thread.interrupt();
}
@Override
public void close() throws IOException {
shutdown();
}
}
}

View File

@ -0,0 +1,194 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.IOException;
import java.util.List;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.config.CharCodingConfig;
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http2.config.H2Config;
import org.apache.hc.core5.http2.frame.FramePrinter;
import org.apache.hc.core5.http2.frame.RawFrame;
import org.apache.hc.core5.http2.impl.nio.ClientHttp2StreamMultiplexerFactory;
import org.apache.hc.core5.http2.impl.nio.Http2OnlyClientProtocolNegotiator;
import org.apache.hc.core5.http2.impl.nio.Http2StreamListener;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
import org.apache.hc.core5.reactor.TlsCapableIOSession;
import org.apache.hc.core5.util.Args;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
class Http2AsyncClientEventHandlerFactory implements IOEventHandlerFactory {
private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");
private final Logger headerLog = LogManager.getLogger("org.apache.hc.client5.http.headers");
private final Logger frameLog = LogManager.getLogger("org.apache.hc.client5.http2.frame");
private final Logger framePayloadLog = LogManager.getLogger("org.apache.hc.client5.http2.frame.payload");
private final Logger flowCtrlLog = LogManager.getLogger("org.apache.hc.client5.http2.flow");
private final HttpProcessor httpProcessor;
private final HandlerFactory<AsyncPushConsumer> exchangeHandlerFactory;
private final H2Config h2Config;
private final CharCodingConfig charCodingConfig;
Http2AsyncClientEventHandlerFactory(
final HttpProcessor httpProcessor,
final HandlerFactory<AsyncPushConsumer> exchangeHandlerFactory,
final H2Config h2Config,
final CharCodingConfig charCodingConfig) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP processor");
this.exchangeHandlerFactory = exchangeHandlerFactory;
this.h2Config = h2Config != null ? h2Config : H2Config.DEFAULT;
this.charCodingConfig = charCodingConfig != null ? charCodingConfig : CharCodingConfig.DEFAULT;
}
@Override
public IOEventHandler createHandler(final TlsCapableIOSession ioSession, final Object attachment) {
final Logger sessionLog = LogManager.getLogger(ioSession.getClass());
if (sessionLog.isDebugEnabled()
|| wireLog.isDebugEnabled()
|| headerLog.isDebugEnabled()
|| frameLog.isDebugEnabled()
|| framePayloadLog.isDebugEnabled()
|| flowCtrlLog.isDebugEnabled()) {
final String id = ConnPoolSupport.getId(ioSession);
final ClientHttp2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientHttp2StreamMultiplexerFactory(
httpProcessor,
exchangeHandlerFactory,
h2Config,
charCodingConfig,
new Http2StreamListener() {
final FramePrinter framePrinter = new FramePrinter();
private void logFrameInfo(final String prefix, final RawFrame frame) {
try {
final LogAppendable logAppendable = new LogAppendable(frameLog, prefix);
framePrinter.printFrameInfo(frame, logAppendable);
logAppendable.flush();
} catch (final IOException ignore) {
}
}
private void logFramePayload(final String prefix, final RawFrame frame) {
try {
final LogAppendable logAppendable = new LogAppendable(framePayloadLog, prefix);
framePrinter.printPayload(frame, logAppendable);
logAppendable.flush();
} catch (final IOException ignore) {
}
}
private void logFlowControl(final String prefix, final int streamId, final int delta, final int actualSize) {
final StringBuilder buffer = new StringBuilder();
buffer.append(prefix).append(" stream ").append(streamId).append(" flow control " )
.append(delta).append(" -> ")
.append(actualSize);
flowCtrlLog.debug(buffer.toString());
}
@Override
public void onHeaderInput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
if (headerLog.isDebugEnabled()) {
for (int i = 0; i < headers.size(); i++) {
headerLog.debug(id + " << " + headers.get(i));
}
}
}
@Override
public void onHeaderOutput(final HttpConnection connection, final int streamId, final List<? extends Header> headers) {
if (headerLog.isDebugEnabled()) {
for (int i = 0; i < headers.size(); i++) {
headerLog.debug(id + " >> " + headers.get(i));
}
}
}
@Override
public void onFrameInput(final HttpConnection connection, final int streamId, final RawFrame frame) {
if (frameLog.isDebugEnabled()) {
logFrameInfo(id + " <<", frame);
}
if (framePayloadLog.isDebugEnabled()) {
logFramePayload(id + " <<", frame);
}
}
@Override
public void onFrameOutput(final HttpConnection connection, final int streamId, final RawFrame frame) {
if (frameLog.isDebugEnabled()) {
logFrameInfo(id + " >>", frame);
}
if (framePayloadLog.isDebugEnabled()) {
logFramePayload(id + " >>", frame);
}
}
@Override
public void onInputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
if (flowCtrlLog.isDebugEnabled()) {
logFlowControl(id + " <<", streamId, delta, actualSize);
}
}
@Override
public void onOutputFlowControl(final HttpConnection connection, final int streamId, final int delta, final int actualSize) {
if (flowCtrlLog.isDebugEnabled()) {
logFlowControl(id + " >>", streamId, delta, actualSize);
}
}
});
final LoggingIOSession loggingIOSession = new LoggingIOSession(ioSession, id, sessionLog, wireLog);
return new Http2OnlyClientProtocolNegotiator(loggingIOSession, http2StreamHandlerFactory);
} else {
final ClientHttp2StreamMultiplexerFactory http2StreamHandlerFactory = new ClientHttp2StreamMultiplexerFactory(
httpProcessor,
exchangeHandlerFactory,
h2Config,
charCodingConfig,
null);
return new Http2OnlyClientProtocolNegotiator(ioSession, http2StreamHandlerFactory);
}
}
}

View File

@ -0,0 +1,170 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class Http2AsyncMainClientExec implements AsyncExecChainHandler {
private final Logger log = LogManager.getLogger(getClass());
@Override
public void execute(
final HttpRequest request,
final AsyncEntityProducer entityProducer,
final AsyncExecChain.Scope scope,
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
final String exchangeId = scope.exchangeId;
final HttpRoute route = scope.route;
final HttpClientContext clientContext = scope.clientContext;
final AsyncExecRuntime execRuntime = scope.execRuntime;
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": executing " + new RequestLine(request));
}
final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() {
private final AtomicReference<AsyncDataConsumer> entityConsumerRef = new AtomicReference<>(null);
@Override
public void releaseResources() {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
entityConsumer.releaseResources();
}
}
@Override
public void failed(final Exception cause) {
execRuntime.markConnectionNonReusable();
asyncExecCallback.failed(cause);
}
@Override
public void cancel() {
failed(new InterruptedIOException());
}
@Override
public void produceRequest(final RequestChannel channel) throws HttpException, IOException {
channel.sendRequest(request, entityProducer);
}
@Override
public int available() {
return entityProducer.available();
}
@Override
public void produce(final DataStreamChannel channel) throws IOException {
entityProducer.produce(channel);
}
@Override
public void consumeInformation(final HttpResponse response) throws HttpException, IOException {
}
@Override
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails) throws HttpException, IOException {
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
if (entityDetails == null) {
execRuntime.validateConnection();
asyncExecCallback.completed();
}
}
@Override
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
if (entityConsumer != null) {
entityConsumer.updateCapacity(capacityChannel);
} else {
capacityChannel.update(Integer.MAX_VALUE);
}
}
@Override
public int consume(final ByteBuffer src) throws IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.get();
if (entityConsumer != null) {
return entityConsumer.consume(src);
} else {
return Integer.MAX_VALUE;
}
}
@Override
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null);
if (entityConsumer != null) {
entityConsumer.streamEnd(trailers);
} else {
execRuntime.validateConnection();
}
asyncExecCallback.completed();
}
};
if (log.isDebugEnabled()) {
execRuntime.execute(
new LoggingAsyncClientExchangeHandler(log, exchangeId, internalExchangeHandler),
clientContext);
} else {
execRuntime.execute(
internalExchangeHandler, clientContext);
}
}
}

View File

@ -736,7 +736,7 @@ public class HttpAsyncClientBuilder {
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new AsyncMainClientExec(keepAliveStrategyCopy, userTokenHandlerCopy),
new HttpAsyncMainClientExec(keepAliveStrategyCopy, userTokenHandlerCopy),
ChainElements.MAIN_TRANSPORT.name());
AuthenticationStrategy targetAuthStrategyCopy = this.targetAuthStrategy;

View File

@ -71,7 +71,7 @@ import org.apache.logging.log4j.Logger;
* @since 5.0
*/
@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class HttpAsyncClientEventHandlerFactory implements IOEventHandlerFactory {
class HttpAsyncClientEventHandlerFactory implements IOEventHandlerFactory {
private final Logger streamLog = LogManager.getLogger(InternalHttpAsyncClient.class);
private final Logger wireLog = LogManager.getLogger("org.apache.hc.client5.http.wire");

View File

@ -88,6 +88,31 @@ public class HttpAsyncClients {
return HttpAsyncClientBuilder.create().useSystemProperties().build();
}
/**
* Creates builder object for construction of custom HTTP/2
* {@link CloseableHttpAsyncClient} instances optimized for HTTP/2 protocol
* and message multiplexing
*/
public static Http2AsyncClientBuilder customHttp2() {
return Http2AsyncClientBuilder.create();
}
/**
* Creates HTTP/2 {@link CloseableHttpAsyncClient} instance with default configuration
* optimized for HTTP/2 protocol and message multiplexing.
*/
public static CloseableHttpAsyncClient createHttp2Default() {
return Http2AsyncClientBuilder.create().build();
}
/**
* Creates HTTP/2 {@link CloseableHttpAsyncClient} instance with default configuration and
* system properties optimized for HTTP/2 protocol and message multiplexing.
*/
public static CloseableHttpAsyncClient createHttp2System() {
return Http2AsyncClientBuilder.create().useSystemProperties().build();
}
private static HttpProcessor createMinimalProtocolProcessor() {
return new DefaultHttpProcessor(
new H2RequestContent(),
@ -220,7 +245,7 @@ public class HttpAsyncClients {
final TlsStrategy tlsStrategy) {
final AsyncPushConsumerRegistry pushConsumerRegistry = new AsyncPushConsumerRegistry();
return createMinimalHttp2AsyncClientImpl(
new HttpAsyncClientEventHandlerFactory(
new Http2AsyncClientEventHandlerFactory(
createMinimalProtocolProcessor(),
new HandlerFactory<AsyncPushConsumer>() {
@ -230,11 +255,8 @@ public class HttpAsyncClients {
}
},
HttpVersionPolicy.FORCE_HTTP_2,
h2Config,
null,
CharCodingConfig.DEFAULT,
DefaultConnectionReuseStrategy.INSTANCE),
CharCodingConfig.DEFAULT),
pushConsumerRegistry,
ioReactorConfig,
dnsResolver,

View File

@ -58,14 +58,14 @@ import org.apache.hc.core5.util.TimeValue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
class AsyncMainClientExec implements AsyncExecChainHandler {
class HttpAsyncMainClientExec implements AsyncExecChainHandler {
private final Logger log = LogManager.getLogger(getClass());
private final ConnectionKeepAliveStrategy keepAliveStrategy;
private final UserTokenHandler userTokenHandler;
AsyncMainClientExec(final ConnectionKeepAliveStrategy keepAliveStrategy, final UserTokenHandler userTokenHandler) {
HttpAsyncMainClientExec(final ConnectionKeepAliveStrategy keepAliveStrategy, final UserTokenHandler userTokenHandler) {
this.keepAliveStrategy = keepAliveStrategy;
this.userTokenHandler = userTokenHandler;
}

View File

@ -0,0 +1,308 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.auth.CredentialsProvider;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.CookieSpecProvider;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.impl.RequestCopier;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBase {
private final AsyncExecChainElement execChain;
private final Lookup<CookieSpecProvider> cookieSpecRegistry;
private final Lookup<AuthSchemeProvider> authSchemeRegistry;
private final CookieStore cookieStore;
private final CredentialsProvider credentialsProvider;
private final RequestConfig defaultConfig;
private final List<Closeable> closeables;
InternalAbstractHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final ThreadFactory threadFactory,
final AsyncExecChainElement execChain,
final Lookup<CookieSpecProvider> cookieSpecRegistry,
final Lookup<AuthSchemeProvider> authSchemeRegistry,
final CookieStore cookieStore,
final CredentialsProvider credentialsProvider,
final RequestConfig defaultConfig,
final List<Closeable> closeables) {
super(ioReactor, pushConsumerRegistry, threadFactory);
this.execChain = execChain;
this.cookieSpecRegistry = cookieSpecRegistry;
this.authSchemeRegistry = authSchemeRegistry;
this.cookieStore = cookieStore;
this.credentialsProvider = credentialsProvider;
this.defaultConfig = defaultConfig;
this.closeables = closeables;
}
@Override
public void close() {
super.close();
if (closeables != null) {
for (final Closeable closeable: closeables) {
try {
closeable.close();
} catch (final IOException ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
private void setupContext(final HttpClientContext context) {
if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
}
if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
}
if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
}
if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
}
if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
}
}
abstract AsyncExecRuntime crerateAsyncExecRuntime();
abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException;
@Override
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
final BasicFuture<T> future = new BasicFuture<>(callback);
try {
final HttpClientContext clientContext = HttpClientContext.adapt(context);
requestProducer.sendRequest(new RequestChannel() {
@Override
public void sendRequest(
final HttpRequest request,
final EntityDetails entityDetails) throws HttpException, IOException {
RequestConfig requestConfig = null;
if (request instanceof Configurable) {
requestConfig = ((Configurable) request).getConfig();
}
if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig);
}
final HttpRoute route = determineRoute(request, clientContext);
final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
final AsyncExecRuntime execRuntime = crerateAsyncExecRuntime();
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": preparing request execution");
}
setupContext(clientContext);
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
final AtomicReference<T> resultRef = new AtomicReference<>(null);
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
execChain.execute(
RequestCopier.INSTANCE.copy(request),
entityDetails != null ? new AsyncEntityProducer() {
@Override
public void releaseResources() {
requestProducer.releaseResources();
}
@Override
public void failed(final Exception cause) {
requestProducer.failed(cause);
}
@Override
public boolean isRepeatable() {
//TODO: use AsyncRequestProducer#isRepeatable once available
return requestProducer instanceof SimpleRequestProducer;
}
@Override
public long getContentLength() {
return entityDetails.getContentLength();
}
@Override
public String getContentType() {
return entityDetails.getContentType();
}
@Override
public String getContentEncoding() {
return entityDetails.getContentEncoding();
}
@Override
public boolean isChunked() {
return entityDetails.isChunked();
}
@Override
public Set<String> getTrailerNames() {
return entityDetails.getTrailerNames();
}
@Override
public int available() {
return requestProducer.available();
}
@Override
public void produce(final DataStreamChannel channel) throws IOException {
if (outputTerminated.get()) {
channel.endStream();
return;
}
requestProducer.produce(channel);
}
} : null,
scope,
new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
outputTerminated.set(true);
requestProducer.releaseResources();
}
responseConsumer.consumeResponse(response, entityDetails,
//TODO: eliminate this callback after upgrade to HttpCore 5.0b2
new FutureCallback<T>() {
@Override
public void completed(final T result) {
resultRef.set(result);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
});
return responseConsumer;
}
@Override
public void completed() {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": message exchange successfully completed");
}
try {
execRuntime.releaseConnection();
future.completed(resultRef.getAndSet(null));
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
}
}
@Override
public void failed(final Exception cause) {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": request failed: " + cause.getMessage());
}
try {
execRuntime.discardConnection();
responseConsumer.failed(cause);
} finally {
try {
future.failed(cause);
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
}
}
}
});
}
});
} catch (final HttpException | IOException ex) {
future.failed(ex);
}
return future;
}
}

View File

@ -0,0 +1,88 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.auth.CredentialsProvider;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.CookieSpecProvider;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
class InternalHttp2AsyncClient extends InternalAbstractHttpAsyncClient {
private final HttpRoutePlanner routePlanner;
private final H2ConnPool connPool;
InternalHttp2AsyncClient(
final DefaultConnectingIOReactor ioReactor,
final AsyncExecChainElement execChain,
final AsyncPushConsumerRegistry pushConsumerRegistry,
final ThreadFactory threadFactory,
final H2ConnPool connPool,
final HttpRoutePlanner routePlanner,
final Lookup<CookieSpecProvider> cookieSpecRegistry,
final Lookup<AuthSchemeProvider> authSchemeRegistry,
final CookieStore cookieStore,
final CredentialsProvider credentialsProvider,
final RequestConfig defaultConfig,
final List<Closeable> closeables) {
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, defaultConfig, closeables);
this.connPool = connPool;
this.routePlanner = routePlanner;
}
@Override
AsyncExecRuntime crerateAsyncExecRuntime() {
return new InternalHttp2AsyncExecRuntime(log, connPool);
}
@Override
HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
if (route.isTunnelled()) {
throw new HttpException("HTTP/2 tunneling not supported");
}
return route;
}
}

View File

@ -0,0 +1,245 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.async;
import java.io.InterruptedIOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.command.ExecutionCommand;
import org.apache.hc.core5.http2.nio.pool.H2ConnPool;
import org.apache.hc.core5.io.ShutdownType;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.TimeValue;
import org.apache.logging.log4j.Logger;
class InternalHttp2AsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final H2ConnPool connPool;
private final AtomicReference<Endpoint> sessionRef;
private volatile boolean reusable;
InternalHttp2AsyncExecRuntime(final Logger log, final H2ConnPool connPool) {
super();
this.log = log;
this.connPool = connPool;
this.sessionRef = new AtomicReference<>(null);
}
@Override
public boolean isConnectionAcquired() {
return sessionRef.get() != null;
}
@Override
public void acquireConnection(
final HttpRoute route,
final Object object,
final HttpClientContext context,
final FutureCallback<AsyncExecRuntime> callback) {
if (sessionRef.get() == null) {
final HttpHost target = route.getTargetHost();
final RequestConfig requestConfig = context.getRequestConfig();
connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
@Override
public void completed(final IOSession ioSession) {
sessionRef.set(new Endpoint(target, ioSession));
reusable = true;
callback.completed(InternalHttp2AsyncExecRuntime.this);
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
} else {
callback.completed(this);
}
}
@Override
public void releaseConnection() {
final Endpoint endpoint = sessionRef.getAndSet(null);
if (endpoint != null && !reusable) {
endpoint.session.shutdown(ShutdownType.GRACEFUL);
}
}
@Override
public void discardConnection() {
final Endpoint endpoint = sessionRef.getAndSet(null);
if (endpoint != null) {
endpoint.session.shutdown(ShutdownType.GRACEFUL);
}
}
@Override
public boolean validateConnection() {
if (reusable) {
final Endpoint endpoint = sessionRef.get();
return endpoint != null && !endpoint.session.isClosed();
} else {
final Endpoint endpoint = sessionRef.getAndSet(null);
if (endpoint != null) {
endpoint.session.shutdown(ShutdownType.GRACEFUL);
}
}
return false;
}
@Override
public boolean isConnected() {
final Endpoint endpoint = sessionRef.get();
return endpoint != null && !endpoint.session.isClosed();
}
Endpoint ensureValid() {
final Endpoint endpoint = sessionRef.get();
if (endpoint == null) {
throw new IllegalStateException("I/O session not acquired / already released");
}
return endpoint;
}
@Override
public void connect(
final HttpClientContext context,
final FutureCallback<AsyncExecRuntime> callback) {
final Endpoint endpoint = ensureValid();
if (!endpoint.session.isClosed()) {
callback.completed(this);
} else {
final HttpHost target = endpoint.target;
final RequestConfig requestConfig = context.getRequestConfig();
connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
@Override
public void completed(final IOSession ioSession) {
sessionRef.set(new Endpoint(target, ioSession));
reusable = true;
callback.completed(InternalHttp2AsyncExecRuntime.this);
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
}
});
}
}
@Override
public void upgradeTls(final HttpClientContext context) {
throw new UnsupportedOperationException();
}
@Override
public void execute(final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
final Endpoint endpoint = ensureValid();
final IOSession session = endpoint.session;
if (!session.isClosed()) {
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.addLast(new ExecutionCommand(exchangeHandler, context));
} else {
final HttpHost target = endpoint.target;
final RequestConfig requestConfig = context.getRequestConfig();
connPool.getSession(target, requestConfig.getConnectTimeout(), new FutureCallback<IOSession>() {
@Override
public void completed(final IOSession ioSession) {
sessionRef.set(new Endpoint(target, ioSession));
reusable = true;
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": executing " + ConnPoolSupport.getId(exchangeHandler));
}
session.addLast(new ExecutionCommand(exchangeHandler, context));
}
@Override
public void failed(final Exception ex) {
exchangeHandler.failed(ex);
}
@Override
public void cancelled() {
exchangeHandler.failed(new InterruptedIOException());
}
});
}
}
@Override
public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
throw new UnsupportedOperationException();
}
@Override
public void markConnectionNonReusable() {
reusable = false;
}
static class Endpoint {
final HttpHost target;
final IOSession session;
Endpoint(final HttpHost target, final IOSession session) {
this.target = target;
this.session = session;
}
}
}

View File

@ -27,63 +27,33 @@
package org.apache.hc.client5.http.impl.async;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecRuntime;
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
import org.apache.hc.client5.http.auth.AuthSchemeProvider;
import org.apache.hc.client5.http.auth.CredentialsProvider;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.cookie.CookieSpecProvider;
import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.impl.RequestCopier;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.reactor.DefaultConnectingIOReactor;
class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClient {
private final AsyncClientConnectionManager connmgr;
private final AsyncExecChainElement execChain;
private final HttpRoutePlanner routePlanner;
private final HttpVersionPolicy versionPolicy;
private final Lookup<CookieSpecProvider> cookieSpecRegistry;
private final Lookup<AuthSchemeProvider> authSchemeRegistry;
private final CookieStore cookieStore;
private final CredentialsProvider credentialsProvider;
private final RequestConfig defaultConfig;
private final List<Closeable> closeables;
InternalHttpAsyncClient(
final DefaultConnectingIOReactor ioReactor,
@ -99,227 +69,27 @@ class InternalHttpAsyncClient extends AbstractHttpAsyncClientBase {
final CredentialsProvider credentialsProvider,
final RequestConfig defaultConfig,
final List<Closeable> closeables) {
super(ioReactor, pushConsumerRegistry, threadFactory);
super(ioReactor, pushConsumerRegistry, threadFactory, execChain,
cookieSpecRegistry, authSchemeRegistry, cookieStore, credentialsProvider, defaultConfig, closeables);
this.connmgr = connmgr;
this.execChain = execChain;
this.routePlanner = routePlanner;
this.versionPolicy = versionPolicy;
this.cookieSpecRegistry = cookieSpecRegistry;
this.authSchemeRegistry = authSchemeRegistry;
this.cookieStore = cookieStore;
this.credentialsProvider = credentialsProvider;
this.defaultConfig = defaultConfig;
this.closeables = closeables;
}
@Override
public void close() {
super.close();
if (closeables != null) {
for (final Closeable closeable: closeables) {
try {
closeable.close();
} catch (final IOException ex) {
log.error(ex.getMessage(), ex);
}
}
}
}
private void setupContext(final HttpClientContext context) {
if (context.getAttribute(HttpClientContext.AUTHSCHEME_REGISTRY) == null) {
context.setAttribute(HttpClientContext.AUTHSCHEME_REGISTRY, authSchemeRegistry);
}
if (context.getAttribute(HttpClientContext.COOKIESPEC_REGISTRY) == null) {
context.setAttribute(HttpClientContext.COOKIESPEC_REGISTRY, cookieSpecRegistry);
}
if (context.getAttribute(HttpClientContext.COOKIE_STORE) == null) {
context.setAttribute(HttpClientContext.COOKIE_STORE, cookieStore);
}
if (context.getAttribute(HttpClientContext.CREDS_PROVIDER) == null) {
context.setAttribute(HttpClientContext.CREDS_PROVIDER, credentialsProvider);
}
if (context.getAttribute(HttpClientContext.REQUEST_CONFIG) == null) {
context.setAttribute(HttpClientContext.REQUEST_CONFIG, defaultConfig);
}
AsyncExecRuntime crerateAsyncExecRuntime() {
return new InternalHttpAsyncExecRuntime(log, connmgr, getConnectionInitiator(), versionPolicy);
}
@Override
public <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context,
final FutureCallback<T> callback) {
ensureRunning();
final BasicFuture<T> future = new BasicFuture<>(callback);
try {
final HttpClientContext clientContext = HttpClientContext.adapt(context);
requestProducer.sendRequest(new RequestChannel() {
@Override
public void sendRequest(
final HttpRequest request,
final EntityDetails entityDetails) throws HttpException, IOException {
RequestConfig requestConfig = null;
if (request instanceof Configurable) {
requestConfig = ((Configurable) request).getConfig();
}
if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig);
}
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
final String exchangeId = String.format("ex-%08X", ExecSupport.getNextExecNumber());
final AsyncExecRuntime execRuntime = new AsyncExecRuntimeImpl(log, connmgr, getConnectionInitiator(), versionPolicy);
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": preparing request execution");
}
final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
throw new HttpException("HTTP/2 tunneling not supported");
}
setupContext(clientContext);
final AsyncExecChain.Scope scope = new AsyncExecChain.Scope(exchangeId, route, request, clientContext, execRuntime);
final AtomicReference<T> resultRef = new AtomicReference<>(null);
final AtomicBoolean outputTerminated = new AtomicBoolean(false);
execChain.execute(
RequestCopier.INSTANCE.copy(request),
entityDetails != null ? new AsyncEntityProducer() {
@Override
public void releaseResources() {
requestProducer.releaseResources();
}
@Override
public void failed(final Exception cause) {
requestProducer.failed(cause);
}
@Override
public boolean isRepeatable() {
//TODO: use AsyncRequestProducer#isRepeatable once available
return requestProducer instanceof SimpleRequestProducer;
}
@Override
public long getContentLength() {
return entityDetails.getContentLength();
}
@Override
public String getContentType() {
return entityDetails.getContentType();
}
@Override
public String getContentEncoding() {
return entityDetails.getContentEncoding();
}
@Override
public boolean isChunked() {
return entityDetails.isChunked();
}
@Override
public Set<String> getTrailerNames() {
return entityDetails.getTrailerNames();
}
@Override
public int available() {
return requestProducer.available();
}
@Override
public void produce(final DataStreamChannel channel) throws IOException {
if (outputTerminated.get()) {
channel.endStream();
return;
}
requestProducer.produce(channel);
}
} : null,
scope,
new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
outputTerminated.set(true);
requestProducer.releaseResources();
}
responseConsumer.consumeResponse(response, entityDetails,
//TODO: eliminate this callback after upgrade to HttpCore 5.0b2
new FutureCallback<T>() {
@Override
public void completed(final T result) {
resultRef.set(result);
}
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void cancelled() {
future.cancel();
}
});
return responseConsumer;
}
@Override
public void completed() {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": message exchange successfully completed");
}
try {
execRuntime.releaseConnection();
future.completed(resultRef.getAndSet(null));
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
}
}
@Override
public void failed(final Exception cause) {
if (log.isDebugEnabled()) {
log.debug(exchangeId + ": request failed: " + cause.getMessage());
}
try {
execRuntime.discardConnection();
responseConsumer.failed(cause);
} finally {
try {
future.failed(cause);
} finally {
responseConsumer.releaseResources();
requestProducer.releaseResources();
}
}
}
});
}
});
} catch (final HttpException | IOException ex) {
future.failed(ex);
HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException {
final HttpHost target = routePlanner.determineTargetHost(request, clientContext);
final HttpRoute route = routePlanner.determineRoute(target, clientContext);
final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
throw new HttpException("HTTP/2 tunneling not supported");
}
return future;
return route;
}
}

View File

@ -45,7 +45,7 @@ import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.util.TimeValue;
import org.apache.logging.log4j.Logger;
class AsyncExecRuntimeImpl implements AsyncExecRuntime {
class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
private final Logger log;
private final AsyncClientConnectionManager manager;
@ -56,7 +56,7 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
private volatile Object state;
private volatile TimeValue validDuration;
AsyncExecRuntimeImpl(
InternalHttpAsyncExecRuntime(
final Logger log,
final AsyncClientConnectionManager manager,
final ConnectionInitiator connectionInitiator,
@ -90,7 +90,7 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
endpointRef.set(connectionEndpoint);
reusable = connectionEndpoint.isConnected();
callback.completed(AsyncExecRuntimeImpl.this);
callback.completed(InternalHttpAsyncExecRuntime.this);
}
@Override
@ -197,7 +197,7 @@ class AsyncExecRuntimeImpl implements AsyncExecRuntime {
if (TimeValue.isPositive(socketTimeout)) {
endpoint.setSocketTimeout(socketTimeout.toMillisIntBound());
}
callback.completed(AsyncExecRuntimeImpl.this);
callback.completed(InternalHttpAsyncExecRuntime.this);
}
@Override