400014 - Http async client DNS performance.

Introduced SocketAddressResolver to perform DNS resolution in a separate thread,
and updated HttpClient to make use of it.
This commit is contained in:
Simone Bordet 2013-02-14 11:26:44 +01:00
parent dd96cc50b2
commit df56bd3c27
8 changed files with 302 additions and 49 deletions

View File

@ -64,6 +64,7 @@ import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Jetty;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -122,6 +123,7 @@ public class HttpClient extends ContainerLifeCycle
private volatile Executor executor;
private volatile ByteBufferPool byteBufferPool;
private volatile Scheduler scheduler;
private volatile SocketAddressResolver resolver;
private volatile SelectorManager selectorManager;
private volatile HttpField agentField = new HttpField(HttpHeader.USER_AGENT, "Jetty/" + Jetty.VERSION);
private volatile boolean followRedirects = true;
@ -132,6 +134,7 @@ public class HttpClient extends ContainerLifeCycle
private volatile int maxRedirects = 8;
private volatile SocketAddress bindAddress;
private volatile long connectTimeout = 15000;
private volatile long addressResolutionTimeout = 15000;
private volatile long idleTimeout;
private volatile boolean tcpNoDelay = true;
private volatile boolean dispatchIO = true;
@ -198,6 +201,8 @@ public class HttpClient extends ContainerLifeCycle
scheduler = new ScheduledExecutorScheduler(name + "-scheduler", false);
addBean(scheduler);
resolver = new SocketAddressResolver(executor, scheduler, getAddressResolutionTimeout());
selectorManager = newSelectorManager();
selectorManager.setConnectTimeout(getConnectTimeout());
addBean(selectorManager);
@ -484,30 +489,44 @@ public class HttpClient extends ContainerLifeCycle
destination.send(request, listeners);
}
protected void newConnection(HttpDestination destination, Promise<Connection> promise)
protected void newConnection(final HttpDestination destination, final Promise<Connection> promise)
{
SocketChannel channel = null;
try
Destination.Address address = destination.getConnectAddress();
resolver.resolve(address.getHost(), address.getPort(), new Promise<SocketAddress>()
{
channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(channel);
channel.configureBlocking(false);
channel.connect(destination.getConnectAddress());
@Override
public void succeeded(SocketAddress socketAddress)
{
SocketChannel channel = null;
try
{
channel = SocketChannel.open();
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
channel.bind(bindAddress);
configure(channel);
channel.configureBlocking(false);
channel.connect(socketAddress);
Future<Connection> result = new ConnectionCallback(destination, promise);
selectorManager.connect(channel, result);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Exception x)
{
if (channel != null)
close(channel);
promise.failed(x);
}
Future<Connection> futureConnection = new ConnectionCallback(destination, promise);
selectorManager.connect(channel, futureConnection);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
if (channel != null)
close(channel);
promise.failed(x);
}
}
@Override
public void failed(Throwable x)
{
promise.failed(x);
}
});
}
protected void configure(SocketChannel channel) throws SocketException
@ -599,6 +618,22 @@ public class HttpClient extends ContainerLifeCycle
this.connectTimeout = connectTimeout;
}
/**
* @return the timeout, in milliseconds, for the DNS resolution of host addresses
*/
public long getAddressResolutionTimeout()
{
return addressResolutionTimeout;
}
/**
* @param addressResolutionTimeout the timeout, in milliseconds, for the DNS resolution of host addresses
*/
public void setAddressResolutionTimeout(long addressResolutionTimeout)
{
this.addressResolutionTimeout = addressResolutionTimeout;
}
/**
* @return the max time a connection can be idle (that is, without traffic of bytes in either direction)
*/

View File

@ -19,13 +19,11 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -41,7 +39,6 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@ -56,13 +53,13 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private final HttpClient client;
private final String scheme;
private final String host;
private final InetSocketAddress address;
private final Address address;
private final Queue<RequestContext> requests;
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
private final ResponseNotifier responseNotifier;
private final InetSocketAddress proxyAddress;
private final Address proxyAddress;
private final HttpField hostField;
public HttpDestination(HttpClient client, String scheme, String host, int port)
@ -70,7 +67,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
this.client = client;
this.scheme = scheme;
this.host = host;
this.address = new InetSocketAddress(host, port);
this.address = new Address(host, port);
int maxRequestsQueued = client.getMaxRequestsQueuedPerDestination();
int capacity = Math.min(32, maxRequestsQueued);
@ -86,7 +83,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
ProxyConfiguration proxyConfig = client.getProxyConfiguration();
proxyAddress = proxyConfig != null && proxyConfig.matches(host, port) ?
new InetSocketAddress(proxyConfig.getHost(), proxyConfig.getPort()) : null;
new Address(proxyConfig.getHost(), proxyConfig.getPort()) : null;
hostField = new HttpField(HttpHeader.HOST, host + ":" + port);
}
@ -121,7 +118,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
return address.getPort();
}
public InetSocketAddress getConnectAddress()
public Address getConnectAddress()
{
return isProxied() ? proxyAddress : address;
}
@ -175,14 +172,12 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
}
}
public Future<Connection> newConnection()
public void newConnection(Promise<Connection> promise)
{
FuturePromise<Connection> result = new FuturePromise<>();
newConnection(new ProxyPromise(result));
return result;
createConnection(new ProxyPromise(promise));
}
protected void newConnection(Promise<Connection> promise)
protected void createConnection(Promise<Connection> promise)
{
client.newConnection(this, promise);
}
@ -236,7 +231,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
// Create a new connection, and pass a ProxyPromise to establish a proxy tunnel, if needed.
// Differently from the case where the connection is created explicitly by applications, here
// we need to do a bit more logging and keep track of the connection count in case of failures.
newConnection(new ProxyPromise(promise)
createConnection(new ProxyPromise(promise)
{
@Override
public void succeeded(Connection connection)
@ -449,7 +444,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
getScheme(),
getHost(),
getPort(),
proxyAddress == null ? "" : " via " + proxyAddress.getHostString() + ":" + proxyAddress.getPort());
proxyAddress == null ? "" : " via " + proxyAddress.getHost() + ":" + proxyAddress.getPort());
}
private static class RequestContext
@ -499,8 +494,8 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private void tunnel(final Connection connection)
{
String target = address.getHostString() + ":" + address.getPort();
Request connect = client.newRequest(proxyAddress.getHostString(), proxyAddress.getPort())
String target = address.getHost() + ":" + address.getPort();
Request connect = client.newRequest(proxyAddress.getHost(), proxyAddress.getPort())
.scheme(HttpScheme.HTTP.asString())
.method(HttpMethod.CONNECT)
.path(target)

View File

@ -18,13 +18,15 @@
package org.eclipse.jetty.client.api;
import org.eclipse.jetty.util.Promise;
/**
* {@link Connection} represent a connection to a {@link Destination} and allow applications to send
* requests via {@link #send(Request, Response.CompleteListener)}.
* <p />
* {@link Connection}s are normally pooled by {@link Destination}s, but unpooled {@link Connection}s
* may be created by applications that want to do their own connection management via
* {@link Destination#newConnection()} and {@link Connection#close()}.
* {@link Destination#newConnection(Promise)} and {@link Connection#close()}.
*/
public interface Connection extends AutoCloseable
{

View File

@ -18,16 +18,16 @@
package org.eclipse.jetty.client.api;
import java.util.concurrent.Future;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.Promise;
/**
* {@link Destination} represents the triple made of the {@link #getScheme}, the {@link #getHost}
* and the {@link #getPort}.
* <p />
* {@link Destination} holds a pool of {@link Connection}s, but allows to create unpooled
* connections if the application wants full control over connection management via {@link #newConnection()}.
* connections if the application wants full control over connection management via {@link #newConnection(Promise)}.
* <p />
* {@link Destination}s may be obtained via {@link HttpClient#getDestination(String, String, int)}
*/
@ -49,7 +49,40 @@ public interface Destination
int getPort();
/**
* @return a future to a new, unpooled, {@link Connection}
* Creates asynchronously a new, unpooled, {@link Connection} that will be returned
* at a later time through the given {@link Promise}.
* <p />
* Use {@link FuturePromise} to wait for the connection:
* <pre>
* Destination destination = ...;
* FuturePromise&lt;Connection&gt; futureConnection = new FuturePromise&lt;&gt;();
* destination.newConnection(futureConnection);
* Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
* </pre>
*
* @param promise the promise of a new, unpooled, {@link Connection}
*/
Future<Connection> newConnection();
void newConnection(Promise<Connection> promise);
public static class Address
{
private final String host;
private final int port;
public Address(String host, int port)
{
this.host = host;
this.port = port;
}
public String getHost()
{
return host;
}
public int getPort()
{
return port;
}
}
}

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.Assert;
import org.junit.Test;
@ -43,7 +44,9 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
start(new EmptyServerHandler());
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest(destination.getHost(), destination.getPort()).scheme(scheme);
FutureResponseListener listener = new FutureResponseListener(request);
@ -66,7 +69,9 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
start(new EmptyServerHandler());
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS);
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
Connection connection = futureConnection.get(5, TimeUnit.SECONDS);
Request request = client.newRequest(destination.getHost(), destination.getPort()).scheme(scheme);
FutureResponseListener listener = new FutureResponseListener(request);
connection.send(request, listener);

View File

@ -40,6 +40,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
@ -175,7 +176,9 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
@ -203,7 +206,9 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
Destination destination = client.getDestination(scheme, "localhost", connector.getLocalPort());
try (Connection connection = destination.newConnection().get(5, TimeUnit.SECONDS))
FuturePromise<Connection> futureConnection = new FuturePromise<>();
destination.newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest(destination.getHost(), destination.getPort())
.scheme(scheme)

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.FuturePromise;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@ -149,7 +150,9 @@ public class Usage
client.start();
// Create an explicit connection, and use try-with-resources to manage it
try (Connection connection = client.getDestination("http", "localhost", 8080).newConnection().get(5, TimeUnit.SECONDS))
FuturePromise<Connection> futureConnection = new FuturePromise<>();
client.getDestination("http", "localhost", 8080).newConnection(futureConnection);
try (Connection connection = futureConnection.get(5, TimeUnit.SECONDS))
{
Request request = client.newRequest("localhost", 8080);

View File

@ -0,0 +1,175 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.UnresolvedAddressException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* Creates asynchronously {@link SocketAddress} instances, returning them through a {@link Promise},
* in order to avoid blocking on DNS lookup.
* <p />
* {@link InetSocketAddress#InetSocketAddress(String, int)} attempts to perform a DNS resolution of
* the host name, and this may block for several seconds.
* This class creates the {@link InetSocketAddress} in a separate thread and provides the result
* through a {@link Promise}, with the possibility to specify a timeout for the operation.
* <p />
* Example usage:
* <pre>
* SocketAddressResolver resolver = new SocketAddressResolver(executor, scheduler);
* resolver.resolve("www.google.com", 80, new Promise&lt;SocketAddress&gt;()
* {
* public void succeeded(SocketAddress result)
* {
* // The address was resolved
* }
*
* public void failed(Throwable failure)
* {
* // The address resolution failed
* }
* });
* </pre>
*/
public class SocketAddressResolver
{
private static final Logger LOG = Log.getLogger(SocketAddressResolver.class);
private final Executor executor;
private final Scheduler scheduler;
private final long timeout;
/**
* Creates a new instance with the given executor (to perform DNS resolution in a separate thread),
* the given scheduler (to cancel the operation if it takes too long) and the given timeout, in milliseconds.
*
* @param executor the thread pool to use to perform DNS resolution in pooled threads
* @param scheduler the scheduler to schedule tasks to cancel DNS resolution if it takes too long
* @param timeout the timeout, in milliseconds, for the DNS resolution to complete
*/
public SocketAddressResolver(Executor executor, Scheduler scheduler, long timeout)
{
this.executor = executor;
this.scheduler = scheduler;
this.timeout = timeout;
}
public Executor getExecutor()
{
return executor;
}
public Scheduler getScheduler()
{
return scheduler;
}
public long getTimeout()
{
return timeout;
}
/**
* Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise}
* with the default timeout.
*
* @param host the host to resolve
* @param port the port of the resulting socket address
* @param promise the callback invoked when the resolution succeeds or fails
* @see #resolve(String, int, long, Promise)
*/
public void resolve(String host, int port, Promise<SocketAddress> promise)
{
resolve(host, port, timeout, promise);
}
/**
* Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise}
* with the given timeout.
*
* @param host the host to resolve
* @param port the port of the resulting socket address
* @param timeout the timeout, in milliseconds, for the DNS resolution to complete
* @param promise the callback invoked when the resolution succeeds or fails
*/
protected void resolve(final String host, final int port, final long timeout, final Promise<SocketAddress> promise)
{
executor.execute(new Runnable()
{
@Override
public void run()
{
Scheduler.Task task = null;
final AtomicBoolean complete = new AtomicBoolean();
if (timeout > 0)
{
final Thread thread = Thread.currentThread();
task = scheduler.schedule(new Runnable()
{
@Override
public void run()
{
if (complete.compareAndSet(false, true))
{
promise.failed(new TimeoutException());
thread.interrupt();
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
try
{
long start = System.nanoTime();
InetSocketAddress result = new InetSocketAddress(host, port);
long elapsed = System.nanoTime() - start;
LOG.debug("Resolved {} in {} ms", host, TimeUnit.NANOSECONDS.toMillis(elapsed));
if (complete.compareAndSet(false, true))
{
if (result.isUnresolved())
promise.failed(new UnresolvedAddressException());
else
promise.succeeded(result);
}
}
catch (Throwable x)
{
if (complete.compareAndSet(false, true))
promise.failed(x);
}
finally
{
if (task != null)
task.cancel();
// Reset the interrupted status before releasing the thread to the pool
Thread.interrupted();
}
}
});
}
}