From f7ac968aaf499abe7858eb6cdc45ece0a26dda28 Mon Sep 17 00:00:00 2001 From: Oleg Kalnichevski Date: Thu, 30 Nov 2023 11:38:55 +0100 Subject: [PATCH] HTTPCLIENT-2310: Async Connect exec handler incorrectly pipes CONNECT requests through the main request protocol chain --- .../http/impl/async/AsyncConnectExec.java | 152 +++++++++++++----- 1 file changed, 116 insertions(+), 36 deletions(-) diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java index da534c91c..493ab6c9e 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/AsyncConnectExec.java @@ -29,6 +29,9 @@ package org.apache.hc.client5.http.impl.async; import java.io.IOException; import java.io.InterruptedIOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hc.client5.http.AuthenticationStrategy; import org.apache.hc.client5.http.HttpRoute; @@ -53,6 +56,7 @@ import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.CancellableDependency; import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; import org.apache.hc.core5.http.HttpException; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpRequest; @@ -62,8 +66,13 @@ import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.Method; import org.apache.hc.core5.http.message.BasicHttpRequest; import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncDataConsumer; import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.protocol.HttpContext; import org.apache.hc.core5.http.protocol.HttpCoreContext; import org.apache.hc.core5.http.protocol.HttpProcessor; import org.apache.hc.core5.util.Args; @@ -255,20 +264,20 @@ public final class AsyncConnectExec implements AsyncExecChainHandler { if (LOG.isDebugEnabled()) { LOG.debug("{} create tunnel", exchangeId); } - createTunnel(state, proxy, target, scope, chain, new AsyncExecCallback() { + createTunnel(state, proxy, target, scope, new AsyncExecCallback() { - @Override - public AsyncDataConsumer handleResponse( - final HttpResponse response, - final EntityDetails entityDetails) throws HttpException, IOException { - return asyncExecCallback.handleResponse(response, entityDetails); - } + @Override + public AsyncDataConsumer handleResponse( + final HttpResponse response, + final EntityDetails entityDetails) throws HttpException, IOException { + return asyncExecCallback.handleResponse(response, entityDetails); + } - @Override - public void handleInformationResponse( - final HttpResponse response) throws HttpException, IOException { - asyncExecCallback.handleInformationResponse(response); - } + @Override + public void handleInformationResponse( + final HttpResponse response) throws HttpException, IOException { + asyncExecCallback.handleInformationResponse(response); + } @Override public void completed() { @@ -302,6 +311,7 @@ public final class AsyncConnectExec implements AsyncExecChainHandler { @Override public void failed(final Exception cause) { + execRuntime.markConnectionNonReusable(); asyncExecCallback.failed(cause); } @@ -370,10 +380,12 @@ public final class AsyncConnectExec implements AsyncExecChainHandler { final HttpHost proxy, final HttpHost nextHop, final AsyncExecChain.Scope scope, - final AsyncExecChain chain, final AsyncExecCallback asyncExecCallback) throws HttpException, IOException { + final CancellableDependency operation = scope.cancellableDependency; final HttpClientContext clientContext = scope.clientContext; + final AsyncExecRuntime execRuntime = scope.execRuntime; + final String exchangeId = scope.exchangeId; final AuthExchange proxyAuthExchange = proxy != null ? clientContext.getAuthExchange(proxy) : new AuthExchange(); @@ -381,19 +393,62 @@ public final class AsyncConnectExec implements AsyncExecChainHandler { authCacheKeeper.loadPreemptively(proxy, null, proxyAuthExchange, clientContext); } - final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString()); - connect.setVersion(HttpVersion.HTTP_1_1); + final AsyncClientExchangeHandler internalExchangeHandler = new AsyncClientExchangeHandler() { - proxyHttpProcessor.process(connect, null, clientContext); - authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext); - - chain.proceed(connect, null, scope, new AsyncExecCallback() { + private final AtomicReference entityConsumerRef = new AtomicReference<>(); @Override - public AsyncDataConsumer handleResponse( - final HttpResponse response, - final EntityDetails entityDetails) throws HttpException, IOException { + public void releaseResources() { + final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null); + if (entityConsumer != null) { + entityConsumer.releaseResources(); + } + } + @Override + public void failed(final Exception cause) { + final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null); + if (entityConsumer != null) { + entityConsumer.releaseResources(); + } + asyncExecCallback.failed(cause); + } + + @Override + public void cancel() { + failed(new InterruptedIOException()); + } + + @Override + public void produceRequest(final RequestChannel requestChannel, + final HttpContext httpContext) throws HttpException, IOException { + final HttpRequest connect = new BasicHttpRequest(Method.CONNECT, nextHop, nextHop.toHostString()); + connect.setVersion(HttpVersion.HTTP_1_1); + + proxyHttpProcessor.process(connect, null, clientContext); + authenticator.addAuthResponse(proxy, ChallengeType.PROXY, connect, proxyAuthExchange, clientContext); + + requestChannel.sendRequest(connect, null, clientContext); + } + + @Override + public void produce(final DataStreamChannel dataStreamChannel) throws IOException { + } + + @Override + public int available() { + return 0; + } + + @Override + public void consumeInformation(final HttpResponse httpResponse, + final HttpContext httpContext) throws HttpException, IOException { + } + + @Override + public void consumeResponse(final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext httpContext) throws HttpException, IOException { clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response); proxyHttpProcessor.process(response, entityDetails, clientContext); @@ -404,31 +459,56 @@ public final class AsyncConnectExec implements AsyncExecChainHandler { if (needAuthentication(proxyAuthExchange, proxy, response, clientContext)) { state.challenged = true; - return null; + } else { + state.challenged = false; + if (status >= HttpStatus.SC_REDIRECTION) { + state.tunnelRefused = true; + entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails)); + } else if (status == HttpStatus.SC_OK) { + asyncExecCallback.completed(); + } else { + throw new HttpException("Unexpected response to CONNECT request: " + new StatusLine(response)); + } } - state.challenged = false; - if (status >= HttpStatus.SC_REDIRECTION) { - state.tunnelRefused = true; - return asyncExecCallback.handleResponse(response, entityDetails); - } - return null; } @Override - public void handleInformationResponse(final HttpResponse response) throws HttpException, IOException { + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + final AsyncDataConsumer entityConsumer = entityConsumerRef.get(); + if (entityConsumer != null) { + entityConsumer.updateCapacity(capacityChannel); + } else { + capacityChannel.update(Integer.MAX_VALUE); + } } @Override - public void completed() { + public void consume(final ByteBuffer src) throws IOException { + final AsyncDataConsumer entityConsumer = entityConsumerRef.get(); + if (entityConsumer != null) { + entityConsumer.consume(src); + } + } + + @Override + public void streamEnd(final List trailers) throws HttpException, IOException { + final AsyncDataConsumer entityConsumer = entityConsumerRef.getAndSet(null); + if (entityConsumer != null) { + entityConsumer.streamEnd(trailers); + } asyncExecCallback.completed(); } - @Override - public void failed(final Exception cause) { - asyncExecCallback.failed(cause); - } + }; - }); + if (LOG.isDebugEnabled()) { + operation.setDependency(execRuntime.execute( + exchangeId, + new LoggingAsyncClientExchangeHandler(LOG, exchangeId, internalExchangeHandler), + clientContext)); + } else { + operation.setDependency(execRuntime.execute(exchangeId, internalExchangeHandler, clientContext)); + } }