JETTY-912 add exchange timeout

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2204 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Jesse McConnell 2010-08-03 14:51:41 +00:00
parent 16caeeb478
commit 129184744a
5 changed files with 356 additions and 1 deletions

View File

@ -8,6 +8,7 @@ jetty-7.2-SNAPSHOT
+ 320112 Websocket in aggregate jars
+ 320264 Removed duplicate mime.property entries
+ 320457 Added rfc2045 support to B64Code
+ JETTY-912 added per exchange timeout api
+ JETTY-1245 Do not use direct buffers with NIO SSL
+ JETTY-1249 Apply max idle time to all connectors
+ JETTY-1250 Parallel start of HandlerCollection

View File

@ -242,6 +242,11 @@ public class HttpClient extends HttpBuffers implements Attributes
{
_timeoutQ.schedule(task);
}
public void schedule(Timeout.Task task, long timeout)
{
_timeoutQ.schedule(task, timeout);
}
/* ------------------------------------------------------------ */
public void scheduleIdle(Timeout.Task task)

View File

@ -131,7 +131,16 @@ public class HttpConnection implements Connection
AsyncEndPoint scep = (AsyncEndPoint)_endp;
scep.scheduleWrite();
}
_destination.getHttpClient().schedule(_timeout);
long exchTimeout = _exchange.getTimeout();
if (exchTimeout > 0)
{
_destination.getHttpClient().schedule(_timeout, exchTimeout);
}
else
{
_destination.getHttpClient().schedule(_timeout);
}
return true;
}

View File

@ -96,6 +96,9 @@ public class HttpExchange
private boolean _configureListeners = true;
private HttpEventListener _listener = new Listener();
private volatile HttpConnection _connection;
// a timeout for this exchange
private long _timeout = -1;
boolean _onRequestCompleteDone;
boolean _onResponseCompleteDone;
@ -349,6 +352,16 @@ public class HttpExchange
_listener=listener;
}
public void setTimeout( long timeout )
{
_timeout = timeout;
}
public long getTimeout()
{
return _timeout;
}
/**
* @param url Including protocol, host and port
*/

View File

@ -0,0 +1,327 @@
package org.eclipse.jetty.client;
// ========================================================================
//All rights reserved. This program and the accompanying materials
//are made available under the terms of the Eclipse Public License v1.0
//and Apache License v2.0 which accompanies this distribution.
//The Eclipse Public License is available at
//http://www.eclipse.org/legal/epl-v10.html
//The Apache License v2.0 is available at
//http://www.opensource.org/licenses/apache2.0.php
//You may elect to redistribute this code under either of these licenses.
// ========================================================================
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpHeaders;
import org.eclipse.jetty.http.HttpMethods;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import junit.framework.Assert;
import junit.framework.TestCase;
public class TimeoutExchangeTest extends TestCase
{
protected HttpClient _httpClient;
protected int _maxConnectionsPerAddress = 2;
protected String _scheme = "http://";
protected Server _server;
protected int _port;
protected Connector _connector;
public void setUp() throws Exception
{
startServer();
createClient();
}
public void tearDown() throws Exception
{
_httpClient.stop();
Thread.sleep(500);
stopServer();
}
private void startServer() throws Exception
{
_server = new Server();
_server.setGracefulShutdown(500);
_connector = new SelectChannelConnector();
_connector.setPort(0);
_server.setConnectors(new Connector[]
{ _connector });
Handler handler = new AbstractHandler()
{
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException,
ServletException
{
try
{
// let's sleep for 0.7 sec as the default timeout is 0.5 sec
Thread.sleep(700);
}
catch (Exception e)
{
e.printStackTrace();
}
response.setContentType("text/html");
response.setStatus(HttpServletResponse.SC_OK);
response.getWriter().println("<h1>Hello</h1>");
((Request)request).setHandled(true);
}
};
_server.setHandler(handler);
_server.start();
_port = _connector.getLocalPort();
}
private void stopServer() throws Exception
{
if (_server != null)
{
_server.stop();
_server = null;
}
}
private void createClient() throws Exception
{
_httpClient = new HttpClient();
_httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
_httpClient.setMaxConnectionsPerAddress(_maxConnectionsPerAddress);
// default timeout = 500 ms
_httpClient.setTimeout(500);
_httpClient.start();
}
public void testTimeouts() throws Exception
{
CustomContentExchange httpExchange = new CustomContentExchange();
httpExchange.setURL(_scheme + "localhost:" + _port);
httpExchange.setMethod(HttpMethods.POST);
httpExchange.setRequestContent(new ByteArrayBuffer("<h1>??</h1>"));
// let's use the default timeout - the one set on the HttpClient
// (500 ms)
_httpClient.send(httpExchange);
httpExchange.getDoneLatch().await(900,TimeUnit.MILLISECONDS);
// we should get a timeout - the server sleeps for 700 ms
Assert.assertTrue(httpExchange.isTimeoutOccurred());
Assert.assertFalse(httpExchange.isResponseReceived());
// let's do it again - with a custom timeout
httpExchange = new CustomContentExchange();
httpExchange.setURL(_scheme + "localhost:" + _port);
httpExchange.setMethod(HttpMethods.POST);
httpExchange.setRequestContent(new ByteArrayBuffer("<h1>??</h1>"));
httpExchange.setTimeout(500);
// let's use a custom timeout - 500 ms (the default one) + 500 ms
// delay = 1000 ms
_httpClient.send(httpExchange);
httpExchange.getDoneLatch().await(1100,TimeUnit.MILLISECONDS);
// we should not get a timeout - the server sleeps for 700 ms
// while we wait for 1000 ms
Assert.assertFalse(httpExchange.isTimeoutOccurred());
Assert.assertTrue(httpExchange.isResponseReceived());
}
class CustomContentExchange extends ContentExchange
{
protected final CountDownLatch _doneLatch = new CountDownLatch(1);
protected boolean _errorOccurred = false;
protected boolean _timeoutOccurred = false;
protected boolean _responseReceived = false;
public boolean isErrorOccurred()
{
return _errorOccurred;
}
public boolean isTimeoutOccurred()
{
return _timeoutOccurred;
}
public boolean isResponseReceived()
{
return _responseReceived;
}
public CustomContentExchange()
{
super(true);
}
@Override
protected void onRequestComplete() throws IOException
{
// close the input stream when its not needed anymore
InputStream is = getRequestContentSource();
if (is != null)
{
try
{
is.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
@Override
protected void onResponseComplete() throws IOException
{
try
{
super.onResponseComplete();
}
finally
{
doTaskCompleted();
}
}
@Override
protected void onExpire()
{
try
{
super.onExpire();
}
finally
{
doTaskCompleted();
}
}
@Override
protected void onException(Throwable ex)
{
try
{
super.onException(ex);
}
finally
{
doTaskCompleted(ex);
}
}
@Override
protected void onConnectionFailed(Throwable ex)
{
try
{
super.onConnectionFailed(ex);
}
finally
{
doTaskCompleted(ex);
}
}
public String getBody() throws UnsupportedEncodingException
{
return super.getResponseContent();
}
public String getUrl()
{
String params = getRequestFields().getStringField(HttpHeaders.CONTENT_ENCODING);
return getScheme() + "//" + getAddress().toString() + getURI() + (params != null?"?" + params:"");
}
protected void doTaskCompleted()
{
int exchangeState = getStatus();
try
{
if (exchangeState == HttpExchange.STATUS_COMPLETED)
{
// process the response as the state is ok
try
{
int responseCode = getResponseStatus();
if (responseCode >= HttpStatus.CONTINUE_100 && responseCode < HttpStatus.MULTIPLE_CHOICES_300)
{
_responseReceived = true;
}
else
{
_errorOccurred = true;
}
}
catch (Exception e)
{
_errorOccurred = true;
e.printStackTrace();
}
}
else if (exchangeState == HttpExchange.STATUS_EXPIRED)
{
_timeoutOccurred = true;
}
else
{
_errorOccurred = true;
}
}
finally
{
// make sure to lower the latch
getDoneLatch().countDown();
}
}
protected void doTaskCompleted(Throwable ex)
{
try
{
_errorOccurred = true;
}
finally
{
// make sure to lower the latch
getDoneLatch().countDown();
}
}
public CountDownLatch getDoneLatch()
{
return _doneLatch;
}
}
}