Implemented HTTP2Client connect timeout.

This commit is contained in:
Simone Bordet 2015-02-09 16:22:21 +01:00
parent 85edb7e573
commit f974c74329
8 changed files with 280 additions and 186 deletions

View File

@ -88,6 +88,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
public void failed(Throwable x)
{
connect.set(ConnectState.DISCONNECTED);
abort(x);
}
protected boolean process(final C connection, boolean dispatch)

View File

@ -28,7 +28,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@ -299,114 +298,6 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
}
}
@Slow
@Test
public void testConnectTimeoutFailsRequest() throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(new EmptyServerHandler());
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertNotNull(request.getAbortCause());
}
@Slow
@Test
public void testConnectTimeoutIsCancelledByShorterRequestTimeout() throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 2000;
assumeConnectTimeout(host, port, connectTimeout);
start(new EmptyServerHandler());
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
Request request = client.newRequest(host, port);
request.scheme(scheme)
.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completes.incrementAndGet();
latch.countDown();
}
});
Assert.assertFalse(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(1, completes.get());
Assert.assertNotNull(request.getAbortCause());
}
@Test
public void retryAfterConnectTimeout() throws Exception
{
final String host = "10.255.255.1";
final int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(new EmptyServerHandler());
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
// Retry
client.newRequest(host, port)
.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
}
}
});
Assert.assertTrue(latch.await(333 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertNotNull(request.getAbortCause());
}
@Test
public void testVeryShortTimeout() throws Exception
{

View File

@ -176,6 +176,9 @@ public class HTTP2Client extends ContainerLifeCycle
public void setConnectTimeout(long connectTimeout)
{
this.connectTimeout = connectTimeout;
SelectorManager selector = this.selector;
if (selector != null)
selector.setConnectTimeout(connectTimeout);
}
public void connect(InetSocketAddress address, Session.Listener listener, Promise<Session> promise)
@ -265,5 +268,21 @@ public class HTTP2Client extends ContainerLifeCycle
return factory.newConnection(endpoint, context);
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
if (LOG.isDebugEnabled())
{
Object host = context.get(SslClientConnectionFactory.SSL_PEER_HOST_CONTEXT_KEY);
Object port = context.get(SslClientConnectionFactory.SSL_PEER_PORT_CONTEXT_KEY);
LOG.debug("Could not connect to {}:{}", host, port);
}
@SuppressWarnings("unchecked")
Promise<Session> promise = (Promise<Session>)context.get(HTTP2ClientConnectionFactory.SESSION_PROMISE_CONTEXT_KEY);
promise.failed(failure);
}
}
}

View File

@ -0,0 +1,86 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.util.Promise;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class ConnectTimeoutTest extends AbstractTest
{
@Test
public void testConnectTimeout() throws Exception
{
final String host = "10.255.255.1";
final int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(new ServerSessionListener.Adapter());
client.setConnectTimeout(connectTimeout);
InetSocketAddress address = new InetSocketAddress(host, port);
final CountDownLatch latch = new CountDownLatch(1);
client.connect(address, new Session.Listener.Adapter(), new Promise.Adapter<Session>()
{
@Override
public void failed(Throwable x)
{
Assert.assertTrue(x instanceof SocketTimeoutException);
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
}
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
{
try (Socket socket = new Socket())
{
// Try to connect to a private address in the 10.x.y.z range.
// These addresses are usually not routed, so an attempt to
// connect to them will hang the connection attempt, which is
// what we want to simulate in this test.
socket.connect(new InetSocketAddress(host, port), connectTimeout);
// Abort the test if we can connect.
Assume.assumeTrue(false);
}
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
Assume.assumeTrue(true);
}
catch (Throwable x)
{
// Abort if any other exception happens.
Assume.assumeTrue(false);
}
}
}

View File

@ -58,6 +58,8 @@ public class HttpClientTransportOverHTTP2 implements HttpClientTransport
@Override
public void connect(SocketAddress address, Map<String, Object> context)
{
client.setConnectTimeout(httpClient.getConnectTimeout());
final HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
@SuppressWarnings("unchecked")
final Promise<Connection> connection = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
@ -85,6 +87,7 @@ public class HttpClientTransportOverHTTP2 implements HttpClientTransport
connection.failed(failure);
}
};
client.connect(httpClient.getSslContextFactory(), (InetSocketAddress)address, listener, promise, context);
}

View File

@ -49,9 +49,10 @@ public abstract class AbstractTest
static
{
http2Client = new HTTP2Client();
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("h2-client");
http2Client = HTTP2Client.builder().executor(clientThreads).build();
http2Client.setExecutor(clientThreads);
}
@Parameterized.Parameters(name = "{index}: mod:{0}")

View File

@ -0,0 +1,169 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.server.ConnectionFactory;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
public class HttpClientConnectTimeoutTest extends AbstractTest
{
public HttpClientConnectTimeoutTest(HttpClientTransport httpClientTransport, ConnectionFactory serverConnectionFactory)
{
super(httpClientTransport, serverConnectionFactory);
}
@Test
public void testConnectTimeout() throws Exception
{
final String host = "10.255.255.1";
final int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(null);
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertNotNull(request.getAbortCause());
}
@Test
public void testConnectTimeoutIsCancelledByShorterRequestTimeout() throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 2000;
assumeConnectTimeout(host, port, connectTimeout);
start(null);
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
Request request = client.newRequest(host, port);
request.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completes.incrementAndGet();
latch.countDown();
}
});
Assert.assertFalse(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(1, completes.get());
Assert.assertNotNull(request.getAbortCause());
}
@Test
public void retryAfterConnectTimeout() throws Exception
{
final String host = "10.255.255.1";
final int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(null);
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
{
// Retry
client.newRequest(host, port).send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
}
}
});
Assert.assertTrue(latch.await(3 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertNotNull(request.getAbortCause());
}
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
{
try (Socket socket = new Socket())
{
// Try to connect to a private address in the 10.x.y.z range.
// These addresses are usually not routed, so an attempt to
// connect to them will hang the connection attempt, which is
// what we want to simulate in this test.
socket.connect(new InetSocketAddress(host, port), connectTimeout);
// Abort the test if we can connect.
Assume.assumeTrue(false);
}
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
Assume.assumeTrue(true);
}
catch (Throwable x)
{
// Abort if any other exception happens.
Assume.assumeTrue(false);
}
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
@ -31,7 +30,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
@ -193,78 +191,4 @@ public class HttpClientTest extends AbstractTest
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(0, response.getContent().length);
}
@Test
public void testUploadLargeWithExceptionThrownWhileReadingOnServer() throws Exception
{
final byte[] bytes = new byte[1024 * 1024];
new Random().nextBytes(bytes);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[512];
input.read(buffer);
throw new IOException();
}
});
try
{
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.content(new BytesContentProvider(bytes))
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.assertEquals(500, response.getStatus());
}
catch (Exception e)
{
Thread.sleep(1000);
}
}
@Test
public void testUploadChunkedWithExceptionThrownWhileReadingOnServer() throws Exception
{
final byte[] chunk1 = new byte[512];
final byte[] chunk2 = new byte[512];
Random random = new Random();
random.nextBytes(chunk1);
random.nextBytes(chunk2);
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
ServletInputStream input = request.getInputStream();
byte[] buffer = new byte[512];
input.read(buffer);
throw new IOException();
}
});
org.eclipse.jetty.client.api.Request request = client.newRequest("localhost", connector.getLocalPort());
DeferredContentProvider content = new DeferredContentProvider();
FutureResponseListener listener = new FutureResponseListener(request);
request.method(HttpMethod.POST)
.content(content)
.timeout(5, TimeUnit.SECONDS)
.send(listener);
content.offer(ByteBuffer.wrap(chunk1));
ContentResponse response = listener.get();
// content.offer(ByteBuffer.wrap(chunk2));
// content.close();
Assert.assertEquals(500, response.getStatus());
}
}