Callbacks for Connection initialization

This commit is contained in:
Richard Tippl 2024-04-07 15:05:32 +02:00 committed by Oleg Kalnichevski
parent 17142a7e74
commit 5eb15a1be4
6 changed files with 94 additions and 18 deletions

View File

@ -192,6 +192,7 @@ public class DefaultHttpClientConnectionOperator implements HttpClientConnection
final InetAddress address = remoteAddresses[i]; final InetAddress address = remoteAddresses[i];
final boolean last = i == remoteAddresses.length - 1; final boolean last = i == remoteAddresses.length - 1;
final InetSocketAddress remoteAddress = new InetSocketAddress(address, port); final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
onBeforeSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout); LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
} }
@ -221,6 +222,7 @@ public class DefaultHttpClientConnectionOperator implements HttpClientConnection
} }
socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0); socket.connect(remoteAddress, TimeValue.isPositive(connectTimeout) ? connectTimeout.toMillisecondsIntBound() : 0);
conn.bind(socket); conn.bind(socket);
onAfterSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(conn), endpointHost, LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(conn), endpointHost,
conn.getLocalAddress(), conn.getRemoteAddress()); conn.getLocalAddress(), conn.getRemoteAddress());
@ -229,11 +231,16 @@ public class DefaultHttpClientConnectionOperator implements HttpClientConnection
final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(endpointHost.getSchemeName()) : null; final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(endpointHost.getSchemeName()) : null;
if (tlsSocketStrategy != null) { if (tlsSocketStrategy != null) {
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost; final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(conn), tlsName); LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(conn), tlsName);
} }
final Socket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context); final Socket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context);
conn.bind(upgradedSocket); conn.bind(upgradedSocket);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(conn), tlsName);
}
} }
return; return;
} catch (final RuntimeException ex) { } catch (final RuntimeException ex) {
@ -278,14 +285,31 @@ public class DefaultHttpClientConnectionOperator implements HttpClientConnection
final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(newProtocol) : null; final TlsSocketStrategy tlsSocketStrategy = tlsSocketStrategyLookup != null ? tlsSocketStrategyLookup.lookup(newProtocol) : null;
if (tlsSocketStrategy != null) { if (tlsSocketStrategy != null) {
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost; final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} upgrading to TLS {}:{}", ConnPoolSupport.getId(conn), tlsName.getHostName(), tlsName.getPort()); LOG.debug("{} upgrading to TLS {}:{}", ConnPoolSupport.getId(conn), tlsName.getHostName(), tlsName.getPort());
} }
final SSLSocket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context); final SSLSocket upgradedSocket = tlsSocketStrategy.upgrade(socket, tlsName.getHostName(), tlsName.getPort(), attachment, context);
conn.bind(upgradedSocket); conn.bind(upgradedSocket);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} upgraded to TLS {}:{}", ConnPoolSupport.getId(conn), tlsName.getHostName(), tlsName.getPort());
}
} else { } else {
throw new UnsupportedSchemeException(newProtocol + " protocol is not supported"); throw new UnsupportedSchemeException(newProtocol + " protocol is not supported");
} }
} }
protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}
} }

View File

@ -206,7 +206,7 @@ public class PoolingHttpClientConnectionManager
} }
@Internal @Internal
protected PoolingHttpClientConnectionManager( public PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator, final HttpClientConnectionOperator httpClientConnectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy, final PoolReusePolicy poolReusePolicy,

View File

@ -34,9 +34,11 @@ import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.io.HttpClientConnectionOperator;
import org.apache.hc.client5.http.io.ManagedHttpClientConnection; import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.client5.http.ssl.TlsSocketStrategy; import org.apache.hc.client5.http.ssl.TlsSocketStrategy;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.URIScheme;
@ -94,7 +96,8 @@ public class PoolingHttpClientConnectionManagerBuilder {
return new PoolingHttpClientConnectionManagerBuilder(); return new PoolingHttpClientConnectionManagerBuilder();
} }
PoolingHttpClientConnectionManagerBuilder() { @Internal
protected PoolingHttpClientConnectionManagerBuilder() {
super(); super();
} }
@ -273,15 +276,31 @@ public class PoolingHttpClientConnectionManagerBuilder {
return this; return this;
} }
@Internal
protected HttpClientConnectionOperator createConnectionOperator(
final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver,
final TlsSocketStrategy tlsSocketStrategy) {
return new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver,
RegistryBuilder.<TlsSocketStrategy>create()
.register(URIScheme.HTTPS.id, tlsSocketStrategy)
.build());
}
public PoolingHttpClientConnectionManager build() { public PoolingHttpClientConnectionManager build() {
final TlsSocketStrategy tlsSocketStrategyCopy;
if (tlsSocketStrategy != null) {
tlsSocketStrategyCopy = tlsSocketStrategy;
} else {
if (systemProperties) {
tlsSocketStrategyCopy = DefaultClientTlsStrategy.createSystemDefault();
} else {
tlsSocketStrategyCopy = DefaultClientTlsStrategy.createDefault();
}
}
final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager( final PoolingHttpClientConnectionManager poolingmgr = new PoolingHttpClientConnectionManager(
new DefaultHttpClientConnectionOperator(schemePortResolver, dnsResolver, createConnectionOperator(schemePortResolver, dnsResolver, tlsSocketStrategyCopy),
RegistryBuilder.<TlsSocketStrategy>create()
.register(URIScheme.HTTPS.id, tlsSocketStrategy != null ? tlsSocketStrategy :
(systemProperties ?
DefaultClientTlsStrategy.createSystemDefault() :
DefaultClientTlsStrategy.createDefault()))
.build()),
poolConcurrencyPolicy, poolConcurrencyPolicy,
poolReusePolicy, poolReusePolicy,
null, null,

View File

@ -41,6 +41,7 @@ import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
import org.apache.hc.client5.http.routing.RoutingSupport; import org.apache.hc.client5.http.routing.RoutingSupport;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.concurrent.CallbackContribution; import org.apache.hc.core5.concurrent.CallbackContribution;
import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback; import org.apache.hc.core5.concurrent.FutureCallback;
@ -59,7 +60,8 @@ import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator { @Internal
public class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncClientConnectionOperator.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncClientConnectionOperator.class);
@ -105,6 +107,7 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
final InetAddress remoteAddress = endpointHost.getAddress(); final InetAddress remoteAddress = endpointHost.getAddress();
final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT; final TlsConfig tlsConfig = attachment instanceof TlsConfig ? (TlsConfig) attachment : TlsConfig.DEFAULT;
onBeforeSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout); LOG.debug("{} connecting {}->{} ({})", endpointHost, localAddress, remoteAddress, connectTimeout);
} }
@ -121,6 +124,7 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
@Override @Override
public void completed(final IOSession session) { public void completed(final IOSession session) {
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session); final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
onAfterSocketConnect(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(connection), endpointHost, LOG.debug("{} {} connected {}->{}", ConnPoolSupport.getId(connection), endpointHost,
connection.getLocalAddress(), connection.getRemoteAddress()); connection.getLocalAddress(), connection.getRemoteAddress());
@ -131,6 +135,7 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
final Timeout socketTimeout = connection.getSocketTimeout(); final Timeout socketTimeout = connection.getSocketTimeout();
final Timeout handshakeTimeout = tlsConfig.getHandshakeTimeout(); final Timeout handshakeTimeout = tlsConfig.getHandshakeTimeout();
final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost; final NamedEndpoint tlsName = endpointName != null ? endpointName : endpointHost;
onBeforeTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName); LOG.debug("{} {} upgrading to TLS", ConnPoolSupport.getId(connection), tlsName);
} }
@ -145,6 +150,10 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
public void completed(final TransportSecurityLayer transportSecurityLayer) { public void completed(final TransportSecurityLayer transportSecurityLayer) {
connection.setSocketTimeout(socketTimeout); connection.setSocketTimeout(socketTimeout);
future.completed(connection); future.completed(connection);
onAfterTlsHandshake(context, endpointHost);
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} upgraded to TLS", ConnPoolSupport.getId(connection), tlsName);
}
} }
}); });
@ -214,4 +223,16 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
} }
} }
protected void onBeforeSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onAfterSocketConnect(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onBeforeTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}
protected void onAfterTlsHandshake(final HttpContext httpContext, final HttpHost endpointHost) {
}
} }

View File

@ -164,7 +164,7 @@ public class PoolingAsyncClientConnectionManager implements AsyncClientConnectio
} }
@Internal @Internal
protected PoolingAsyncClientConnectionManager( public PoolingAsyncClientConnectionManager(
final AsyncClientConnectionOperator connectionOperator, final AsyncClientConnectionOperator connectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy, final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy, final PoolReusePolicy poolReusePolicy,

View File

@ -32,8 +32,10 @@ import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.SchemePortResolver; import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.TlsConfig; import org.apache.hc.client5.http.config.TlsConfig;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy; import org.apache.hc.client5.http.ssl.ConscryptClientTlsStrategy;
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
import org.apache.hc.core5.annotation.Internal;
import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.function.Resolver;
import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.URIScheme;
@ -90,7 +92,8 @@ public class PoolingAsyncClientConnectionManagerBuilder {
return new PoolingAsyncClientConnectionManagerBuilder(); return new PoolingAsyncClientConnectionManagerBuilder();
} }
PoolingAsyncClientConnectionManagerBuilder() { @Internal
protected PoolingAsyncClientConnectionManagerBuilder() {
super(); super();
} }
@ -229,6 +232,19 @@ public class PoolingAsyncClientConnectionManagerBuilder {
return this; return this;
} }
@Internal
protected AsyncClientConnectionOperator createConnectionOperator(
final TlsStrategy tlsStrategy,
final SchemePortResolver schemePortResolver,
final DnsResolver dnsResolver) {
return new DefaultAsyncClientConnectionOperator(
RegistryBuilder.<TlsStrategy>create()
.register(URIScheme.HTTPS.getId(), tlsStrategy)
.build(),
schemePortResolver,
dnsResolver);
}
public PoolingAsyncClientConnectionManager build() { public PoolingAsyncClientConnectionManager build() {
final TlsStrategy tlsStrategyCopy; final TlsStrategy tlsStrategyCopy;
if (tlsStrategy != null) { if (tlsStrategy != null) {
@ -249,14 +265,10 @@ public class PoolingAsyncClientConnectionManagerBuilder {
} }
} }
final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager( final PoolingAsyncClientConnectionManager poolingmgr = new PoolingAsyncClientConnectionManager(
RegistryBuilder.<TlsStrategy>create() createConnectionOperator(tlsStrategyCopy, schemePortResolver, dnsResolver),
.register(URIScheme.HTTPS.getId(), tlsStrategyCopy)
.build(),
poolConcurrencyPolicy, poolConcurrencyPolicy,
poolReusePolicy, poolReusePolicy,
null, null);
schemePortResolver,
dnsResolver);
poolingmgr.setConnectionConfigResolver(connectionConfigResolver); poolingmgr.setConnectionConfigResolver(connectionConfigResolver);
poolingmgr.setTlsConfigResolver(tlsConfigResolver); poolingmgr.setTlsConfigResolver(tlsConfigResolver);
if (maxConnTotal > 0) { if (maxConnTotal > 0) {