HTTPCLIENT-2200: Protocol interceptors are executed before the connection route has been fully established

This commit is contained in:
Oleg Kalnichevski 2022-01-27 11:59:10 +01:00
parent a3bbcc82ae
commit f00ce5da9e
13 changed files with 337 additions and 144 deletions

View File

@ -0,0 +1,161 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.testing.async;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.testing.SSLTestContexts;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpVersion;
import org.apache.hc.core5.http.ProtocolVersion;
import org.apache.hc.core5.http.URIScheme;
import org.apache.hc.core5.http.config.Http1Config;
import org.apache.hc.core5.http2.HttpVersionPolicy;
import org.apache.hc.core5.http2.config.H2Config;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.migrationsupport.rules.EnableRuleMigrationSupport;
import org.junit.rules.ExternalResource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@EnableRuleMigrationSupport
@RunWith(Parameterized.class)
public class TestAsyncRequestContext extends AbstractIntegrationTestBase<CloseableHttpAsyncClient> {
@Parameterized.Parameters(name = "{0} {1}")
public static Collection<Object[]> protocols() {
return Arrays.asList(new Object[][]{
{ HttpVersion.HTTP_1_1, URIScheme.HTTP },
{ HttpVersion.HTTP_1_1, URIScheme.HTTPS },
{ HttpVersion.HTTP_2, URIScheme.HTTP },
{ HttpVersion.HTTP_2, URIScheme.HTTPS }
});
}
protected final HttpVersion version;
public TestAsyncRequestContext(final HttpVersion version, final URIScheme scheme) {
super(scheme);
this.version = version;
}
HttpAsyncClientBuilder clientBuilder;
PoolingAsyncClientConnectionManager connManager;
@Rule
public ExternalResource connManagerResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setTlsStrategy(new DefaultClientTlsStrategy(SSLTestContexts.createClientSSLContext()))
.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(TIMEOUT)
.setSocketTimeout(TIMEOUT)
.build())
.build();
}
@Override
protected void after() {
if (connManager != null) {
connManager.close();
connManager = null;
}
}
};
@Rule
public ExternalResource clientBuilderResource = new ExternalResource() {
@Override
protected void before() throws Throwable {
connManager.setDefaultTlsConfig(TlsConfig.custom()
.setVersionPolicy(version.greaterEquals(HttpVersion.HTTP_2) ? HttpVersionPolicy.FORCE_HTTP_2 : HttpVersionPolicy.FORCE_HTTP_1)
.build());
clientBuilder = HttpAsyncClientBuilder.create()
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectionRequestTimeout(TIMEOUT)
.build())
.setConnectionManager(connManager);
}
};
@Override
public final HttpHost start() throws Exception {
if (version.greaterEquals(HttpVersion.HTTP_2)) {
return super.start(null, H2Config.DEFAULT);
} else {
return super.start(null, Http1Config.DEFAULT);
}
}
@Override
protected CloseableHttpAsyncClient createClient() throws Exception {
return clientBuilder.build();
}
@Test
public void testRequestContext() throws Exception {
final AtomicReference<ProtocolVersion> versionRef = new AtomicReference<>();
clientBuilder.addRequestInterceptorFirst((request, entity, context) ->
versionRef.set(context.getProtocolVersion()));
final HttpHost target = start();
final Future<SimpleHttpResponse> future = httpclient.execute(
SimpleRequestBuilder.get()
.setHttpHost(target)
.setPath("/random/2048")
.build(), null);
final SimpleHttpResponse response = future.get();
assertThat(response, CoreMatchers.notNullValue());
assertThat(response.getCode(), CoreMatchers.equalTo(200));
final String body = response.getBodyText();
assertThat(body, CoreMatchers.notNullValue());
assertThat(body.length(), CoreMatchers.equalTo(2048));
assertThat(versionRef.get(), CoreMatchers.equalTo(version));
}
}

View File

@ -59,8 +59,6 @@ import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.nio.AsyncDataConsumer;
import org.apache.hc.core5.http.nio.AsyncEntityProducer;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.http.support.BasicRequestBuilder;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.util.Args;
@ -84,7 +82,6 @@ public final class AsyncProtocolExec implements AsyncExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(AsyncProtocolExec.class);
private final HttpProcessor httpProcessor;
private final AuthenticationStrategy targetAuthStrategy;
private final AuthenticationStrategy proxyAuthStrategy;
private final HttpAuthenticator authenticator;
@ -92,12 +89,10 @@ public final class AsyncProtocolExec implements AsyncExecChainHandler {
private final AuthCacheKeeper authCacheKeeper;
AsyncProtocolExec(
final HttpProcessor httpProcessor,
final AuthenticationStrategy targetAuthStrategy,
final AuthenticationStrategy proxyAuthStrategy,
final SchemePortResolver schemePortResolver,
final boolean authCachingDisabled) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
this.targetAuthStrategy = Args.notNull(targetAuthStrategy, "Target authentication strategy");
this.proxyAuthStrategy = Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
this.authenticator = new HttpAuthenticator();
@ -196,10 +191,6 @@ public final class AsyncProtocolExec implements AsyncExecChainHandler {
final HttpHost proxy = route.getProxyHost();
clientContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
httpProcessor.process(request, entityProducer, clientContext);
if (!request.containsHeader(HttpHeaders.AUTHORIZATION)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} target auth state: {}", exchangeId, targetAuthExchange.getState());
@ -220,9 +211,6 @@ public final class AsyncProtocolExec implements AsyncExecChainHandler {
final HttpResponse response,
final EntityDetails entityDetails) throws HttpException, IOException {
clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, entityDetails, clientContext);
if (Method.TRACE.isSame(request.getMethod())) {
// Do not perform authentication for TRACE request
return asyncExecCallback.handleResponse(response, entityDetails);

View File

@ -650,11 +650,6 @@ public class H2AsyncClientBuilder {
}
public CloseableHttpAsyncClient build() {
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new H2AsyncMainClientExec(),
ChainElement.MAIN_TRANSPORT.name());
AuthenticationStrategy targetAuthStrategyCopy = this.targetAuthStrategy;
if (targetAuthStrategyCopy == null) {
targetAuthStrategyCopy = DefaultAuthenticationStrategy.INSTANCE;
@ -675,14 +670,6 @@ public class H2AsyncClientBuilder {
}
}
execChainDefinition.addFirst(
new AsyncConnectExec(
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
@ -724,9 +711,22 @@ public class H2AsyncClientBuilder {
}
final HttpProcessor httpProcessor = b.build();
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new H2AsyncMainClientExec(httpProcessor),
ChainElement.MAIN_TRANSPORT.name());
execChainDefinition.addFirst(
new AsyncConnectExec(
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
execChainDefinition.addFirst(
new AsyncProtocolExec(
httpProcessor,
targetAuthStrategyCopy,
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,

View File

@ -32,6 +32,7 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.async.AsyncExecCallback;
import org.apache.hc.client5.http.async.AsyncExecChain;
import org.apache.hc.client5.http.async.AsyncExecChainHandler;
@ -54,6 +55,9 @@ import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +74,12 @@ public class H2AsyncMainClientExec implements AsyncExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(H2AsyncMainClientExec.class);
private final HttpProcessor httpProcessor;
H2AsyncMainClientExec(final HttpProcessor httpProcessor) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
}
@Override
public void execute(
final HttpRequest request,
@ -78,6 +88,7 @@ public class H2AsyncMainClientExec implements AsyncExecChainHandler {
final AsyncExecChain chain,
final AsyncExecCallback asyncExecCallback) throws HttpException, IOException {
final String exchangeId = scope.exchangeId;
final HttpRoute route = scope.route;
final CancellableDependency operation = scope.cancellableDependency;
final HttpClientContext clientContext = scope.clientContext;
final AsyncExecRuntime execRuntime = scope.execRuntime;
@ -115,6 +126,11 @@ public class H2AsyncMainClientExec implements AsyncExecChainHandler {
@Override
public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
clientContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
httpProcessor.process(request, entityProducer, clientContext);
channel.sendRequest(request, entityProducer, context);
}
@ -137,6 +153,10 @@ public class H2AsyncMainClientExec implements AsyncExecChainHandler {
final HttpResponse response,
final EntityDetails entityDetails,
final HttpContext context) throws HttpException, IOException {
clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, entityDetails, clientContext);
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
if (entityDetails == null) {
execRuntime.validateConnection();

View File

@ -782,11 +782,6 @@ public class HttpAsyncClientBuilder {
}
}
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new HttpAsyncMainClientExec(keepAliveStrategyCopy, userTokenHandlerCopy),
ChainElement.MAIN_TRANSPORT.name());
AuthenticationStrategy targetAuthStrategyCopy = this.targetAuthStrategy;
if (targetAuthStrategyCopy == null) {
targetAuthStrategyCopy = DefaultAuthenticationStrategy.INSTANCE;
@ -807,14 +802,6 @@ public class HttpAsyncClientBuilder {
}
}
execChainDefinition.addFirst(
new AsyncConnectExec(
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
@ -856,9 +843,22 @@ public class HttpAsyncClientBuilder {
}
final HttpProcessor httpProcessor = b.build();
final NamedElementChain<AsyncExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new HttpAsyncMainClientExec(httpProcessor, keepAliveStrategyCopy, userTokenHandlerCopy),
ChainElement.MAIN_TRANSPORT.name());
execChainDefinition.addFirst(
new AsyncConnectExec(
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
execChainDefinition.addFirst(
new AsyncProtocolExec(
httpProcessor,
targetAuthStrategyCopy,
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,

View File

@ -59,6 +59,9 @@ import org.apache.hc.core5.http.nio.CapacityChannel;
import org.apache.hc.core5.http.nio.DataStreamChannel;
import org.apache.hc.core5.http.nio.RequestChannel;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -76,10 +79,14 @@ class HttpAsyncMainClientExec implements AsyncExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(HttpAsyncMainClientExec.class);
private final HttpProcessor httpProcessor;
private final ConnectionKeepAliveStrategy keepAliveStrategy;
private final UserTokenHandler userTokenHandler;
HttpAsyncMainClientExec(final ConnectionKeepAliveStrategy keepAliveStrategy, final UserTokenHandler userTokenHandler) {
HttpAsyncMainClientExec(final HttpProcessor httpProcessor,
final ConnectionKeepAliveStrategy keepAliveStrategy,
final UserTokenHandler userTokenHandler) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
this.keepAliveStrategy = keepAliveStrategy;
this.userTokenHandler = userTokenHandler;
}
@ -133,6 +140,11 @@ class HttpAsyncMainClientExec implements AsyncExecChainHandler {
public void produceRequest(
final RequestChannel channel,
final HttpContext context) throws HttpException, IOException {
clientContext.setAttribute(HttpClientContext.HTTP_ROUTE, route);
clientContext.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
httpProcessor.process(request, entityProducer, clientContext);
channel.sendRequest(request, entityProducer, context);
if (entityProducer == null) {
messageCountDown.decrementAndGet();
@ -189,6 +201,10 @@ class HttpAsyncMainClientExec implements AsyncExecChainHandler {
final HttpResponse response,
final EntityDetails entityDetails,
final HttpContext context) throws HttpException, IOException {
clientContext.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, entityDetails, clientContext);
entityConsumerRef.set(asyncExecCallback.handleResponse(response, entityDetails));
if (response.getCode() >= HttpStatus.SC_CLIENT_ERROR) {
messageCountDown.decrementAndGet();

View File

@ -782,19 +782,6 @@ public class HttpClientBuilder {
}
}
final NamedElementChain<ExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new MainClientExec(connManagerCopy, reuseStrategyCopy, keepAliveStrategyCopy, userTokenHandlerCopy),
ChainElement.MAIN_TRANSPORT.name());
execChainDefinition.addFirst(
new ConnectExec(
reuseStrategyCopy,
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
final HttpProcessorBuilder b = HttpProcessorBuilder.create();
if (requestInterceptors != null) {
for (final RequestInterceptorEntry entry: requestInterceptors) {
@ -838,9 +825,22 @@ public class HttpClientBuilder {
}
}
final HttpProcessor httpProcessor = b.build();
final NamedElementChain<ExecChainHandler> execChainDefinition = new NamedElementChain<>();
execChainDefinition.addLast(
new MainClientExec(connManagerCopy, httpProcessor, reuseStrategyCopy, keepAliveStrategyCopy, userTokenHandlerCopy),
ChainElement.MAIN_TRANSPORT.name());
execChainDefinition.addFirst(
new ConnectExec(
reuseStrategyCopy,
new DefaultHttpProcessor(new RequestTargetHost(), new RequestUserAgent(userAgentCopy)),
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,
authCachingDisabled),
ChainElement.CONNECT.name());
execChainDefinition.addFirst(
new ProtocolExec(
httpProcessor,
targetAuthStrategyCopy,
proxyAuthStrategyCopy,
schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE,

View File

@ -48,6 +48,8 @@ import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.message.RequestLine;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
@ -68,6 +70,7 @@ public final class MainClientExec implements ExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(MainClientExec.class);
private final HttpClientConnectionManager connectionManager;
private final HttpProcessor httpProcessor;
private final ConnectionReuseStrategy reuseStrategy;
private final ConnectionKeepAliveStrategy keepAliveStrategy;
private final UserTokenHandler userTokenHandler;
@ -77,10 +80,12 @@ public final class MainClientExec implements ExecChainHandler {
*/
public MainClientExec(
final HttpClientConnectionManager connectionManager,
final HttpProcessor httpProcessor,
final ConnectionReuseStrategy reuseStrategy,
final ConnectionKeepAliveStrategy keepAliveStrategy,
final UserTokenHandler userTokenHandler) {
this.connectionManager = Args.notNull(connectionManager, "Connection manager");
this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
this.reuseStrategy = Args.notNull(reuseStrategy, "Connection reuse strategy");
this.keepAliveStrategy = Args.notNull(keepAliveStrategy, "Connection keep alive strategy");
this.userTokenHandler = Args.notNull(userTokenHandler, "User token handler");
@ -102,8 +107,17 @@ public final class MainClientExec implements ExecChainHandler {
LOG.debug("{} executing {}", exchangeId, new RequestLine(request));
}
try {
// Run request protocol interceptors
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
httpProcessor.process(request, request.getEntity(), context);
final ClassicHttpResponse response = execRuntime.execute(exchangeId, request, context);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, response.getEntity(), context);
Object userToken = context.getUserToken();
if (userToken == null) {
userToken = userTokenHandler.getUserToken(route, request, context);

View File

@ -59,8 +59,6 @@ import org.apache.hc.core5.http.Method;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.support.ClassicRequestBuilder;
import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.net.URIAuthority;
import org.apache.hc.core5.util.Args;
import org.slf4j.Logger;
@ -83,7 +81,6 @@ public final class ProtocolExec implements ExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(ProtocolExec.class);
private final HttpProcessor httpProcessor;
private final AuthenticationStrategy targetAuthStrategy;
private final AuthenticationStrategy proxyAuthStrategy;
private final HttpAuthenticator authenticator;
@ -91,12 +88,10 @@ public final class ProtocolExec implements ExecChainHandler {
private final AuthCacheKeeper authCacheKeeper;
public ProtocolExec(
final HttpProcessor httpProcessor,
final AuthenticationStrategy targetAuthStrategy,
final AuthenticationStrategy proxyAuthStrategy,
final SchemePortResolver schemePortResolver,
final boolean authCachingDisabled) {
this.httpProcessor = Args.notNull(httpProcessor, "HTTP protocol processor");
this.targetAuthStrategy = Args.notNull(targetAuthStrategy, "Target authentication strategy");
this.proxyAuthStrategy = Args.notNull(proxyAuthStrategy, "Proxy authentication strategy");
this.authenticator = new HttpAuthenticator();
@ -181,12 +176,6 @@ public final class ProtocolExec implements ExecChainHandler {
for (;;) {
// Run request protocol interceptors
context.setAttribute(HttpClientContext.HTTP_ROUTE, route);
context.setAttribute(HttpCoreContext.HTTP_REQUEST, request);
httpProcessor.process(request, request.getEntity(), context);
if (!request.containsHeader(HttpHeaders.AUTHORIZATION)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} target auth state: {}", exchangeId, targetAuthExchange.getState());
@ -202,9 +191,6 @@ public final class ProtocolExec implements ExecChainHandler {
final ClassicHttpResponse response = chain.proceed(request, scope);
context.setAttribute(HttpCoreContext.HTTP_RESPONSE, response);
httpProcessor.process(response, response.getEntity(), context);
if (Method.TRACE.isSame(request.getMethod())) {
// Do not perform authentication for TRACE request
ResponseEntityProxy.enhance(response, execRuntime);

View File

@ -134,7 +134,10 @@ final class DefaultManagedAsyncClientConnection implements ManagedAsyncClientCon
public ProtocolVersion getProtocolVersion() {
final IOEventHandler handler = ioSession.getHandler();
if (handler instanceof HttpConnection) {
return ((HttpConnection) handler).getProtocolVersion();
final ProtocolVersion version = ((HttpConnection) handler).getProtocolVersion();
if (version != null) {
return version;
}
}
return HttpVersion.DEFAULT;
}

View File

@ -450,6 +450,8 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
if (LOG.isDebugEnabled()) {
LOG.debug("{} connected {}", ConnPoolSupport.getId(endpoint), ConnPoolSupport.getId(connection));
}
final ProtocolVersion protocolVersion = connection.getProtocolVersion();
context.setProtocolVersion(protocolVersion);
final Timeout socketTimeout = connectionConfig.getSocketTimeout();
if (socketTimeout != null) {
connection.setSocketTimeout(socketTimeout);

View File

@ -43,9 +43,11 @@ import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionReuseStrategy;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.apache.hc.core5.util.TimeValue;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@ -61,13 +63,15 @@ public class TestMainClientExec {
@Mock
private HttpClientConnectionManager connectionManager;
@Mock
private HttpProcessor httpProcessor;
@Mock
private ConnectionReuseStrategy reuseStrategy;
@Mock
private ConnectionKeepAliveStrategy keepAliveStrategy;
@Mock
private UserTokenHandler userTokenHandler;
@Mock
private ExecRuntime endpoint;
private ExecRuntime execRuntime;
private MainClientExec mainClientExec;
private HttpHost target;
@ -75,10 +79,39 @@ public class TestMainClientExec {
@BeforeEach
public void setup() throws Exception {
MockitoAnnotations.openMocks(this);
mainClientExec = new MainClientExec(connectionManager, reuseStrategy, keepAliveStrategy, userTokenHandler);
mainClientExec = new MainClientExec(connectionManager, httpProcessor, reuseStrategy, keepAliveStrategy, userTokenHandler);
target = new HttpHost("foo", 80);
}
@Test
public void testFundamentals() throws Exception {
final HttpRoute route = new HttpRoute(target);
final ClassicHttpRequest request = new HttpGet("/test");
final HttpClientContext context = HttpClientContext.create();
final ClassicHttpResponse response = new BasicClassicHttpResponse(200, "OK");
final HttpEntity responseEntity = EntityBuilder.create()
.setStream(new ByteArrayInputStream(new byte[]{}))
.build();
response.setEntity(responseEntity);
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(httpProcessor).process(request, null, context);
Mockito.verify(execRuntime).execute("test", request, context);
Mockito.verify(httpProcessor).process(response, responseEntity, context);
Assertions.assertEquals(route, context.getHttpRoute());
Assertions.assertSame(request, context.getRequest());
Assertions.assertSame(response, context.getResponse());
}
@Test
public void testExecRequestNonPersistentConnection() throws Exception {
final HttpRoute route = new HttpRoute(target);
@ -89,7 +122,7 @@ public class TestMainClientExec {
.setStream(new ByteArrayInputStream(new byte[]{}))
.build());
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
@ -98,11 +131,11 @@ public class TestMainClientExec {
Mockito.same(response),
Mockito.<HttpClientContext>any())).thenReturn(false);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(endpoint).execute("test", request, context);
Mockito.verify(endpoint, Mockito.times(1)).markConnectionNonReusable();
Mockito.verify(endpoint, Mockito.never()).releaseEndpoint();
Mockito.verify(execRuntime).execute("test", request, context);
Mockito.verify(execRuntime, Mockito.times(1)).markConnectionNonReusable();
Mockito.verify(execRuntime, Mockito.never()).releaseEndpoint();
Assertions.assertNull(context.getUserToken());
Assertions.assertNotNull(finalResponse);
@ -117,7 +150,7 @@ public class TestMainClientExec {
final ClassicHttpResponse response = new BasicClassicHttpResponse(200, "OK");
response.setEntity(null);
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
@ -126,12 +159,12 @@ public class TestMainClientExec {
Mockito.same(response),
Mockito.<HttpClientContext>any())).thenReturn(false);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(endpoint).execute("test", request, context);
Mockito.verify(endpoint).markConnectionNonReusable();
Mockito.verify(endpoint).releaseEndpoint();
Mockito.verify(execRuntime).execute("test", request, context);
Mockito.verify(execRuntime).markConnectionNonReusable();
Mockito.verify(execRuntime).releaseEndpoint();
Assertions.assertNotNull(finalResponse);
Assertions.assertTrue(finalResponse instanceof CloseableHttpResponse);
@ -149,7 +182,7 @@ public class TestMainClientExec {
.build());
final ConnectionState connectionState = new ConnectionState();
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
@ -161,12 +194,12 @@ public class TestMainClientExec {
Mockito.same(response),
Mockito.<HttpClientContext>any())).thenReturn(TimeValue.ofMilliseconds(678L));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(endpoint).execute("test", request, context);
Mockito.verify(endpoint).markConnectionReusable(null, TimeValue.ofMilliseconds(678L));
Mockito.verify(endpoint, Mockito.never()).releaseEndpoint();
Mockito.verify(execRuntime).execute("test", request, context);
Mockito.verify(execRuntime).markConnectionReusable(null, TimeValue.ofMilliseconds(678L));
Mockito.verify(execRuntime, Mockito.never()).releaseEndpoint();
Assertions.assertNotNull(finalResponse);
Assertions.assertTrue(finalResponse instanceof CloseableHttpResponse);
@ -179,7 +212,7 @@ public class TestMainClientExec {
final ClassicHttpRequest request = new HttpGet("http://bar/test");
final ClassicHttpResponse response = new BasicClassicHttpResponse(200, "OK");
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
@ -191,11 +224,11 @@ public class TestMainClientExec {
Mockito.same(response),
Mockito.<HttpClientContext>any())).thenReturn(TimeValue.ofMilliseconds(678L));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(endpoint).execute("test", request, context);
Mockito.verify(endpoint).releaseEndpoint();
Mockito.verify(execRuntime).execute("test", request, context);
Mockito.verify(execRuntime).releaseEndpoint();
Assertions.assertNotNull(finalResponse);
Assertions.assertTrue(finalResponse instanceof CloseableHttpResponse);
@ -213,7 +246,7 @@ public class TestMainClientExec {
.build());
final ConnectionState connectionState = new ConnectionState();
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenReturn(response);
@ -222,18 +255,18 @@ public class TestMainClientExec {
Mockito.same(response),
Mockito.<HttpClientContext>any())).thenReturn(Boolean.FALSE);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
final ClassicHttpResponse finalResponse = mainClientExec.execute(request, scope, null);
Mockito.verify(endpoint, Mockito.times(1)).execute("test", request, context);
Mockito.verify(endpoint, Mockito.never()).disconnectEndpoint();
Mockito.verify(endpoint, Mockito.never()).releaseEndpoint();
Mockito.verify(execRuntime, Mockito.times(1)).execute("test", request, context);
Mockito.verify(execRuntime, Mockito.never()).disconnectEndpoint();
Mockito.verify(execRuntime, Mockito.never()).releaseEndpoint();
Assertions.assertNotNull(finalResponse);
Assertions.assertTrue(finalResponse instanceof CloseableHttpResponse);
finalResponse.close();
Mockito.verify(endpoint).disconnectEndpoint();
Mockito.verify(endpoint).discardEndpoint();
Mockito.verify(execRuntime).disconnectEndpoint();
Mockito.verify(execRuntime).discardEndpoint();
}
@Test
@ -242,15 +275,15 @@ public class TestMainClientExec {
final HttpClientContext context = new HttpClientContext();
final ClassicHttpRequest request = new HttpGet("http://bar/test");
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenThrow(new ConnectionShutdownException());
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(InterruptedIOException.class, () ->
mainClientExec.execute(request, scope, null));
Mockito.verify(endpoint).discardEndpoint();
Mockito.verify(execRuntime).discardEndpoint();
}
@Test
@ -259,15 +292,15 @@ public class TestMainClientExec {
final HttpClientContext context = new HttpClientContext();
final ClassicHttpRequest request = new HttpGet("http://bar/test");
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenThrow(new RuntimeException("Ka-boom"));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(RuntimeException.class, () ->
mainClientExec.execute(request, scope, null));
Mockito.verify(endpoint).discardEndpoint();
Mockito.verify(execRuntime).discardEndpoint();
}
@Test
@ -276,15 +309,15 @@ public class TestMainClientExec {
final HttpClientContext context = new HttpClientContext();
final ClassicHttpRequest request = new HttpGet("http://bar/test");
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenThrow(new HttpException("Ka-boom"));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(HttpException.class, () ->
mainClientExec.execute(request, scope, null));
Mockito.verify(endpoint).discardEndpoint();
Mockito.verify(execRuntime).discardEndpoint();
}
@Test
@ -293,15 +326,15 @@ public class TestMainClientExec {
final HttpClientContext context = new HttpClientContext();
final ClassicHttpRequest request = new HttpGet("http://bar/test");
Mockito.when(endpoint.execute(
Mockito.when(execRuntime.execute(
Mockito.anyString(),
Mockito.same(request),
Mockito.any())).thenThrow(new IOException("Ka-boom"));
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, endpoint, context);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(IOException.class, () ->
mainClientExec.execute(request, scope, null));
Mockito.verify(endpoint).discardEndpoint();
Mockito.verify(execRuntime).discardEndpoint();
}
static class ConnectionState {

View File

@ -55,7 +55,6 @@ import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.ProtocolException;
import org.apache.hc.core5.http.message.BasicClassicHttpResponse;
import org.apache.hc.core5.http.protocol.HttpProcessor;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -67,8 +66,6 @@ import org.mockito.stubbing.Answer;
@SuppressWarnings({"static-access"}) // test code
public class TestProtocolExec {
@Mock
private HttpProcessor httpProcessor;
@Mock
private AuthenticationStrategy targetAuthStrategy;
@Mock
@ -85,35 +82,11 @@ public class TestProtocolExec {
@BeforeEach
public void setup() throws Exception {
MockitoAnnotations.openMocks(this);
protocolExec = new ProtocolExec(httpProcessor, targetAuthStrategy, proxyAuthStrategy, null, true);
protocolExec = new ProtocolExec(targetAuthStrategy, proxyAuthStrategy, null, true);
target = new HttpHost("foo", 80);
proxy = new HttpHost("bar", 8888);
}
@Test
public void testFundamentals() throws Exception {
final HttpRoute route = new HttpRoute(target);
final ClassicHttpRequest request = new HttpGet("/test");
final HttpClientContext context = HttpClientContext.create();
final ClassicHttpResponse response = Mockito.mock(ClassicHttpResponse.class);
Mockito.when(chain.proceed(
Mockito.any(),
Mockito.any())).thenReturn(response);
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
protocolExec.execute(request, scope, chain);
Mockito.verify(httpProcessor).process(request, null, context);
Mockito.verify(chain).proceed(request, scope);
Mockito.verify(httpProcessor).process(response, null, context);
Assertions.assertEquals(route, context.getHttpRoute());
Assertions.assertSame(request, context.getRequest());
Assertions.assertSame(response, context.getResponse());
}
@Test
public void testUserInfoInRequestURI() throws Exception {
final HttpRoute route = new HttpRoute(new HttpHost("somehost", 8080));
@ -135,8 +108,7 @@ public class TestProtocolExec {
Mockito.when(chain.proceed(
Mockito.any(),
Mockito.any())).thenReturn(response);
Mockito.doThrow(new HttpException("Ooopsie")).when(httpProcessor).process(
Mockito.same(response), Mockito.isNull(), Mockito.any());
Mockito.doThrow(new HttpException("Ooopsie")).when(chain).proceed(Mockito.any(), Mockito.any());
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(HttpException.class, () ->
protocolExec.execute(request, scope, chain));
@ -153,8 +125,7 @@ public class TestProtocolExec {
Mockito.when(chain.proceed(
Mockito.any(),
Mockito.any())).thenReturn(response);
Mockito.doThrow(new IOException("Ooopsie")).when(httpProcessor).process(
Mockito.same(response), Mockito.isNull(), Mockito.any());
Mockito.doThrow(new IOException("Ooopsie")).when(chain).proceed(Mockito.any(), Mockito.any());
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(IOException.class, () ->
protocolExec.execute(request, scope, chain));
@ -171,8 +142,7 @@ public class TestProtocolExec {
Mockito.when(chain.proceed(
Mockito.any(),
Mockito.any())).thenReturn(response);
Mockito.doThrow(new RuntimeException("Ooopsie")).when(httpProcessor).process(
Mockito.same(response), Mockito.isNull(), Mockito.any());
Mockito.doThrow(new RuntimeException("Ooopsie")).when(chain).proceed(Mockito.any(), Mockito.any());
final ExecChain.Scope scope = new ExecChain.Scope("test", route, request, execRuntime, context);
Assertions.assertThrows(RuntimeException.class, () ->
protocolExec.execute(request, scope, chain));