Fixes #340040 (Support for a total timeout).

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3069 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Simone Bordet 2011-05-03 08:57:26 +00:00
parent 911860828a
commit 3d192f98ab
5 changed files with 275 additions and 92 deletions

View File

@ -11,6 +11,7 @@ jetty-7.4.1-SNAPSHOT
+ 344513 Attempting to set ConfigurationClasses in jetty-web.xml causes NPE
+ JETTY-954 WebAppContext eats any start exceptions instead of stopping the server load
+ 344529 Ability to customize the error handling of the OSGi HttpService
+ 340040 Support for a total timeout
jetty-7.4.0.v20110414
+ 342504 Scanner Listener

View File

@ -60,7 +60,7 @@ public class HttpConnection extends AbstractConnection
// The current exchange waiting for a response
private volatile HttpExchange _exchange;
private HttpExchange _pipeline;
private final Timeout.Task _timeout = new TimeoutTask();
private final Timeout.Task _idleTimeout = new ConnectionIdleTask();
private AtomicBoolean _idle = new AtomicBoolean(false);
public void dump() throws IOException
@ -114,10 +114,17 @@ public class HttpConnection extends AbstractConnection
return true;
}
if (!_endp.isOpen())
return false;
_exchange = ex;
_exchange.associate(this);
// The call to associate() may have closed the connection, check if it's the case
if (!_endp.isOpen())
{
_exchange.disassociate();
_exchange = null;
return false;
}
_exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT);
if (_endp.isBlocking())
@ -130,41 +137,36 @@ public class HttpConnection extends AbstractConnection
scep.scheduleWrite();
}
scheduleTimeout();
adjustIdleTimeout();
return true;
}
}
protected void scheduleTimeout() throws IOException
private void adjustIdleTimeout() throws IOException
{
HttpClient httpClient = _destination.getHttpClient();
// Adjusts the idle timeout in case the default or exchange timeout
// are greater. This is needed for long polls, where one wants an
// aggressive releasing of idle connections (so idle timeout is small)
// but still allow long polls to complete normally
long exchangeTimeout = _exchange.getTimeout();
long timeout = exchangeTimeout;
long timeout = _exchange.getTimeout();
if (timeout <= 0)
timeout = httpClient.getTimeout();
timeout = _destination.getHttpClient().getTimeout();
long endPointTimeout = _endp.getMaxIdleTime();
if (timeout > 0 && timeout > endPointTimeout)
{
// Make it larger than the exchange timeout so that there are
// no races in trying to close the endpoint between the 2 timeouts
// no races between the idle timeout and the exchange timeout
// when trying to close the endpoint
_endp.setMaxIdleTime(2 * (int)timeout);
}
if (exchangeTimeout > 0)
httpClient.schedule(_timeout, exchangeTimeout);
else
httpClient.schedule(_timeout);
}
public Connection handle() throws IOException
{
if (_exchange != null)
_exchange.associate(this);
try
{
int no_progress = 0;
@ -207,8 +209,6 @@ public class HttpConnection extends AbstractConnection
return this;
}
}
if (!_exchange.isAssociated())
_exchange.associate(this);
}
try
@ -252,7 +252,6 @@ public class HttpConnection extends AbstractConnection
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
_requestContentChunk = _exchange.getRequestContentChunk();
_destination.getHttpClient().schedule(_timeout);
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
@ -313,7 +312,8 @@ public class HttpConnection extends AbstractConnection
{
// Cancelling the exchange causes an exception as we close the connection,
// but we don't report it as it is normal cancelling operation
if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING)
if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING &&
_exchange.getStatus() != HttpExchange.STATUS_CANCELLED)
{
_exchange.setStatus(HttpExchange.STATUS_EXCEPTED);
_exchange.getEventListener().onException(e);
@ -353,13 +353,12 @@ public class HttpConnection extends AbstractConnection
// it can be reused or closed out
if (_parser.isComplete())
{
_destination.getHttpClient().cancel(_timeout);
_exchange.cancelTimeout(_destination.getHttpClient());
complete = true;
}
}
}
// TODO - this needs to be greatly improved.
if (_generator.isComplete() && !_parser.isComplete())
{
if (!_endp.isOpen() || _endp.isInputShutdown())
@ -383,7 +382,6 @@ public class HttpConnection extends AbstractConnection
if (_exchange != null)
{
HttpExchange exchange=_exchange;
exchange.disassociate();
_exchange = null;
// Reset the maxIdleTime because it may have been changed
@ -428,7 +426,6 @@ public class HttpConnection extends AbstractConnection
send(exchange);
}
}
}
}
}
@ -437,11 +434,6 @@ public class HttpConnection extends AbstractConnection
}
finally
{
if (_exchange != null && _exchange.isAssociated())
{
_exchange.disassociate();
}
// Do we have more stuff to write?
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
{
@ -461,9 +453,6 @@ public class HttpConnection extends AbstractConnection
}
}
/**
* @see org.eclipse.jetty.io.Connection#isSuspended()
*/
public boolean isSuspended()
{
return false;
@ -646,7 +635,7 @@ public class HttpConnection extends AbstractConnection
public String toDetailString()
{
return toString() + " ex=" + _exchange + " " + _timeout.getAge();
return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge();
}
public void close() throws IOException
@ -677,8 +666,8 @@ public class HttpConnection extends AbstractConnection
{
synchronized (this)
{
if (_idle.compareAndSet(false,true))
_destination.getHttpClient().scheduleIdle(_timeout);
if (_idle.compareAndSet(false, true))
_destination.getHttpClient().scheduleIdle(_idleTimeout);
else
throw new IllegalStateException();
}
@ -688,9 +677,9 @@ public class HttpConnection extends AbstractConnection
{
synchronized (this)
{
if (_idle.compareAndSet(true,false))
if (_idle.compareAndSet(true, false))
{
_destination.getHttpClient().cancel(_timeout);
_destination.getHttpClient().cancel(_idleTimeout);
return true;
}
}
@ -698,47 +687,35 @@ public class HttpConnection extends AbstractConnection
return false;
}
private class TimeoutTask extends Timeout.Task
protected void exchangeExpired(HttpExchange exchange)
{
synchronized (this)
{
// We are expiring an exchange, but the exchange is pending
// Cannot reuse the connection because the reply may arrive, so close it
if (_exchange == exchange)
{
try
{
_destination.returnConnection(this, true);
}
catch (IOException x)
{
Log.ignore(x);
}
}
}
}
private class ConnectionIdleTask extends Timeout.Task
{
@Override
public void expired()
{
HttpExchange ex = null;
try
// Connection idle, close it
if (_idle.compareAndSet(true, false))
{
synchronized (HttpConnection.this)
{
ex = _exchange;
_exchange = null;
if (ex != null)
{
ex.disassociate();
_destination.returnConnection(HttpConnection.this, true);
}
else if (_idle.compareAndSet(true,false))
{
_destination.returnIdleConnection(HttpConnection.this);
}
}
}
catch (Exception e)
{
Log.debug(e);
}
finally
{
if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED)
ex.setStatus(HttpExchange.STATUS_EXPIRED);
try
{
close();
}
catch (IOException e)
{
Log.ignore(e);
}
_destination.returnIdleConnection(HttpConnection.this);
}
}
}

View File

@ -538,6 +538,10 @@ public class HttpDestination
(auth).setCredentials(ex);
}
// Schedule the timeout here, before we queue the exchange
// so that we count also the queue time in the timeout
ex.scheduleTimeout(this);
HttpConnection connection = getIdleConnection();
if (connection != null)
{
@ -561,6 +565,16 @@ public class HttpDestination
}
}
protected void exchangeExpired(HttpExchange exchange)
{
// The exchange may expire while waiting in the
// destination queue, make sure it is removed
synchronized (this)
{
_queue.remove(exchange);
}
}
protected void send(HttpConnection connection, HttpExchange exchange) throws IOException
{
synchronized (this)
@ -569,7 +583,8 @@ public class HttpDestination
// to the exchange queue and recycle the connection
if (!connection.send(exchange))
{
_queue.add(0, exchange);
if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
_queue.add(0, exchange);
returnIdleConnection(connection);
}
}

View File

@ -30,7 +30,7 @@ import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Timeout;
/**
* <p>An HTTP client API that encapsulates an exchange (a request and its response) with a HTTP server.</p>
@ -101,11 +101,22 @@ public class HttpExchange
// a timeout for this exchange
private long _timeout = -1;
private volatile Timeout.Task _timeoutTask;
boolean _onRequestCompleteDone;
boolean _onResponseCompleteDone;
boolean _onDone; // == onConnectionFail || onException || onExpired || onCancelled || onResponseCompleted && onRequestCompleted
protected void expire(HttpDestination destination)
{
if (getStatus() < HttpExchange.STATUS_COMPLETED)
setStatus(HttpExchange.STATUS_EXPIRED);
destination.exchangeExpired(this);
HttpConnection connection = _connection;
if (connection != null)
connection.exchangeExpired(this);
}
public int getStatus()
{
@ -154,6 +165,7 @@ public class HttpExchange
// might need a version number concept
synchronized(this)
{
_timeoutTask=null;
_onRequestCompleteDone=false;
_onResponseCompleteDone=false;
_onDone=false;
@ -191,6 +203,10 @@ public class HttpExchange
case STATUS_EXCEPTED:
set=_status.compareAndSet(oldStatus,newStatus);
break;
case STATUS_EXPIRED:
if (set=_status.compareAndSet(oldStatus,newStatus))
getEventListener().onExpire();
break;
}
break;
case STATUS_WAITING_FOR_COMMIT:
@ -527,7 +543,7 @@ public class HttpExchange
*/
public void setRequestHeader(String name, String value)
{
getRequestFields().put(name,value);
getRequestFields().put(name, value);
}
/**
@ -537,7 +553,7 @@ public class HttpExchange
*/
public void setRequestHeader(Buffer name, Buffer value)
{
getRequestFields().put(name,value);
getRequestFields().put(name, value);
}
/**
@ -545,7 +561,7 @@ public class HttpExchange
*/
public void setRequestContentType(String value)
{
getRequestFields().put(HttpHeaders.CONTENT_TYPE_BUFFER,value);
getRequestFields().put(HttpHeaders.CONTENT_TYPE_BUFFER, value);
}
/**
@ -670,15 +686,17 @@ public class HttpExchange
{
Log.debug(x);
}
finally
{
disassociate();
}
}
}
void associate(HttpConnection connection)
{
if ( connection.getEndPoint().getLocalHost() != null )
{
_localAddress = new Address( connection.getEndPoint().getLocalHost(), connection.getEndPoint().getLocalPort() );
}
if (connection.getEndPoint().getLocalHost() != null)
_localAddress = new Address(connection.getEndPoint().getLocalHost(), connection.getEndPoint().getLocalPort());
_connection = connection;
if (getStatus() == STATUS_CANCELLING)
@ -690,9 +708,9 @@ public class HttpExchange
return this._connection != null;
}
Connection disassociate()
HttpConnection disassociate()
{
Connection result = _connection;
HttpConnection result = _connection;
this._connection = null;
if (getStatus() == STATUS_CANCELLING)
setStatus(STATUS_CANCELLED);
@ -850,9 +868,37 @@ public class HttpExchange
this._configureListeners = autoConfigure;
}
protected void scheduleTimeout(final HttpDestination destination)
{
assert _timeoutTask == null;
_timeoutTask = new Timeout.Task()
{
@Override
public void expired()
{
HttpExchange.this.expire(destination);
}
};
HttpClient httpClient = destination.getHttpClient();
long timeout = getTimeout();
if (timeout > 0)
httpClient.schedule(_timeoutTask, timeout);
else
httpClient.schedule(_timeoutTask);
}
protected void cancelTimeout(HttpClient httpClient)
{
Timeout.Task task = _timeoutTask;
if (task != null)
httpClient.cancel(task);
_timeoutTask = null;
}
private class Listener implements HttpEventListener
{
public void onConnectionFailed(Throwable ex)
{
try
@ -904,8 +950,10 @@ public class HttpExchange
{
synchronized(HttpExchange.this)
{
_onRequestCompleteDone=true;
_onDone=_onResponseCompleteDone;
_onRequestCompleteDone = true;
// Member _onDone may already be true, for example
// because the exchange expired or has been canceled
_onDone |= _onResponseCompleteDone;
if (_onDone)
disassociate();
HttpExchange.this.notifyAll();
@ -923,8 +971,10 @@ public class HttpExchange
{
synchronized(HttpExchange.this)
{
_onResponseCompleteDone=true;
_onDone=_onRequestCompleteDone;
_onResponseCompleteDone = true;
// Member _onDone may already be true, for example
// because the exchange expired or has been canceled
_onDone |= _onRequestCompleteDone;
if (_onDone)
disassociate();
HttpExchange.this.notifyAll();

View File

@ -81,4 +81,144 @@ public class HttpDestinationQueueTest
client.stop();
}
@Test
public void testDefaultTimeoutIncludesQueuingExchangeExpiresInQueue() throws Exception
{
HttpClient client = new HttpClient();
client.setMaxConnectionsPerAddress(1);
client.setMaxQueueSizePerAddress(1);
long timeout = 1000;
client.setTimeout(timeout);
client.start();
ServerSocket server = new ServerSocket(0);
// This will keep the connection busy
HttpExchange exchange1 = new HttpExchange();
exchange1.setTimeout(timeout * 3); // Be sure it does not expire
exchange1.setMethod("GET");
exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1");
client.send(exchange1);
// Read request so we are sure that this exchange is out of the queue
Socket socket = server.accept();
byte[] buffer = new byte[1024];
StringBuilder request = new StringBuilder();
while (true)
{
int read = socket.getInputStream().read(buffer);
request.append(new String(buffer, 0, read, "UTF-8"));
if (request.toString().endsWith("\r\n\r\n"))
break;
}
Assert.assertTrue(request.toString().contains("exchange1"));
// This will be queued
HttpExchange exchange2 = new HttpExchange();
exchange2.setMethod("GET");
exchange2.setURL("http://localhost:" + server.getLocalPort() + "/exchange2");
client.send(exchange2);
// Wait until the queued exchange times out in the queue
Thread.sleep(timeout * 2);
Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange2.getStatus());
// Send the response to the first exchange to avoid exceptions in the console
socket.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes("UTF-8"));
Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange1.waitForDone());
socket.close();
server.close();
client.stop();
}
@Test
public void testDefaultTimeoutIncludesQueuingExchangeExpiresDuringRequest() throws Exception
{
HttpClient client = new HttpClient();
client.setMaxConnectionsPerAddress(1);
client.setMaxQueueSizePerAddress(1);
long timeout = 1000;
client.setTimeout(timeout);
client.start();
ServerSocket server = new ServerSocket(0);
HttpExchange exchange1 = new HttpExchange();
exchange1.setMethod("GET");
exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1");
client.send(exchange1);
// Read request so we are sure that this exchange is out of the queue
Socket socket = server.accept();
byte[] buffer = new byte[1024];
StringBuilder request = new StringBuilder();
while (true)
{
int read = socket.getInputStream().read(buffer);
request.append(new String(buffer, 0, read, "UTF-8"));
if (request.toString().endsWith("\r\n\r\n"))
break;
}
Assert.assertTrue(request.toString().contains("exchange1"));
// Wait until the exchange times out during the request
Thread.sleep(timeout * 2);
Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange1.getStatus());
socket.close();
server.close();
client.stop();
}
@Test
public void testExchangeTimeoutIncludesQueuingExchangeExpiresDuringResponse() throws Exception
{
HttpClient client = new HttpClient();
client.setMaxConnectionsPerAddress(1);
client.setMaxQueueSizePerAddress(1);
client.start();
ServerSocket server = new ServerSocket(0);
long timeout = 1000;
HttpExchange exchange1 = new HttpExchange();
exchange1.setTimeout(timeout);
exchange1.setMethod("GET");
exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1");
client.send(exchange1);
// Read request so we are sure that this exchange is out of the queue
Socket socket = server.accept();
byte[] buffer = new byte[1024];
StringBuilder request = new StringBuilder();
while (true)
{
int read = socket.getInputStream().read(buffer);
request.append(new String(buffer, 0, read, "UTF-8"));
if (request.toString().endsWith("\r\n\r\n"))
break;
}
Assert.assertTrue(request.toString().contains("exchange1"));
// Write part of the response
socket.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 1\r\n\r\n".getBytes("UTF-8"));
// Wait until the exchange times out during the response
Thread.sleep(timeout * 2);
Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange1.getStatus());
socket.close();
server.close();
client.stop();
}
}