Minimal HttpAsyncClient to resolve default protocol scheme port when leasing endpoints

This commit is contained in:
Oleg Kalnichevski 2018-03-13 16:43:44 +01:00
parent 56cc24525e
commit 70ee2d4912
2 changed files with 18 additions and 6 deletions

View File

@ -28,7 +28,9 @@
package org.apache.hc.client5.http.impl.async;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.ssl.H2TlsStrategy;
@ -127,7 +129,8 @@ public class HttpAsyncClients {
final AsyncPushConsumerRegistry pushConsumerRegistry,
final HttpVersionPolicy versionPolicy,
final IOReactorConfig ioReactorConfig,
final AsyncClientConnectionManager connmgr) {
final AsyncClientConnectionManager connmgr,
final SchemePortResolver schemePortResolver) {
return new MinimalHttpAsyncClient(
eventHandlerFactory,
pushConsumerRegistry,
@ -135,7 +138,8 @@ public class HttpAsyncClients {
ioReactorConfig,
new DefaultThreadFactory("httpclient-main", true),
new DefaultThreadFactory("httpclient-dispatch", true),
connmgr);
connmgr,
schemePortResolver);
}
/**
@ -169,7 +173,8 @@ public class HttpAsyncClients {
pushConsumerRegistry,
versionPolicy,
ioReactorConfig,
connmgr);
connmgr,
DefaultSchemePortResolver.INSTANCE);
}
/**

View File

@ -35,14 +35,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.config.Configurable;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.ConnPoolSupport;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.impl.classic.RequestFailedException;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
@ -76,6 +79,7 @@ import org.apache.hc.core5.util.Timeout;
public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClientBase {
private final AsyncClientConnectionManager connmgr;
private final SchemePortResolver schemePortResolver;
private final HttpVersionPolicy versionPolicy;
MinimalHttpAsyncClient(
@ -85,7 +89,8 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
final IOReactorConfig reactorConfig,
final ThreadFactory threadFactory,
final ThreadFactory workerThreadFactory,
final AsyncClientConnectionManager connmgr) {
final AsyncClientConnectionManager connmgr,
final SchemePortResolver schemePortResolver) {
super(new DefaultConnectingIOReactor(
eventHandlerFactory,
reactorConfig,
@ -102,8 +107,9 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
}),
pushConsumerRegistry,
threadFactory);
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
this.connmgr = connmgr;
this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
this.versionPolicy = versionPolicy != null ? versionPolicy : HttpVersionPolicy.NEGOTIATE;
}
private Future<AsyncConnectionEndpoint> leaseEndpoint(
@ -111,9 +117,10 @@ public final class MinimalHttpAsyncClient extends AbstractMinimalHttpAsyncClient
final Timeout connectTimeout,
final HttpClientContext clientContext,
final FutureCallback<AsyncConnectionEndpoint> callback) {
final HttpRoute route = new HttpRoute(RoutingSupport.normalize(host, schemePortResolver));
final ComplexFuture<AsyncConnectionEndpoint> resultFuture = new ComplexFuture<>(callback);
final Future<AsyncConnectionEndpoint> leaseFuture = connmgr.lease(
new HttpRoute(host), null, connectTimeout,
route, null, connectTimeout,
new FutureCallback<AsyncConnectionEndpoint>() {
@Override