HTTPCLIENT-2120: support for H2 via HTTP/1.1 proxy

This commit is contained in:
Oleg Kalnichevski 2020-12-25 12:34:13 +01:00
parent bdc7f3b93e
commit 92f757eee3
11 changed files with 341 additions and 140 deletions

View File

@ -88,7 +88,14 @@ public class HttpAsyncClientCompatibilityTest {
new HttpHost("http", "localhost", 8080), null, null),
new HttpAsyncClientCompatibilityTest(
HttpVersionPolicy.FORCE_HTTP_2,
new HttpHost("https", "localhost", 8443), null, null)
new HttpHost("https", "localhost", 8443), null, null),
new HttpAsyncClientCompatibilityTest(
HttpVersionPolicy.NEGOTIATE,
new HttpHost("https", "test-httpd", 8443), new HttpHost("localhost", 8888), null),
new HttpAsyncClientCompatibilityTest(
HttpVersionPolicy.NEGOTIATE,
new HttpHost("https", "test-httpd", 8443), new HttpHost("localhost", 8889),
new UsernamePasswordCredentials("squid", "nopassword".toCharArray()))
};
for (final HttpAsyncClientCompatibilityTest test: tests) {
try {

View File

@ -113,6 +113,21 @@ public interface AsyncExecRuntime {
*/
void upgradeTls(HttpClientContext context);
/**
* Upgrades transport security of the active connection by using the TLS security protocol.
*
* @param context the execution context.
*
* @since 5.2
*/
default void upgradeTls(HttpClientContext context,
FutureCallback<AsyncExecRuntime> callback) {
upgradeTls(context);
if (callback != null) {
callback.completed(this);
}
}
/**
* Validates the connection making sure it can be used to execute requests.
*

View File

@ -187,137 +187,152 @@ public final class AsyncConnectExec implements AsyncExecChainHandler {
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext clientContext = scope.clientContext;
int step;
do {
final HttpRoute fact = tracker.toRoute();
step = routeDirector.nextStep(route, fact);
switch (step) {
case HttpRouteDirector.CONNECT_TARGET:
operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
final HttpRoute fact = tracker.toRoute();
final int step = routeDirector.nextStep(route, fact);
switch (step) {
case HttpRouteDirector.CONNECT_TARGET:
operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {
tracker.connectTarget(route.isSecure());
if (LOG.isDebugEnabled()) {
LOG.debug("{} connected to target", exchangeId);
}
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
break;
case HttpRouteDirector.CONNECT_PROXY:
operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {
final HttpHost proxy = route.getProxyHost();
tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
if (LOG.isDebugEnabled()) {
LOG.debug("{} connected to proxy", exchangeId);
}
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
break;
case HttpRouteDirector.TUNNEL_TARGET:
try {
final HttpHost proxy = route.getProxyHost();
final HttpHost target = route.getTargetHost();
createTunnel(state, proxy ,target, scope, chain, new AsyncExecCallback() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {
tracker.connectTarget(route.isSecure());
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public void handleInformationResponse(
final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void completed() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} connected to target", exchangeId);
LOG.debug("{} tunnel to target created", exchangeId);
}
tracker.tunnelTarget(false);
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
});
} catch (final HttpException | IOException ex) {
asyncExecCallback.failed(ex);
}
break;
case HttpRouteDirector.TUNNEL_PROXY:
// The most simple example for this case is a proxy chain
// of two proxies, where P1 must be tunnelled to P2.
// route: Source -> P1 -> P2 -> Target (3 hops)
// fact: Source -> P1 -> Target (2 hops)
asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
break;
case HttpRouteDirector.LAYER_PROTOCOL:
execRuntime.upgradeTls(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime asyncExecRuntime) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded to TLS", exchangeId);
}
tracker.layerProtocol(route.isSecure());
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
}));
return;
case HttpRouteDirector.CONNECT_PROXY:
operation.setDependency(execRuntime.connectEndpoint(clientContext, new FutureCallback<AsyncExecRuntime>() {
@Override
public void completed(final AsyncExecRuntime execRuntime) {
final HttpHost proxy = route.getProxyHost();
tracker.connectProxy(proxy, route.isSecure() && !route.isTunnelled());
if (LOG.isDebugEnabled()) {
LOG.debug("{} connected to proxy", exchangeId);
}
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
}));
return;
case HttpRouteDirector.TUNNEL_TARGET:
try {
final HttpHost proxy = route.getProxyHost();
final HttpHost target = route.getTargetHost();
createTunnel(state, proxy ,target, scope, chain, new AsyncExecCallback() {
@Override
public AsyncDataConsumer handleResponse(
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
return asyncExecCallback.handleResponse(response, entityDetails);
}
@Override
public void handleInformationResponse(
final HttpResponse response) throws HttpException, IOException {
asyncExecCallback.handleInformationResponse(response);
}
@Override
public void completed() {
if (LOG.isDebugEnabled()) {
LOG.debug("{} tunnel to target created", exchangeId);
}
tracker.tunnelTarget(false);
proceedToNextHop(state, request, entityProducer, scope, chain, asyncExecCallback);
}
@Override
public void failed(final Exception cause) {
asyncExecCallback.failed(cause);
}
});
} catch (final HttpException | IOException ex) {
@Override
public void failed(final Exception ex) {
asyncExecCallback.failed(ex);
}
return;
case HttpRouteDirector.TUNNEL_PROXY:
// The most simple example for this case is a proxy chain
// of two proxies, where P1 must be tunnelled to P2.
// route: Source -> P1 -> P2 -> Target (3 hops)
// fact: Source -> P1 -> Target (2 hops)
asyncExecCallback.failed(new HttpException("Proxy chains are not supported"));
return;
case HttpRouteDirector.LAYER_PROTOCOL:
execRuntime.upgradeTls(clientContext);
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded to TLS", exchangeId);
@Override
public void cancelled() {
asyncExecCallback.failed(new InterruptedIOException());
}
tracker.layerProtocol(route.isSecure());
break;
case HttpRouteDirector.UNREACHABLE:
asyncExecCallback.failed(new HttpException("Unable to establish route: " +
"planned = " + route + "; current = " + fact));
return;
});
break;
case HttpRouteDirector.COMPLETE:
if (LOG.isDebugEnabled()) {
LOG.debug("{} route fully established", exchangeId);
}
try {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final HttpException | IOException ex) {
asyncExecCallback.failed(ex);
}
break;
case HttpRouteDirector.UNREACHABLE:
asyncExecCallback.failed(new HttpException("Unable to establish route: " +
"planned = " + route + "; current = " + fact));
break;
default:
throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
}
} while (step > HttpRouteDirector.COMPLETE);
case HttpRouteDirector.COMPLETE:
if (LOG.isDebugEnabled()) {
LOG.debug("{} route fully established", exchangeId);
}
try {
chain.proceed(request, entityProducer, scope, asyncExecCallback);
} catch (final HttpException | IOException ex) {
asyncExecCallback.failed(ex);
}
break;
default:
throw new IllegalStateException("Unknown step indicator " + step + " from RouteDirector.");
}
}
private void createTunnel(

View File

@ -219,6 +219,11 @@ class InternalH2AsyncExecRuntime implements AsyncExecRuntime {
throw new UnsupportedOperationException();
}
@Override
public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
throw new UnsupportedOperationException();
}
@Override
public Cancellable execute(
final String id,

View File

@ -38,6 +38,7 @@ import org.apache.hc.client5.http.impl.Operations;
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.Cancellable;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
@ -214,24 +215,16 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
connectTimeout,
versionPolicy,
context,
new FutureCallback<AsyncConnectionEndpoint>() {
new CallbackContribution<AsyncConnectionEndpoint>(callback) {
@Override
public void completed(final AsyncConnectionEndpoint endpoint) {
if (log.isDebugEnabled()) {
log.debug("{} endpoint connected", ConnPoolSupport.getId(endpoint));
}
callback.completed(InternalHttpAsyncExecRuntime.this);
}
@Override
public void failed(final Exception ex) {
callback.failed(ex);
}
@Override
public void cancelled() {
callback.cancelled();
if (callback != null) {
callback.completed(InternalHttpAsyncExecRuntime.this);
}
}
}));
@ -240,11 +233,25 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Override
public void upgradeTls(final HttpClientContext context) {
upgradeTls(context, null);
}
@Override
public void upgradeTls(final HttpClientContext context, final FutureCallback<AsyncExecRuntime> callback) {
final AsyncConnectionEndpoint endpoint = ensureValid();
if (log.isDebugEnabled()) {
log.debug("{} upgrading endpoint", ConnPoolSupport.getId(endpoint));
}
manager.upgrade(endpoint, versionPolicy, context);
manager.upgrade(endpoint, versionPolicy, context, new CallbackContribution<AsyncConnectionEndpoint>(callback) {
@Override
public void completed(final AsyncConnectionEndpoint endpoint) {
if (callback != null) {
callback.completed(InternalHttpAsyncExecRuntime.this);
}
}
});
}
@Override

View File

@ -38,13 +38,17 @@ import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.concurrent.FutureContribution;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.Lookup;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Timeout;
@ -89,19 +93,27 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
@Override
public void completed(final IOSession session) {
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
if (tlsStrategy != null) {
if (tlsStrategy != null && URIScheme.HTTPS.same(host.getSchemeName())) {
try {
tlsStrategy.upgrade(
connection,
host,
attachment,
connectTimeout, null);
null,
new FutureContribution<TransportSecurityLayer>(future) {
@Override
public void completed(final TransportSecurityLayer transportSecurityLayer) {
future.completed(connection);
}
});
} catch (final Exception ex) {
future.failed(ex);
return;
}
} else {
future.completed(connection);
}
future.completed(connection);
}
@Override
@ -120,7 +132,19 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
}
@Override
public void upgrade(final ManagedAsyncClientConnection connection, final HttpHost host, final Object attachment) {
public void upgrade(
final ManagedAsyncClientConnection connection,
final HttpHost host,
final Object attachment) {
upgrade(connection, host, attachment, null);
}
@Override
public void upgrade(
final ManagedAsyncClientConnection connection,
final HttpHost host,
final Object attachment,
final FutureCallback<ManagedAsyncClientConnection> callback) {
final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
if (tlsStrategy != null) {
tlsStrategy.upgrade(
@ -128,8 +152,18 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
host,
attachment,
null,
null);
new CallbackContribution<TransportSecurityLayer>(callback) {
@Override
public void completed(final TransportSecurityLayer transportSecurityLayer) {
if (callback != null) {
callback.completed(connection);
}
}
});
}
}
}

View File

@ -35,6 +35,7 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.EndpointDetails;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpVersion;
@ -45,6 +46,7 @@ import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer;
import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier;
@ -144,18 +146,30 @@ final class DefaultManagedAsyncClientConnection implements ManagedAsyncClientCon
final SSLBufferMode sslBufferMode,
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier,
final Timeout handshakeTimeout) throws UnsupportedOperationException {
final Timeout handshakeTimeout,
final FutureCallback<TransportSecurityLayer> callback) throws UnsupportedOperationException {
if (LOG.isDebugEnabled()) {
LOG.debug("{} start TLS", getId());
}
if (ioSession instanceof TransportSecurityLayer) {
((TransportSecurityLayer) ioSession).startTls(sslContext, endpoint, sslBufferMode, initializer, verifier,
handshakeTimeout);
handshakeTimeout, callback);
} else {
throw new UnsupportedOperationException("TLS upgrade not supported");
}
}
@Override
public void startTls(
final SSLContext sslContext,
final NamedEndpoint endpoint,
final SSLBufferMode sslBufferMode,
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier,
final Timeout handshakeTimeout) throws UnsupportedOperationException {
startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null);
}
@Override
public TlsDetails getTlsDetails() {
return ioSession instanceof TransportSecurityLayer ? ((TransportSecurityLayer) ioSession).getTlsDetails() : null;
@ -185,4 +199,14 @@ final class DefaultManagedAsyncClientConnection implements ManagedAsyncClientCon
ioSession.setSocketTimeout(socketTimeout);
}
@Override
public void switchProtocol(final String protocolId,
final FutureCallback<ProtocolIOSession> callback) throws UnsupportedOperationException {
if (ioSession instanceof ProtocolIOSession) {
((ProtocolIOSession) ioSession).switchProtocol(protocolId, callback);
} else {
throw new UnsupportedOperationException("Protocol switch not supported");
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.concurrent.BasicFuture;
import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.function.Resolver;
@ -68,6 +69,7 @@ import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http2.nio.command.PingCommand;
import org.apache.hc.core5.http2.nio.support.BasicPingHandler;
import org.apache.hc.core5.http2.ssl.ApplicationProtocol;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.pool.LaxConnPool;
@ -79,6 +81,8 @@ import org.apache.hc.core5.pool.PoolStats;
import org.apache.hc.core5.pool.StrictConnPool;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Asserts;
import org.apache.hc.core5.util.Identifiable;
@ -439,16 +443,48 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
public void upgrade(
final AsyncConnectionEndpoint endpoint,
final Object attachment,
final HttpContext context) {
final HttpContext context,
final FutureCallback<AsyncConnectionEndpoint> callback) {
Args.notNull(endpoint, "Managed endpoint");
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = internalEndpoint.getValidatedPoolEntry();
final HttpRoute route = poolEntry.getRoute();
final ManagedAsyncClientConnection connection = poolEntry.getConnection();
connectionOperator.upgrade(poolEntry.getConnection(), route.getTargetHost(), attachment);
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
}
connectionOperator.upgrade(
poolEntry.getConnection(),
route.getTargetHost(),
attachment,
new CallbackContribution<ManagedAsyncClientConnection>(callback) {
@Override
public void completed(final ManagedAsyncClientConnection connection) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded {}", ConnPoolSupport.getId(internalEndpoint), ConnPoolSupport.getId(connection));
}
final TlsDetails tlsDetails = connection.getTlsDetails();
if (tlsDetails != null && ApplicationProtocol.HTTP_2.id.equals(tlsDetails.getApplicationProtocol())) {
connection.switchProtocol(ApplicationProtocol.HTTP_2.id, new CallbackContribution<ProtocolIOSession>(callback) {
@Override
public void completed(final ProtocolIOSession protocolIOSession) {
if (callback != null) {
callback.completed(endpoint);
}
}
});
} else {
if (callback != null) {
callback.completed(endpoint);
}
}
}
});
}
@Override
public void upgrade(final AsyncConnectionEndpoint endpoint, final Object attachment, final HttpContext context) {
upgrade(endpoint, attachment, context, null);
}
@Override

View File

@ -131,4 +131,25 @@ public interface AsyncClientConnectionManager extends ModalCloseable {
Object attachment,
HttpContext context);
/**
* Upgrades transport security of the given endpoint by using the TLS security protocol.
*
* @param endpoint the managed endpoint.
* @param attachment the attachment the upgrade attachment object.
* @param context the actual HTTP context.
* @param callback result callback.
*
* @since 5.2
*/
default void upgrade(
AsyncConnectionEndpoint endpoint,
Object attachment,
HttpContext context,
FutureCallback<AsyncConnectionEndpoint> callback) {
upgrade(endpoint, attachment, context);
if (callback != null) {
callback.completed(endpoint);
}
}
}

View File

@ -79,4 +79,26 @@ public interface AsyncClientConnectionOperator {
*/
void upgrade(ManagedAsyncClientConnection conn, HttpHost host, Object attachment);
/**
* Upgrades transport security of the given managed connection
* by using the TLS security protocol.
*
* @param conn the managed connection.
* @param host the address of the opposite endpoint with TLS security.
* @param attachment the attachment, which can be any object representing custom parameter
* of the operation.
*
* @since 5.2
*/
default void upgrade(
ManagedAsyncClientConnection conn,
HttpHost host,
Object attachment,
FutureCallback<ManagedAsyncClientConnection> callback) {
upgrade(conn, host, attachment);
if (callback != null) {
callback.completed(conn);
}
}
}

View File

@ -28,8 +28,10 @@
package org.apache.hc.client5.http.nio;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
/**
@ -59,4 +61,17 @@ public interface ManagedAsyncClientConnection extends HttpConnection, TransportS
*/
void activate();
/**
* Switches this I/O session to the application protocol with the given ID.
* @param protocolId the application protocol ID
* @param callback the result callback
* @throws UnsupportedOperationException if application protocol switch
* is not supported.
*
* @since 5.2
*/
default void switchProtocol(String protocolId, FutureCallback<ProtocolIOSession> callback) throws UnsupportedOperationException {
throw new UnsupportedOperationException("Protocol switch not supported");
}
}