Merged branch 'jetty-9.2.x' into 'master'.
This commit is contained in:
commit
542ef22ba8
10
VERSION.txt
10
VERSION.txt
|
@ -24,6 +24,16 @@ jetty-9.3.2.v20150730 - 30 July 2015
|
|||
before query when using prefix.
|
||||
+ 473832 SslConnection flips back buffers on handshake exception
|
||||
|
||||
jetty-9.2.13.v20150730 - 30 July 2015
|
||||
+ 472859 ConcatServlet may expose protected resources.
|
||||
+ 473006 Encode addPath in URLResource
|
||||
+ 473243 Delay resource close for async default content
|
||||
+ 473266 Better handling of MultiException
|
||||
+ 473322 GatherWrite limit handling
|
||||
+ 473624 ProxyServlet.Transparent / TransparentDelegate add trailing slash
|
||||
before query when using prefix.
|
||||
+ 473832 SslConnection flips back buffers on handshake exception
|
||||
|
||||
jetty-9.3.1.v20150714 - 14 July 2015
|
||||
+ 441020 Support HEADERS followed by CONTINUATION+.
|
||||
+ 460671 Rationalize property names (fix for jetty.sh)
|
||||
|
|
|
@ -88,15 +88,24 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
|
|||
if (bindAddress != null)
|
||||
channel.bind(bindAddress);
|
||||
configure(client, channel);
|
||||
channel.configureBlocking(false);
|
||||
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY, destination.getHost());
|
||||
context.put(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY, destination.getPort());
|
||||
|
||||
if (channel.connect(address))
|
||||
if (client.isConnectBlocking())
|
||||
{
|
||||
channel.socket().connect(address, (int)client.getConnectTimeout());
|
||||
channel.configureBlocking(false);
|
||||
selectorManager.accept(channel, context);
|
||||
}
|
||||
else
|
||||
selectorManager.connect(channel, context);
|
||||
{
|
||||
channel.configureBlocking(false);
|
||||
if (channel.connect(address))
|
||||
selectorManager.accept(channel, context);
|
||||
else
|
||||
selectorManager.connect(channel, context);
|
||||
}
|
||||
}
|
||||
// Must catch all exceptions, since some like
|
||||
// UnresolvedAddressException are not IOExceptions.
|
||||
|
|
|
@ -22,8 +22,10 @@ import java.io.IOException;
|
|||
import java.net.CookieManager;
|
||||
import java.net.CookiePolicy;
|
||||
import java.net.CookieStore;
|
||||
import java.net.Socket;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -137,6 +139,7 @@ public class HttpClient extends ContainerLifeCycle
|
|||
private volatile boolean strictEventOrdering = false;
|
||||
private volatile HttpField encodingField;
|
||||
private volatile boolean removeIdleDestinations = false;
|
||||
private volatile boolean connectBlocking = false;
|
||||
|
||||
/**
|
||||
* Creates a {@link HttpClient} instance that can perform requests to non-TLS destinations only
|
||||
|
@ -208,7 +211,9 @@ public class HttpClient extends ContainerLifeCycle
|
|||
transport.setHttpClient(this);
|
||||
addBean(transport);
|
||||
|
||||
resolver = new SocketAddressResolver(executor, scheduler, getAddressResolutionTimeout());
|
||||
if (resolver == null)
|
||||
resolver = new SocketAddressResolver.Async(executor, scheduler, getAddressResolutionTimeout());
|
||||
addBean(resolver);
|
||||
|
||||
handlers.put(new ContinueProtocolHandler());
|
||||
handlers.put(new RedirectProtocolHandler(this));
|
||||
|
@ -604,7 +609,8 @@ public class HttpClient extends ContainerLifeCycle
|
|||
}
|
||||
|
||||
/**
|
||||
* @return the timeout, in milliseconds, for the DNS resolution of host addresses
|
||||
* @return the timeout, in milliseconds, for the default {@link SocketAddressResolver} created at startup
|
||||
* @see #getSocketAddressResolver()
|
||||
*/
|
||||
public long getAddressResolutionTimeout()
|
||||
{
|
||||
|
@ -612,7 +618,13 @@ public class HttpClient extends ContainerLifeCycle
|
|||
}
|
||||
|
||||
/**
|
||||
* @param addressResolutionTimeout the timeout, in milliseconds, for the DNS resolution of host addresses
|
||||
* <p>Sets the socket address resolution timeout used by the default {@link SocketAddressResolver}
|
||||
* created by this {@link HttpClient} at startup.</p>
|
||||
* <p>For more fine tuned configuration of socket address resolution, see
|
||||
* {@link #setSocketAddressResolver(SocketAddressResolver)}.</p>
|
||||
*
|
||||
* @param addressResolutionTimeout the timeout, in milliseconds, for the default {@link SocketAddressResolver} created at startup
|
||||
* @see #setSocketAddressResolver(SocketAddressResolver)
|
||||
*/
|
||||
public void setAddressResolutionTimeout(long addressResolutionTimeout)
|
||||
{
|
||||
|
@ -722,6 +734,22 @@ public class HttpClient extends ContainerLifeCycle
|
|||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the {@link SocketAddressResolver} of this {@link HttpClient}
|
||||
*/
|
||||
public SocketAddressResolver getSocketAddressResolver()
|
||||
{
|
||||
return resolver;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param resolver the {@link SocketAddressResolver} of this {@link HttpClient}
|
||||
*/
|
||||
public void setSocketAddressResolver(SocketAddressResolver resolver)
|
||||
{
|
||||
this.resolver = resolver;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the max number of connections that this {@link HttpClient} opens to {@link Destination}s
|
||||
*/
|
||||
|
@ -934,6 +962,29 @@ public class HttpClient extends ContainerLifeCycle
|
|||
this.removeIdleDestinations = removeIdleDestinations;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return whether {@code connect()} operations are performed in blocking mode
|
||||
*/
|
||||
public boolean isConnectBlocking()
|
||||
{
|
||||
return connectBlocking;
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Whether {@code connect()} operations are performed in blocking mode.</p>
|
||||
* <p>If {@code connect()} are performed in blocking mode, then {@link Socket#connect(SocketAddress, int)}
|
||||
* will be used to connect to servers.</p>
|
||||
* <p>Otherwise, {@link SocketChannel#connect(SocketAddress)} will be used in non-blocking mode,
|
||||
* therefore registering for {@link SelectionKey#OP_CONNECT} and finishing the connect operation
|
||||
* when the NIO system emits that event.</p>
|
||||
*
|
||||
* @param connectBlocking whether {@code connect()} operations are performed in blocking mode
|
||||
*/
|
||||
public void setConnectBlocking(boolean connectBlocking)
|
||||
{
|
||||
this.connectBlocking = connectBlocking;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the forward proxy configuration
|
||||
*/
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.eclipse.jetty.server.ServerConnector;
|
|||
import org.eclipse.jetty.server.handler.AbstractHandler;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.LeakDetector;
|
||||
import org.eclipse.jetty.util.SocketAddressResolver;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
|
@ -84,13 +85,13 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
server.removeConnector(connector);
|
||||
LeakTrackingByteBufferPool serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
|
||||
connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(),
|
||||
serverBufferPool , 1, Math.min(1, cores / 2),
|
||||
serverBufferPool , 1, Math.min(1, cores / 2),
|
||||
AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory()));
|
||||
server.addConnector(connector);
|
||||
server.start();
|
||||
|
||||
client.stop();
|
||||
|
||||
|
||||
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
|
||||
{
|
||||
@Override
|
||||
|
@ -114,6 +115,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
}, sslContextFactory);
|
||||
newClient.setExecutor(client.getExecutor());
|
||||
newClient.setSocketAddressResolver(new SocketAddressResolver.Sync());
|
||||
client = newClient;
|
||||
LeakTrackingByteBufferPool clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
|
||||
client.setByteBufferPool(clientBufferPool);
|
||||
|
@ -144,11 +146,11 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
|
|||
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), is(0L));
|
||||
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), is(0L));
|
||||
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), is(0L));
|
||||
|
||||
|
||||
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), is(0L));
|
||||
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), is(0L));
|
||||
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), is(0L));
|
||||
|
||||
|
||||
assertThat("Connection Leaks", connectionLeaks.get(), is(0L));
|
||||
}
|
||||
|
||||
|
|
|
@ -173,6 +173,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
});
|
||||
|
||||
client.setConnectBlocking(true);
|
||||
ContentResponse response = client.GET(scheme + "://localhost:" + connector.getLocalPort());
|
||||
|
||||
Assert.assertNotNull(response);
|
||||
|
@ -1135,16 +1136,16 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.scheme(scheme)
|
||||
.send(listener);
|
||||
|
||||
|
||||
Response response = ex.exchange(null);
|
||||
|
||||
Assert.assertEquals(200, response.getStatus());
|
||||
Assert.assertArrayEquals(content, listener.getContent());
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1381,7 +1382,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
|||
|
||||
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCompleteNotInvokedUntilContentConsumed() throws Exception
|
||||
{
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.concurrent.Executor;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.servlet.ServletException;
|
||||
|
@ -299,6 +300,127 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
|
|||
}
|
||||
}
|
||||
|
||||
@Slow
|
||||
@Test
|
||||
public void testBlockingConnectTimeoutFailsRequest() throws Exception
|
||||
{
|
||||
testConnectTimeoutFailsRequest(true);
|
||||
}
|
||||
|
||||
@Slow
|
||||
@Test
|
||||
public void testNonBlockingConnectTimeoutFailsRequest() throws Exception
|
||||
{
|
||||
testConnectTimeoutFailsRequest(false);
|
||||
}
|
||||
|
||||
private void testConnectTimeoutFailsRequest(boolean blocking) throws Exception
|
||||
{
|
||||
String host = "10.255.255.1";
|
||||
int port = 80;
|
||||
int connectTimeout = 1000;
|
||||
assumeConnectTimeout(host, port, connectTimeout);
|
||||
|
||||
start(new EmptyServerHandler());
|
||||
client.stop();
|
||||
client.setConnectTimeout(connectTimeout);
|
||||
client.setConnectBlocking(blocking);
|
||||
client.start();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Request request = client.newRequest(host, port);
|
||||
request.scheme(scheme)
|
||||
.send(new Response.CompleteListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
if (result.isFailed())
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertNotNull(request.getAbortCause());
|
||||
}
|
||||
|
||||
@Slow
|
||||
@Test
|
||||
public void testConnectTimeoutIsCancelledByShorterRequestTimeout() throws Exception
|
||||
{
|
||||
String host = "10.255.255.1";
|
||||
int port = 80;
|
||||
int connectTimeout = 2000;
|
||||
assumeConnectTimeout(host, port, connectTimeout);
|
||||
|
||||
start(new EmptyServerHandler());
|
||||
client.stop();
|
||||
client.setConnectTimeout(connectTimeout);
|
||||
client.start();
|
||||
|
||||
final AtomicInteger completes = new AtomicInteger();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
Request request = client.newRequest(host, port);
|
||||
request.scheme(scheme)
|
||||
.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
|
||||
.send(new Response.CompleteListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
completes.incrementAndGet();
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertFalse(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertEquals(1, completes.get());
|
||||
Assert.assertNotNull(request.getAbortCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void retryAfterConnectTimeout() throws Exception
|
||||
{
|
||||
final String host = "10.255.255.1";
|
||||
final int port = 80;
|
||||
int connectTimeout = 1000;
|
||||
assumeConnectTimeout(host, port, connectTimeout);
|
||||
|
||||
start(new EmptyServerHandler());
|
||||
client.stop();
|
||||
client.setConnectTimeout(connectTimeout);
|
||||
client.start();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Request request = client.newRequest(host, port);
|
||||
request.scheme(scheme)
|
||||
.send(new Response.CompleteListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
if (result.isFailed())
|
||||
{
|
||||
// Retry
|
||||
client.newRequest(host, port)
|
||||
.scheme(scheme)
|
||||
.send(new Response.CompleteListener()
|
||||
{
|
||||
@Override
|
||||
public void onComplete(Result result)
|
||||
{
|
||||
if (result.isFailed())
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Assert.assertTrue(latch.await(333 * connectTimeout, TimeUnit.MILLISECONDS));
|
||||
Assert.assertNotNull(request.getAbortCause());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVeryShortTimeout() throws Exception
|
||||
{
|
||||
|
|
|
@ -31,69 +31,10 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.util.thread.Scheduler;
|
||||
|
||||
/**
|
||||
* Creates asynchronously {@link SocketAddress} instances, returning them through a {@link Promise},
|
||||
* in order to avoid blocking on DNS lookup.
|
||||
* <p>
|
||||
* {@link InetSocketAddress#InetSocketAddress(String, int)} attempts to perform a DNS resolution of
|
||||
* the host name, and this may block for several seconds.
|
||||
* This class creates the {@link InetSocketAddress} in a separate thread and provides the result
|
||||
* through a {@link Promise}, with the possibility to specify a timeout for the operation.
|
||||
* <p>
|
||||
* Example usage:
|
||||
* <pre>
|
||||
* SocketAddressResolver resolver = new SocketAddressResolver(executor, scheduler);
|
||||
* resolver.resolve("www.google.com", 80, new Promise<SocketAddress>()
|
||||
* {
|
||||
* public void succeeded(SocketAddress result)
|
||||
* {
|
||||
* // The address was resolved
|
||||
* }
|
||||
*
|
||||
* public void failed(Throwable failure)
|
||||
* {
|
||||
* // The address resolution failed
|
||||
* }
|
||||
* });
|
||||
* </pre>
|
||||
* <p>Creates {@link SocketAddress} instances, returning them through a {@link Promise}.</p>
|
||||
*/
|
||||
public class SocketAddressResolver
|
||||
public interface SocketAddressResolver
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(SocketAddressResolver.class);
|
||||
|
||||
private final Executor executor;
|
||||
private final Scheduler scheduler;
|
||||
private final long timeout;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the given executor (to perform DNS resolution in a separate thread),
|
||||
* the given scheduler (to cancel the operation if it takes too long) and the given timeout, in milliseconds.
|
||||
*
|
||||
* @param executor the thread pool to use to perform DNS resolution in pooled threads
|
||||
* @param scheduler the scheduler to schedule tasks to cancel DNS resolution if it takes too long
|
||||
* @param timeout the timeout, in milliseconds, for the DNS resolution to complete
|
||||
*/
|
||||
public SocketAddressResolver(Executor executor, Scheduler scheduler, long timeout)
|
||||
{
|
||||
this.executor = executor;
|
||||
this.scheduler = scheduler;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
}
|
||||
|
||||
public Scheduler getScheduler()
|
||||
{
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
public long getTimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise}
|
||||
* with the default timeout.
|
||||
|
@ -101,76 +42,149 @@ public class SocketAddressResolver
|
|||
* @param host the host to resolve
|
||||
* @param port the port of the resulting socket address
|
||||
* @param promise the callback invoked when the resolution succeeds or fails
|
||||
* @see #resolve(String, int, long, Promise)
|
||||
*/
|
||||
public void resolve(String host, int port, Promise<SocketAddress> promise)
|
||||
public void resolve(String host, int port, Promise<SocketAddress> promise);
|
||||
|
||||
/**
|
||||
* <p>Creates {@link SocketAddress} instances synchronously in the caller thread.</p>
|
||||
*/
|
||||
public static class Sync implements SocketAddressResolver
|
||||
{
|
||||
resolve(host, port, timeout, promise);
|
||||
@Override
|
||||
public void resolve(String host, int port, Promise<SocketAddress> promise)
|
||||
{
|
||||
try
|
||||
{
|
||||
InetSocketAddress result = new InetSocketAddress(host, port);
|
||||
if (result.isUnresolved())
|
||||
promise.failed(new UnresolvedAddressException());
|
||||
else
|
||||
promise.succeeded(result);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
promise.failed(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves the given host and port, returning a {@link SocketAddress} through the given {@link Promise}
|
||||
* with the given timeout.
|
||||
* <p>Creates {@link SocketAddress} instances asynchronously in a different thread.</p>
|
||||
* <p>{@link InetSocketAddress#InetSocketAddress(String, int)} attempts to perform a DNS
|
||||
* resolution of the host name, and this may block for several seconds.
|
||||
* This class creates the {@link InetSocketAddress} in a separate thread and provides the result
|
||||
* through a {@link Promise}, with the possibility to specify a timeout for the operation.</p>
|
||||
* <p>Example usage:</p>
|
||||
* <pre>
|
||||
* SocketAddressResolver resolver = new SocketAddressResolver.Async(executor, scheduler, timeout);
|
||||
* resolver.resolve("www.google.com", 80, new Promise<SocketAddress>()
|
||||
* {
|
||||
* public void succeeded(SocketAddress result)
|
||||
* {
|
||||
* // The address was resolved
|
||||
* }
|
||||
*
|
||||
* @param host the host to resolve
|
||||
* @param port the port of the resulting socket address
|
||||
* @param timeout the timeout, in milliseconds, for the DNS resolution to complete
|
||||
* @param promise the callback invoked when the resolution succeeds or fails
|
||||
* public void failed(Throwable failure)
|
||||
* {
|
||||
* // The address resolution failed
|
||||
* }
|
||||
* });
|
||||
* </pre>
|
||||
*/
|
||||
protected void resolve(final String host, final int port, final long timeout, final Promise<SocketAddress> promise)
|
||||
public static class Async implements SocketAddressResolver
|
||||
{
|
||||
executor.execute(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
Scheduler.Task task = null;
|
||||
final AtomicBoolean complete = new AtomicBoolean();
|
||||
if (timeout > 0)
|
||||
{
|
||||
final Thread thread = Thread.currentThread();
|
||||
task = scheduler.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if (complete.compareAndSet(false, true))
|
||||
{
|
||||
promise.failed(new TimeoutException());
|
||||
thread.interrupt();
|
||||
}
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
private static final Logger LOG = Log.getLogger(SocketAddressResolver.class);
|
||||
|
||||
try
|
||||
private final Executor executor;
|
||||
private final Scheduler scheduler;
|
||||
private final long timeout;
|
||||
|
||||
/**
|
||||
* Creates a new instance with the given executor (to perform DNS resolution in a separate thread),
|
||||
* the given scheduler (to cancel the operation if it takes too long) and the given timeout, in milliseconds.
|
||||
*
|
||||
* @param executor the thread pool to use to perform DNS resolution in pooled threads
|
||||
* @param scheduler the scheduler to schedule tasks to cancel DNS resolution if it takes too long
|
||||
* @param timeout the timeout, in milliseconds, for the DNS resolution to complete
|
||||
*/
|
||||
public Async(Executor executor, Scheduler scheduler, long timeout)
|
||||
{
|
||||
this.executor = executor;
|
||||
this.scheduler = scheduler;
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public Executor getExecutor()
|
||||
{
|
||||
return executor;
|
||||
}
|
||||
|
||||
public Scheduler getScheduler()
|
||||
{
|
||||
return scheduler;
|
||||
}
|
||||
|
||||
public long getTimeout()
|
||||
{
|
||||
return timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resolve(final String host, final int port, final Promise<SocketAddress> promise)
|
||||
{
|
||||
executor.execute(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
long start = System.nanoTime();
|
||||
InetSocketAddress result = new InetSocketAddress(host, port);
|
||||
long elapsed = System.nanoTime() - start;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resolved {} in {} ms", host, TimeUnit.NANOSECONDS.toMillis(elapsed));
|
||||
if (complete.compareAndSet(false, true))
|
||||
Scheduler.Task task = null;
|
||||
final AtomicBoolean complete = new AtomicBoolean();
|
||||
if (timeout > 0)
|
||||
{
|
||||
if (result.isUnresolved())
|
||||
promise.failed(new UnresolvedAddressException());
|
||||
else
|
||||
promise.succeeded(result);
|
||||
final Thread thread = Thread.currentThread();
|
||||
task = scheduler.schedule(new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
if (complete.compareAndSet(false, true))
|
||||
{
|
||||
promise.failed(new TimeoutException());
|
||||
thread.interrupt();
|
||||
}
|
||||
}
|
||||
}, timeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
long start = System.nanoTime();
|
||||
InetSocketAddress result = new InetSocketAddress(host, port);
|
||||
long elapsed = System.nanoTime() - start;
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Resolved {} in {} ms", host, TimeUnit.NANOSECONDS.toMillis(elapsed));
|
||||
if (complete.compareAndSet(false, true))
|
||||
{
|
||||
if (result.isUnresolved())
|
||||
promise.failed(new UnresolvedAddressException());
|
||||
else
|
||||
promise.succeeded(result);
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (complete.compareAndSet(false, true))
|
||||
promise.failed(x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (task != null)
|
||||
task.cancel();
|
||||
// Reset the interrupted status before releasing the thread to the pool
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
if (complete.compareAndSet(false, true))
|
||||
promise.failed(x);
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (task != null)
|
||||
task.cancel();
|
||||
// Reset the interrupted status before releasing the thread to the pool
|
||||
Thread.interrupted();
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue