diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 57ea48d3c98..deb137606f3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -461,7 +461,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor } else if (isCommitted()) { - _transport.abort(x); + abort(x); if (!(x instanceof EofException)) LOG.warn("Could not send response error 500: "+x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index c999441f0c0..fa795dd8417 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -349,11 +349,27 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http // Finish consuming the request // If we are still expecting if (_channel.isExpecting100Continue()) + { // close to seek EOF _parser.close(); + } else if (_parser.inContentState() && _generator.isPersistent()) - // Complete reading the request - _channel.getRequest().getHttpInput().consumeAll(); + { + // If we are async, then we have problems to complete neatly + if (_channel.getRequest().getHttpInput().isAsync()) + { + if (LOG.isDebugEnabled()) + LOG.debug("unconsumed async input {}", this); + _channel.abort(); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("unconsumed input {}", this); + // Complete reading the request + _channel.getRequest().getHttpInput().consumeAll(); + } + } // Reset the channel, parsers and generator _channel.recycle(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 26fd95379b1..98dd1a15d49 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -322,7 +322,7 @@ public abstract class HttpInput extends ServletInputStream implements Runnable public void consumeAll() { synchronized (lock()) - { + { try { while (!isFinished()) diff --git a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java index 5cd51d8d785..19ef2b10b8c 100644 --- a/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java +++ b/jetty-servlet/src/test/java/org/eclipse/jetty/servlet/AsyncServletIOTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; @@ -65,6 +66,7 @@ public class AsyncServletIOTest private static final Logger LOG = Log.getLogger(AsyncServletIOTest.class); protected AsyncIOServlet _servlet0=new AsyncIOServlet(); protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2(); + protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3(); protected int _port; protected Server _server = new Server(); protected ServletHandler _servletHandler; @@ -89,9 +91,13 @@ public class AsyncServletIOTest _servletHandler.addServletWithMapping(holder,"/path/*"); ServletHolder holder2=new ServletHolder(_servlet2); - holder.setAsyncSupported(true); + holder2.setAsyncSupported(true); _servletHandler.addServletWithMapping(holder2,"/path2/*"); + ServletHolder holder3=new ServletHolder(_servlet3); + holder3.setAsyncSupported(true); + _servletHandler.addServletWithMapping(holder3,"/path3/*"); + _server.start(); _port=_connector.getLocalPort(); @@ -209,6 +215,50 @@ public class AsyncServletIOTest Assert.assertTrue(_servlet2.completed.await(5, TimeUnit.SECONDS)); } + @Test + public void testAsyncConsumeAll() throws Exception + { + StringBuilder request = new StringBuilder(512); + request.append("GET /ctx/path3/info HTTP/1.1\r\n") + .append("Host: localhost\r\n") + .append("Content-Type: text/plain\r\n") + .append("Content-Length: 10\r\n") + .append("\r\n"); + + int port=_port; + try (Socket socket = new Socket("localhost",port)) + { + socket.setSoTimeout(1000000); + OutputStream out = socket.getOutputStream(); + out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1)); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400); + + // response line + String line = in.readLine(); + LOG.debug("response-line: "+line); + Assert.assertThat(line,startsWith("HTTP/1.1 200 OK")); + + // Skip headers + while (line!=null) + { + line = in.readLine(); + LOG.debug("header-line: "+line); + if (line.length()==0) + break; + } + + // Get body + line = in.readLine(); + LOG.debug("body: "+line); + Assert.assertEquals("DONE",line); + + // The connection should be aborted + line = in.readLine(); + Assert.assertNull(line); + } + } + public synchronized List process(String content,int... writes) throws Exception { return process(content.getBytes(StandardCharsets.ISO_8859_1),writes); @@ -507,4 +557,40 @@ public class AsyncServletIOTest } } } + + + @SuppressWarnings("serial") + public class AsyncIOServlet3 extends HttpServlet + { + public CountDownLatch completed = new CountDownLatch(1); + + @Override + public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException + { + AsyncContext async = request.startAsync(); + + request.getInputStream().setReadListener(new ReadListener() + { + + @Override + public void onError(Throwable t) + { + } + + @Override + public void onDataAvailable() throws IOException + { + } + + @Override + public void onAllDataRead() throws IOException + { + } + }); + + response.setStatus(200); + response.getOutputStream().print("DONE"); + async.complete(); + } + } }