406617 Spin in Request.recycle

Numerous code cleanups with the handling of early closes of requests, specially when the response has already been sent.
This commit is contained in:
Greg Wilkins 2013-05-03 15:15:03 +10:00
parent 5bea4cc781
commit d351e0790a
11 changed files with 318 additions and 118 deletions

View File

@ -374,10 +374,9 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
failAndClose(new EOFException()); failAndClose(new EOFException());
return false;
} }
private void failAndClose(Throwable failure) private void failAndClose(Throwable failure)

View File

@ -1151,16 +1151,14 @@ public class HttpParser
case CLOSED: case CLOSED:
if (BufferUtil.hasContent(buffer)) if (BufferUtil.hasContent(buffer))
{ {
int len=buffer.remaining(); // Just ignore data when closed
_headerBytes+=len; _headerBytes+=buffer.remaining();
BufferUtil.clear(buffer);
if (_headerBytes>_maxHeaderBytes) if (_headerBytes>_maxHeaderBytes)
{ {
Thread.sleep(100); // Don't want to waste time reading data of a closed request
String chars = BufferUtil.toDetailString(buffer); throw new IllegalStateException("too much data after closed");
BufferUtil.clear(buffer);
throw new IllegalStateException(String.format("%s %d/%d>%d data when CLOSED:%s",this,len,_headerBytes,_maxHeaderBytes,chars));
} }
BufferUtil.clear(buffer);
} }
return false; return false;
default: break; default: break;
@ -1473,8 +1471,18 @@ public class HttpParser
*/ */
public boolean parsedHeader(HttpField field); public boolean parsedHeader(HttpField field);
public boolean earlyEOF(); /* ------------------------------------------------------------ */
/** Called to signal that an EOF was received unexpectedly
* during the parsing of a HTTP message
* @return True if the parser should return to its caller
*/
public void earlyEOF();
/* ------------------------------------------------------------ */
/** Called to signal that a bad HTTP message has been received.
* @param status The bad status to send
* @param reason The textual reason for badness
*/
public void badMessage(int status, String reason); public void badMessage(int status, String reason);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -138,9 +138,8 @@ public class HttpTester
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
return true;
} }
@Override @Override

View File

@ -51,9 +51,8 @@ public class HttpGeneratorServerTest
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
return true;
} }
@Override @Override

View File

@ -1147,9 +1147,8 @@ public class HttpParserTest
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
return true;
} }
@Override @Override

View File

@ -566,10 +566,9 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
_request.getHttpInput().earlyEOF(); _request.getHttpInput().earlyEOF();
return false;
} }
@Override @Override

View File

@ -277,7 +277,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
LOG.debug(e); LOG.debug(e);
} }
catch (IOException e) catch (Exception e)
{ {
if (_parser.isIdle()) if (_parser.isIdle())
LOG.debug(e); LOG.debug(e);
@ -285,11 +285,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
LOG.warn(this.toString(), e); LOG.warn(this.toString(), e);
close(); close();
} }
catch (Exception e)
{
LOG.warn(this.toString(), e);
close();
}
finally finally
{ {
setCurrentConnection(null); setCurrentConnection(null);
@ -621,7 +616,6 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
*/ */
} }
@Override @Override
protected void onAllContentConsumed() protected void onAllContentConsumed()
{ {
@ -631,6 +625,12 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
*/ */
releaseRequestBuffer(); releaseRequestBuffer();
} }
@Override
public String toString()
{
return super.toString()+"{"+HttpConnection.this+","+getEndPoint()+","+_parser+"}";
}
} }
private class HttpChannelOverHttp extends HttpChannel<ByteBuffer> private class HttpChannelOverHttp extends HttpChannel<ByteBuffer>

View File

@ -139,7 +139,7 @@ public abstract class HttpInput<T> extends ServletInputStream
// blockForContent will only return with no // blockForContent will only return with no
// content if it is closed. // content if it is closed.
if (!isShutdown()) if (!isShutdown())
LOG.warn("unexpected !EOF state"); LOG.warn("Unexpected !EOF ");
onEOF(); onEOF();
return -1; return -1;
@ -173,20 +173,34 @@ public abstract class HttpInput<T> extends ServletInputStream
} }
} }
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal new content has been queued
* @param item
*/
protected void onContentQueued(T item) protected void onContentQueued(T item)
{ {
lock().notify(); lock().notify();
} }
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal all available content has been consumed
*/
protected void onAllContentConsumed() protected void onAllContentConsumed()
{ {
} }
/* ------------------------------------------------------------ */
/** Called by this HttpInput to signal it has reached EOF
*/
protected void onEOF() protected void onEOF()
{ {
} }
public boolean content(T item) /* ------------------------------------------------------------ */
/** Add some content to the input stream
* @param item
*/
public void content(T item)
{ {
synchronized (lock()) synchronized (lock())
{ {
@ -197,19 +211,26 @@ public abstract class HttpInput<T> extends ServletInputStream
onContentQueued(item); onContentQueued(item);
LOG.debug("{} queued {}", this, item); LOG.debug("{} queued {}", this, item);
} }
return true;
} }
/* ------------------------------------------------------------ */
/** This method should be called to signal to the HttpInput
* that an EOF has arrived before all the expected content.
* Typically this will result in an EOFException being thrown
* from a subsequent read rather than a -1 return.
*/
public void earlyEOF() public void earlyEOF()
{ {
synchronized (lock()) synchronized (lock())
{ {
_earlyEOF = true; _earlyEOF = true;
_inputEOF = true;
lock().notify(); lock().notify();
LOG.debug("{} early EOF", this); LOG.debug("{} early EOF", this);
} }
} }
/* ------------------------------------------------------------ */
public boolean isEarlyEOF() public boolean isEarlyEOF()
{ {
synchronized (lock()) synchronized (lock())
@ -218,6 +239,7 @@ public abstract class HttpInput<T> extends ServletInputStream
} }
} }
/* ------------------------------------------------------------ */
public void shutdown() public void shutdown()
{ {
synchronized (lock()) synchronized (lock())
@ -228,6 +250,7 @@ public abstract class HttpInput<T> extends ServletInputStream
} }
} }
/* ------------------------------------------------------------ */
public boolean isShutdown() public boolean isShutdown()
{ {
synchronized (lock()) synchronized (lock())
@ -236,13 +259,14 @@ public abstract class HttpInput<T> extends ServletInputStream
} }
} }
/* ------------------------------------------------------------ */
public void consumeAll() public void consumeAll()
{ {
synchronized (lock()) synchronized (lock())
{ {
T item = _inputQ.peekUnsafe();
while (!isShutdown() && !isEarlyEOF()) while (!isShutdown() && !isEarlyEOF())
{ {
T item = _inputQ.peekUnsafe();
while (item != null) while (item != null)
{ {
_inputQ.pollUnsafe(); _inputQ.pollUnsafe();
@ -256,6 +280,9 @@ public abstract class HttpInput<T> extends ServletInputStream
try try
{ {
blockForContent(); blockForContent();
item = _inputQ.peekUnsafe();
if (item==null)
break;
} }
catch (IOException e) catch (IOException e)
{ {

View File

@ -753,6 +753,8 @@ public class Response implements HttpServletResponse
break; break;
case STREAM: case STREAM:
getOutputStream().close(); getOutputStream().close();
break;
default:
} }
return true; return true;
} }
@ -926,6 +928,7 @@ public class Response implements HttpServletResponse
case TE: case TE:
_fields.put(HttpHeader.CONNECTION, HttpHeaderValue.TE.toString()); _fields.put(HttpHeader.CONNECTION, HttpHeaderValue.TE.toString());
break; break;
default:
} }
} }
} }
@ -965,6 +968,8 @@ public class Response implements HttpServletResponse
case STREAM: case STREAM:
case WRITER: case WRITER:
_out.reset(); _out.reset();
break;
default:
} }
_out.resetBuffer(); _out.resetBuffer();
@ -1023,7 +1028,7 @@ public class Response implements HttpServletResponse
return _reason; return _reason;
} }
public void complete() throws IOException public void complete()
{ {
_out.close(); _out.close();
} }

View File

@ -18,26 +18,38 @@
package org.eclipse.jetty.server; package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket; import java.net.Socket;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Exchanger; import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext; import javax.servlet.AsyncContext;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@ -46,116 +58,129 @@ public class AsyncRequestReadTest
{ {
private static Server server; private static Server server;
private static ServerConnector connector; private static ServerConnector connector;
private final static Exchanger<Long> __total=new Exchanger<Long>(); private final static BlockingQueue<Long> __total=new BlockingArrayQueue<>();
@BeforeClass @Before
public static void startServer() throws Exception public void startServer() throws Exception
{ {
server = new Server(); server = new Server();
connector = new ServerConnector(server); connector = new ServerConnector(server);
connector.setIdleTimeout(10000); connector.setIdleTimeout(10000);
server.addConnector(connector); server.addConnector(connector);
server.setHandler(new EmptyHandler());
server.start();
} }
@AfterClass @After
public static void stopServer() throws Exception public void stopServer() throws Exception
{ {
server.stop(); server.stop();
server.join(); server.join();
} }
@Test @Test
public void test() throws Exception public void testPipelined() throws Exception
{ {
final Socket socket = new Socket("localhost",connector.getLocalPort()); server.setHandler(new AsyncStreamHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)120);
byte[] content = new byte[16*4096]; OutputStream out = socket.getOutputStream();
Arrays.fill(content, (byte)120); String header=
"POST / HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"\r\n";
byte[] h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content);
header=
"POST / HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"Connection: close\r\n"+
"\r\n";
h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content);
out.flush();
OutputStream out = socket.getOutputStream(); InputStream in = socket.getInputStream();
String header= String response = IO.toString(in);
"POST / HTTP/1.1\r\n"+ assertTrue(response.indexOf("200 OK")>0);
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"Connection: close\r\n"+
"\r\n";
byte[] h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h); long total=__total.poll(5,TimeUnit.SECONDS);
out.flush(); assertEquals(content.length, total);
total=__total.poll(5,TimeUnit.SECONDS);
out.write(content,0,4*4096); assertEquals(content.length, total);
Thread.sleep(100); }
out.write(content,8192,4*4096);
Thread.sleep(100);
out.write(content,8*4096,content.length-8*4096);
out.flush();
InputStream in = socket.getInputStream();
String response = IO.toString(in);
assertTrue(response.indexOf("200 OK")>0);
long total=__total.exchange(0L,30,TimeUnit.SECONDS);
assertEquals(content.length, total);
} }
@Test @Test
@Ignore public void testAsyncReadsWithDelays() throws Exception
public void tests() throws Exception
{ {
runTest(64,4,4,20); server.setHandler(new AsyncStreamHandler());
runTest(256,16,16,50); server.start();
runTest(256,1,128,10);
runTest(128*1024,1,64,10); asyncReadTest(64,4,4,20);
runTest(256*1024,5321,10,100); asyncReadTest(256,16,16,50);
runTest(512*1024,32*1024,10,10); asyncReadTest(256,1,128,10);
asyncReadTest(128*1024,1,64,10);
asyncReadTest(256*1024,5321,10,100);
asyncReadTest(512*1024,32*1024,10,10);
} }
public void runTest(int contentSize, int chunkSize, int chunks, int delayMS) throws Exception public void asyncReadTest(int contentSize, int chunkSize, int chunks, int delayMS) throws Exception
{ {
String tst=contentSize+","+chunkSize+","+chunks+","+delayMS; String tst=contentSize+","+chunkSize+","+chunks+","+delayMS;
//System.err.println(tst); //System.err.println(tst);
final Socket socket = new Socket("localhost",connector.getLocalPort()); try(final Socket socket = new Socket("localhost",connector.getLocalPort()))
byte[] content = new byte[contentSize];
Arrays.fill(content, (byte)120);
OutputStream out = socket.getOutputStream();
out.write("POST / HTTP/1.1\r\n".getBytes());
out.write("Host: localhost\r\n".getBytes());
out.write(("Content-Length: "+content.length+"\r\n").getBytes());
out.write("Content-Type: bytes\r\n".getBytes());
out.write("Connection: close\r\n".getBytes());
out.write("\r\n".getBytes());
out.flush();
int offset=0;
for (int i=0;i<chunks;i++)
{ {
out.write(content,offset,chunkSize);
offset+=chunkSize; byte[] content = new byte[contentSize];
Thread.sleep(delayMS); Arrays.fill(content, (byte)120);
OutputStream out = socket.getOutputStream();
out.write("POST / HTTP/1.1\r\n".getBytes());
out.write("Host: localhost\r\n".getBytes());
out.write(("Content-Length: "+content.length+"\r\n").getBytes());
out.write("Content-Type: bytes\r\n".getBytes());
out.write("Connection: close\r\n".getBytes());
out.write("\r\n".getBytes());
out.flush();
int offset=0;
for (int i=0;i<chunks;i++)
{
out.write(content,offset,chunkSize);
offset+=chunkSize;
Thread.sleep(delayMS);
}
out.write(content,offset,content.length-offset);
out.flush();
InputStream in = socket.getInputStream();
String response = IO.toString(in);
assertTrue(tst,response.indexOf("200 OK")>0);
long total=__total.poll(30,TimeUnit.SECONDS);
assertEquals(tst,content.length, total);
} }
out.write(content,offset,content.length-offset);
out.flush();
InputStream in = socket.getInputStream();
String response = IO.toString(in);
assertTrue(tst,response.indexOf("200 OK")>0);
long total=__total.exchange(0L,30,TimeUnit.SECONDS);
assertEquals(tst,content.length, total);
} }
private static class EmptyHandler extends AbstractHandler private static class AsyncStreamHandler extends AbstractHandler
{ {
@Override @Override
public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException
@ -164,6 +189,7 @@ public class AsyncRequestReadTest
request.setHandled(true); request.setHandled(true);
final AsyncContext async = request.startAsync(); final AsyncContext async = request.startAsync();
// System.err.println("handle "+request.getContentLength());
new Thread() new Thread()
{ {
@ -171,9 +197,10 @@ public class AsyncRequestReadTest
public void run() public void run()
{ {
long total=0; long total=0;
try try(InputStream in = request.getInputStream();)
{ {
InputStream in = request.getInputStream(); // System.err.println("reading...");
byte[] b = new byte[4*4096]; byte[] b = new byte[4*4096];
int read; int read;
while((read =in.read(b))>=0) while((read =in.read(b))>=0)
@ -188,17 +215,156 @@ public class AsyncRequestReadTest
{ {
httpResponse.setStatus(200); httpResponse.setStatus(200);
async.complete(); async.complete();
try // System.err.println("read "+total);
{ __total.offer(total);
__total.exchange(total);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
} }
} }
}.start(); }.start();
} }
} }
@Test
public void testPartialRead() throws Exception
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
OutputStream out = socket.getOutputStream();
String header=
"POST /?read=10 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"\r\n";
byte[] h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content);
header= "POST /?read=10 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"Connection: close\r\n"+
"\r\n";
h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content);
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
assertThat(in.readLine(),containsString("HTTP/1.1 200 OK"));
assertThat(in.readLine(),containsString("Content-Length:"));
assertThat(in.readLine(),containsString("Server:"));
in.readLine();
assertThat(in.readLine(),containsString("XXXXXXX"));
assertThat(in.readLine(),containsString("HTTP/1.1 200 OK"));
assertThat(in.readLine(),containsString("Connection: close"));
assertThat(in.readLine(),containsString("Server:"));
in.readLine();
assertThat(in.readLine(),containsString("XXXXXXX"));
}
}
@Test
public void testPartialReadThenShutdown() throws Exception
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(10000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
OutputStream out = socket.getOutputStream();
String header=
"POST /?read=10 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"\r\n";
byte[] h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content,0,4096);
out.flush();
socket.shutdownOutput();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
assertThat(in.readLine(),containsString("HTTP/1.1 200 OK"));
assertThat(in.readLine(),containsString("Content-Length:"));
assertThat(in.readLine(),containsString("Server:"));
in.readLine();
assertThat(in.readLine(),containsString("XXXXXXX"));
}
}
@Test
public void testPartialReadThenClose() throws Exception
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
OutputStream out = socket.getOutputStream();
String header=
"POST /?read=10 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
"Content-Type: bytes\r\n"+
"\r\n";
byte[] h=header.getBytes(StringUtil.__ISO_8859_1);
out.write(h);
out.write(content,0,4096);
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
assertThat(in.readLine(),containsString("HTTP/1.1 200 OK"));
assertThat(in.readLine(),containsString("Content-Length:"));
assertThat(in.readLine(),containsString("Server:"));
in.readLine();
assertThat(in.readLine(),containsString("XXXXXXX"));
socket.close();
}
}
private static class PartialReaderHandler extends AbstractHandler
{
@Override
public void handle(String path, final Request request, HttpServletRequest httpRequest, final HttpServletResponse httpResponse) throws IOException, ServletException
{
httpResponse.setStatus(200);
request.setHandled(true);
BufferedReader in = request.getReader();
PrintWriter out =httpResponse.getWriter();
int read=Integer.valueOf(request.getParameter("read"));
// System.err.println("read="+read);
for (int i=read;i-->0;)
{
int c=in.read();
if (c<0)
break;
out.write(c);
}
out.println();
}
}
} }

View File

@ -158,10 +158,9 @@ public class ProxyHTTPSPDYConnection extends HttpConnection implements HttpParse
} }
@Override @Override
public boolean earlyEOF() public void earlyEOF()
{ {
// TODO // TODO
return false;
} }
@Override @Override