diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java index a482da34583..335d9c6f924 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncHttpConnection.java @@ -38,6 +38,7 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async private static final Logger LOG = Log.getLogger(AsyncHttpConnection.class); private int _total_no_progress; private final AsyncEndPoint _asyncEndp; + private boolean _readInterested = true; public AsyncHttpConnection(Connector connector, EndPoint endpoint, Server server) { @@ -103,29 +104,44 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async { some_progress|=progress; // Is this request/response round complete and are fully flushed? - if (_parser.isComplete() && _generator.isComplete()) + boolean parserComplete = _parser.isComplete(); + boolean generatorComplete = _generator.isComplete(); + boolean complete = parserComplete && generatorComplete; + if (parserComplete) { - // Reset the parser/generator - progress=true; - - // look for a switched connection instance? - if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) + if (generatorComplete) { - Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection"); - if (switched!=null) - connection=switched; + // Reset the parser/generator + progress=true; + + // look for a switched connection instance? + if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) + { + Connection switched=(Connection)_request.getAttribute("org.eclipse.jetty.io.Connection"); + if (switched!=null) + connection=switched; + } + + reset(); + + // TODO Is this still required? + if (!_generator.isPersistent() && !_endp.isOutputShutdown()) + { + LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA"); + _endp.shutdownOutput(); + } } - - reset(); - - // TODO Is this still required? - if (!_generator.isPersistent() && !_endp.isOutputShutdown()) + else { - LOG.warn("Safety net oshut!!! IF YOU SEE THIS, PLEASE RAISE BUGZILLA"); - _endp.shutdownOutput(); + // We have finished parsing, but not generating so + // we must not be interested in reading until we + // have finished generating and we reset the generator + _readInterested = false; + LOG.debug("Disabled read interest while writing response {}", _endp); } } - else if (_request.getAsyncContinuation().isAsyncStarted()) + + if (!complete && _request.getAsyncContinuation().isAsyncStarted()) { // The request is suspended, so even though progress has been made, // exit the while loop by setting progress to false @@ -177,10 +193,23 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async // then no more can happen, so close. _endp.close(); } - + // Make idle parser seek EOF if (_parser.isIdle()) _parser.setPersistent(false); } + @Override + public void reset() + { + _readInterested = true; + LOG.debug("Enabled read interest {}", _endp); + super.reset(); + } + + @Override + public boolean isSuspended() + { + return !_readInterested || super.isSuspended(); + } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java new file mode 100644 index 00000000000..833e5ff97b0 --- /dev/null +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/SlowClientWithPipelinedRequestTest.java @@ -0,0 +1,159 @@ +package org.eclipse.jetty.server; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.nio.AsyncConnection; +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import static org.hamcrest.Matchers.lessThan; + +public class SlowClientWithPipelinedRequestTest +{ + private final AtomicInteger handles = new AtomicInteger(); + private Server server; + private SelectChannelConnector connector; + + public void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new SelectChannelConnector() + { + @Override + protected AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint) + { + return new AsyncHttpConnection(this, endpoint, getServer()) + { + @Override + public Connection handle() throws IOException + { + handles.incrementAndGet(); + return super.handle(); + } + }; + } + }; + server.addConnector(connector); + connector.setPort(0); + server.setHandler(handler); + server.start(); + } + + @After + public void stopServer() throws Exception + { + if (server != null) + { + server.stop(); + server.join(); + } + } + + @Test + public void testSlowClientWithPipelinedRequest() throws Exception + { + final int contentLength = 512 * 1024; + startServer(new AbstractHandler() + { + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException + { + baseRequest.setHandled(true); + System.err.println("target = " + target); + if ("/content".equals(target)) + { + // We simulate what the DefaultServlet does, bypassing the blocking + // write mechanism otherwise the test does not reproduce the bug + + OutputStream outputStream = response.getOutputStream(); + AbstractHttpConnection.Output output = (AbstractHttpConnection.Output)outputStream; + // Since the test is via localhost, we need a really big buffer to stall the write + byte[] bytes = new byte[contentLength]; + Arrays.fill(bytes, (byte)'9'); + Buffer buffer = new ByteArrayBuffer(bytes); + // Do a non blocking write + output.sendContent(buffer); + } + } + }); + + Socket client = new Socket("localhost", connector.getLocalPort()); + OutputStream output = client.getOutputStream(); + output.write(("" + + "GET /content HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n" + + "").getBytes("UTF-8")); + output.flush(); + + InputStream input = client.getInputStream(); + + int read = input.read(); + Assert.assertTrue(read >= 0); + // As soon as we can read the response, send a pipelined request + // so it is a different read for the server and it will trigger NIO + output.write(("" + + "GET /pipelined HTTP/1.1\r\n" + + "Host: localhost:" + connector.getLocalPort() + "\r\n" + + "\r\n" + + "").getBytes("UTF-8")); + output.flush(); + + // Simulate a slow reader + Thread.sleep(1000); + Assert.assertThat(handles.get(), lessThan(10)); + + // We are sure we are not spinning, read the content + StringBuilder lines = new StringBuilder().append((char)read); + int crlfs = 0; + while (true) + { + read = input.read(); + lines.append((char)read); + if (read == '\r' || read == '\n') + ++crlfs; + else + crlfs = 0; + if (crlfs == 4) + break; + } + Assert.assertTrue(lines.toString().contains(" 200 ")); + // Read the body + for (int i = 0; i < contentLength; ++i) + input.read(); + + // Read the pipelined response + lines.setLength(0); + crlfs = 0; + while (true) + { + read = input.read(); + lines.append((char)read); + if (read == '\r' || read == '\n') + ++crlfs; + else + crlfs = 0; + if (crlfs == 4) + break; + } + Assert.assertTrue(lines.toString().contains(" 200 ")); + + client.close(); + } +} diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/GzipWithPipeliningTest.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/GzipWithPipeliningTest.java index 9130dc2ebc0..485df242df2 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/GzipWithPipeliningTest.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/GzipWithPipeliningTest.java @@ -105,7 +105,7 @@ public class GzipWithPipeliningTest client.connect(); // Request text that will be gzipped + chunked in the response - client.issueGET("/lots-of-fantasy-names.txt",true); + client.issueGET("/lots-of-fantasy-names.txt",true, false); respHeader = client.readResponseHeader(); System.out.println("Response Header #1 --\n" + respHeader); @@ -125,7 +125,7 @@ public class GzipWithPipeliningTest System.out.printf("Read %,d bytes%n",readBytes); // Issue another request - client.issueGET("/jetty_logo.png",true); + client.issueGET("/jetty_logo.png",true, false); // Finish reading chunks System.out.println("Finish reading remaining chunks ..."); diff --git a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/PipelineHelper.java b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/PipelineHelper.java index 9dc4944bd6e..3f0e039b3e8 100644 --- a/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/PipelineHelper.java +++ b/jetty-servlets/src/test/java/org/eclipse/jetty/servlets/PipelineHelper.java @@ -1,7 +1,5 @@ package org.eclipse.jetty.servlets; -import static org.hamcrest.Matchers.*; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -17,6 +15,8 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StdErrLog; import org.junit.Assert; +import static org.hamcrest.Matchers.not; + public class PipelineHelper { private static final Logger LOG = Log.getLogger(PipelineHelper.class); @@ -38,7 +38,7 @@ public class PipelineHelper /** * Open the Socket to the destination endpoint and - * + * * @return the open java Socket. * @throws IOException */ @@ -57,14 +57,14 @@ public class PipelineHelper /** * Issue a HTTP/1.1 GET request with Connection:keep-alive set. - * + * * @param path * the path to GET * @param acceptGzipped * to turn on acceptance of GZIP compressed responses * @throws IOException */ - public void issueGET(String path, boolean acceptGzipped) throws IOException + public void issueGET(String path, boolean acceptGzipped, boolean close) throws IOException { LOG.debug("Issuing GET on " + path); StringBuilder req = new StringBuilder(); @@ -79,7 +79,15 @@ public class PipelineHelper req.append("Accept-Encoding: gzip, deflate\r\n"); } req.append("Cookie: JSESSIONID=spqx8v8szylt1336t96vc6mw0\r\n"); - req.append("Connection: keep-alive\r\n"); + if ( close ) + { + req.append("Connection: close\r\n"); + } + else + { + req.append("Connection: keep-alive\r\n"); + } + req.append("\r\n"); LOG.debug("Request:" + req); @@ -92,7 +100,7 @@ public class PipelineHelper public String readResponseHeader() throws IOException { - // Read Response Header + // Read Response Header socket.setSoTimeout(10000); LOG.debug("Reading http header"); @@ -189,6 +197,15 @@ public class PipelineHelper while (left > 0) { int val = inputStream.read(); + try + { + if (left % 10 == 0) + Thread.sleep(1); + } + catch (InterruptedException e) + { + e.printStackTrace(); + } if (val == (-1)) { Assert.fail(String.format("Encountered an early EOL (expected another %,d bytes)",left)); diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/helper/SafariD00.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/helper/SafariD00.java index 3b5d8fd0b82..69f18c26cf3 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/helper/SafariD00.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/helper/SafariD00.java @@ -15,9 +15,6 @@ *******************************************************************************/ package org.eclipse.jetty.websocket.helper; -import static org.hamcrest.Matchers.*; - -import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -35,6 +32,8 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.StdErrLog; import org.junit.Assert; +import static org.hamcrest.Matchers.is; + public class SafariD00 { private static final Logger LOG = Log.getLogger(SafariD00.class); @@ -56,7 +55,7 @@ public class SafariD00 /** * Open the Socket to the destination endpoint and - * + * * @return the open java Socket. * @throws IOException */ @@ -75,7 +74,7 @@ public class SafariD00 /** * Issue an Http websocket (Draft-0) upgrade request (using an example request captured from OSX/Safari) - * + * * @throws UnsupportedEncodingException */ public void issueHandshake() throws IOException @@ -103,23 +102,22 @@ public class SafariD00 out.write(buf,0,buf.length); out.flush(); - // Read HTTP 101 Upgrade / Handshake Response - InputStreamReader reader = new InputStreamReader(in); - BufferedReader br = new BufferedReader(reader); - socket.setSoTimeout(10000); - LOG.debug("Reading http header"); - boolean foundEnd = false; - String line; - while (!foundEnd) + // Read HTTP 101 Upgrade / Handshake Response + InputStreamReader reader = new InputStreamReader(in); + + LOG.debug("Reading http headers"); + int crlfs = 0; + while (true) { - line = br.readLine(); - // System.out.printf("RESP: %s%n",line); - if (line.length() == 0) - { - foundEnd = true; - } + int read = in.read(); + if (read == '\r' || read == '\n') + ++crlfs; + else + crlfs = 0; + if (crlfs == 4) + break; } // Read expected handshake hixie bytes