CloseableHttpAsyncClient to support explicit HttpHost execution parameter

This commit is contained in:
Oleg Kalnichevski 2019-12-04 17:29:21 +01:00
parent d0541123c2
commit f3c418c50a
7 changed files with 76 additions and 30 deletions

View File

@ -33,6 +33,7 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.Cancellable; import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncRequestProducer;
@ -52,7 +53,8 @@ abstract class AbstractMinimalHttpAsyncClientBase extends AbstractHttpAsyncClien
} }
@Override @Override
public final <T> Future<T> execute( protected <T> Future<T> doExecute(
final HttpHost httphost,
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,

View File

@ -39,9 +39,11 @@ import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Supplier; import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.AsyncRequestProducer; import org.apache.hc.core5.http.nio.AsyncRequestProducer;
import org.apache.hc.core5.http.nio.AsyncResponseConsumer; import org.apache.hc.core5.http.nio.AsyncResponseConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory;
import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.io.CloseMode; import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.ModalCloseable; import org.apache.hc.core5.io.ModalCloseable;
@ -73,11 +75,45 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Modal
close(closeMode); close(closeMode);
} }
protected abstract <T> Future<T> doExecute(
final HttpHost target,
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback);
public final <T> Future<T> execute(
final HttpHost target,
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
return doExecute(target, requestProducer, responseConsumer, pushHandlerFactory, context, callback);
}
@Override
public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
final HttpContext context,
final FutureCallback<T> callback) {
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
return doExecute(null, requestProducer, responseConsumer, pushHandlerFactory, context, callback);
}
public final <T> Future<T> execute( public final <T> Future<T> execute(
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HttpContext context, final HttpContext context,
final FutureCallback<T> callback) { final FutureCallback<T> callback) {
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
return execute(requestProducer, responseConsumer, null, context, callback); return execute(requestProducer, responseConsumer, null, context, callback);
} }
@ -85,6 +121,8 @@ public abstract class CloseableHttpAsyncClient implements HttpAsyncClient, Modal
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final FutureCallback<T> callback) { final FutureCallback<T> callback) {
Args.notNull(requestProducer, "Request producer");
Args.notNull(responseConsumer, "Response consumer");
return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback); return execute(requestProducer, responseConsumer, HttpClientContext.create(), callback);
} }

View File

@ -48,10 +48,12 @@ import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.impl.ExecSupport; import org.apache.hc.client5.http.impl.ExecSupport;
import org.apache.hc.client5.http.impl.RequestCopier; import org.apache.hc.client5.http.impl.RequestCopier;
import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EntityDetails; import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpResponse; import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.HttpStatus; import org.apache.hc.core5.http.HttpStatus;
@ -138,10 +140,11 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory); abstract AsyncExecRuntime createAsyncExecRuntime(HandlerFactory<AsyncPushConsumer> pushHandlerFactory);
abstract HttpRoute determineRoute(HttpRequest request, HttpClientContext clientContext) throws HttpException; abstract HttpRoute determineRoute(HttpHost httpHost, HttpClientContext clientContext) throws HttpException;
@Override @Override
public <T> Future<T> execute( protected <T> Future<T> doExecute(
final HttpHost httpHost,
final AsyncRequestProducer requestProducer, final AsyncRequestProducer requestProducer,
final AsyncResponseConsumer<T> responseConsumer, final AsyncResponseConsumer<T> responseConsumer,
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory, final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
@ -150,7 +153,7 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
ensureRunning(); ensureRunning();
final ComplexFuture<T> future = new ComplexFuture<>(callback); final ComplexFuture<T> future = new ComplexFuture<>(callback);
try { try {
final HttpClientContext clientContext = HttpClientContext.adapt(context); final HttpClientContext clientContext = context != null ? HttpClientContext.adapt(context) : HttpClientContext.create();
requestProducer.sendRequest(new RequestChannel() { requestProducer.sendRequest(new RequestChannel() {
@Override @Override
@ -166,7 +169,9 @@ abstract class InternalAbstractHttpAsyncClient extends AbstractHttpAsyncClientBa
if (requestConfig != null) { if (requestConfig != null) {
clientContext.setRequestConfig(requestConfig); clientContext.setRequestConfig(requestConfig);
} }
final HttpRoute route = determineRoute(request, clientContext); final HttpRoute route = determineRoute(
httpHost != null ? httpHost : RoutingSupport.determineHost(request),
clientContext);
final String exchangeId = ExecSupport.getNextExchangeId(); final String exchangeId = ExecSupport.getNextExchangeId();
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug(exchangeId + ": preparing request execution"); log.debug(exchangeId + ": preparing request execution");

View File

@ -39,12 +39,11 @@ import org.apache.hc.client5.http.cookie.CookieSpecProvider;
import org.apache.hc.client5.http.cookie.CookieStore; import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.HttpRoutePlanner; import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncPushConsumer;
import org.apache.hc.core5.http.nio.HandlerFactory; import org.apache.hc.core5.http.nio.HandlerFactory;
@ -93,8 +92,8 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
} }
@Override @Override
HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException { HttpRoute determineRoute(final HttpHost httpHost, final HttpClientContext clientContext) throws HttpException {
final HttpRoute route = routePlanner.determineRoute(RoutingSupport.determineHost(request), clientContext); final HttpRoute route = routePlanner.determineRoute(httpHost, clientContext);
if (route.isTunnelled()) { if (route.isTunnelled()) {
throw new HttpException("HTTP/2 tunneling not supported"); throw new HttpException("HTTP/2 tunneling not supported");
} }

View File

@ -40,12 +40,11 @@ import org.apache.hc.client5.http.cookie.CookieStore;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager; import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.routing.HttpRoutePlanner; import org.apache.hc.client5.http.routing.HttpRoutePlanner;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal; import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion; import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.Lookup;
@ -100,8 +99,8 @@ public final class InternalHttpAsyncClient extends InternalAbstractHttpAsyncClie
} }
@Override @Override
HttpRoute determineRoute(final HttpRequest request, final HttpClientContext clientContext) throws HttpException { HttpRoute determineRoute(final HttpHost httpHost, final HttpClientContext clientContext) throws HttpException {
final HttpRoute route = routePlanner.determineRoute(RoutingSupport.determineHost(request), clientContext); final HttpRoute route = routePlanner.determineRoute(httpHost, clientContext);
final ProtocolVersion protocolVersion = clientContext.getProtocolVersion(); final ProtocolVersion protocolVersion = clientContext.getProtocolVersion();
if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) { if (route.isTunnelled() && protocolVersion.greaterEquals(HttpVersion.HTTP_2_0)) {
throw new HttpException("HTTP/2 tunneling not supported"); throw new HttpException("HTTP/2 tunneling not supported");

View File

@ -28,12 +28,10 @@
package org.apache.hc.client5.http.impl.classic; package org.apache.hc.client5.http.impl.classic;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hc.client5.http.ClientProtocolException; import org.apache.hc.client5.http.ClientProtocolException;
import org.apache.hc.client5.http.classic.HttpClient; import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.client5.http.utils.URIUtils; import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ClassicHttpRequest; import org.apache.hc.core5.http.ClassicHttpRequest;
@ -78,22 +76,11 @@ public abstract class CloseableHttpClient implements HttpClient, ModalCloseable
} }
private static HttpHost determineTarget(final ClassicHttpRequest request) throws ClientProtocolException { private static HttpHost determineTarget(final ClassicHttpRequest request) throws ClientProtocolException {
// A null target may be acceptable if there is a default target.
// Otherwise, the null target is detected in the director.
HttpHost target = null;
URI requestURI = null;
try { try {
requestURI = request.getUri(); return RoutingSupport.determineHost(request);
} catch (final URISyntaxException ignore) { } catch (final HttpException ex) {
throw new ClientProtocolException(ex);
} }
if (requestURI != null && requestURI.isAbsolute()) {
target = URIUtils.extractHost(requestURI);
if (target == null) {
throw new ClientProtocolException("URI does not specify a valid host name: "
+ requestURI);
}
}
return target;
} }
@Override @Override

View File

@ -26,8 +26,12 @@
*/ */
package org.apache.hc.client5.http.routing; package org.apache.hc.client5.http.routing;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver; import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.utils.URIUtils;
import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest; import org.apache.hc.core5.http.HttpRequest;
@ -47,6 +51,18 @@ public final class RoutingSupport {
throw new ProtocolException("Protocol scheme is not specified"); throw new ProtocolException("Protocol scheme is not specified");
} }
return new HttpHost(scheme, authority); return new HttpHost(scheme, authority);
} else {
try {
final URI requestURI = request.getUri();
if (requestURI.isAbsolute()) {
final HttpHost httpHost = URIUtils.extractHost(requestURI);
if (httpHost == null) {
throw new ProtocolException("URI does not specify a valid host name: " + requestURI);
}
return httpHost;
}
} catch (final URISyntaxException ignore) {
}
} }
return null; return null;
} }