475605 - Add support for multi-homed destinations.

If DNS lookup returns multiple IP addresses, HttpClient tries to
connect to the first; failing that, to the second, and so on.
This commit is contained in:
Simone Bordet 2015-08-24 12:31:08 +02:00
parent aca2aa56ad
commit 545fa0f72b
6 changed files with 140 additions and 73 deletions

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
@ -85,7 +86,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
} }
@Override @Override
public void connect(SocketAddress address, Map<String, Object> context) public void connect(InetSocketAddress address, Map<String, Object> context)
{ {
SocketChannel channel = null; SocketChannel channel = null;
try try

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.CookieManager; import java.net.CookieManager;
import java.net.CookiePolicy; import java.net.CookiePolicy;
import java.net.CookieStore; import java.net.CookieStore;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.URI; import java.net.URI;
@ -547,15 +548,14 @@ public class HttpClient extends ContainerLifeCycle
protected void newConnection(final HttpDestination destination, final Promise<Connection> promise) protected void newConnection(final HttpDestination destination, final Promise<Connection> promise)
{ {
Origin.Address address = destination.getConnectAddress(); Origin.Address address = destination.getConnectAddress();
resolver.resolve(address.getHost(), address.getPort(), new Promise<SocketAddress>() resolver.resolve(address.getHost(), address.getPort(), new Promise<List<InetSocketAddress>>()
{ {
@Override @Override
public void succeeded(SocketAddress socketAddress) public void succeeded(List<InetSocketAddress> socketAddresses)
{ {
Map<String, Object> context = new HashMap<>(); Map<String, Object> context = new HashMap<>();
context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination); context.put(HttpClientTransport.HTTP_DESTINATION_CONTEXT_KEY, destination);
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, promise); connect(socketAddresses, 0, context);
transport.connect(socketAddress, context);
} }
@Override @Override
@ -563,6 +563,29 @@ public class HttpClient extends ContainerLifeCycle
{ {
promise.failed(x); promise.failed(x);
} }
private void connect(List<InetSocketAddress> socketAddresses, int index, Map<String, Object> context)
{
context.put(HttpClientTransport.HTTP_CONNECTION_PROMISE_CONTEXT_KEY, new Promise<Connection>()
{
@Override
public void succeeded(Connection result)
{
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
int nextIndex = index + 1;
if (nextIndex == socketAddresses.size())
promise.failed(x);
else
connect(socketAddresses, nextIndex, context);
}
});
transport.connect(socketAddresses.get(index), context);
}
}); });
} }

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.net.SocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnectionFactory;
@ -64,8 +64,8 @@ public interface HttpClientTransport extends ClientConnectionFactory
/** /**
* Establishes a physical connection to the given {@code address}. * Establishes a physical connection to the given {@code address}.
* *
* @param address the address to connect to * @param address the address to connect to
* @param context the context information to establish the connection * @param context the context information to establish the connection
*/ */
public void connect(SocketAddress address, Map<String, Object> context); public void connect(InetSocketAddress address, Map<String, Object> context);
} }

View File

@ -22,10 +22,12 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.HttpCookie; import java.net.HttpCookie;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URLEncoder; import java.net.URLEncoder;
import java.net.UnknownHostException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -77,6 +79,8 @@ import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume; import org.junit.Assume;
@ -852,7 +856,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
} }
@Test @Test
public void testConnectThrowsUnresolvedAddressException() throws Exception public void testConnectThrowsUnknownHostException() throws Exception
{ {
start(new EmptyServerHandler()); start(new EmptyServerHandler());
@ -864,13 +868,55 @@ public class HttpClientTest extends AbstractHttpClientServerTest
public void onComplete(Result result) public void onComplete(Result result)
{ {
Assert.assertTrue(result.isFailed()); Assert.assertTrue(result.isFailed());
Assert.assertTrue(result.getFailure() instanceof UnresolvedAddressException); Throwable failure = result.getFailure();
Assert.assertTrue(failure instanceof UnknownHostException);
latch.countDown(); latch.countDown();
} }
}); });
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
} }
@Test
public void testConnectHostWithMultipleAddresses() throws Exception
{
// Likely that the DNS for google.com returns multiple addresses.
String host = "google.com";
Assume.assumeTrue(InetAddress.getAllByName(host).length > 1);
startClient();
client.setFollowRedirects(false); // Avoid redirects from 80 to 443.
client.setSocketAddressResolver(new SocketAddressResolver.Async(client.getExecutor(), client.getScheduler(), client.getConnectTimeout())
{
@Override
public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{
super.resolve(host, port, new Promise<List<InetSocketAddress>>()
{
@Override
public void succeeded(List<InetSocketAddress> result)
{
// Replace the first address with an invalid address so that we
// test that the connect operation iterates over the addresses.
result.set(0, new InetSocketAddress("idontexist", 80));
promise.succeeded(result);
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
});
}
});
// Response code may be 200 or 302;
// if no exceptions the test passes.
client.newRequest(host, 80)
.header(HttpHeader.CONNECTION, "close")
.send();
}
@Test @Test
public void testCustomUserAgent() throws Exception public void testCustomUserAgent() throws Exception
{ {

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.http2.client.http;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.HttpClient;
@ -61,14 +60,10 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
addBean(client); addBean(client);
super.doStart(); super.doStart();
this.connectionFactory = client.getClientConnectionFactory(); this.connectionFactory = client.getClientConnectionFactory();
client.setClientConnectionFactory(new ClientConnectionFactory() client.setClientConnectionFactory((endPoint, context) ->
{ {
@Override HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException return destination.getClientConnectionFactory().newConnection(endPoint, context);
{
HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
return destination.getClientConnectionFactory().newConnection(endPoint, context);
}
}); });
} }
@ -92,7 +87,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
} }
@Override @Override
public void connect(SocketAddress address, Map<String, Object> context) public void connect(InetSocketAddress address, Map<String, Object> context)
{ {
client.setConnectTimeout(httpClient.getConnectTimeout()); client.setConnectTimeout(httpClient.getConnectTimeout());
@ -124,7 +119,7 @@ public class HttpClientTransportOverHTTP2 extends ContainerLifeCycle implements
} }
}; };
client.connect(httpClient.getSslContextFactory(), (InetSocketAddress)address, listener, promise, context); client.connect(httpClient.getSslContextFactory(), address, listener, promise, context);
} }
@Override @Override

View File

@ -18,9 +18,12 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException; import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -40,12 +43,11 @@ public interface SocketAddressResolver
/** /**
* Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise} * Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise}
* with the default timeout. * with the default timeout.
* * @param host the host to resolve
* @param host the host to resolve
* @param port the port of the resulting socket address * @param port the port of the resulting socket address
* @param promise the callback invoked when the resolution succeeds or fails * @param promise the callback invoked when the resolution succeeds or fails
*/ */
public void resolve(String host, int port, Promise<SocketAddress> promise); public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise);
/** /**
* <p>Creates {@link SocketAddress} instances synchronously in the caller thread.</p> * <p>Creates {@link SocketAddress} instances synchronously in the caller thread.</p>
@ -54,13 +56,18 @@ public interface SocketAddressResolver
public static class Sync implements SocketAddressResolver public static class Sync implements SocketAddressResolver
{ {
@Override @Override
public void resolve(String host, int port, Promise<SocketAddress> promise) public void resolve(String host, int port, Promise<List<InetSocketAddress>> promise)
{ {
try try
{ {
InetSocketAddress result = new InetSocketAddress(host, port); InetAddress[] addresses = InetAddress.getAllByName(host);
if (result.isUnresolved())
promise.failed(new UnresolvedAddressException()); List<InetSocketAddress> result = new ArrayList<>(addresses.length);
for (InetAddress address : addresses)
result.add(new InetSocketAddress(address, port));
if (result.isEmpty())
promise.failed(new UnknownHostException());
else else
promise.succeeded(result); promise.succeeded(result);
} }
@ -135,60 +142,55 @@ public interface SocketAddressResolver
} }
@Override @Override
public void resolve(final String host, final int port, final Promise<SocketAddress> promise) public void resolve(final String host, final int port, final Promise<List<InetSocketAddress>> promise)
{ {
executor.execute(new Runnable() executor.execute(() ->
{ {
@Override Scheduler.Task task = null;
public void run() final AtomicBoolean complete = new AtomicBoolean();
if (timeout > 0)
{ {
Scheduler.Task task = null; final Thread thread = Thread.currentThread();
final AtomicBoolean complete = new AtomicBoolean(); task = scheduler.schedule(() ->
if (timeout > 0)
{ {
final Thread thread = Thread.currentThread();
task = scheduler.schedule(new Runnable()
{
@Override
public void run()
{
if (complete.compareAndSet(false, true))
{
promise.failed(new TimeoutException());
thread.interrupt();
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
try
{
long start = System.nanoTime();
InetSocketAddress result = new InetSocketAddress(host, port);
long elapsed = System.nanoTime() - start;
if (LOG.isDebugEnabled())
LOG.debug("Resolved {} in {} ms", host, TimeUnit.NANOSECONDS.toMillis(elapsed));
if (complete.compareAndSet(false, true)) if (complete.compareAndSet(false, true))
{ {
if (result.isUnresolved()) promise.failed(new TimeoutException());
promise.failed(new UnresolvedAddressException()); thread.interrupt();
else
promise.succeeded(result);
} }
} }, timeout, TimeUnit.MILLISECONDS);
catch (Throwable x) }
try
{
long start = System.nanoTime();
InetAddress[] addresses = InetAddress.getAllByName(host);
long elapsed = System.nanoTime() - start;
if (LOG.isDebugEnabled())
LOG.debug("Resolved {} in {} ms", host, TimeUnit.NANOSECONDS.toMillis(elapsed));
List<InetSocketAddress> result = new ArrayList<>(addresses.length);
for (InetAddress address : addresses)
result.add(new InetSocketAddress(address, port));
if (complete.compareAndSet(false, true))
{ {
if (complete.compareAndSet(false, true)) if (result.isEmpty())
promise.failed(x); promise.failed(new UnknownHostException());
} else
finally promise.succeeded(result);
{
if (task != null)
task.cancel();
// Reset the interrupted status before releasing the thread to the pool
Thread.interrupted();
} }
} }
catch (Throwable x)
{
if (complete.compareAndSet(false, true))
promise.failed(x);
}
finally
{
if (task != null)
task.cancel();
}
}); });
} }
} }