From d351e0790a75fd77344f6d426fa463ba757312cb Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 3 May 2013 15:15:03 +1000 Subject: [PATCH] 406617 Spin in Request.recycle Numerous code cleanups with the handling of early closes of requests, specially when the response has already been sent. --- .../eclipse/jetty/client/HttpReceiver.java | 3 +- .../org/eclipse/jetty/http/HttpParser.java | 24 +- .../org/eclipse/jetty/http/HttpTester.java | 3 +- .../jetty/http/HttpGeneratorServerTest.java | 3 +- .../eclipse/jetty/http/HttpParserTest.java | 3 +- .../org/eclipse/jetty/server/HttpChannel.java | 3 +- .../eclipse/jetty/server/HttpConnection.java | 14 +- .../org/eclipse/jetty/server/HttpInput.java | 35 +- .../org/eclipse/jetty/server/Response.java | 7 +- .../jetty/server/AsyncRequestReadTest.java | 338 +++++++++++++----- .../server/proxy/ProxyHTTPSPDYConnection.java | 3 +- 11 files changed, 318 insertions(+), 118 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 081e4b602f3..2f337313903 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -374,10 +374,9 @@ public class HttpReceiver implements HttpParser.ResponseHandler } @Override - public boolean earlyEOF() + public void earlyEOF() { failAndClose(new EOFException()); - return false; } private void failAndClose(Throwable failure) diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java index eb2753e8aea..9b89bfac70e 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java @@ -1151,16 +1151,14 @@ public class HttpParser case CLOSED: if (BufferUtil.hasContent(buffer)) { - int len=buffer.remaining(); - _headerBytes+=len; + // Just ignore data when closed + _headerBytes+=buffer.remaining(); + BufferUtil.clear(buffer); if (_headerBytes>_maxHeaderBytes) { - Thread.sleep(100); - String chars = BufferUtil.toDetailString(buffer); - BufferUtil.clear(buffer); - throw new IllegalStateException(String.format("%s %d/%d>%d data when CLOSED:%s",this,len,_headerBytes,_maxHeaderBytes,chars)); + // Don't want to waste time reading data of a closed request + throw new IllegalStateException("too much data after closed"); } - BufferUtil.clear(buffer); } return false; default: break; @@ -1473,8 +1471,18 @@ public class HttpParser */ public boolean parsedHeader(HttpField field); - public boolean earlyEOF(); + /* ------------------------------------------------------------ */ + /** Called to signal that an EOF was received unexpectedly + * during the parsing of a HTTP message + * @return True if the parser should return to its caller + */ + public void earlyEOF(); + /* ------------------------------------------------------------ */ + /** Called to signal that a bad HTTP message has been received. + * @param status The bad status to send + * @param reason The textual reason for badness + */ public void badMessage(int status, String reason); /* ------------------------------------------------------------ */ diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpTester.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpTester.java index 24f2e7aad82..b14221d98d2 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpTester.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpTester.java @@ -138,9 +138,8 @@ public class HttpTester } @Override - public boolean earlyEOF() + public void earlyEOF() { - return true; } @Override diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpGeneratorServerTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpGeneratorServerTest.java index 7ca440f0aab..262c0acb0e2 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpGeneratorServerTest.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpGeneratorServerTest.java @@ -51,9 +51,8 @@ public class HttpGeneratorServerTest } @Override - public boolean earlyEOF() + public void earlyEOF() { - return true; } @Override diff --git a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpParserTest.java b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpParserTest.java index 99506f400b1..b65e10a12a4 100644 --- a/jetty-http/src/test/java/org/eclipse/jetty/http/HttpParserTest.java +++ b/jetty-http/src/test/java/org/eclipse/jetty/http/HttpParserTest.java @@ -1147,9 +1147,8 @@ public class HttpParserTest } @Override - public boolean earlyEOF() + public void earlyEOF() { - return true; } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 1d7edd9b0d3..1f25642f2d2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -566,10 +566,9 @@ public class HttpChannel implements HttpParser.RequestHandler, Runnable } @Override - public boolean earlyEOF() + public void earlyEOF() { _request.getHttpInput().earlyEOF(); - return false; } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 88c53c7d719..bc7ad1b1227 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -277,7 +277,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { LOG.debug(e); } - catch (IOException e) + catch (Exception e) { if (_parser.isIdle()) LOG.debug(e); @@ -285,11 +285,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http LOG.warn(this.toString(), e); close(); } - catch (Exception e) - { - LOG.warn(this.toString(), e); - close(); - } finally { setCurrentConnection(null); @@ -621,7 +616,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http */ } - @Override protected void onAllContentConsumed() { @@ -631,6 +625,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http */ releaseRequestBuffer(); } + + @Override + public String toString() + { + return super.toString()+"{"+HttpConnection.this+","+getEndPoint()+","+_parser+"}"; + } } private class HttpChannelOverHttp extends HttpChannel diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java index 8f19b56bd46..403b31676d8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpInput.java @@ -139,7 +139,7 @@ public abstract class HttpInput extends ServletInputStream // blockForContent will only return with no // content if it is closed. if (!isShutdown()) - LOG.warn("unexpected !EOF state"); + LOG.warn("Unexpected !EOF "); onEOF(); return -1; @@ -173,20 +173,34 @@ public abstract class HttpInput extends ServletInputStream } } + /* ------------------------------------------------------------ */ + /** Called by this HttpInput to signal new content has been queued + * @param item + */ protected void onContentQueued(T item) { lock().notify(); } + /* ------------------------------------------------------------ */ + /** Called by this HttpInput to signal all available content has been consumed + */ protected void onAllContentConsumed() { } + /* ------------------------------------------------------------ */ + /** Called by this HttpInput to signal it has reached EOF + */ protected void onEOF() { } - public boolean content(T item) + /* ------------------------------------------------------------ */ + /** Add some content to the input stream + * @param item + */ + public void content(T item) { synchronized (lock()) { @@ -197,19 +211,26 @@ public abstract class HttpInput extends ServletInputStream onContentQueued(item); LOG.debug("{} queued {}", this, item); } - return true; } + /* ------------------------------------------------------------ */ + /** This method should be called to signal to the HttpInput + * that an EOF has arrived before all the expected content. + * Typically this will result in an EOFException being thrown + * from a subsequent read rather than a -1 return. + */ public void earlyEOF() { synchronized (lock()) { _earlyEOF = true; + _inputEOF = true; lock().notify(); LOG.debug("{} early EOF", this); } } + /* ------------------------------------------------------------ */ public boolean isEarlyEOF() { synchronized (lock()) @@ -218,6 +239,7 @@ public abstract class HttpInput extends ServletInputStream } } + /* ------------------------------------------------------------ */ public void shutdown() { synchronized (lock()) @@ -228,6 +250,7 @@ public abstract class HttpInput extends ServletInputStream } } + /* ------------------------------------------------------------ */ public boolean isShutdown() { synchronized (lock()) @@ -236,13 +259,14 @@ public abstract class HttpInput extends ServletInputStream } } + /* ------------------------------------------------------------ */ public void consumeAll() { synchronized (lock()) { + T item = _inputQ.peekUnsafe(); while (!isShutdown() && !isEarlyEOF()) { - T item = _inputQ.peekUnsafe(); while (item != null) { _inputQ.pollUnsafe(); @@ -256,6 +280,9 @@ public abstract class HttpInput extends ServletInputStream try { blockForContent(); + item = _inputQ.peekUnsafe(); + if (item==null) + break; } catch (IOException e) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index d1e90a23f39..d18366acec1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -753,6 +753,8 @@ public class Response implements HttpServletResponse break; case STREAM: getOutputStream().close(); + break; + default: } return true; } @@ -926,6 +928,7 @@ public class Response implements HttpServletResponse case TE: _fields.put(HttpHeader.CONNECTION, HttpHeaderValue.TE.toString()); break; + default: } } } @@ -965,6 +968,8 @@ public class Response implements HttpServletResponse case STREAM: case WRITER: _out.reset(); + break; + default: } _out.resetBuffer(); @@ -1023,7 +1028,7 @@ public class Response implements HttpServletResponse return _reason; } - public void complete() throws IOException + public void complete() { _out.close(); } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java index 9162374a649..a1b673d18f3 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncRequestReadTest.java @@ -18,26 +18,38 @@ package org.eclipse.jetty.server; +import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.PrintWriter; import java.net.Socket; import java.util.Arrays; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import javax.servlet.AsyncContext; import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.StringUtil; +import org.hamcrest.Matchers; +import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -46,116 +58,129 @@ public class AsyncRequestReadTest { private static Server server; private static ServerConnector connector; - private final static Exchanger __total=new Exchanger(); + private final static BlockingQueue __total=new BlockingArrayQueue<>(); - @BeforeClass - public static void startServer() throws Exception + @Before + public void startServer() throws Exception { server = new Server(); connector = new ServerConnector(server); connector.setIdleTimeout(10000); server.addConnector(connector); - server.setHandler(new EmptyHandler()); - server.start(); } - @AfterClass - public static void stopServer() throws Exception + @After + public void stopServer() throws Exception { server.stop(); server.join(); } @Test - public void test() throws Exception + public void testPipelined() throws Exception { - final Socket socket = new Socket("localhost",connector.getLocalPort()); + server.setHandler(new AsyncStreamHandler()); + server.start(); + + try (final Socket socket = new Socket("localhost",connector.getLocalPort())) + { + socket.setSoTimeout(1000); + + byte[] content = new byte[32*4096]; + Arrays.fill(content, (byte)120); - byte[] content = new byte[16*4096]; - Arrays.fill(content, (byte)120); + OutputStream out = socket.getOutputStream(); + String header= + "POST / HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "\r\n"; + byte[] h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content); + + + header= + "POST / HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "Connection: close\r\n"+ + "\r\n"; + h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content); + out.flush(); - OutputStream out = socket.getOutputStream(); - String header= - "POST / HTTP/1.1\r\n"+ - "Host: localhost\r\n"+ - "Content-Length: "+content.length+"\r\n"+ - "Content-Type: bytes\r\n"+ - "Connection: close\r\n"+ - "\r\n"; - byte[] h=header.getBytes(StringUtil.__ISO_8859_1); + InputStream in = socket.getInputStream(); + String response = IO.toString(in); + assertTrue(response.indexOf("200 OK")>0); - out.write(h); - out.flush(); - - out.write(content,0,4*4096); - Thread.sleep(100); - out.write(content,8192,4*4096); - Thread.sleep(100); - out.write(content,8*4096,content.length-8*4096); - - out.flush(); - - InputStream in = socket.getInputStream(); - String response = IO.toString(in); - assertTrue(response.indexOf("200 OK")>0); - - long total=__total.exchange(0L,30,TimeUnit.SECONDS); - assertEquals(content.length, total); + long total=__total.poll(5,TimeUnit.SECONDS); + assertEquals(content.length, total); + total=__total.poll(5,TimeUnit.SECONDS); + assertEquals(content.length, total); + } } @Test - @Ignore - public void tests() throws Exception + public void testAsyncReadsWithDelays() throws Exception { - runTest(64,4,4,20); - runTest(256,16,16,50); - runTest(256,1,128,10); - runTest(128*1024,1,64,10); - runTest(256*1024,5321,10,100); - runTest(512*1024,32*1024,10,10); + server.setHandler(new AsyncStreamHandler()); + server.start(); + + asyncReadTest(64,4,4,20); + asyncReadTest(256,16,16,50); + asyncReadTest(256,1,128,10); + asyncReadTest(128*1024,1,64,10); + asyncReadTest(256*1024,5321,10,100); + asyncReadTest(512*1024,32*1024,10,10); } - public void runTest(int contentSize, int chunkSize, int chunks, int delayMS) throws Exception + public void asyncReadTest(int contentSize, int chunkSize, int chunks, int delayMS) throws Exception { String tst=contentSize+","+chunkSize+","+chunks+","+delayMS; //System.err.println(tst); - final Socket socket = new Socket("localhost",connector.getLocalPort()); - - byte[] content = new byte[contentSize]; - Arrays.fill(content, (byte)120); - - OutputStream out = socket.getOutputStream(); - out.write("POST / HTTP/1.1\r\n".getBytes()); - out.write("Host: localhost\r\n".getBytes()); - out.write(("Content-Length: "+content.length+"\r\n").getBytes()); - out.write("Content-Type: bytes\r\n".getBytes()); - out.write("Connection: close\r\n".getBytes()); - out.write("\r\n".getBytes()); - out.flush(); - - int offset=0; - for (int i=0;i0); + + long total=__total.poll(30,TimeUnit.SECONDS); + assertEquals(tst,content.length, total); } - out.write(content,offset,content.length-offset); - - out.flush(); - - InputStream in = socket.getInputStream(); - String response = IO.toString(in); - assertTrue(tst,response.indexOf("200 OK")>0); - - long total=__total.exchange(0L,30,TimeUnit.SECONDS); - assertEquals(tst,content.length, total); } - private static class EmptyHandler extends AbstractHandler + private static class AsyncStreamHandler extends AbstractHandler { @Override public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException @@ -164,6 +189,7 @@ public class AsyncRequestReadTest request.setHandled(true); final AsyncContext async = request.startAsync(); + // System.err.println("handle "+request.getContentLength()); new Thread() { @@ -171,9 +197,10 @@ public class AsyncRequestReadTest public void run() { long total=0; - try + try(InputStream in = request.getInputStream();) { - InputStream in = request.getInputStream(); + // System.err.println("reading..."); + byte[] b = new byte[4*4096]; int read; while((read =in.read(b))>=0) @@ -188,17 +215,156 @@ public class AsyncRequestReadTest { httpResponse.setStatus(200); async.complete(); - try - { - __total.exchange(total); - } - catch (InterruptedException e) - { - e.printStackTrace(); - } + // System.err.println("read "+total); + __total.offer(total); } } }.start(); } } + + + @Test + public void testPartialRead() throws Exception + { + server.setHandler(new PartialReaderHandler()); + server.start(); + + try (final Socket socket = new Socket("localhost",connector.getLocalPort())) + { + socket.setSoTimeout(1000); + + byte[] content = new byte[32*4096]; + Arrays.fill(content, (byte)88); + + OutputStream out = socket.getOutputStream(); + String header= + "POST /?read=10 HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "\r\n"; + byte[] h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content); + + header= "POST /?read=10 HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "Connection: close\r\n"+ + "\r\n"; + h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content); + out.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + assertThat(in.readLine(),containsString("HTTP/1.1 200 OK")); + assertThat(in.readLine(),containsString("Content-Length:")); + assertThat(in.readLine(),containsString("Server:")); + in.readLine(); + assertThat(in.readLine(),containsString("XXXXXXX")); + assertThat(in.readLine(),containsString("HTTP/1.1 200 OK")); + assertThat(in.readLine(),containsString("Connection: close")); + assertThat(in.readLine(),containsString("Server:")); + in.readLine(); + assertThat(in.readLine(),containsString("XXXXXXX")); + + } + } + + @Test + public void testPartialReadThenShutdown() throws Exception + { + server.setHandler(new PartialReaderHandler()); + server.start(); + + try (final Socket socket = new Socket("localhost",connector.getLocalPort())) + { + socket.setSoTimeout(10000); + + byte[] content = new byte[32*4096]; + Arrays.fill(content, (byte)88); + + OutputStream out = socket.getOutputStream(); + String header= + "POST /?read=10 HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "\r\n"; + byte[] h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content,0,4096); + out.flush(); + socket.shutdownOutput(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + assertThat(in.readLine(),containsString("HTTP/1.1 200 OK")); + assertThat(in.readLine(),containsString("Content-Length:")); + assertThat(in.readLine(),containsString("Server:")); + in.readLine(); + assertThat(in.readLine(),containsString("XXXXXXX")); + } + } + + @Test + public void testPartialReadThenClose() throws Exception + { + server.setHandler(new PartialReaderHandler()); + server.start(); + + try (final Socket socket = new Socket("localhost",connector.getLocalPort())) + { + socket.setSoTimeout(1000); + + byte[] content = new byte[32*4096]; + Arrays.fill(content, (byte)88); + + OutputStream out = socket.getOutputStream(); + String header= + "POST /?read=10 HTTP/1.1\r\n"+ + "Host: localhost\r\n"+ + "Content-Length: "+content.length+"\r\n"+ + "Content-Type: bytes\r\n"+ + "\r\n"; + byte[] h=header.getBytes(StringUtil.__ISO_8859_1); + out.write(h); + out.write(content,0,4096); + out.flush(); + + BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + assertThat(in.readLine(),containsString("HTTP/1.1 200 OK")); + assertThat(in.readLine(),containsString("Content-Length:")); + assertThat(in.readLine(),containsString("Server:")); + in.readLine(); + assertThat(in.readLine(),containsString("XXXXXXX")); + + socket.close(); + } + } + + private static class PartialReaderHandler extends AbstractHandler + { + @Override + public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException + { + httpResponse.setStatus(200); + request.setHandled(true); + + BufferedReader in = request.getReader(); + PrintWriter out =httpResponse.getWriter(); + int read=Integer.valueOf(request.getParameter("read")); + // System.err.println("read="+read); + for (int i=read;i-->0;) + { + int c=in.read(); + if (c<0) + break; + out.write(c); + } + out.println(); + } + } } diff --git a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java index e960d12a94b..2c05ec77899 100644 --- a/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java +++ b/jetty-spdy/spdy-http-server/src/main/java/org/eclipse/jetty/spdy/server/proxy/ProxyHTTPSPDYConnection.java @@ -158,10 +158,9 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse } @Override - public boolean earlyEOF() + public void earlyEOF() { // TODO - return false; } @Override