From 3d93d39b396c3703fe3a6891c2ea54dd530b0ca6 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 16 Aug 2016 16:24:14 +1000 Subject: [PATCH 01/11] Issue #845 data rate limits Initial thoughts --- .../eclipse/jetty/embedded/FileServer.java | 12 +++-- .../org/eclipse/jetty/embedded/SlowGet.java | 30 ++++++++++++ .../org/eclipse/jetty/http/HttpGenerator.java | 1 - .../http2/server/HttpChannelOverHTTP2.java | 12 +++++ .../org/eclipse/jetty/io/ChannelEndPoint.java | 3 ++ .../org/eclipse/jetty/server/HttpChannel.java | 10 ++++ .../jetty/server/HttpConfiguration.java | 46 +++++++++++++++++++ .../org/eclipse/jetty/server/HttpInput.java | 19 +++++++- .../jetty/server/handler/ResourceHandler.java | 2 +- 9 files changed, 128 insertions(+), 7 deletions(-) create mode 100644 examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java index 2da99b2aeab..cdd1d63c5b5 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java @@ -45,15 +45,19 @@ public class FileServer // In this example it is the current directory but it can be configured to anything that the jvm has access to. resource_handler.setDirectoriesListed(true); resource_handler.setWelcomeFiles(new String[]{ "index.html" }); - resource_handler.setResourceBase("."); + resource_handler.setResourceBase("/tmp/docroot"); // Add the ResourceHandler to the server. - GzipHandler gzip = new GzipHandler(); - server.setHandler(gzip); HandlerList handlers = new HandlerList(); handlers.setHandlers(new Handler[] { resource_handler, new DefaultHandler() }); + server.setHandler(handlers); + + /* + GzipHandler gzip = new GzipHandler(); + server.setHandler(gzip); gzip.setHandler(handlers); - + */ + // Start things up! By using the server.join() the server thread will join with the current thread. // See "http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/Thread.html#join()" for more details. server.start(); diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java new file mode 100644 index 00000000000..e174dabf99d --- /dev/null +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java @@ -0,0 +1,30 @@ +package org.eclipse.jetty.embedded; + +import java.io.InputStream; +import java.net.Socket; + +public class SlowGet +{ + public static void main(String... args) throws Exception + { + try(Socket socket = new Socket("localhost",8080)) + { + socket.getOutputStream().write("GET /data.txt HTTP/1.0\r\n\r\n".getBytes()); + socket.getOutputStream().flush(); + + InputStream in = socket.getInputStream(); + byte[] headers = new byte[1024]; + int len = in.read(headers); + + System.err.println("read="+len); + + int b=0; + while (b>=0) + { + b = in.read(); + if ((++len % 1024)==0) + System.err.println("read="+(++len)); + } + } + } +} diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index f743379eb4b..b03133a96b4 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.http; -import java.io.EOFException; import java.io.IOException; import java.nio.BufferOverflowException; import java.nio.ByteBuffer; diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index b70b65bb402..f2ac9c918b7 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -71,6 +71,18 @@ public class HttpChannelOverHTTP2 extends HttpChannel return _expect100Continue; } + @Override + public void setIdleTimeout(long timeoutMs) + { + getStream().setIdleTimeout(timeoutMs); + } + + @Override + public long getIdleTimeout() + { + return getStream().getIdleTimeout(); + } + public Runnable onRequest(HeadersFrame frame) { try diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 1952760111d..7ef57077531 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -24,6 +24,8 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.ByteChannel; import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.stream.Collectors; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.log.Log; @@ -168,6 +170,7 @@ public class ChannelEndPoint extends AbstractEndPoint @Override public boolean flush(ByteBuffer... buffers) throws IOException { + System.err.println("FLUSH: "+Arrays.stream(buffers).map(b->BufferUtil.toDetailString(b)).collect(Collectors.toList())); long flushed=0; try { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 05196e33a6a..88b61a5e0aa 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -80,6 +80,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor private final Response _response; private MetaData.Response _committedMetaData; private RequestLog _requestLog; + private long _oldIdleTimeout; /** Bytes written after interception (eg after compression) */ private long _written; @@ -596,6 +597,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor if (_configuration.getSendDateHeader() && !fields.contains(HttpHeader.DATE)) fields.put(_connector.getServer().getDateField()); + long idleTO=_configuration.getIdleTimeout(); + _oldIdleTimeout=getIdleTimeout(); + if (idleTO>=0 && _oldIdleTimeout!=idleTO) + setIdleTimeout(idleTO); + _request.setMetaData(request); if (LOG.isDebugEnabled()) @@ -627,6 +633,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor if (_requestLog!=null ) _requestLog.log(_request, _response); + long idleTO=_configuration.getIdleTimeout(); + if (idleTO>=0 && getIdleTimeout()!=_oldIdleTimeout) + setIdleTimeout(_oldIdleTimeout); + _transport.onCompleted(); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java index d2ec7bd0398..8d1026484b6 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java @@ -56,6 +56,7 @@ public class HttpConfiguration private int _responseHeaderSize=8*1024; private int _headerCacheSize=512; private int _securePort; + private long _idleTimeout=-1; private long _blockingTimeout=-1; private String _secureScheme = HttpScheme.HTTPS.asString(); private boolean _sendServerVersion = true; @@ -64,6 +65,7 @@ public class HttpConfiguration private boolean _delayDispatchUntilContent = true; private boolean _persistentConnectionsEnabled = true; private int _maxErrorDispatches = 10; + private int _minRequestDataRate; /* ------------------------------------------------------------ */ /** @@ -113,6 +115,7 @@ public class HttpConfiguration _headerCacheSize=config._headerCacheSize; _secureScheme=config._secureScheme; _securePort=config._securePort; + _idleTimeout=config._idleTimeout; _blockingTimeout=config._blockingTimeout; _sendDateHeader=config._sendDateHeader; _sendServerVersion=config._sendServerVersion; @@ -206,6 +209,31 @@ public class HttpConfiguration return _persistentConnectionsEnabled; } + /* ------------------------------------------------------------ */ + /** Get the max idle time in ms. + *

The max idle time is applied to a HTTP request for IO operations and + * delayed dispatch. + * @return the max idle time in ms or if == 0 implies an infinite timeout, <0 + * implies no HTTP channel timeout and the connection timeout is used instead. + */ + public long getIdleTimeout() + { + return _idleTimeout; + } + + /* ------------------------------------------------------------ */ + /** Set the max idle time in ms. + *

The max idle time is applied to a HTTP request for IO operations and + * delayed dispatch. + * @param timeoutMs the max idle time in ms or if == 0 implies an infinite timeout, <0 + * implies no HTTP channel timeout and the connection timeout is used instead. + */ + public void setIdleTimeout(long timeoutMs) + { + _idleTimeout=timeoutMs; + } + + /* ------------------------------------------------------------ */ /** Get the timeout applied to blocking operations. *

This timeout is in addition to the {@link Connector#getIdleTimeout()}, and applies @@ -479,4 +507,22 @@ public class HttpConfiguration { _maxErrorDispatches=max; } + + /* ------------------------------------------------------------ */ + /** + * @return The minimum request data rate in bytes per second; or <=0 for no limit + */ + public int getMinRequestDataRate() + { + return _minRequestDataRate; + } + + /* ------------------------------------------------------------ */ + /** + * @param bytesPerSecond The minimum request data rate in bytes per second; or <=0 for no limit + */ + public void setMinRequestDataRate(int bytesPerSecond) + { + _minRequestDataRate=bytesPerSecond; + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 98b4c4f5431..146c75039bc 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -56,6 +56,7 @@ public class HttpInput extends ServletInputStream implements Runnable private final HttpChannelState _channelState; private ReadListener _listener; private State _state = STREAM; + private long _contentArrived; private long _contentConsumed; private long _blockingTimeoutAt = -1; @@ -83,6 +84,7 @@ public class HttpInput extends ServletInputStream implements Runnable } _listener = null; _state = STREAM; + _contentArrived = 0; _contentConsumed = 0; } } @@ -139,9 +141,23 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { + long now=System.currentTimeMillis(); + if (_blockingTimeoutAt>=0 && !isAsync()) - _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + _blockingTimeoutAt=now+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + int minRequestDataRate=_channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); + if (minRequestDataRate>0) + { + long period=now-_channelState.getHttpChannel().getRequest().getTimeStamp(); + if (period>=1000) + { + long data_rate = _contentArrived / (now-_channelState.getHttpChannel().getRequest().getTimeStamp()); + if (data_rate0 && + if (_minMemoryMappedContentLength>=0 && resource.length()>_minMemoryMappedContentLength && resource.length() Date: Tue, 16 Aug 2016 17:09:46 +1000 Subject: [PATCH 02/11] Issue #845 data rate limits Added data rate unit tests --- .../org/eclipse/jetty/embedded/SlowGet.java | 30 ----- .../org/eclipse/jetty/io/ChannelEndPoint.java | 1 - .../org/eclipse/jetty/server/HttpInput.java | 6 +- .../jetty/server/ConnectorTimeoutTest.java | 119 +++++++++++++++++- 4 files changed, 122 insertions(+), 34 deletions(-) delete mode 100644 examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java deleted file mode 100644 index e174dabf99d..00000000000 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/SlowGet.java +++ /dev/null @@ -1,30 +0,0 @@ -package org.eclipse.jetty.embedded; - -import java.io.InputStream; -import java.net.Socket; - -public class SlowGet -{ - public static void main(String... args) throws Exception - { - try(Socket socket = new Socket("localhost",8080)) - { - socket.getOutputStream().write("GET /data.txt HTTP/1.0\r\n\r\n".getBytes()); - socket.getOutputStream().flush(); - - InputStream in = socket.getInputStream(); - byte[] headers = new byte[1024]; - int len = in.read(headers); - - System.err.println("read="+len); - - int b=0; - while (b>=0) - { - b = in.read(); - if ((++len % 1024)==0) - System.err.println("read="+(++len)); - } - } - } -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java index 7ef57077531..27a914155b5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ChannelEndPoint.java @@ -170,7 +170,6 @@ public class ChannelEndPoint extends AbstractEndPoint @Override public boolean flush(ByteBuffer... buffers) throws IOException { - System.err.println("FLUSH: "+Arrays.stream(buffers).map(b->BufferUtil.toDetailString(b)).collect(Collectors.toList())); long flushed=0; try { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 146c75039bc..0fa4ec79414 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeoutException; import javax.servlet.ReadListener; import javax.servlet.ServletInputStream; +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.util.BufferUtil; @@ -152,9 +154,9 @@ public class HttpInput extends ServletInputStream implements Runnable long period=now-_channelState.getHttpChannel().getRequest().getTimeStamp(); if (period>=1000) { - long data_rate = _contentArrived / (now-_channelState.getHttpChannel().getRequest().getTimeStamp()); + double data_rate = _contentArrived / (0.001*(now-_channelState.getHttpChannel().getRequest().getTimeStamp())); if (data_rate minimumTestRuntime); Assert.assertTrue(System.currentTimeMillis() - start < maximumTestRuntime); } - + @Test(timeout=60000) + public void testSlowClientRequestNoLimit() throws Exception + { + configureServer(new EchoHandler()); + Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); + client.setSoTimeout(10000); + + Assert.assertFalse(client.isClosed()); + + OutputStream os=client.getOutputStream(); + InputStream is=client.getInputStream(); + + os.write(( + "POST /echo HTTP/1.0\r\n"+ + "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ + "content-type: text/plain; charset=utf-8\r\n"+ + "content-length: 20\r\n"+ + "\r\n").getBytes("utf-8")); + os.flush(); + + for (int i=0;i<4;i++) + { + os.write("123\n".getBytes("utf-8")); + os.flush(); + Thread.sleep(1000); + } + os.write("===\n".getBytes("utf-8")); + os.flush(); + + String response =IO.toString(is); + Assert.assertThat(response,containsString(" 200 ")); + Assert.assertThat(response,containsString("===")); + } + + @Test(timeout=60000) + public void testSlowClientRequestLimited() throws Exception + { + _httpConfiguration.setMinRequestDataRate(20); + configureServer(new EchoHandler()); + Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); + client.setSoTimeout(10000); + + Assert.assertFalse(client.isClosed()); + + OutputStream os=client.getOutputStream(); + InputStream is=client.getInputStream(); + + os.write(( + "POST /echo HTTP/1.0\r\n"+ + "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ + "content-type: text/plain; charset=utf-8\r\n"+ + "content-length: 20\r\n"+ + "\r\n").getBytes("utf-8")); + os.flush(); + + try + { + for (int i=0;i<4;i++) + { + os.write("123\n".getBytes("utf-8")); + os.flush(); + Thread.sleep(500); + } + os.write("===\n".getBytes("utf-8")); + os.flush(); + + String response =IO.toString(is); + Assert.assertThat(response,containsString(" 408 ")); + Assert.assertThat(response,containsString("Request Data rate")); + } + catch (SocketException e) + {} + } + + @Test(timeout=60000) + public void testSlowClientRequestLimitExceeded() throws Exception + { + _httpConfiguration.setMinRequestDataRate(20); + configureServer(new EchoHandler()); + Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); + client.setSoTimeout(10000); + + Assert.assertFalse(client.isClosed()); + + OutputStream os=client.getOutputStream(); + InputStream is=client.getInputStream(); + + os.write(( + "POST /echo HTTP/1.0\r\n"+ + "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ + "content-type: text/plain; charset=utf-8\r\n"+ + "content-length: 100\r\n"+ + "\r\n").getBytes("utf-8")); + os.flush(); + + for (int i=0;i<9;i++) + { + os.write("123456789\n".getBytes("utf-8")); + os.flush(); + Thread.sleep(250); + } + os.write("=========\n".getBytes("utf-8")); + os.flush(); + + String response =IO.toString(is); + Assert.assertThat(response,containsString(" 200 ")); + Assert.assertThat(response,containsString("=========")); + } + + + + protected static class SlowResponseHandler extends AbstractHandler { @Override From 83f503b86f8f56b013886c2ce0868580d9fe037b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 16 Aug 2016 17:40:18 +1000 Subject: [PATCH 03/11] Issue #845 data rate limits test for variable idle time --- .../jetty/server/ConnectorTimeoutTest.java | 38 +++++++++++++++++++ .../server/ServerConnectorTimeoutTest.java | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index 65453e9b916..c37c2ba5ccb 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -78,6 +78,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture { _httpConfiguration.setBlockingTimeout(-1L); _httpConfiguration.setMinRequestDataRate(-1); + _httpConfiguration.setIdleTimeout(-1); } } @@ -882,6 +883,43 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture Assert.assertThat(response,containsString("=========")); } + + @Test(timeout=60000) + public void testHttpIdleTime() throws Exception + { + _httpConfiguration.setIdleTimeout(500); + configureServer(new EchoHandler()); + Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); + client.setSoTimeout(10000); + + Assert.assertFalse(client.isClosed()); + + OutputStream os=client.getOutputStream(); + InputStream is=client.getInputStream(); + + try (StacklessLogging scope = new StacklessLogging(HttpChannel.class)) + { + os.write(( + "POST /echo HTTP/1.0\r\n"+ + "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ + "content-type: text/plain; charset=utf-8\r\n"+ + "content-length: 20\r\n"+ + "\r\n").getBytes("utf-8")); + os.flush(); + + os.write("123456789\n".getBytes("utf-8")); + os.flush(); + Thread.sleep(1000); + os.write("=========\n".getBytes("utf-8")); + os.flush(); + + String response =IO.toString(is); + Assert.assertThat(response,containsString(" 500 ")); + Assert.assertThat(response,containsString("/500 ms")); + Assert.assertThat(response,Matchers.not(containsString("========="))); + } + } + diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java index bfc0b16edb4..e10cb8d5de1 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java @@ -38,7 +38,7 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest public void init() throws Exception { ServerConnector connector = new ServerConnector(_server,1,1); - connector.setIdleTimeout(MAX_IDLE_TIME); // 250 msec max idle + connector.setIdleTimeout(MAX_IDLE_TIME); startServer(connector); } From 6e2db6993b9378f94fa0d69edee4298cdf6adabb Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 16 Aug 2016 18:16:13 +1000 Subject: [PATCH 04/11] Issue #845 data rate limits reverted embedded example --- .../src/main/java/org/eclipse/jetty/embedded/FileServer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java index cdd1d63c5b5..273ff1897bd 100644 --- a/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java +++ b/examples/embedded/src/main/java/org/eclipse/jetty/embedded/FileServer.java @@ -45,18 +45,17 @@ public class FileServer // In this example it is the current directory but it can be configured to anything that the jvm has access to. resource_handler.setDirectoriesListed(true); resource_handler.setWelcomeFiles(new String[]{ "index.html" }); - resource_handler.setResourceBase("/tmp/docroot"); + resource_handler.setResourceBase("."); // Add the ResourceHandler to the server. HandlerList handlers = new HandlerList(); handlers.setHandlers(new Handler[] { resource_handler, new DefaultHandler() }); server.setHandler(handlers); - /* + // Add GzipHandler GzipHandler gzip = new GzipHandler(); server.setHandler(gzip); gzip.setHandler(handlers); - */ // Start things up! By using the server.join() the server thread will join with the current thread. // See "http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/Thread.html#join()" for more details. From d4ebdd6875e3b199de6a4968fad1f17bb200870b Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 16 Aug 2016 21:47:49 +1000 Subject: [PATCH 05/11] Issue #845 data rate limits added @sbordets feedback --- .../main/java/org/eclipse/jetty/server/HttpInput.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 0fa4ec79414..c5aa8dd6aae 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -58,6 +58,7 @@ public class HttpInput extends ServletInputStream implements Runnable private final HttpChannelState _channelState; private ReadListener _listener; private State _state = STREAM; + private long _firstByteTimeStamp=-1; private long _contentArrived; private long _contentConsumed; private long _blockingTimeoutAt = -1; @@ -88,6 +89,8 @@ public class HttpInput extends ServletInputStream implements Runnable _state = STREAM; _contentArrived = 0; _contentConsumed = 0; + _firstByteTimeStamp = -1; + _blockingTimeoutAt = -1; } } @@ -149,12 +152,12 @@ public class HttpInput extends ServletInputStream implements Runnable _blockingTimeoutAt=now+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); int minRequestDataRate=_channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); - if (minRequestDataRate>0) + if (minRequestDataRate>0 && _firstByteTimeStamp!=-1) { - long period=now-_channelState.getHttpChannel().getRequest().getTimeStamp(); + long period=now-_firstByteTimeStamp; if (period>=1000) { - double data_rate = _contentArrived / (0.001*(now-_channelState.getHttpChannel().getRequest().getTimeStamp())); + double data_rate = _contentArrived / (0.001*period); if (data_rate Date: Tue, 16 Aug 2016 23:15:30 +1000 Subject: [PATCH 06/11] Issue #845 data rate limits use nanotime --- .../main/java/org/eclipse/jetty/server/HttpInput.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index c5aa8dd6aae..cce11cfd6d7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -25,6 +25,7 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.Objects; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.servlet.ReadListener; @@ -146,18 +147,16 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - long now=System.currentTimeMillis(); - if (_blockingTimeoutAt>=0 && !isAsync()) - _blockingTimeoutAt=now+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); int minRequestDataRate=_channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); if (minRequestDataRate>0 && _firstByteTimeStamp!=-1) { - long period=now-_firstByteTimeStamp; + long period=System.nanoTime()-_firstByteTimeStamp; if (period>=1000) { - double data_rate = _contentArrived / (0.001*period); + double data_rate = _contentArrived / ((1.0*period)/TimeUnit.SECONDS.toNanos(1)); if (data_rate Date: Wed, 17 Aug 2016 07:11:36 +1000 Subject: [PATCH 07/11] ssue #845 data rate limits double --- .../src/main/java/org/eclipse/jetty/server/HttpInput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index cce11cfd6d7..b0c185bb698 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -156,7 +156,7 @@ public class HttpInput extends ServletInputStream implements Runnable long period=System.nanoTime()-_firstByteTimeStamp; if (period>=1000) { - double data_rate = _contentArrived / ((1.0*period)/TimeUnit.SECONDS.toNanos(1)); + double data_rate = _contentArrived / (((double)period)/TimeUnit.SECONDS.toNanos(1)); if (data_rate Date: Wed, 17 Aug 2016 17:24:57 +1000 Subject: [PATCH 08/11] Issue #845 data rate limits Converted to long minimum data rather than the numerically sensitive data rate. --- .../src/main/java/org/eclipse/jetty/server/HttpInput.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index b0c185bb698..634aa747574 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -154,11 +154,11 @@ public class HttpInput extends ServletInputStream implements Runnable if (minRequestDataRate>0 && _firstByteTimeStamp!=-1) { long period=System.nanoTime()-_firstByteTimeStamp; - if (period>=1000) + if (period>0) { - double data_rate = _contentArrived / (((double)period)/TimeUnit.SECONDS.toNanos(1)); - if (data_rate Date: Fri, 26 Aug 2016 10:45:04 +0200 Subject: [PATCH 09/11] Code cleanup. --- .../org/eclipse/jetty/server/HttpInput.java | 197 +++++++++--------- 1 file changed, 103 insertions(+), 94 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 634aa747574..6f045d85c41 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -59,16 +59,16 @@ public class HttpInput extends ServletInputStream implements Runnable private final HttpChannelState _channelState; private ReadListener _listener; private State _state = STREAM; - private long _firstByteTimeStamp=-1; + private long _firstByteTimeStamp = -1; private long _contentArrived; private long _contentConsumed; private long _blockingTimeoutAt = -1; public HttpInput(HttpChannelState state) { - _channelState=state; - if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout()>0) - _blockingTimeoutAt=0; + _channelState = state; + if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout() > 0) + _blockingTimeoutAt = 0; } protected HttpChannelState getHttpChannelState() @@ -98,26 +98,26 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public int available() { - int available=0; - boolean woken=false; + int available = 0; + boolean woken = false; synchronized (_inputQ) { Content content = _inputQ.peek(); - if (content==null) + if (content == null) { try { produceContent(); } - catch(IOException e) + catch (IOException e) { - woken=failed(e); + woken = failed(e); } content = _inputQ.peek(); } - if (content!=null) - available= remaining(content); + if (content != null) + available = remaining(content); } if (woken) @@ -137,7 +137,7 @@ public class HttpInput extends ServletInputStream implements Runnable public int read() throws IOException { int read = read(_oneByteBuffer, 0, 1); - if (read==0) + if (read == 0) throw new IllegalStateException("unready read=0"); return read < 0 ? -1 : _oneByteBuffer[0] & 0xFF; } @@ -147,29 +147,29 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - if (_blockingTimeoutAt>=0 && !isAsync()) - _blockingTimeoutAt=System.currentTimeMillis()+getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + if (_blockingTimeoutAt >= 0 && !isAsync()) + _blockingTimeoutAt = System.currentTimeMillis() + getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); - int minRequestDataRate=_channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); - if (minRequestDataRate>0 && _firstByteTimeStamp!=-1) + int minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); + if (minRequestDataRate > 0 && _firstByteTimeStamp != -1) { - long period=System.nanoTime()-_firstByteTimeStamp; - if (period>0) + long period = System.nanoTime() - _firstByteTimeStamp; + if (period > 0) { - long minimum_data = minRequestDataRate * TimeUnit.NANOSECONDS.toMillis(period)/TimeUnit.SECONDS.toMillis(1); - if (_contentArrived0) + if (content == EOF_CONTENT || content == EARLY_EOF_CONTENT || remaining(content) > 0) return content; _inputQ.poll(); @@ -327,17 +332,17 @@ public class HttpInput extends ServletInputStream implements Runnable /** * Copies the given content into the given byte buffer. * - * @param content the content to copy from - * @param buffer the buffer to copy into - * @param offset the buffer offset to start copying from - * @param length the space available in the buffer + * @param content the content to copy from + * @param buffer the buffer to copy into + * @param offset the buffer offset to start copying from + * @param length the space available in the buffer * @return the number of bytes actually copied */ protected int get(Content content, byte[] buffer, int offset, int length) { int l = Math.min(content.remaining(), length); content.getContent().get(buffer, offset, l); - _contentConsumed+=l; + _contentConsumed += l; return l; } @@ -345,16 +350,16 @@ public class HttpInput extends ServletInputStream implements Runnable * Consumes the given content. * Calls the content succeeded if all content consumed. * - * @param content the content to consume - * @param length the number of bytes to consume + * @param content the content to consume + * @param length the number of bytes to consume */ protected void skip(Content content, int length) { int l = Math.min(content.remaining(), length); ByteBuffer buffer = content.getContent(); - buffer.position(buffer.position()+l); - _contentConsumed+=l; - if (l>0 && !content.hasContent()) + buffer.position(buffer.position() + l); + _contentConsumed += l; + if (l > 0 && !content.hasContent()) pollContent(); // hungry succeed } @@ -368,22 +373,22 @@ public class HttpInput extends ServletInputStream implements Runnable { try { - long timeout=0; - if (_blockingTimeoutAt>=0) + long timeout = 0; + if (_blockingTimeoutAt >= 0) { - timeout=_blockingTimeoutAt-System.currentTimeMillis(); - if (timeout<=0) + timeout = _blockingTimeoutAt - System.currentTimeMillis(); + if (timeout <= 0) throw new TimeoutException(); } if (LOG.isDebugEnabled()) - LOG.debug("{} blocking for content timeout={}", this,timeout); - if (timeout>0) + LOG.debug("{} blocking for content timeout={}", this, timeout); + if (timeout > 0) _inputQ.wait(timeout); else _inputQ.wait(); - if (_blockingTimeoutAt>0 && System.currentTimeMillis()>=_blockingTimeoutAt) + if (_blockingTimeoutAt > 0 && System.currentTimeMillis() >= _blockingTimeoutAt) throw new TimeoutException(); } catch (Throwable e) @@ -397,23 +402,24 @@ public class HttpInput extends ServletInputStream implements Runnable *

Typically used to push back content that has * been read, perhaps mutated. The bytes prepended are * deducted for the contentConsumed total

+ * * @param item the content to add * @return true if content channel woken for read */ public boolean prependContent(Content item) { - boolean woken=false; + boolean woken = false; synchronized (_inputQ) { _inputQ.push(item); - _contentConsumed-=item.remaining(); + _contentConsumed -= item.remaining(); if (LOG.isDebugEnabled()) LOG.debug("{} prependContent {}", this, item); - if (_listener==null) + if (_listener == null) _inputQ.notify(); else - woken=_channelState.onReadPossible(); + woken = _channelState.onReadPossible(); } return woken; @@ -427,20 +433,20 @@ public class HttpInput extends ServletInputStream implements Runnable */ public boolean addContent(Content item) { - boolean woken=false; + boolean woken = false; synchronized (_inputQ) { - if (_firstByteTimeStamp==-1) - _firstByteTimeStamp=System.nanoTime(); - _contentArrived+=item.remaining(); + if (_firstByteTimeStamp == -1) + _firstByteTimeStamp = System.nanoTime(); + _contentArrived += item.remaining(); _inputQ.offer(item); if (LOG.isDebugEnabled()) LOG.debug("{} addContent {}", this, item); - if (_listener==null) + if (_listener == null) _inputQ.notify(); else - woken=_channelState.onReadPossible(); + woken = _channelState.onReadPossible(); } return woken; @@ -450,7 +456,7 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - return _inputQ.size()>0; + return _inputQ.size() > 0; } } @@ -476,6 +482,7 @@ public class HttpInput extends ServletInputStream implements Runnable *

* Typically this will result in an EOFException being thrown * from a subsequent read rather than a -1 return. + * * @return true if content channel woken for read */ public boolean earlyEOF() @@ -486,11 +493,12 @@ public class HttpInput extends ServletInputStream implements Runnable /** * This method should be called to signal that all the expected * content arrived. + * * @return true if content channel woken for read */ public boolean eof() { - return addContent(EOF_CONTENT); + return addContent(EOF_CONTENT); } public boolean consumeAll() @@ -529,7 +537,7 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - return _state==ASYNC; + return _state == ASYNC; } } @@ -550,18 +558,18 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - if (_listener == null ) + if (_listener == null) return true; if (_state instanceof EOFState) return true; - if (nextReadable()!=null) + if (nextReadable() != null) return true; _channelState.onReadUnready(); } return false; } - catch(IOException e) + catch (IOException e) { LOG.ignore(e); return true; @@ -572,7 +580,7 @@ public class HttpInput extends ServletInputStream implements Runnable public void setReadListener(ReadListener readListener) { readListener = Objects.requireNonNull(readListener); - boolean woken=false; + boolean woken = false; try { synchronized (_inputQ) @@ -580,11 +588,11 @@ public class HttpInput extends ServletInputStream implements Runnable if (_listener != null) throw new IllegalStateException("ReadListener already set"); if (_state != STREAM) - throw new IllegalStateException("State "+STREAM+" != " + _state); + throw new IllegalStateException("State " + STREAM + " != " + _state); _state = ASYNC; _listener = readListener; - boolean content=nextContent()!=null; + boolean content = nextContent() != null; if (content) woken = _channelState.onReadReady(); @@ -592,7 +600,7 @@ public class HttpInput extends ServletInputStream implements Runnable _channelState.onReadUnready(); } } - catch(IOException e) + catch (IOException e) { throw new RuntimeIOException(e); } @@ -603,7 +611,7 @@ public class HttpInput extends ServletInputStream implements Runnable public boolean failed(Throwable x) { - boolean woken=false; + boolean woken = false; synchronized (_inputQ) { if (_state instanceof ErrorState) @@ -611,16 +619,15 @@ public class HttpInput extends ServletInputStream implements Runnable else _state = new ErrorState(x); - if (_listener==null) + if (_listener == null) _inputQ.notify(); else - woken=_channelState.onReadPossible(); + woken = _channelState.onReadPossible(); } return woken; } - /* ------------------------------------------------------------ */ /* *

* While this class is-a Runnable, it should never be dispatched in it's own thread. It is a @@ -633,26 +640,26 @@ public class HttpInput extends ServletInputStream implements Runnable { final Throwable error; final ReadListener listener; - boolean aeof=false; + boolean aeof = false; synchronized (_inputQ) { - if (_state==EOF) + if (_state == EOF) return; - if (_state==AEOF) + if (_state == AEOF) { - _state=EOF; - aeof=true; + _state = EOF; + aeof = true; } listener = _listener; - error = _state instanceof ErrorState?((ErrorState)_state).getError():null; + error = _state instanceof ErrorState ? ((ErrorState)_state).getError() : null; } try { - if (error!=null) + if (error != null) { _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); listener.onError(error); @@ -672,7 +679,7 @@ public class HttpInput extends ServletInputStream implements Runnable LOG.debug(e); try { - if (aeof || error==null) + if (aeof || error == null) { _channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE); listener.onError(e); @@ -696,10 +703,10 @@ public class HttpInput extends ServletInputStream implements Runnable Content content; synchronized (_inputQ) { - state=_state; - consumed=_contentConsumed; - q=_inputQ.size(); - content=_inputQ.peekFirst(); + state = _state; + consumed = _contentConsumed; + q = _inputQ.size(); + content = _inputQ.peekFirst(); } return String.format("%s@%x[c=%d,q=%d,[0]=%s,s=%s]", getClass().getSimpleName(), @@ -713,10 +720,11 @@ public class HttpInput extends ServletInputStream implements Runnable public static class PoisonPillContent extends Content { private final String _name; + public PoisonPillContent(String name) { super(BufferUtil.EMPTY_BUFFER); - _name=name; + _name = name; } @Override @@ -740,7 +748,7 @@ public class HttpInput extends ServletInputStream implements Runnable public Content(ByteBuffer content) { - _content=content; + _content = content; } @Override @@ -768,7 +776,7 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public String toString() { - return String.format("Content@%x{%s}",hashCode(),BufferUtil.toDetailString(_content)); + return String.format("Content@%x{%s}", hashCode(), BufferUtil.toDetailString(_content)); } } @@ -793,9 +801,10 @@ public class HttpInput extends ServletInputStream implements Runnable protected class ErrorState extends EOFState { final Throwable _error; + ErrorState(Throwable error) { - _error=error; + _error = error; } public Throwable getError() @@ -814,7 +823,7 @@ public class HttpInput extends ServletInputStream implements Runnable @Override public String toString() { - return "ERROR:"+_error; + return "ERROR:" + _error; } } From 0322a1640de987f887e0b725a7142236a2724bca Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Fri, 26 Aug 2016 12:49:31 +0200 Subject: [PATCH 10/11] Code cleanup. --- .../org/eclipse/jetty/server/HttpOutput.java | 334 +++++++++--------- 1 file changed, 171 insertions(+), 163 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 1f83ee1623c..d36f0116caa 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -54,66 +54,67 @@ import org.eclipse.jetty.util.log.Logger; * close the stream, to be reopened after the inclusion ends.

*/ public class HttpOutput extends ServletOutputStream implements Runnable -{ - +{ /** - * The HttpOutput.Inteceptor is a single intercept point for all - * output written to the HttpOutput: via writer; via output stream; + * The HttpOutput.Interceptor is a single intercept point for all + * output written to the HttpOutput: via writer; via output stream; * asynchronously; or blocking. *

- * The Interceptor can be used to implement translations (eg Gzip) or - * additional buffering that acts on all output. Interceptors are + * The Interceptor can be used to implement translations (eg Gzip) or + * additional buffering that acts on all output. Interceptors are * created in a chain, so that multiple concerns may intercept. *

- * The {@link HttpChannel} is an {@link Interceptor} and is always the + * The {@link HttpChannel} is an {@link Interceptor} and is always the * last link in any Interceptor chain. *

- * Responses are committed by the first call to + * Responses are committed by the first call to * {@link #write(ByteBuffer, boolean, Callback)} - * and closed by a call to {@link #write(ByteBuffer, boolean, Callback)} - * with the last boolean set true. If no content is available to commit + * and closed by a call to {@link #write(ByteBuffer, boolean, Callback)} + * with the last boolean set true. If no content is available to commit * or close, then a null buffer is passed. */ public interface Interceptor { - /** + /** * Write content. * The response is committed by the first call to write and is closed by - * a call with last == true. Empty content buffers may be passed to + * a call with last == true. Empty content buffers may be passed to * force a commit or close. - * @param content The content to be written or an empty buffer. - * @param last True if this is the last call to write - * @param callback The callback to use to indicate {@link Callback#succeeded()} - * or {@link Callback#failed(Throwable)}. + * + * @param content The content to be written or an empty buffer. + * @param last True if this is the last call to write + * @param callback The callback to use to indicate {@link Callback#succeeded()} + * or {@link Callback#failed(Throwable)}. */ void write(ByteBuffer content, boolean last, Callback callback); - + /** - * @return The next Interceptor in the chain or null if this is the + * @return The next Interceptor in the chain or null if this is the * last Interceptor in the chain. */ Interceptor getNextInterceptor(); - + /** - * @return True if the Interceptor is optimized to receive direct + * @return True if the Interceptor is optimized to receive direct * {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)} - * method. If false is returned, then passing direct buffers may cause + * method. If false is returned, then passing direct buffers may cause * inefficiencies. */ boolean isOptimizedForDirectBuffers(); - + /** * Reset the buffers. *

If the Interceptor contains buffers then reset them. - * @throws IllegalStateException Thrown if the response has been - * committed and buffers and/or headers cannot be reset. + * + * @throws IllegalStateException Thrown if the response has been + * committed and buffers and/or headers cannot be reset. */ default void resetBuffer() throws IllegalStateException { Interceptor next = getNextInterceptor(); - if (next!=null) + if (next != null) next.resetBuffer(); - }; + } } private static Logger LOG = Log.getLogger(HttpOutput.class); @@ -122,7 +123,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable private final SharedBlockingCallback _writeBlock; private Interceptor _interceptor; - /** Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. */ + /** + * Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written. + */ private long _written; private ByteBuffer _aggregate; @@ -130,6 +133,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; + /* ACTION OPEN ASYNC READY PENDING UNREADY CLOSED ------------------------------------------------------------------------------------------- @@ -140,8 +144,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true write completed - - - ASYNC READY->owp - */ - private enum OutputState { OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED } - private final AtomicReference _state=new AtomicReference<>(OutputState.OPEN); + private enum OutputState + { + OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED + } + + private final AtomicReference _state = new AtomicReference<>(OutputState.OPEN); public HttpOutput(HttpChannel channel) { @@ -153,9 +161,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable protected long getIdleTimeout() { long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout(); - if (bto>0) + if (bto > 0) return bto; - if (bto<0) + if (bto < 0) return -1; return _channel.getIdleTimeout(); } @@ -163,10 +171,10 @@ public class HttpOutput extends ServletOutputStream implements Runnable HttpConfiguration config = channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); _commitSize = config.getOutputAggregationSize(); - if (_commitSize>_bufferSize) + if (_commitSize > _bufferSize) { - LOG.warn("OutputAggregationSize {} exceeds bufferSize {}",_commitSize,_bufferSize); - _commitSize=_bufferSize; + LOG.warn("OutputAggregationSize {} exceeds bufferSize {}", _commitSize, _bufferSize); + _commitSize = _bufferSize; } } @@ -182,7 +190,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable public void setInterceptor(Interceptor filter) { - _interceptor=filter; + _interceptor = filter; } public boolean isWritten() @@ -202,7 +210,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private boolean isLastContentToWrite(int len) { - _written+=len; + _written += len; return _channel.getResponse().isAllContentWritten(_written); } @@ -248,9 +256,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void close() { - while(true) + while (true) { - OutputState state=_state.get(); + OutputState state = _state.get(); switch (state) { case CLOSED: @@ -259,18 +267,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable } case UNREADY: { - if (_state.compareAndSet(state,OutputState.ERROR)) - _writeListener.onError(_onError==null?new EofException("Async close"):_onError); + if (_state.compareAndSet(state, OutputState.ERROR)) + _writeListener.onError(_onError == null ? new EofException("Async close") : _onError); break; } default: { - if (!_state.compareAndSet(state,OutputState.CLOSED)) + if (!_state.compareAndSet(state, OutputState.CLOSED)) break; try { - write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); + write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding()); } catch (IOException x) { @@ -293,9 +301,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ void closed() { - while(true) + while (true) { - OutputState state=_state.get(); + OutputState state = _state.get(); switch (state) { case CLOSED: @@ -304,8 +312,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable } case UNREADY: { - if (_state.compareAndSet(state,OutputState.ERROR)) - _writeListener.onError(_onError==null?new EofException("Async closed"):_onError); + if (_state.compareAndSet(state, OutputState.ERROR)) + _writeListener.onError(_onError == null ? new EofException("Async closed") : _onError); break; } default: @@ -345,18 +353,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable public boolean isClosed() { - return _state.get()==OutputState.CLOSED; + return _state.get() == OutputState.CLOSED; } @Override public void flush() throws IOException { - while(true) + while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: - write(BufferUtil.hasContent(_aggregate)?_aggregate:BufferUtil.EMPTY_BUFFER, false); + write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); return; case ASYNC: @@ -388,9 +396,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable public void write(byte[] b, int off, int len) throws IOException { // Async or Blocking ? - while(true) + while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: // process blocking below @@ -405,7 +413,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Should we aggregate? boolean last = isLastContentToWrite(len); - if (!last && len<=_commitSize) + if (!last && len <= _commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers()); @@ -414,7 +422,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content - if (filled==len && !BufferUtil.isFull(_aggregate)) + if (filled == len && !BufferUtil.isFull(_aggregate)) { if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) throw new IllegalStateException(); @@ -422,12 +430,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable } // adjust offset/length - off+=filled; - len-=filled; + off += filled; + len -= filled; } // Do the asynchronous writing from the callback - new AsyncWrite(b,off,len,last).iterate(); + new AsyncWrite(b, off, len, last).iterate(); return; case PENDING: @@ -451,7 +459,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Should we aggregate? int capacity = getBufferSize(); boolean last = isLastContentToWrite(len); - if (!last && len<=_commitSize) + if (!last && len <= _commitSize) { if (_aggregate == null) _aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers()); @@ -460,21 +468,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable int filled = BufferUtil.fill(_aggregate, b, off, len); // return if we are not complete, not full and filled all the content - if (filled==len && !BufferUtil.isFull(_aggregate)) + if (filled == len && !BufferUtil.isFull(_aggregate)) return; // adjust offset/length - off+=filled; - len-=filled; + off += filled; + len -= filled; } // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { - write(_aggregate, last && len==0); + write(_aggregate, last && len == 0); // should we fill aggregate again from the buffer? - if (len>0 && !last && len<=_commitSize && len<=BufferUtil.space(_aggregate)) + if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) { BufferUtil.append(_aggregate, b, off, len); return; @@ -482,26 +490,26 @@ public class HttpOutput extends ServletOutputStream implements Runnable } // write any remaining content in the buffer directly - if (len>0) + if (len > 0) { // write a buffer capacity at a time to avoid JVM pooling large direct buffers // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 ByteBuffer view = ByteBuffer.wrap(b, off, len); - while (len>getBufferSize()) + while (len > getBufferSize()) { - int p=view.position(); - int l=p+getBufferSize(); - view.limit(p+getBufferSize()); - write(view,false); - len-=getBufferSize(); - view.limit(l+Math.min(len,getBufferSize())); + int p = view.position(); + int l = p + getBufferSize(); + view.limit(p + getBufferSize()); + write(view, false); + len -= getBufferSize(); + view.limit(l + Math.min(len, getBufferSize())); view.position(l); } - write(view,last); + write(view, last); } else if (last) { - write(BufferUtil.EMPTY_BUFFER,true); + write(BufferUtil.EMPTY_BUFFER, true); } if (last) @@ -511,9 +519,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable public void write(ByteBuffer buffer) throws IOException { // Async or Blocking ? - while(true) + while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: // process blocking below @@ -528,7 +536,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable // Do the asynchronous writing from the callback boolean last = isLastContentToWrite(buffer.remaining()); - new AsyncWrite(buffer,last).iterate(); + new AsyncWrite(buffer, last).iterate(); return; case PENDING: @@ -547,17 +555,16 @@ public class HttpOutput extends ServletOutputStream implements Runnable break; } - // handle blocking write - int len=BufferUtil.length(buffer); + int len = BufferUtil.length(buffer); boolean last = isLastContentToWrite(len); // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, last && len==0); + write(_aggregate, last && len == 0); // write any remaining content in the buffer directly - if (len>0) + if (len > 0) write(buffer, last); else if (last) write(BufferUtil.EMPTY_BUFFER, true); @@ -569,13 +576,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void write(int b) throws IOException { - _written+=1; - boolean complete=_channel.getResponse().isAllContentWritten(_written); + _written += 1; + boolean complete = _channel.getResponse().isAllContentWritten(_written); // Async or Blocking ? - while(true) + while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: if (_aggregate == null) @@ -649,7 +656,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable public void sendContent(ByteBuffer content) throws IOException { if (LOG.isDebugEnabled()) - LOG.debug("sendContent({})",BufferUtil.toDetailString(content)); + LOG.debug("sendContent({})", BufferUtil.toDetailString(content)); write(content, true); closed(); @@ -663,7 +670,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(InputStream in) throws IOException { - try(Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlock.acquire()) { new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); @@ -685,7 +692,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ReadableByteChannel in) throws IOException { - try(Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlock.acquire()) { new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); @@ -707,7 +714,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(HttpContent content) throws IOException { - try(Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlock.acquire()) { sendContent(content, blocker); blocker.block(); @@ -723,13 +730,14 @@ public class HttpOutput extends ServletOutputStream implements Runnable /** * Asynchronous send of whole content. - * @param content The whole content to send + * + * @param content The whole content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ByteBuffer content, final Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("sendContent(buffer={},{})",BufferUtil.toDetailString(content),callback); + LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); write(content, true, new Callback.Nested(callback) { @@ -753,13 +761,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable * Asynchronous send of stream content. * The stream will be closed after reading all content. * - * @param in The stream content to send + * @param in The stream content to send * @param callback The callback to use to notify success or failure */ public void sendContent(InputStream in, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("sendContent(stream={},{})",in,callback); + LOG.debug("sendContent(stream={},{})", in, callback); new InputStreamWritingCB(in, callback).iterate(); } @@ -768,13 +776,13 @@ public class HttpOutput extends ServletOutputStream implements Runnable * Asynchronous send of channel content. * The channel will be closed after reading all content. * - * @param in The channel content to send + * @param in The channel content to send * @param callback The callback to use to notify success or failure */ public void sendContent(ReadableByteChannel in, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("sendContent(channel={},{})",in,callback); + LOG.debug("sendContent(channel={},{})", in, callback); new ReadableByteChannelWritingCB(in, callback).iterate(); } @@ -783,12 +791,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable * Asynchronous send of HTTP content. * * @param httpContent The HTTP content to send - * @param callback The callback to use to notify success or failure + * @param callback The callback to use to notify success or failure */ public void sendContent(HttpContent httpContent, Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("sendContent(http={},{})",httpContent,callback); + LOG.debug("sendContent(http={},{})", httpContent, callback); if (BufferUtil.hasContent(_aggregate)) { @@ -803,7 +811,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING)) @@ -824,37 +832,36 @@ public class HttpOutput extends ServletOutputStream implements Runnable break; } - ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) buffer = httpContent.getIndirectBuffer(); - if (buffer!=null) + if (buffer != null) { - sendContent(buffer,callback); + sendContent(buffer, callback); return; } try { - ReadableByteChannel rbc=httpContent.getReadableByteChannel(); - if (rbc!=null) + ReadableByteChannel rbc = httpContent.getReadableByteChannel(); + if (rbc != null) { // Close of the rbc is done by the async sendContent - sendContent(rbc,callback); + sendContent(rbc, callback); return; } InputStream in = httpContent.getInputStream(); - if (in!=null) + if (in != null) { - sendContent(in,callback); + sendContent(in, callback); return; } - throw new IllegalArgumentException("unknown content for "+httpContent); + throw new IllegalArgumentException("unknown content for " + httpContent); } - catch(Throwable th) + catch (Throwable th) { abort(th); callback.failed(th); @@ -874,7 +881,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable public void recycle() { - _interceptor=_channel; + _interceptor = _channel; if (BufferUtil.hasContent(_aggregate)) BufferUtil.clear(_aggregate); _written = 0; @@ -914,7 +921,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { while (true) { - switch(_state.get()) + switch (_state.get()) { case OPEN: return true; @@ -950,28 +957,29 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void run() { - loop: while (true) + loop: + while (true) { OutputState state = _state.get(); - if(_onError!=null) + if (_onError != null) { - switch(state) + switch (state) { case CLOSED: case ERROR: { - _onError=null; + _onError = null; break loop; } default: { if (_state.compareAndSet(state, OutputState.ERROR)) { - Throwable th=_onError; - _onError=null; + Throwable th = _onError; + _onError = null; if (LOG.isDebugEnabled()) - LOG.debug("onError",th); + LOG.debug("onError", th); _writeListener.onError(th); close(); break loop; @@ -981,7 +989,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable continue; } - switch(_state.get()) + switch (_state.get()) { case ASYNC: case READY: @@ -1003,7 +1011,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable break; default: - _onError=new IllegalStateException("state="+_state.get()); + _onError = new IllegalStateException("state=" + _state.get()); } } } @@ -1023,25 +1031,25 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public String toString() { - return String.format("%s@%x{%s}",this.getClass().getSimpleName(),hashCode(),_state.get()); + return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get()); } private abstract class AsyncICB extends IteratingCallback { final boolean _last; - + AsyncICB(boolean last) { - _last=last; + _last = last; } - + @Override protected void onCompleteSuccess() { - while(true) + while (true) { - OutputState last=_state.get(); - switch(last) + OutputState last = _state.get(); + switch (last) { case PENDING: if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC)) @@ -1070,7 +1078,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable @Override public void onCompleteFailure(Throwable e) { - _onError=e==null?new IOException():e; + _onError = e == null ? new IOException() : e; if (_channel.getState().onWritePossible()) _channel.execute(_channel); } @@ -1090,15 +1098,15 @@ public class HttpOutput extends ServletOutputStream implements Runnable { if (BufferUtil.hasContent(_aggregate)) { - _flushed=true; + _flushed = true; write(_aggregate, false, this); return Action.SCHEDULED; } if (!_flushed) { - _flushed=true; - write(BufferUtil.EMPTY_BUFFER,false,this); + _flushed = true; + write(BufferUtil.EMPTY_BUFFER, false, this); return Action.SCHEDULED; } @@ -1116,23 +1124,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable public AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); - _buffer=ByteBuffer.wrap(b, off, len); - _len=len; + _buffer = ByteBuffer.wrap(b, off, len); + _len = len; // always use a view for large byte arrays to avoid JVM pooling large direct buffers - _slice=_len Date: Wed, 31 Aug 2016 12:15:31 +0200 Subject: [PATCH 11/11] Issue #845 - Improve blocking IO for data rate limiting. Moved tests to run HTTP and HTTP/2 tests, and added more test cases. --- .../AbstractHTTP2ServerConnectionFactory.java | 19 +- .../http2/server/HTTP2ServerConnection.java | 6 +- .../http2/server/HttpChannelOverHTTP2.java | 21 +- .../http2/server/HttpTransportOverHTTP2.java | 111 ++- .../jetty/server/HttpChannelState.java | 7 +- .../jetty/server/HttpConfiguration.java | 6 +- .../eclipse/jetty/server/HttpConnection.java | 9 +- .../org/eclipse/jetty/server/HttpInput.java | 35 +- .../org/eclipse/jetty/server/HttpOutput.java | 45 +- .../jetty/server/ConnectorTimeoutTest.java | 185 ----- .../server/ServerConnectorTimeoutTest.java | 60 +- .../test/resources/jetty-logging.properties | 2 +- .../jetty/util/SharedBlockingCallback.java | 38 +- .../jetty/http/client/AbstractTest.java | 19 +- .../jetty/http/client/ServerTimeoutsTest.java | 732 ++++++++++++++++++ 15 files changed, 1027 insertions(+), 268 deletions(-) create mode 100644 tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java index fa402070f8a..4303d09f79e 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/AbstractHTTP2ServerConnectionFactory.java @@ -50,6 +50,7 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne private int maxHeaderBlockFragment = 0; private FlowControlStrategy.Factory flowControlStrategyFactory = () -> new BufferingFlowControlStrategy(0.5F); private ExecutionStrategy.Factory executionStrategyFactory = new ProduceExecuteConsume.Factory(); + private long streamIdleTimeout; public AbstractHTTP2ServerConnectionFactory(@Name("config") HttpConfiguration httpConfiguration) { @@ -157,6 +158,17 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne this.executionStrategyFactory = executionStrategyFactory; } + @ManagedAttribute("The stream idle timeout in milliseconds") + public long getStreamIdleTimeout() + { + return streamIdleTimeout; + } + + public void setStreamIdleTimeout(long streamIdleTimeout) + { + this.streamIdleTimeout = streamIdleTimeout; + } + public HttpConfiguration getHttpConfiguration() { return httpConfiguration; @@ -177,8 +189,11 @@ public abstract class AbstractHTTP2ServerConnectionFactory extends AbstractConne // For a single stream in a connection, there will be a race between // the stream idle timeout and the connection idle timeout. However, // the typical case is that the connection will be busier and the - // stream idle timeout will expire earlier that the connection's. - session.setStreamIdleTimeout(endPoint.getIdleTimeout()); + // stream idle timeout will expire earlier than the connection's. + long streamIdleTimeout = getStreamIdleTimeout(); + if (streamIdleTimeout <= 0) + streamIdleTimeout = endPoint.getIdleTimeout(); + session.setStreamIdleTimeout(streamIdleTimeout); session.setInitialSessionRecvWindow(getInitialSessionRecvWindow()); ServerParser parser = newServerParser(connector, session); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java index 99996a2a887..0a462143703 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HTTP2ServerConnection.java @@ -133,9 +133,9 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection public boolean onStreamTimeout(IStream stream, Throwable failure) { HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); - boolean result = !channel.isRequestHandled(); + boolean result = channel.onStreamTimeout(failure); if (LOG.isDebugEnabled()) - LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", stream, failure); + LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure); return result; } @@ -157,7 +157,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection result &= !channel.isRequestHandled(); } if (LOG.isDebugEnabled()) - LOG.debug("{} idle timeout on {}: {}", result ? "Processing" : "Ignoring", session, failure); + LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); return result; } diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java index f2ac9c918b7..4cf3396dca3 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpChannelOverHTTP2.java @@ -267,11 +267,11 @@ public class HttpChannelOverHTTP2 extends HttpChannel handle); } - boolean delayed = _delayedUntilContent; + boolean wasDelayed = _delayedUntilContent; _delayedUntilContent = false; - if (delayed) + if (wasDelayed) _handled = true; - return handle || delayed ? this : null; + return handle || wasDelayed ? this : null; } public boolean isRequestHandled() @@ -279,6 +279,21 @@ public class HttpChannelOverHTTP2 extends HttpChannel return _handled; } + public boolean onStreamTimeout(Throwable failure) + { + if (!_handled) + return true; + + HttpInput input = getRequest().getHttpInput(); + boolean readFailed = input.failed(failure); + if (readFailed) + handle(); + + boolean writeFailed = getHttpTransport().onStreamTimeout(failure); + + return readFailed || writeFailed; + } + public void onFailure(Throwable failure) { onEarlyEOF(); diff --git a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java index 1541688589a..7ca4974ceb8 100644 --- a/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java +++ b/jetty-http2/http2-server/src/main/java/org/eclipse/jetty/http2/server/HttpTransportOverHTTP2.java @@ -43,7 +43,7 @@ public class HttpTransportOverHTTP2 implements HttpTransport private static final Logger LOG = Log.getLogger(HttpTransportOverHTTP2.class); private final AtomicBoolean commit = new AtomicBoolean(); - private final Callback commitCallback = new CommitCallback(); + private final TransportCallback transportCallback = new TransportCallback(); private final Connector connector; private final HTTP2ServerConnection connection; private IStream stream; @@ -100,12 +100,22 @@ public class HttpTransportOverHTTP2 implements HttpTransport { if (hasContent) { - commit(info, false, commitCallback); - send(content, lastContent, callback); + Callback commitCallback = new Callback.Nested(callback) + { + @Override + public void succeeded() + { + if (transportCallback.start(callback, false)) + send(content, lastContent, transportCallback); + } + }; + if (transportCallback.start(commitCallback, true)) + commit(info, false, transportCallback); } else { - commit(info, lastContent, callback); + if (transportCallback.start(callback, false)) + commit(info, lastContent, transportCallback); } } else @@ -117,7 +127,8 @@ public class HttpTransportOverHTTP2 implements HttpTransport { if (hasContent || lastContent) { - send(content, lastContent, callback); + if (transportCallback.start(callback, false)) + send(content, lastContent, transportCallback); } else { @@ -186,6 +197,11 @@ public class HttpTransportOverHTTP2 implements HttpTransport stream.data(frame, callback); } + public boolean onStreamTimeout(Throwable failure) + { + return transportCallback.onIdleTimeout(failure); + } + @Override public void onCompleted() { @@ -214,20 +230,99 @@ public class HttpTransportOverHTTP2 implements HttpTransport stream.reset(new ResetFrame(stream.getId(), ErrorCode.INTERNAL_ERROR.code), Callback.NOOP); } - private class CommitCallback implements Callback.NonBlocking + private class TransportCallback implements Callback { + private State state = State.IDLE; + private Callback callback; + private boolean commit; + + public boolean start(Callback callback, boolean commit) + { + State state; + synchronized (this) + { + state = this.state; + if (state == State.IDLE) + { + this.state = State.WRITING; + this.callback = callback; + this.commit = commit; + return true; + } + } + callback.failed(new IllegalStateException("Invalid transport state: " + state)); + return false; + } + @Override public void succeeded() { + boolean commit; + Callback callback = null; + synchronized (this) + { + commit = this.commit; + if (state != State.TIMEOUT) + { + callback = this.callback; + this.state = State.IDLE; + } + } if (LOG.isDebugEnabled()) - LOG.debug("HTTP2 Response #{} committed", stream.getId()); + LOG.debug("HTTP2 Response #{} {}", stream.getId(), commit ? "committed" : "flushed content"); + if (callback != null) + callback.succeeded(); } @Override public void failed(Throwable x) { + boolean commit; + Callback callback = null; + synchronized (this) + { + commit = this.commit; + if (state != State.TIMEOUT) + { + callback = this.callback; + this.state = State.FAILED; + } + } if (LOG.isDebugEnabled()) - LOG.debug("HTTP2 Response #" + stream.getId() + " failed to commit", x); + LOG.debug("HTTP2 Response #" + stream.getId() + " failed to " + (commit ? "commit" : "flush"), x); + if (callback != null) + callback.failed(x); + } + + @Override + public boolean isNonBlocking() + { + return callback.isNonBlocking(); + } + + private boolean onIdleTimeout(Throwable failure) + { + boolean result; + Callback callback = null; + synchronized (this) + { + result = state == State.WRITING; + if (result) + { + callback = this.callback; + this.state = State.TIMEOUT; + } + } + if (LOG.isDebugEnabled()) + LOG.debug("HTTP2 Response #" + stream.getId() + " idle timeout", failure); + if (result) + callback.failed(failure); + return result; } } + + private enum State + { + IDLE, WRITING, FAILED, TIMEOUT + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 000f05c50c2..9f8fa19adad 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -419,8 +419,8 @@ public class HttpChannelState _state=State.ASYNC_WAIT; action=Action.WAIT; if (_asyncReadUnready) - _channel.asyncReadFillInterested(); - Scheduler scheduler = _channel.getScheduler(); + read_interested=true; + Scheduler scheduler=_channel.getScheduler(); if (scheduler!=null && _timeoutMs>0) _event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS)); } @@ -454,6 +454,9 @@ public class HttpChannelState } } + if (read_interested) + _channel.asyncReadFillInterested(); + return action; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java index 8d1026484b6..189e1f12eeb 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConfiguration.java @@ -65,7 +65,7 @@ public class HttpConfiguration private boolean _delayDispatchUntilContent = true; private boolean _persistentConnectionsEnabled = true; private int _maxErrorDispatches = 10; - private int _minRequestDataRate; + private long _minRequestDataRate; /* ------------------------------------------------------------ */ /** @@ -512,7 +512,7 @@ public class HttpConfiguration /** * @return The minimum request data rate in bytes per second; or <=0 for no limit */ - public int getMinRequestDataRate() + public long getMinRequestDataRate() { return _minRequestDataRate; } @@ -521,7 +521,7 @@ public class HttpConfiguration /** * @param bytesPerSecond The minimum request data rate in bytes per second; or <=0 for no limit */ - public void setMinRequestDataRate(int bytesPerSecond) + public void setMinRequestDataRate(long bytesPerSecond) { _minRequestDataRate=bytesPerSecond; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 7e636e6d996..a9418d6040f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -124,9 +124,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http protected HttpChannelOverHttp newHttpChannel() { - HttpChannelOverHttp httpChannel = new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this); - - return httpChannel; + return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this); } protected HttpParser newHttpParser(HttpCompliance compliance) @@ -285,9 +283,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http while (_parser.inContentState()) { int filled = fillRequestBuffer(); - boolean handle = parseRequestBuffer(); - handled|=handle; - if (handle || filled<=0 || _channel.getRequest().getHttpInput().hasContent()) + handled = parseRequestBuffer(); + if (handled || filled<=0 || _channel.getRequest().getHttpInput().hasContent()) break; } return handled; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 6f045d85c41..a0ab5891586 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -62,13 +62,11 @@ public class HttpInput extends ServletInputStream implements Runnable private long _firstByteTimeStamp = -1; private long _contentArrived; private long _contentConsumed; - private long _blockingTimeoutAt = -1; + private long _blockUntil; public HttpInput(HttpChannelState state) { _channelState = state; - if (_channelState.getHttpChannel().getHttpConfiguration().getBlockingTimeout() > 0) - _blockingTimeoutAt = 0; } protected HttpChannelState getHttpChannelState() @@ -91,7 +89,7 @@ public class HttpInput extends ServletInputStream implements Runnable _contentArrived = 0; _contentConsumed = 0; _firstByteTimeStamp = -1; - _blockingTimeoutAt = -1; + _blockUntil = 0; } } @@ -132,6 +130,10 @@ public class HttpInput extends ServletInputStream implements Runnable executor.execute(channel); } + private long getBlockingTimeout() + { + return getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + } @Override public int read() throws IOException @@ -147,10 +149,17 @@ public class HttpInput extends ServletInputStream implements Runnable { synchronized (_inputQ) { - if (_blockingTimeoutAt >= 0 && !isAsync()) - _blockingTimeoutAt = System.currentTimeMillis() + getHttpChannelState().getHttpChannel().getHttpConfiguration().getBlockingTimeout(); + if (!isAsync()) + { + if (_blockUntil == 0) + { + long blockingTimeout = getBlockingTimeout(); + if (blockingTimeout > 0) + _blockUntil = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(blockingTimeout); + } + } - int minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); + long minRequestDataRate = _channelState.getHttpChannel().getHttpConfiguration().getMinRequestDataRate(); if (minRequestDataRate > 0 && _firstByteTimeStamp != -1) { long period = System.nanoTime() - _firstByteTimeStamp; @@ -374,9 +383,9 @@ public class HttpInput extends ServletInputStream implements Runnable try { long timeout = 0; - if (_blockingTimeoutAt >= 0) + if (_blockUntil != 0) { - timeout = _blockingTimeoutAt - System.currentTimeMillis(); + timeout = TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()); if (timeout <= 0) throw new TimeoutException(); } @@ -388,8 +397,11 @@ public class HttpInput extends ServletInputStream implements Runnable else _inputQ.wait(); - if (_blockingTimeoutAt > 0 && System.currentTimeMillis() >= _blockingTimeoutAt) - throw new TimeoutException(); + // TODO: cannot return unless there is content or timeout, + // TODO: so spurious wakeups are not handled correctly. + + if (_blockUntil != 0 && TimeUnit.NANOSECONDS.toMillis(_blockUntil - System.nanoTime()) <= 0) + throw new TimeoutException(String.format("Blocking timeout %d ms", getBlockingTimeout())); } catch (Throwable e) { @@ -550,7 +562,6 @@ public class HttpInput extends ServletInputStream implements Runnable } } - @Override public boolean isReady() { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index d36f0116caa..76471243e03 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -120,7 +120,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private static Logger LOG = Log.getLogger(HttpOutput.class); private final HttpChannel _channel; - private final SharedBlockingCallback _writeBlock; + private final SharedBlockingCallback _writeBlocker; private Interceptor _interceptor; /** @@ -155,19 +155,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable { _channel = channel; _interceptor = channel; - _writeBlock = new SharedBlockingCallback() - { - @Override - protected long getIdleTimeout() - { - long bto = getHttpChannel().getHttpConfiguration().getBlockingTimeout(); - if (bto > 0) - return bto; - if (bto < 0) - return -1; - return _channel.getIdleTimeout(); - } - }; + _writeBlocker = new WriteBlocker(channel); HttpConfiguration config = channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); _commitSize = config.getOutputAggregationSize(); @@ -221,12 +209,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable protected Blocker acquireWriteBlockingCallback() throws IOException { - return _writeBlock.acquire(); + return _writeBlocker.acquire(); } private void write(ByteBuffer content, boolean complete) throws IOException { - try (Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlocker.acquire()) { write(content, complete, blocker); blocker.block(); @@ -670,7 +658,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(InputStream in) throws IOException { - try (Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlocker.acquire()) { new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); @@ -692,7 +680,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(ReadableByteChannel in) throws IOException { - try (Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlocker.acquire()) { new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); @@ -714,7 +702,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable */ public void sendContent(HttpContent content) throws IOException { - try (Blocker blocker = _writeBlock.acquire()) + try (Blocker blocker = _writeBlocker.acquire()) { sendContent(content, blocker); blocker.block(); @@ -1325,4 +1313,23 @@ public class HttpOutput extends ServletOutputStream implements Runnable super.onCompleteFailure(x); } } + + private static class WriteBlocker extends SharedBlockingCallback + { + private final HttpChannel _channel; + + private WriteBlocker(HttpChannel channel) + { + _channel = channel; + } + + @Override + protected long getIdleTimeout() + { + long blockingTimeout = _channel.getHttpConfiguration().getBlockingTimeout(); + if (blockingTimeout == 0) + return _channel.getIdleTimeout(); + return blockingTimeout; + } + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java index c37c2ba5ccb..e5188af3bfb 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ConnectorTimeoutTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.server; -import static org.hamcrest.Matchers.containsString; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -739,190 +737,7 @@ public abstract class ConnectorTimeoutTest extends HttpServerTestFixture int offset=in.indexOf("Hello World"); Assert.assertTrue(offset > 0); } - - - @Test(timeout=60000) - public void testMaxIdleWithDelayedDispatch() throws Exception - { - configureServer(new EchoHandler()); - Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); - client.setSoTimeout(10000); - - Assert.assertFalse(client.isClosed()); - - OutputStream os=client.getOutputStream(); - InputStream is=client.getInputStream(); - - String content="Wibble"; - byte[] contentB=content.getBytes("utf-8"); - os.write(( - "POST /echo HTTP/1.1\r\n"+ - "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ - "content-type: text/plain; charset=utf-8\r\n"+ - "content-length: "+contentB.length+"\r\n"+ - "\r\n").getBytes("utf-8")); - os.flush(); - - long start = System.currentTimeMillis(); - IO.toString(is); - - Thread.sleep(sleepTime); - Assert.assertEquals(-1, is.read()); - - Assert.assertTrue(System.currentTimeMillis() - start > minimumTestRuntime); - Assert.assertTrue(System.currentTimeMillis() - start < maximumTestRuntime); - } - - @Test(timeout=60000) - public void testSlowClientRequestNoLimit() throws Exception - { - configureServer(new EchoHandler()); - Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); - client.setSoTimeout(10000); - - Assert.assertFalse(client.isClosed()); - - OutputStream os=client.getOutputStream(); - InputStream is=client.getInputStream(); - - os.write(( - "POST /echo HTTP/1.0\r\n"+ - "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ - "content-type: text/plain; charset=utf-8\r\n"+ - "content-length: 20\r\n"+ - "\r\n").getBytes("utf-8")); - os.flush(); - - for (int i=0;i<4;i++) - { - os.write("123\n".getBytes("utf-8")); - os.flush(); - Thread.sleep(1000); - } - os.write("===\n".getBytes("utf-8")); - os.flush(); - - String response =IO.toString(is); - Assert.assertThat(response,containsString(" 200 ")); - Assert.assertThat(response,containsString("===")); - } - - @Test(timeout=60000) - public void testSlowClientRequestLimited() throws Exception - { - _httpConfiguration.setMinRequestDataRate(20); - configureServer(new EchoHandler()); - Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); - client.setSoTimeout(10000); - - Assert.assertFalse(client.isClosed()); - - OutputStream os=client.getOutputStream(); - InputStream is=client.getInputStream(); - - os.write(( - "POST /echo HTTP/1.0\r\n"+ - "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ - "content-type: text/plain; charset=utf-8\r\n"+ - "content-length: 20\r\n"+ - "\r\n").getBytes("utf-8")); - os.flush(); - - try - { - for (int i=0;i<4;i++) - { - os.write("123\n".getBytes("utf-8")); - os.flush(); - Thread.sleep(500); - } - os.write("===\n".getBytes("utf-8")); - os.flush(); - - String response =IO.toString(is); - Assert.assertThat(response,containsString(" 408 ")); - Assert.assertThat(response,containsString("Request Data rate")); - } - catch (SocketException e) - {} - } - - @Test(timeout=60000) - public void testSlowClientRequestLimitExceeded() throws Exception - { - _httpConfiguration.setMinRequestDataRate(20); - configureServer(new EchoHandler()); - Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); - client.setSoTimeout(10000); - - Assert.assertFalse(client.isClosed()); - - OutputStream os=client.getOutputStream(); - InputStream is=client.getInputStream(); - - os.write(( - "POST /echo HTTP/1.0\r\n"+ - "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ - "content-type: text/plain; charset=utf-8\r\n"+ - "content-length: 100\r\n"+ - "\r\n").getBytes("utf-8")); - os.flush(); - - for (int i=0;i<9;i++) - { - os.write("123456789\n".getBytes("utf-8")); - os.flush(); - Thread.sleep(250); - } - os.write("=========\n".getBytes("utf-8")); - os.flush(); - - String response =IO.toString(is); - Assert.assertThat(response,containsString(" 200 ")); - Assert.assertThat(response,containsString("=========")); - } - - - @Test(timeout=60000) - public void testHttpIdleTime() throws Exception - { - _httpConfiguration.setIdleTimeout(500); - configureServer(new EchoHandler()); - Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); - client.setSoTimeout(10000); - - Assert.assertFalse(client.isClosed()); - - OutputStream os=client.getOutputStream(); - InputStream is=client.getInputStream(); - - try (StacklessLogging scope = new StacklessLogging(HttpChannel.class)) - { - os.write(( - "POST /echo HTTP/1.0\r\n"+ - "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ - "content-type: text/plain; charset=utf-8\r\n"+ - "content-length: 20\r\n"+ - "\r\n").getBytes("utf-8")); - os.flush(); - - os.write("123456789\n".getBytes("utf-8")); - os.flush(); - Thread.sleep(1000); - os.write("=========\n".getBytes("utf-8")); - os.flush(); - - String response =IO.toString(is); - Assert.assertThat(response,containsString(" 500 ")); - Assert.assertThat(response,containsString("/500 ms")); - Assert.assertThat(response,Matchers.not(containsString("========="))); - } - } - - - - protected static class SlowResponseHandler extends AbstractHandler { @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java index e10cb8d5de1..f8bd2d0997e 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/ServerConnectorTimeoutTest.java @@ -18,20 +18,30 @@ package org.eclipse.jetty.server; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.UnsupportedEncodingException; import java.net.Socket; import java.nio.charset.StandardCharsets; import java.util.Locale; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.session.SessionHandler; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.hamcrest.Matchers; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertTrue; + public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest { @Before @@ -113,4 +123,50 @@ public class ServerConnectorTimeoutTest extends ConnectorTimeoutTest return response; } } + + @Test + public void testHttpWriteIdleTimeout() throws Exception + { + _httpConfiguration.setBlockingTimeout(500); + configureServer(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + IO.copy(request.getInputStream(), response.getOutputStream()); + } + }); + Socket client=newSocket(_serverURI.getHost(),_serverURI.getPort()); + client.setSoTimeout(10000); + + Assert.assertFalse(client.isClosed()); + + OutputStream os=client.getOutputStream(); + InputStream is=client.getInputStream(); + + try (StacklessLogging scope = new StacklessLogging(HttpChannel.class)) + { + os.write(( + "POST /echo HTTP/1.0\r\n"+ + "host: "+_serverURI.getHost()+":"+_serverURI.getPort()+"\r\n"+ + "content-type: text/plain; charset=utf-8\r\n"+ + "content-length: 20\r\n"+ + "\r\n").getBytes("utf-8")); + os.flush(); + + os.write("123456789\n".getBytes("utf-8")); + os.flush(); + Thread.sleep(1000); + os.write("=========\n".getBytes("utf-8")); + os.flush(); + + Thread.sleep(2000); + + String response =IO.toString(is); + Assert.assertThat(response,containsString(" 500 ")); + Assert.assertThat(response,containsString("/500 ms")); + Assert.assertThat(response, Matchers.not(containsString("========="))); + } + } } diff --git a/jetty-server/src/test/resources/jetty-logging.properties b/jetty-server/src/test/resources/jetty-logging.properties index f345cd6cb5e..d2d9f09e73e 100644 --- a/jetty-server/src/test/resources/jetty-logging.properties +++ b/jetty-server/src/test/resources/jetty-logging.properties @@ -1,3 +1,3 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.LEVEL=INFO +org.eclipse.jetty.LEVEL=DEBUG #org.eclipse.jetty.server.LEVEL=DEBUG diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java index adfc547242c..ce64842e5d7 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/SharedBlockingCallback.java @@ -78,16 +78,16 @@ public class SharedBlockingCallback private final Condition _idle = _lock.newCondition(); private final Condition _complete = _lock.newCondition(); private Blocker _blocker = new Blocker(); - + protected long getIdleTimeout() { return -1; } - + public Blocker acquire() throws IOException { - _lock.lock(); long idle = getIdleTimeout(); + _lock.lock(); try { while (_blocker._state != IDLE) @@ -102,8 +102,9 @@ public class SharedBlockingCallback _idle.await(); } _blocker._state = null; + return _blocker; } - catch (final InterruptedException e) + catch (InterruptedException x) { throw new InterruptedIOException(); } @@ -111,7 +112,6 @@ public class SharedBlockingCallback { _lock.unlock(); } - return _blocker; } protected void notComplete(Blocker blocker) @@ -173,8 +173,15 @@ public class SharedBlockingCallback _state=cause; _complete.signalAll(); } - else + else if (_state instanceof BlockerTimeoutException) + { + // Failure arrived late, block() already + // modified the state, nothing more to do. + } + else + { throw new IllegalStateException(_state); + } } finally { @@ -191,19 +198,24 @@ public class SharedBlockingCallback */ public void block() throws IOException { - _lock.lock(); long idle = getIdleTimeout(); + _lock.lock(); try { while (_state == null) { - if (idle>0 && (idle < Long.MAX_VALUE/2)) + if (idle > 0) { - // Wait a little bit longer than expected callback idle timeout - if (!_complete.await(idle+idle/2,TimeUnit.MILLISECONDS)) - // The callback has not arrived in sufficient time. - // We will synthesize a TimeoutException - _state=new BlockerTimeoutException(); + // Waiting here may compete with the idle timeout mechanism, + // so here we wait a little bit longer to favor the normal + // idle timeout mechanism that will call failed(Throwable). + long excess = Math.min(idle / 2, 1000); + if (!_complete.await(idle + excess, TimeUnit.MILLISECONDS)) + { + // Method failed(Throwable) has not been called yet, + // so we will synthesize a special TimeoutException. + _state = new BlockerTimeoutException(); + } } else { diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java index 32b52e3c44b..99f79cbc90d 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AbstractTest.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.After; +import org.junit.Assume; import org.junit.Rule; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -61,6 +62,7 @@ public abstract class AbstractTest @Rule public final TestTracker tracker = new TestTracker(); + protected final HttpConfiguration httpConfig = new HttpConfiguration(); protected final Transport transport; protected SslContextFactory sslContextFactory; protected Server server; @@ -69,6 +71,7 @@ public abstract class AbstractTest public AbstractTest(Transport transport) { + Assume.assumeNotNull(transport); this.transport = transport; } @@ -118,14 +121,13 @@ public abstract class AbstractTest { case HTTP: { - result.add(new HttpConnectionFactory(new HttpConfiguration())); + result.add(new HttpConnectionFactory(httpConfig)); break; } case HTTPS: { - HttpConfiguration configuration = new HttpConfiguration(); - configuration.addCustomizer(new SecureRequestCustomizer()); - HttpConnectionFactory http = new HttpConnectionFactory(configuration); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HttpConnectionFactory http = new HttpConnectionFactory(httpConfig); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, http.getProtocol()); result.add(ssl); result.add(http); @@ -133,14 +135,13 @@ public abstract class AbstractTest } case H2C: { - result.add(new HTTP2CServerConnectionFactory(new HttpConfiguration())); + result.add(new HTTP2CServerConnectionFactory(httpConfig)); break; } case H2: { - HttpConfiguration configuration = new HttpConfiguration(); - configuration.addCustomizer(new SecureRequestCustomizer()); - HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(configuration); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HTTP2ServerConnectionFactory h2 = new HTTP2ServerConnectionFactory(httpConfig); ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory("h2"); SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, alpn.getProtocol()); result.add(ssl); @@ -150,7 +151,7 @@ public abstract class AbstractTest } case FCGI: { - result.add(new ServerFCGIConnectionFactory(new HttpConfiguration())); + result.add(new ServerFCGIConnectionFactory(httpConfig)); break; } default: diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java new file mode 100644 index 00000000000..eaa4b74bd86 --- /dev/null +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/ServerTimeoutsTest.java @@ -0,0 +1,732 @@ +// +// ======================================================================== +// Copyright (c) 1995-2016 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http.client; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.servlet.AsyncContext; +import javax.servlet.ReadListener; +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.util.DeferredContentProvider; +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpChannel; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.StacklessLogging; +import org.junit.Assert; +import org.junit.Test; + +public class ServerTimeoutsTest extends AbstractTest +{ + public ServerTimeoutsTest(Transport transport) + { + // Skip FCGI for now, not much interested in its server-side behavior. + super(transport == Transport.FCGI ? null : transport); + } + + private void setServerIdleTimeout(long idleTimeout) + { + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + h2.setStreamIdleTimeout(idleTimeout); + else + connector.setIdleTimeout(idleTimeout); + } + + @Test + public void testDelayedDispatchRequestWithDelayedFirstContentIdleTimeoutFires() throws Exception + { + httpConfig.setDelayDispatchUntilContent(true); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + handlerLatch.countDown(); + } + }); + long idleTimeout = 1000; + setServerIdleTimeout(idleTimeout); + + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(new DeferredContentProvider()) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + // We did not send the content, the request was not + // dispatched, the server should have idle timed out. + Assert.assertFalse(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testNoBlockingTimeoutBlockingReadIdleTimeoutFires() throws Exception + { + httpConfig.setBlockingTimeout(-1); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingReadHandler(handlerLatch)); + long idleTimeout = 1000; + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingReadBlockingTimeoutFires() throws Exception + { + long blockingTimeout = 1000; + httpConfig.setBlockingTimeout(blockingTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingReadHandler(handlerLatch)); + long idleTimeout = 3 * blockingTimeout; + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlockingTimeoutLargerThanIdleTimeoutBlockingReadIdleTimeoutFires() throws Exception + { + long idleTimeout = 1000; + long blockingTimeout = 3 * idleTimeout; + httpConfig.setBlockingTimeout(blockingTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingReadHandler(handlerLatch)); + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testNoBlockingTimeoutBlockingWriteIdleTimeoutFires() throws Exception + { + httpConfig.setBlockingTimeout(-1); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingWriteHandler(handlerLatch)); + long idleTimeout = 1000; + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + BlockingQueue callbacks = new LinkedBlockingQueue<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + // Do not succeed the callback so the server will block writing. + callbacks.offer(callback); + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + // Blocking write should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // After the server stopped sending, consume on the client to read the early EOF. + while (true) + { + Callback callback = callbacks.poll(1, TimeUnit.SECONDS); + if (callback == null) + break; + callback.succeeded(); + } + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlockingTimeoutSmallerThanIdleTimeoutBlockingWriteBlockingTimeoutFires() throws Exception + { + long blockingTimeout = 1000; + httpConfig.setBlockingTimeout(blockingTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingWriteHandler(handlerLatch)); + long idleTimeout = 3 * blockingTimeout; + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + BlockingQueue callbacks = new LinkedBlockingQueue<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + // Do not succeed the callback so the server will block writing. + callbacks.offer(callback); + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + // Blocking write should timeout. + Assert.assertTrue(handlerLatch.await(2 * blockingTimeout, TimeUnit.MILLISECONDS)); + // After the server stopped sending, consume on the client to read the early EOF. + while (true) + { + Callback callback = callbacks.poll(1, TimeUnit.SECONDS); + if (callback == null) + break; + callback.succeeded(); + } + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlockingTimeoutLargerThanIdleTimeoutBlockingWriteIdleTimeoutFires() throws Exception + { + long idleTimeout = 1000; + long blockingTimeout = 3 * idleTimeout; + httpConfig.setBlockingTimeout(blockingTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingWriteHandler(handlerLatch)); + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + BlockingQueue callbacks = new LinkedBlockingQueue<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + // Do not succeed the callback so the server will block writing. + callbacks.offer(callback); + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // After the server stopped sending, consume on the client to read the early EOF. + while (true) + { + Callback callback = callbacks.poll(1, TimeUnit.SECONDS); + if (callback == null) + break; + callback.succeeded(); + } + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testBlockingTimeoutWithSlowRead() throws Exception + { + long idleTimeout = 1000; + long blockingTimeout = 2 * idleTimeout; + httpConfig.setBlockingTimeout(blockingTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + ServletInputStream input = request.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } + } + catch (IOException x) + { + handlerLatch.countDown(); + throw x; + } + } + }); + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .content(contentProvider) + .send(result -> + { + // Result may fail to send the whole request body, + // but the response has arrived successfully. + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // The writes should be slow but not trigger the idle timeout. + long period = idleTimeout / 2; + long writes = 2 * (blockingTimeout / period); + for (long i = 0; i < writes; ++i) + { + contentProvider.offer(ByteBuffer.allocate(1)); + Thread.sleep(period); + } + contentProvider.close(); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testAsyncReadIdleTimeoutFires() throws Exception + { + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + Assert.assertEquals(0, input.read()); + Assert.assertFalse(input.isReady()); + } + + @Override + public void onAllDataRead() throws IOException + { + } + + @Override + public void onError(Throwable failure) + { + if (failure instanceof TimeoutException) + { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + asyncContext.complete(); + handlerLatch.countDown(); + } + } + }); + } + }); + long idleTimeout = 1000; + setServerIdleTimeout(idleTimeout); + + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Async read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testAsyncWriteIdleTimeoutFires() throws Exception + { + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + ServletOutputStream output = response.getOutputStream(); + output.setWriteListener(new WriteListener() + { + @Override + public void onWritePossible() throws IOException + { + output.write(new byte[64 * 1024 * 1024]); + } + + @Override + public void onError(Throwable failure) + { + if (failure instanceof TimeoutException) + { + asyncContext.complete(); + handlerLatch.countDown(); + } + } + }); + } + }); + long idleTimeout = 1000; + setServerIdleTimeout(idleTimeout); + + BlockingQueue callbacks = new LinkedBlockingQueue<>(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .onResponseContentAsync((response, content, callback) -> + { + // Do not succeed the callback so the server will block writing. + callbacks.offer(callback); + }) + .send(result -> + { + if (result.isFailed()) + resultLatch.countDown(); + }); + + // Async write should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // After the server stopped sending, consume on the client to read the early EOF. + while (true) + { + Callback callback = callbacks.poll(1, TimeUnit.SECONDS); + if (callback == null) + break; + callback.succeeded(); + } + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testBlockingReadWithMinimumDataRateBelowLimit() throws Exception + { + int bytesPerSecond = 20; + httpConfig.setMinRequestDataRate(bytesPerSecond); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + ServletInputStream input = request.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } + } + catch (BadMessageException x) + { + handlerLatch.countDown(); + throw x; + } + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.REQUEST_TIMEOUT_408) + resultLatch.countDown(); + }); + + for (int i = 0; i < 3; ++i) + { + contentProvider.offer(ByteBuffer.allocate(bytesPerSecond / 2)); + Thread.sleep(1000); + } + contentProvider.close(); + + // Request should timeout. + Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testBlockingReadWithMinimumDataRateAboveLimit() throws Exception + { + int bytesPerSecond = 20; + httpConfig.setMinRequestDataRate(bytesPerSecond); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + ServletInputStream input = request.getInputStream(); + while (true) + { + int read = input.read(); + if (read < 0) + break; + } + handlerLatch.countDown(); + } + }); + + DeferredContentProvider contentProvider = new DeferredContentProvider(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + resultLatch.countDown(); + }); + + for (int i = 0; i < 3; ++i) + { + contentProvider.offer(ByteBuffer.allocate(bytesPerSecond * 2)); + Thread.sleep(1000); + } + contentProvider.close(); + + Assert.assertTrue(handlerLatch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testBlockingReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception + { + long httpIdleTimeout = 1000; + long idleTimeout = 3 * httpIdleTimeout; + httpConfig.setIdleTimeout(httpIdleTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new BlockingReadHandler(handlerLatch)); + setServerIdleTimeout(idleTimeout); + + try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) + { + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Blocking read should timeout. + Assert.assertTrue(handlerLatch.await(2 * httpIdleTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test + public void testAsyncReadHttpIdleTimeoutOverridesIdleTimeout() throws Exception + { + long httpIdleTimeout = 1000; + long idleTimeout = 3 * httpIdleTimeout; + httpConfig.setIdleTimeout(httpIdleTimeout); + CountDownLatch handlerLatch = new CountDownLatch(1); + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + ServletInputStream input = request.getInputStream(); + input.setReadListener(new ReadListener() + { + @Override + public void onDataAvailable() throws IOException + { + Assert.assertEquals(0, input.read()); + Assert.assertFalse(input.isReady()); + } + + @Override + public void onAllDataRead() throws IOException + { + } + + @Override + public void onError(Throwable failure) + { + if (failure instanceof TimeoutException) + { + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + asyncContext.complete(); + handlerLatch.countDown(); + } + } + }); + } + }); + setServerIdleTimeout(idleTimeout); + + DeferredContentProvider contentProvider = new DeferredContentProvider(ByteBuffer.allocate(1)); + CountDownLatch resultLatch = new CountDownLatch(1); + client.POST(newURI()) + .content(contentProvider) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.INTERNAL_SERVER_ERROR_500) + resultLatch.countDown(); + }); + + // Async read should timeout. + Assert.assertTrue(handlerLatch.await(2 * idleTimeout, TimeUnit.MILLISECONDS)); + // Complete the request. + contentProvider.close(); + Assert.assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); + } + + private static class BlockingReadHandler extends AbstractHandler + { + private final CountDownLatch handlerLatch; + + public BlockingReadHandler(CountDownLatch handlerLatch) + { + this.handlerLatch = handlerLatch; + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + ServletInputStream input = request.getInputStream(); + Assert.assertEquals(0, input.read()); + try + { + input.read(); + } + catch (IOException x) + { + handlerLatch.countDown(); + throw x; + } + } + } + + private static class BlockingWriteHandler extends AbstractHandler + { + private final CountDownLatch handlerLatch; + + private BlockingWriteHandler(CountDownLatch handlerLatch) + { + this.handlerLatch = handlerLatch; + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + ServletOutputStream output = response.getOutputStream(); + try + { + output.write(new byte[64 * 1024 * 1024]); + } + catch (IOException x) + { + handlerLatch.countDown(); + throw x; + } + } + } +}