Multi-home DNS aware implementation of ConnectionInitiator; ConnectTimeoutException and HttpHostConnectException to use NamedEndpoint instead of HttpHost

This commit is contained in:
Oleg Kalnichevski 2017-11-08 15:01:18 +01:00
parent 2ad0370517
commit 7c0a1127be
6 changed files with 285 additions and 97 deletions

View File

@ -32,7 +32,7 @@ import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.util.Arrays;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.net.NamedEndpoint;
/**
* A timeout while connecting to an HTTP server or waiting for an
@ -45,14 +45,14 @@ public class ConnectTimeoutException extends InterruptedIOException {
private static final long serialVersionUID = -4816682903149535989L;
private final HttpHost host;
private final NamedEndpoint namedEndpoint;
/**
* Creates a ConnectTimeoutException with a {@code null} detail message.
*/
public ConnectTimeoutException() {
super();
this.host = null;
this.namedEndpoint = null;
}
/**
@ -60,7 +60,7 @@ public class ConnectTimeoutException extends InterruptedIOException {
*/
public ConnectTimeoutException(final String message) {
super(message);
this.host = null;
this.namedEndpoint = null;
}
/**
@ -70,23 +70,23 @@ public class ConnectTimeoutException extends InterruptedIOException {
*/
public ConnectTimeoutException(
final IOException cause,
final HttpHost host,
final NamedEndpoint namedEndpoint,
final InetAddress... remoteAddresses) {
super("Connect to " +
(host != null ? host.toHostString() : "remote host") +
(namedEndpoint != null ? namedEndpoint : "remote endpoint") +
(remoteAddresses != null && remoteAddresses.length > 0 ?
" " + Arrays.asList(remoteAddresses) : "") +
((cause != null && cause.getMessage() != null) ?
" failed: " + cause.getMessage() : " timed out"));
this.host = host;
this.namedEndpoint = namedEndpoint;
initCause(cause);
}
/**
* @since 4.3
* @since 5.0
*/
public HttpHost getHost() {
return host;
public NamedEndpoint getHost() {
return this.namedEndpoint;
}
}

View File

@ -31,10 +31,10 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.util.Arrays;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.net.NamedEndpoint;
/**
* A {@link ConnectException} that specifies the {@link HttpHost} that was
* A {@link ConnectException} that specifies the {@link NamedEndpoint} that was
* being connected to.
*
* @since 4.0
@ -43,29 +43,32 @@ public class HttpHostConnectException extends ConnectException {
private static final long serialVersionUID = -3194482710275220224L;
private final HttpHost host;
private final NamedEndpoint namedEndpoint;
/**
* Creates a HttpHostConnectException based on original {@link java.io.IOException}.
*
* @since 4.3
* @since 5.0
*/
public HttpHostConnectException(
final IOException cause,
final HttpHost host,
final NamedEndpoint namedEndpoint,
final InetAddress... remoteAddresses) {
super("Connect to " +
(host != null ? host.toHostString() : "remote host") +
(namedEndpoint != null ? namedEndpoint : "remote endpoint") +
(remoteAddresses != null && remoteAddresses .length > 0 ?
" " + Arrays.asList(remoteAddresses) : "") +
((cause != null && cause.getMessage() != null) ?
" failed: " + cause.getMessage() : " refused"));
this.host = host;
this.namedEndpoint = namedEndpoint;
initCause(cause);
}
public HttpHost getHost() {
return this.host;
/**
* @since 5.0
*/
public NamedEndpoint getHost() {
return this.namedEndpoint;
}
}

View File

@ -27,18 +27,13 @@
package org.apache.hc.client5.http.impl.nio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.SchemePortResolver;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.client5.http.UnsupportedSchemeException;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
@ -63,7 +58,7 @@ import org.apache.hc.core5.util.TimeValue;
final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectionOperator {
private final SchemePortResolver schemePortResolver;
private final DnsResolver dnsResolver;
private final MultihomeIOSessionRequester sessionRequester;
private final Lookup<TlsStrategy> tlsStrategyLookup;
DefaultAsyncClientConnectionOperator(
@ -72,7 +67,7 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
final DnsResolver dnsResolver) {
this.tlsStrategyLookup = Args.notNull(tlsStrategyLookup, "TLS strategy lookup");
this.schemePortResolver = schemePortResolver != null ? schemePortResolver : DefaultSchemePortResolver.INSTANCE;
this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver);
}
@Override
@ -86,79 +81,51 @@ final class DefaultAsyncClientConnectionOperator implements AsyncClientConnectio
Args.notNull(connectionInitiator, "Connection initiator");
Args.notNull(host, "Host");
final ComplexFuture<ManagedAsyncClientConnection> future = new ComplexFuture<>(callback);
final InetAddress[] remoteAddresses;
final HttpHost remoteEndpoint;
try {
remoteAddresses = dnsResolver.resolve(host.getHostName());
} catch (final UnknownHostException ex) {
future.failed(ex);
return future;
}
final int port;
try {
port = schemePortResolver.resolve(host);
remoteEndpoint = host.getPort() > 0 ? host :
new HttpHost(host.getHostName(), schemePortResolver.resolve(host), host.getSchemeName());
} catch (final UnsupportedSchemeException ex) {
future.failed(ex);
return future;
}
final InetAddress remoteAddress = host.getAddress();
final TlsStrategy tlsStrategy = tlsStrategyLookup != null ? tlsStrategyLookup.lookup(host.getSchemeName()) : null;
final Runnable runnable = new Runnable() {
final Future<IOSession> sessionFuture = sessionRequester.connect(
connectionInitiator,
remoteEndpoint,
remoteAddress != null ? new InetSocketAddress(remoteAddress, remoteEndpoint.getPort()) : null,
localAddress,
connectTimeout,
attachment,
new FutureCallback<IOSession>() {
private final AtomicInteger attempt = new AtomicInteger(0);
@Override
public void completed(final IOSession session) {
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
if (tlsStrategy != null) {
tlsStrategy.upgrade(
connection,
host,
session.getLocalAddress(),
session.getRemoteAddress(),
attachment);
}
future.completed(connection);
}
void executeNext() {
final int index = attempt.getAndIncrement();
final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], port);
final Future<IOSession> sessionFuture = connectionInitiator.connect(
host,
remoteAddress,
localAddress,
connectTimeout,
attachment,
new FutureCallback<IOSession>() {
@Override
public void failed(final Exception ex) {
future.failed(ex);
}
@Override
public void completed(final IOSession session) {
final DefaultManagedAsyncClientConnection connection = new DefaultManagedAsyncClientConnection(session);
if (tlsStrategy != null) {
tlsStrategy.upgrade(
connection,
host,
session.getLocalAddress(),
session.getRemoteAddress(),
attachment);
}
future.completed(connection);
}
@Override
public void cancelled() {
future.cancel();
}
@Override
public void failed(final Exception cause) {
if (attempt.get() >= remoteAddresses.length) {
if (cause instanceof IOException) {
future.failed(new HttpHostConnectException((IOException) cause, host, remoteAddresses));
} else {
future.failed(cause);
}
} else {
executeNext();
}
}
@Override
public void cancelled() {
future.cancel();
}
});
future.setDependency(sessionFuture);
}
@Override
public void run() {
executeNext();
}
};
runnable.run();
});
future.setDependency(sessionFuture);
return future;
}

View File

@ -0,0 +1,138 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.nio;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.HttpHostConnectException;
import org.apache.hc.client5.http.SystemDefaultDnsResolver;
import org.apache.hc.core5.concurrent.ComplexFuture;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.TimeValue;
final class MultihomeIOSessionRequester {
private final DnsResolver dnsResolver;
MultihomeIOSessionRequester(final DnsResolver dnsResolver) {
this.dnsResolver = dnsResolver != null ? dnsResolver : SystemDefaultDnsResolver.INSTANCE;
}
public Future<IOSession> connect(
final ConnectionInitiator connectionInitiator,
final NamedEndpoint remoteEndpoint,
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final TimeValue connectTimeout,
final Object attachment,
final FutureCallback<IOSession> callback) {
if (remoteAddress != null) {
return connectionInitiator.connect(remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, callback);
} else {
final ComplexFuture<IOSession> future = new ComplexFuture<>(callback);
final InetAddress[] remoteAddresses;
try {
remoteAddresses = dnsResolver.resolve(remoteEndpoint.getHostName());
} catch (final UnknownHostException ex) {
future.failed(ex);
return future;
}
final Runnable runnable = new Runnable() {
private final AtomicInteger attempt = new AtomicInteger(0);
void executeNext() {
final int index = attempt.getAndIncrement();
final InetSocketAddress remoteAddress = new InetSocketAddress(remoteAddresses[index], remoteEndpoint.getPort());
final Future<IOSession> sessionFuture = connectionInitiator.connect(
remoteEndpoint,
remoteAddress,
localAddress,
connectTimeout,
attachment,
new FutureCallback<IOSession>() {
@Override
public void completed(final IOSession session) {
future.completed(session);
}
@Override
public void failed(final Exception cause) {
if (attempt.get() >= remoteAddresses.length) {
if (cause instanceof IOException) {
future.failed(new HttpHostConnectException((IOException) cause, remoteEndpoint, remoteAddresses));
} else {
future.failed(cause);
}
} else {
executeNext();
}
}
@Override
public void cancelled() {
future.cancel();
}
});
future.setDependency(sessionFuture);
}
@Override
public void run() {
executeNext();
}
};
runnable.run();
return future;
}
}
public Future<IOSession> connect(
final ConnectionInitiator connectionInitiator,
final NamedEndpoint remoteEndpoint,
final SocketAddress localAddress,
final TimeValue connectTimeout,
final Object attachment,
final FutureCallback<IOSession> callback) {
return connect(connectionInitiator, remoteEndpoint, null, localAddress, connectTimeout, attachment, callback);
}
}

View File

@ -0,0 +1,80 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.nio;
import java.net.SocketAddress;
import java.util.concurrent.Future;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ConnectionInitiator;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
/**
* Multi-home DNS aware implementation of {@link ConnectionInitiator}.
*
* @since 5.0
*/
public final class MultuhomeConnectionInitiator implements ConnectionInitiator {
private final ConnectionInitiator connectionInitiator;
private final MultihomeIOSessionRequester sessionRequester;
public MultuhomeConnectionInitiator(
final ConnectionInitiator connectionInitiator,
final DnsResolver dnsResolver) {
this.connectionInitiator = Args.notNull(connectionInitiator, "Connection initiator");
this.sessionRequester = new MultihomeIOSessionRequester(dnsResolver);
}
@Override
public Future<IOSession> connect(
final NamedEndpoint remoteEndpoint,
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final TimeValue connectTimeout,
final Object attachment,
final FutureCallback<IOSession> callback) {
Args.notNull(remoteEndpoint, "Remote endpoint");
return sessionRequester.connect(connectionInitiator, remoteEndpoint, remoteAddress, localAddress, connectTimeout, attachment, callback);
}
public Future<IOSession> connect(
final NamedEndpoint remoteEndpoint,
final SocketAddress localAddress,
final TimeValue connectTimeout,
final Object attachment,
final FutureCallback<IOSession> callback) {
Args.notNull(remoteEndpoint, "Remote endpoint");
return sessionRequester.connect(connectionInitiator, remoteEndpoint, localAddress, connectTimeout, attachment, callback);
}
}

View File

@ -55,14 +55,14 @@ public class TestExceptions {
@Test
public void testConnectTimeoutExceptionFromNullCause() {
final ConnectTimeoutException ctx = new ConnectTimeoutException(null, null);
Assert.assertEquals("Connect to remote host timed out", ctx.getMessage());
Assert.assertEquals("Connect to remote endpoint timed out", ctx.getMessage());
}
@Test
public void testConnectTimeoutExceptionFromCause() {
final IOException cause = new IOException("something awful");
final ConnectTimeoutException ctx = new ConnectTimeoutException(cause, null);
Assert.assertEquals("Connect to remote host failed: something awful", ctx.getMessage());
Assert.assertEquals("Connect to remote endpoint failed: something awful", ctx.getMessage());
}
@Test
@ -70,7 +70,7 @@ public class TestExceptions {
final HttpHost target = new HttpHost("localhost");
final IOException cause = new IOException();
final ConnectTimeoutException ctx = new ConnectTimeoutException(cause, target);
Assert.assertEquals("Connect to localhost timed out", ctx.getMessage());
Assert.assertEquals("Connect to http://localhost timed out", ctx.getMessage());
}
@Test
@ -79,21 +79,21 @@ public class TestExceptions {
final InetAddress remoteAddress = InetAddress.getByAddress(new byte[] {1,2,3,4});
final IOException cause = new IOException();
final ConnectTimeoutException ctx = new ConnectTimeoutException(cause, target, remoteAddress);
Assert.assertEquals("Connect to localhost [/1.2.3.4] timed out", ctx.getMessage());
Assert.assertEquals("Connect to http://localhost [/1.2.3.4] timed out", ctx.getMessage());
}
@Test
public void testHttpHostConnectExceptionFromNullCause() {
final HttpHostConnectException ctx = new HttpHostConnectException(null, null,
(InetAddress [])null);
Assert.assertEquals("Connect to remote host refused", ctx.getMessage());
Assert.assertEquals("Connect to remote endpoint refused", ctx.getMessage());
}
@Test
public void testHttpHostConnectExceptionFromCause() {
final IOException cause = new IOException("something awful");
final HttpHostConnectException ctx = new HttpHostConnectException(cause, null);
Assert.assertEquals("Connect to remote host failed: something awful", ctx.getMessage());
Assert.assertEquals("Connect to remote endpoint failed: something awful", ctx.getMessage());
}
@Test
@ -101,7 +101,7 @@ public class TestExceptions {
final HttpHost target = new HttpHost("localhost");
final IOException cause = new IOException();
final HttpHostConnectException ctx = new HttpHostConnectException(cause, target);
Assert.assertEquals("Connect to localhost refused", ctx.getMessage());
Assert.assertEquals("Connect to http://localhost refused", ctx.getMessage());
}
@Test
@ -112,7 +112,7 @@ public class TestExceptions {
final IOException cause = new IOException();
final HttpHostConnectException ctx = new HttpHostConnectException(cause, target,
remoteAddress1, remoteAddress2);
Assert.assertEquals("Connect to localhost [/1.2.3.4, /5.6.7.8] refused", ctx.getMessage());
Assert.assertEquals("Connect to http://localhost [/1.2.3.4, /5.6.7.8] refused", ctx.getMessage());
}
}