From 3d192f98abb052a71ed7fb748e7502a15a55b38a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 3 May 2011 08:57:26 +0000 Subject: [PATCH] Fixes #340040 (Support for a total timeout). git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3069 7e9141cc-0065-0410-87d8-b60c137991c4 --- VERSION.txt | 1 + .../eclipse/jetty/client/HttpConnection.java | 129 +++++++--------- .../eclipse/jetty/client/HttpDestination.java | 17 ++- .../eclipse/jetty/client/HttpExchange.java | 80 ++++++++-- .../client/HttpDestinationQueueTest.java | 140 ++++++++++++++++++ 5 files changed, 275 insertions(+), 92 deletions(-) diff --git a/VERSION.txt b/VERSION.txt index 139e291f09f..b6c1ec615f6 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -11,6 +11,7 @@ jetty-7.4.1-SNAPSHOT + 344513 Attempting to set ConfigurationClasses in jetty-web.xml causes NPE + JETTY-954 WebAppContext eats any start exceptions instead of stopping the server load + 344529 Ability to customize the error handling of the OSGi HttpService + + 340040 Support for a total timeout jetty-7.4.0.v20110414 + 342504 Scanner Listener diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index fc997942e3d..bb076a475a3 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -60,7 +60,7 @@ public class HttpConnection extends AbstractConnection // The current exchange waiting for a response private volatile HttpExchange _exchange; private HttpExchange _pipeline; - private final Timeout.Task _timeout = new TimeoutTask(); + private final Timeout.Task _idleTimeout = new ConnectionIdleTask(); private AtomicBoolean _idle = new AtomicBoolean(false); public void dump() throws IOException @@ -114,10 +114,17 @@ public class HttpConnection extends AbstractConnection return true; } - if (!_endp.isOpen()) - return false; - _exchange = ex; + _exchange.associate(this); + + // The call to associate() may have closed the connection, check if it's the case + if (!_endp.isOpen()) + { + _exchange.disassociate(); + _exchange = null; + return false; + } + _exchange.setStatus(HttpExchange.STATUS_WAITING_FOR_COMMIT); if (_endp.isBlocking()) @@ -130,41 +137,36 @@ public class HttpConnection extends AbstractConnection scep.scheduleWrite(); } - scheduleTimeout(); + adjustIdleTimeout(); return true; } } - protected void scheduleTimeout() throws IOException + private void adjustIdleTimeout() throws IOException { - HttpClient httpClient = _destination.getHttpClient(); + // Adjusts the idle timeout in case the default or exchange timeout + // are greater. This is needed for long polls, where one wants an + // aggressive releasing of idle connections (so idle timeout is small) + // but still allow long polls to complete normally - long exchangeTimeout = _exchange.getTimeout(); - long timeout = exchangeTimeout; + long timeout = _exchange.getTimeout(); if (timeout <= 0) - timeout = httpClient.getTimeout(); + timeout = _destination.getHttpClient().getTimeout(); long endPointTimeout = _endp.getMaxIdleTime(); if (timeout > 0 && timeout > endPointTimeout) { // Make it larger than the exchange timeout so that there are - // no races in trying to close the endpoint between the 2 timeouts + // no races between the idle timeout and the exchange timeout + // when trying to close the endpoint _endp.setMaxIdleTime(2 * (int)timeout); } - - if (exchangeTimeout > 0) - httpClient.schedule(_timeout, exchangeTimeout); - else - httpClient.schedule(_timeout); } public Connection handle() throws IOException { - if (_exchange != null) - _exchange.associate(this); - try { int no_progress = 0; @@ -207,8 +209,6 @@ public class HttpConnection extends AbstractConnection return this; } } - if (!_exchange.isAssociated()) - _exchange.associate(this); } try @@ -252,7 +252,6 @@ public class HttpConnection extends AbstractConnection if (_requestContentChunk == null || _requestContentChunk.length() == 0) { _requestContentChunk = _exchange.getRequestContentChunk(); - _destination.getHttpClient().schedule(_timeout); if (_requestContentChunk != null) _generator.addContent(_requestContentChunk,false); @@ -313,7 +312,8 @@ public class HttpConnection extends AbstractConnection { // Cancelling the exchange causes an exception as we close the connection, // but we don't report it as it is normal cancelling operation - if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING) + if (_exchange.getStatus() != HttpExchange.STATUS_CANCELLING && + _exchange.getStatus() != HttpExchange.STATUS_CANCELLED) { _exchange.setStatus(HttpExchange.STATUS_EXCEPTED); _exchange.getEventListener().onException(e); @@ -353,13 +353,12 @@ public class HttpConnection extends AbstractConnection // it can be reused or closed out if (_parser.isComplete()) { - _destination.getHttpClient().cancel(_timeout); + _exchange.cancelTimeout(_destination.getHttpClient()); complete = true; } } } - // TODO - this needs to be greatly improved. if (_generator.isComplete() && !_parser.isComplete()) { if (!_endp.isOpen() || _endp.isInputShutdown()) @@ -383,7 +382,6 @@ public class HttpConnection extends AbstractConnection if (_exchange != null) { HttpExchange exchange=_exchange; - exchange.disassociate(); _exchange = null; // Reset the maxIdleTime because it may have been changed @@ -428,7 +426,6 @@ public class HttpConnection extends AbstractConnection send(exchange); } } - } } } @@ -437,11 +434,6 @@ public class HttpConnection extends AbstractConnection } finally { - if (_exchange != null && _exchange.isAssociated()) - { - _exchange.disassociate(); - } - // Do we have more stuff to write? if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint) { @@ -461,9 +453,6 @@ public class HttpConnection extends AbstractConnection } } - /** - * @see org.eclipse.jetty.io.Connection#isSuspended() - */ public boolean isSuspended() { return false; @@ -646,7 +635,7 @@ public class HttpConnection extends AbstractConnection public String toDetailString() { - return toString() + " ex=" + _exchange + " " + _timeout.getAge(); + return toString() + " ex=" + _exchange + " idle for " + _idleTimeout.getAge(); } public void close() throws IOException @@ -677,8 +666,8 @@ public class HttpConnection extends AbstractConnection { synchronized (this) { - if (_idle.compareAndSet(false,true)) - _destination.getHttpClient().scheduleIdle(_timeout); + if (_idle.compareAndSet(false, true)) + _destination.getHttpClient().scheduleIdle(_idleTimeout); else throw new IllegalStateException(); } @@ -688,9 +677,9 @@ public class HttpConnection extends AbstractConnection { synchronized (this) { - if (_idle.compareAndSet(true,false)) + if (_idle.compareAndSet(true, false)) { - _destination.getHttpClient().cancel(_timeout); + _destination.getHttpClient().cancel(_idleTimeout); return true; } } @@ -698,47 +687,35 @@ public class HttpConnection extends AbstractConnection return false; } - private class TimeoutTask extends Timeout.Task + protected void exchangeExpired(HttpExchange exchange) + { + synchronized (this) + { + // We are expiring an exchange, but the exchange is pending + // Cannot reuse the connection because the reply may arrive, so close it + if (_exchange == exchange) + { + try + { + _destination.returnConnection(this, true); + } + catch (IOException x) + { + Log.ignore(x); + } + } + } + } + + private class ConnectionIdleTask extends Timeout.Task { @Override public void expired() { - HttpExchange ex = null; - try + // Connection idle, close it + if (_idle.compareAndSet(true, false)) { - synchronized (HttpConnection.this) - { - ex = _exchange; - _exchange = null; - if (ex != null) - { - ex.disassociate(); - _destination.returnConnection(HttpConnection.this, true); - } - else if (_idle.compareAndSet(true,false)) - { - _destination.returnIdleConnection(HttpConnection.this); - } - } - } - catch (Exception e) - { - Log.debug(e); - } - finally - { - if (ex != null && ex.getStatus() < HttpExchange.STATUS_COMPLETED) - ex.setStatus(HttpExchange.STATUS_EXPIRED); - - try - { - close(); - } - catch (IOException e) - { - Log.ignore(e); - } - + _destination.returnIdleConnection(HttpConnection.this); } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 1a95c7a912e..ee3280562da 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -538,6 +538,10 @@ public class HttpDestination (auth).setCredentials(ex); } + // Schedule the timeout here, before we queue the exchange + // so that we count also the queue time in the timeout + ex.scheduleTimeout(this); + HttpConnection connection = getIdleConnection(); if (connection != null) { @@ -561,6 +565,16 @@ public class HttpDestination } } + protected void exchangeExpired(HttpExchange exchange) + { + // The exchange may expire while waiting in the + // destination queue, make sure it is removed + synchronized (this) + { + _queue.remove(exchange); + } + } + protected void send(HttpConnection connection, HttpExchange exchange) throws IOException { synchronized (this) @@ -569,7 +583,8 @@ public class HttpDestination // to the exchange queue and recycle the connection if (!connection.send(exchange)) { - _queue.add(0, exchange); + if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION) + _queue.add(0, exchange); returnIdleConnection(connection); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index b4d96b1ab40..36b32f5e70d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -30,7 +30,7 @@ import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.log.Log; - +import org.eclipse.jetty.util.thread.Timeout; /** *

An HTTP client API that encapsulates an exchange (a request and its response) with a HTTP server.

@@ -101,11 +101,22 @@ public class HttpExchange // a timeout for this exchange private long _timeout = -1; + private volatile Timeout.Task _timeoutTask; boolean _onRequestCompleteDone; boolean _onResponseCompleteDone; boolean _onDone; // == onConnectionFail || onException || onExpired || onCancelled || onResponseCompleted && onRequestCompleted + protected void expire(HttpDestination destination) + { + if (getStatus() < HttpExchange.STATUS_COMPLETED) + setStatus(HttpExchange.STATUS_EXPIRED); + + destination.exchangeExpired(this); + HttpConnection connection = _connection; + if (connection != null) + connection.exchangeExpired(this); + } public int getStatus() { @@ -154,6 +165,7 @@ public class HttpExchange // might need a version number concept synchronized(this) { + _timeoutTask=null; _onRequestCompleteDone=false; _onResponseCompleteDone=false; _onDone=false; @@ -191,6 +203,10 @@ public class HttpExchange case STATUS_EXCEPTED: set=_status.compareAndSet(oldStatus,newStatus); break; + case STATUS_EXPIRED: + if (set=_status.compareAndSet(oldStatus,newStatus)) + getEventListener().onExpire(); + break; } break; case STATUS_WAITING_FOR_COMMIT: @@ -527,7 +543,7 @@ public class HttpExchange */ public void setRequestHeader(String name, String value) { - getRequestFields().put(name,value); + getRequestFields().put(name, value); } /** @@ -537,7 +553,7 @@ public class HttpExchange */ public void setRequestHeader(Buffer name, Buffer value) { - getRequestFields().put(name,value); + getRequestFields().put(name, value); } /** @@ -545,7 +561,7 @@ public class HttpExchange */ public void setRequestContentType(String value) { - getRequestFields().put(HttpHeaders.CONTENT_TYPE_BUFFER,value); + getRequestFields().put(HttpHeaders.CONTENT_TYPE_BUFFER, value); } /** @@ -670,15 +686,17 @@ public class HttpExchange { Log.debug(x); } + finally + { + disassociate(); + } } } void associate(HttpConnection connection) { - if ( connection.getEndPoint().getLocalHost() != null ) - { - _localAddress = new Address( connection.getEndPoint().getLocalHost(), connection.getEndPoint().getLocalPort() ); - } + if (connection.getEndPoint().getLocalHost() != null) + _localAddress = new Address(connection.getEndPoint().getLocalHost(), connection.getEndPoint().getLocalPort()); _connection = connection; if (getStatus() == STATUS_CANCELLING) @@ -690,9 +708,9 @@ public class HttpExchange return this._connection != null; } - Connection disassociate() + HttpConnection disassociate() { - Connection result = _connection; + HttpConnection result = _connection; this._connection = null; if (getStatus() == STATUS_CANCELLING) setStatus(STATUS_CANCELLED); @@ -850,9 +868,37 @@ public class HttpExchange this._configureListeners = autoConfigure; } + protected void scheduleTimeout(final HttpDestination destination) + { + assert _timeoutTask == null; + + _timeoutTask = new Timeout.Task() + { + @Override + public void expired() + { + HttpExchange.this.expire(destination); + } + }; + + HttpClient httpClient = destination.getHttpClient(); + long timeout = getTimeout(); + if (timeout > 0) + httpClient.schedule(_timeoutTask, timeout); + else + httpClient.schedule(_timeoutTask); + } + + protected void cancelTimeout(HttpClient httpClient) + { + Timeout.Task task = _timeoutTask; + if (task != null) + httpClient.cancel(task); + _timeoutTask = null; + } + private class Listener implements HttpEventListener { - public void onConnectionFailed(Throwable ex) { try @@ -904,8 +950,10 @@ public class HttpExchange { synchronized(HttpExchange.this) { - _onRequestCompleteDone=true; - _onDone=_onResponseCompleteDone; + _onRequestCompleteDone = true; + // Member _onDone may already be true, for example + // because the exchange expired or has been canceled + _onDone |= _onResponseCompleteDone; if (_onDone) disassociate(); HttpExchange.this.notifyAll(); @@ -923,8 +971,10 @@ public class HttpExchange { synchronized(HttpExchange.this) { - _onResponseCompleteDone=true; - _onDone=_onRequestCompleteDone; + _onResponseCompleteDone = true; + // Member _onDone may already be true, for example + // because the exchange expired or has been canceled + _onDone |= _onRequestCompleteDone; if (_onDone) disassociate(); HttpExchange.this.notifyAll(); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpDestinationQueueTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpDestinationQueueTest.java index d43e47baf0d..87a118e2680 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpDestinationQueueTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpDestinationQueueTest.java @@ -81,4 +81,144 @@ public class HttpDestinationQueueTest client.stop(); } + + @Test + public void testDefaultTimeoutIncludesQueuingExchangeExpiresInQueue() throws Exception + { + HttpClient client = new HttpClient(); + client.setMaxConnectionsPerAddress(1); + client.setMaxQueueSizePerAddress(1); + long timeout = 1000; + client.setTimeout(timeout); + client.start(); + + ServerSocket server = new ServerSocket(0); + + // This will keep the connection busy + HttpExchange exchange1 = new HttpExchange(); + exchange1.setTimeout(timeout * 3); // Be sure it does not expire + exchange1.setMethod("GET"); + exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1"); + client.send(exchange1); + + // Read request so we are sure that this exchange is out of the queue + Socket socket = server.accept(); + byte[] buffer = new byte[1024]; + StringBuilder request = new StringBuilder(); + while (true) + { + int read = socket.getInputStream().read(buffer); + request.append(new String(buffer, 0, read, "UTF-8")); + if (request.toString().endsWith("\r\n\r\n")) + break; + } + Assert.assertTrue(request.toString().contains("exchange1")); + + // This will be queued + HttpExchange exchange2 = new HttpExchange(); + exchange2.setMethod("GET"); + exchange2.setURL("http://localhost:" + server.getLocalPort() + "/exchange2"); + client.send(exchange2); + + // Wait until the queued exchange times out in the queue + Thread.sleep(timeout * 2); + + Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange2.getStatus()); + + // Send the response to the first exchange to avoid exceptions in the console + socket.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n".getBytes("UTF-8")); + Assert.assertEquals(HttpExchange.STATUS_COMPLETED, exchange1.waitForDone()); + socket.close(); + + server.close(); + + client.stop(); + } + + @Test + public void testDefaultTimeoutIncludesQueuingExchangeExpiresDuringRequest() throws Exception + { + HttpClient client = new HttpClient(); + client.setMaxConnectionsPerAddress(1); + client.setMaxQueueSizePerAddress(1); + long timeout = 1000; + client.setTimeout(timeout); + client.start(); + + ServerSocket server = new ServerSocket(0); + + HttpExchange exchange1 = new HttpExchange(); + exchange1.setMethod("GET"); + exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1"); + client.send(exchange1); + + // Read request so we are sure that this exchange is out of the queue + Socket socket = server.accept(); + byte[] buffer = new byte[1024]; + StringBuilder request = new StringBuilder(); + while (true) + { + int read = socket.getInputStream().read(buffer); + request.append(new String(buffer, 0, read, "UTF-8")); + if (request.toString().endsWith("\r\n\r\n")) + break; + } + Assert.assertTrue(request.toString().contains("exchange1")); + + // Wait until the exchange times out during the request + Thread.sleep(timeout * 2); + + Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange1.getStatus()); + + socket.close(); + + server.close(); + + client.stop(); + } + + @Test + public void testExchangeTimeoutIncludesQueuingExchangeExpiresDuringResponse() throws Exception + { + HttpClient client = new HttpClient(); + client.setMaxConnectionsPerAddress(1); + client.setMaxQueueSizePerAddress(1); + client.start(); + + ServerSocket server = new ServerSocket(0); + + long timeout = 1000; + HttpExchange exchange1 = new HttpExchange(); + exchange1.setTimeout(timeout); + exchange1.setMethod("GET"); + exchange1.setURL("http://localhost:" + server.getLocalPort() + "/exchange1"); + client.send(exchange1); + + // Read request so we are sure that this exchange is out of the queue + Socket socket = server.accept(); + byte[] buffer = new byte[1024]; + StringBuilder request = new StringBuilder(); + while (true) + { + int read = socket.getInputStream().read(buffer); + request.append(new String(buffer, 0, read, "UTF-8")); + if (request.toString().endsWith("\r\n\r\n")) + break; + } + Assert.assertTrue(request.toString().contains("exchange1")); + + // Write part of the response + socket.getOutputStream().write("HTTP/1.1 200 OK\r\nContent-Type: application/octet-stream\r\nContent-Length: 1\r\n\r\n".getBytes("UTF-8")); + + // Wait until the exchange times out during the response + Thread.sleep(timeout * 2); + + Assert.assertEquals(HttpExchange.STATUS_EXPIRED, exchange1.getStatus()); + + socket.close(); + + server.close(); + + client.stop(); + } }